# Введение в MapReduce модель на Python


In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [None]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [None]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [None]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [None]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [None]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [None]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [None]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [None]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [None]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [None]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 2.1192202162349965),
 (1, 2.1192202162349965),
 (2, 2.1192202162349965),
 (3, 2.1192202162349965),
 (4, 2.1192202162349965)]

## Inverted index

In [None]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('it', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('is', ['0', '1', '2']),
 ('a', ['2']),
 ('banana', ['2'])]

## WordCount

In [None]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [None]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('banana', 2), ('it', 18)]),
 (1, [('a', 2), ('is', 18), ('what', 10)])]

## TeraSort

In [None]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.032860284966512165),
   (None, 0.07661220448388828),
   (None, 0.08858981274981514),
   (None, 0.09391596351787668),
   (None, 0.11051332590837382),
   (None, 0.1129398494626257),
   (None, 0.15918524595246009),
   (None, 0.20063783279613923),
   (None, 0.23079101426997117),
   (None, 0.23855394388915396),
   (None, 0.24362455523014204),
   (None, 0.2851774817461844),
   (None, 0.33879531744711133),
   (None, 0.3747200009927326),
   (None, 0.41407090297446225),
   (None, 0.4762066273280423),
   (None, 0.47884879549882686)]),
 (1,
  [(None, 0.5087602205159703),
   (None, 0.5486310799216224),
   (None, 0.6054204620998495),
   (None, 0.7009722508597502),
   (None, 0.7260582714390139),
   (None, 0.7448689241828621),
   (None, 0.7452774836351116),
   (None, 0.7528303140630986),
   (None, 0.7679000916491274),
   (None, 0.7820158019431032),
   (None, 0.8679969770770599),
   (None, 0.8690876453692609),
   (None, 0.9114578441602503)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [None]:
from functools import reduce
import random

# Функция generate для генерации случайных чисел в заданном диапазоне
def generate(count):
    return [random.randint(0, 100) for _ in range(count)]

# Функция map - передаёт каждый элемент как отдельное значение
def mapper(num):
    return num

# Функция reduce - находит максимальное значение среди всех переданных значений
def reducer(a, b):
    return max(a, b)

# Входной список чисел
numbers = generate(15)
print(numbers)

# Применение map и reduce
mapped_values = map(mapper, numbers)
max_value = reduce(reducer, mapped_values)

print("Максимальное значение:", max_value)

[53, 97, 16, 76, 99, 30, 57, 26, 15, 29, 82, 51, 17, 84, 11]
Максимальное значение: 99


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [None]:
from typing import List, Tuple, Iterator

# Функция для отображения числа как пары (ключ, значение)
def map_step(number: int) -> Tuple[int, int]:
    return (0, number)  # Используем фиксированный ключ 0

# Функция для вычисления среднего значения из пар (ключ, значение)
def reduce_step(key: int, pairs: Iterator[Tuple[int, int]]) -> float:
    total_sum = 0
    total_count = 0
    for _, value in pairs:
        total_sum += value
        total_count += 1
    if total_count > 0:
        return total_sum / total_count
    else:
        return 0

# Генерация случайного списка чисел
def generate(size: int) -> List[int]:
    return [random.randint(0, 100) for _ in range(size)]

# Генерация входных данных
input_data = generate(100)
print("Input data:", input_data)

# Преобразование данных в пары (ключ, значение)
mapped_data = list(map(map_step, input_data))
print("Mapped data:", mapped_data)

# Вычисление среднего значения
average_result = reduce_step(0, mapped_data)  # Группируем по ключу 0

print("Arithmetic mean:", average_result)

Input data: [32, 58, 2, 68, 29, 29, 35, 97, 34, 8, 44, 0, 4, 10, 15, 49, 91, 35, 76, 44, 9, 99, 91, 35, 46, 90, 89, 41, 66, 24, 91, 93, 37, 58, 2, 25, 96, 91, 77, 23, 42, 8, 19, 33, 39, 41, 3, 22, 59, 5, 14, 71, 66, 63, 48, 14, 61, 8, 5, 19, 16, 2, 3, 45, 62, 49, 44, 10, 48, 29, 18, 44, 55, 13, 30, 97, 55, 32, 84, 68, 33, 22, 53, 68, 72, 100, 42, 30, 77, 20, 91, 31, 8, 4, 91, 57, 43, 21, 39, 21]
Mapped data: [(0, 32), (0, 58), (0, 2), (0, 68), (0, 29), (0, 29), (0, 35), (0, 97), (0, 34), (0, 8), (0, 44), (0, 0), (0, 4), (0, 10), (0, 15), (0, 49), (0, 91), (0, 35), (0, 76), (0, 44), (0, 9), (0, 99), (0, 91), (0, 35), (0, 46), (0, 90), (0, 89), (0, 41), (0, 66), (0, 24), (0, 91), (0, 93), (0, 37), (0, 58), (0, 2), (0, 25), (0, 96), (0, 91), (0, 77), (0, 23), (0, 42), (0, 8), (0, 19), (0, 33), (0, 39), (0, 41), (0, 3), (0, 22), (0, 59), (0, 5), (0, 14), (0, 71), (0, 66), (0, 63), (0, 48), (0, 14), (0, 61), (0, 8), (0, 5), (0, 19), (0, 16), (0, 2), (0, 3), (0, 45), (0, 62), (0, 49), (0, 44

### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [None]:
import random
from typing import List, Tuple, Iterator

# Функция для создания пары (ключ, значение)
def map_step(number: int) -> Tuple[int, int]:
    return (0, number)  # Фиксированный ключ 0

# Функция для вычисления среднего значения по ключу
def reduce_step(key: int, values: Iterator[int]) -> List[float]:
    total_sum = sum(values)
    total_count = len(values)
    if total_count > 0:
        return [total_sum / total_count]  # Возвращаем в виде списка
    else:
        return [0]

# Функция для генерации случайного списка чисел
def generate_numbers(size: int) -> List[int]:
    return [random.randint(0, 100) for _ in range(size)]

# Функция для группировки значений по ключу с использованием сортировки
def group_by_key(data: List[Tuple[int, int]]) -> List[Tuple[int, List[int]]]:
    data.sort(key=lambda x: x[0])  # Сортировка по ключу
    grouped = {}
    for key, value in data:
        if key not in grouped:
            grouped[key] = []
        grouped[key].append(value)
    return list(grouped.items())

# Функция для развёртки вложенных списков
def flatten(nested_list: List[List]) -> Iterator:
    for sublist in nested_list:
        for item in sublist:
            yield item

# Генерация данных
input_data = generate_numbers(100)
print("Input data:", input_data)

# Применение Map: преобразование чисел в пары (ключ, значение)
mapped_data = list(map(map_step, input_data))
print("Mapped data:", mapped_data)

# Применение Shuffle: группировка по ключу на основе сортировки
shuffled_data = group_by_key(mapped_data)
print("Shuffled data:", shuffled_data)

# Применение Reduce: вычисление среднего по ключу
reduced_data = list(flatten([reduce_step(key, values) for key, values in shuffled_data]))
print("Reduced data (arithmetic mean):", reduced_data)

Input data: [74, 19, 79, 17, 93, 79, 46, 60, 21, 61, 52, 72, 50, 1, 57, 66, 64, 22, 94, 53, 93, 18, 59, 98, 21, 90, 59, 35, 78, 89, 52, 3, 64, 27, 89, 70, 28, 86, 32, 81, 99, 31, 45, 30, 82, 56, 65, 23, 83, 52, 40, 43, 9, 38, 3, 19, 11, 14, 74, 21, 10, 62, 90, 51, 48, 34, 80, 68, 34, 2, 97, 90, 52, 95, 25, 17, 49, 68, 27, 37, 34, 32, 25, 78, 24, 31, 12, 69, 31, 2, 76, 44, 74, 6, 85, 75, 68, 96, 48, 0]
Mapped data: [(0, 74), (0, 19), (0, 79), (0, 17), (0, 93), (0, 79), (0, 46), (0, 60), (0, 21), (0, 61), (0, 52), (0, 72), (0, 50), (0, 1), (0, 57), (0, 66), (0, 64), (0, 22), (0, 94), (0, 53), (0, 93), (0, 18), (0, 59), (0, 98), (0, 21), (0, 90), (0, 59), (0, 35), (0, 78), (0, 89), (0, 52), (0, 3), (0, 64), (0, 27), (0, 89), (0, 70), (0, 28), (0, 86), (0, 32), (0, 81), (0, 99), (0, 31), (0, 45), (0, 30), (0, 82), (0, 56), (0, 65), (0, 23), (0, 83), (0, 52), (0, 40), (0, 43), (0, 9), (0, 38), (0, 3), (0, 19), (0, 11), (0, 14), (0, 74), (0, 21), (0, 10), (0, 62), (0, 90), (0, 51), (0, 48), 

### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [None]:
import numpy as np
from typing import Iterator, List, Tuple

# Функция для выравнивания вложенных структур данных
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Группировка данных по ключу
def group_by_key(pairs: List[Tuple[str, str]]):
    grouped = {}
    for key, value in pairs:
        grouped.setdefault(key, []).append(value)
    return list(grouped.items())

# Распределённая группировка данных по ключу с использованием партиционера
def group_by_key_distributed(map_partitions, partitioner):
    global reducers
    partitions = [dict() for _ in range(reducers)]
    for map_partition in map_partitions:
        for key, value in map_partition:
            partition = partitions[partitioner(key)]
            partition.setdefault(key, []).append(value)
    return [(pid, sorted(partition.items(), key=lambda x: x[0])) for pid, partition in enumerate(partitions)]

# Функция для вычисления партиции по ключу
def partitioner(key):
    global reducers
    return hash(key) % reducers

# Функция для выполнения операций MapReduce
def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=partitioner, COMBINER=None):
    # Применение функции MAP и выравнивание результатов
    map_partitions = map(lambda reader: flatten(map(lambda k1v1: MAP(*k1v1), reader)), INPUTFORMAT())

    # Если есть COMBINER, применяем его перед Shuffle
    if COMBINER:
        map_partitions = map(lambda partition: flatten(map(lambda k2v2: COMBINER(*k2v2), group_by_key(partition))), map_partitions)

    # Shuffle: распределённая группировка данных по ключу
    reduce_partitions = group_by_key_distributed(map_partitions, PARTITIONER)

    # Reduce: агрегация данных
    reduce_outputs = map(lambda partition: (partition[0], flatten(map(lambda group: REDUCE(*group), partition[1]))), reduce_partitions)

    # Подсчёт общего количества отправленных пар ключ-значение
    total_values_sent = sum([len(values) for (key, values) in flatten([part for (_, part) in reduce_partitions])])
    print(f"{total_values_sent} key-value pairs were sent over a network.")

    return reduce_outputs

# Пример входных данных
documents = [
    "cat dog cat",
    "dog cat mouse",
    "dog mouse mouse",
    "cat cat cat",
    "mouse dog cat"
]

maps = 3
reducers = 2

# Функция для подготовки входных данных
def INPUTFORMAT():
    global maps
    split_size = int(np.ceil(len(documents) / maps))

    def RECORDREADER(split):
        for doc_id, document in enumerate(split):
            for line_id, line in enumerate(document.split('\n')):
                yield (f"{doc_id}:{line_id}", line)

    for i in range(0, len(documents), split_size):
        yield RECORDREADER(documents[i:i + split_size])

# Функция MAP для преобразования данных в пары (слово, слово)
def MAP(doc_id: str, line: str):
    for word in line.split():
        yield (word, word)

# Функция REDUCE для исключения дубликатов
def REDUCE(key: str, values: Iterator[str]):
    yield key  # Оставляем только уникальные ключи

# Выполнение распределённой операции MapReduce для исключения дубликатов
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
result = [(pid, list(partition)) for pid, partition in partitioned_output]

# Вывод результата
print("Unique elements:", result)

15 key-value pairs were sent over a network.
Unique elements: [(0, []), (1, ['cat', 'dog', 'mouse'])]


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [None]:
import random
from typing import List, Tuple, Iterator
from collections import defaultdict

# Функция для MAP операции: выборка кортежей, удовлетворяющих предикату C
def MAP(el_list: List[Tuple[int, int, int]]) -> Iterator[Tuple[Tuple[int, int, int], Tuple[int, int, int]]]:
    for t in el_list:
        if C(t):  # Проверка предиката C
            yield (t, t)  # Ключ и значение одинаковы, равны t

# Функция REDUCE: идентичность, возвращает то же значение, что получила на вход
def REDUCE(mapped_items: List[Tuple[Tuple[int, int, int], List[Tuple[int, int, int]]]]) -> List[Tuple[int, int, int]]:
    reduced_result = []
    for key, values in mapped_items:
        reduced_result.extend(values)  # Возвращаем значения без изменений
    return reduced_result

# Функция для группировки данных по ключу
def group_by_key(mapped_data: List[Tuple[Tuple[int, int, int], Tuple[int, int, int]]]):
    grouped = defaultdict(list)
    for key, value in mapped_data:
        grouped[key].append(value)
    return list(grouped.items())

# Предикат для фильтрации кортежей с чётным первым элементом
def C(t: Tuple[int, int, int]) -> bool:
    return t[0] % 2 == 0

# Генерация случайных записей (кортежей формата (x, y, z))
def RECORDREADER(count: int) -> List[Tuple[int, int, int]]:
    return [(random.randint(0, 100), random.randint(0, 100), random.randint(0, 100)) for _ in range(count)]

# Разбиение записей на равные части
def partition_records(record: List[Tuple[int, int, int]], num_partitions: int) -> List[List[Tuple[int, int, int]]]:
    partition_size = int(len(record) / num_partitions)
    return [record[i:i + partition_size] for i in range(0, len(record), partition_size)]

# Основная функция для выполнения операций MAP и REDUCE
def MapReduce(record: List[Tuple[int, int, int]], num_partitions: int):
    # Разделяем записи на части (разделы)
    partitions = partition_records(record, num_partitions)
    print("Partitions:", partitions)

    # Применяем MAP операцию к каждому разделу
    mapped_data = []
    for partition in partitions:
        mapped_data.extend(MAP(partition))
    print("Mapped Data:", mapped_data)

    # Выполняем группировку данных по ключу
    grouped_data = group_by_key(mapped_data)
    print("Grouped Data:", grouped_data)

    # Применяем REDUCE операцию к сгруппированным данным
    reduced_result = REDUCE(grouped_data)
    return reduced_result

# Генерация набора записей
record = RECORDREADER(10)
print("Input Records:", record)

# Количество разделов (партиций)
num_partitions = 3

# Выполнение операции MapReduce (Selection)
result = MapReduce(record, num_partitions)
print("Selection Result:", result)


Input Records: [(36, 76, 51), (92, 71, 7), (91, 85, 57), (69, 60, 65), (34, 64, 91), (85, 37, 0), (57, 18, 47), (65, 59, 83), (52, 16, 31), (30, 53, 95)]
Partitions: [[(36, 76, 51), (92, 71, 7), (91, 85, 57)], [(69, 60, 65), (34, 64, 91), (85, 37, 0)], [(57, 18, 47), (65, 59, 83), (52, 16, 31)], [(30, 53, 95)]]
Mapped Data: [((36, 76, 51), (36, 76, 51)), ((92, 71, 7), (92, 71, 7)), ((34, 64, 91), (34, 64, 91)), ((52, 16, 31), (52, 16, 31)), ((30, 53, 95), (30, 53, 95))]
Grouped Data: [((36, 76, 51), [(36, 76, 51)]), ((92, 71, 7), [(92, 71, 7)]), ((34, 64, 91), [(34, 64, 91)]), ((52, 16, 31), [(52, 16, 31)]), ((30, 53, 95), [(30, 53, 95)])]
Selection Result: [(36, 76, 51), (92, 71, 7), (34, 64, 91), (52, 16, 31), (30, 53, 95)]


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [None]:
import random
from collections import defaultdict
from typing import List, Tuple, Iterator

# Множество атрибутов S для проекции
S = {25, 90, 69}

# Функция MAP: создаёт пары (t', t'), где t' содержит только элементы из S
def MAP(t: Tuple[int, int, int]) -> Tuple[Tuple[int, ...], Tuple[int, ...]]:
    t_prime = tuple(el for el in t if el in S)
    return (t_prime, t_prime)

# Функция REDUCE: возвращает одну пару (t', t') для каждого ключа t'
def REDUCE(key: Tuple[int, ...], values: List[Tuple[int, ...]]) -> Tuple[Tuple[int, ...], Tuple[int, ...]]:
    return (key, key)

# Функция для группировки данных по ключу
def group_by_key(mapped_data: List[Tuple[Tuple[int, ...], Tuple[int, ...]]]):
    grouped = defaultdict(list)
    for key, value in mapped_data:
        grouped[key].append(value)
    return list(grouped.items())

# Генерация случайных записей (кортежей формата (x, y, z))
def RECORDREADER(count: int) -> List[Tuple[int, int, int]]:
    return [(random.randint(0, 100), random.randint(0, 100), random.randint(0, 100)) for _ in range(count)]

# Основная функция для выполнения операций MAP и REDUCE
def MapReduce(record: List[Tuple[int, int, int]]):
    # Применяем MAP операцию к записям
    mapped_data = [MAP(t) for t in record]
    print("MAP output:", mapped_data)

    # Выполняем группировку данных по ключу
    grouped_data = group_by_key(mapped_data)
    print("Grouped Data:", grouped_data)

    # Применяем REDUCE операцию к сгруппированным данным
    reduced_result = [REDUCE(key, values) for key, values in grouped_data]
    return reduced_result

# Генерация набора записей
record = RECORDREADER(10)
print("Input Records:", record)

# Выполнение операции MapReduce (Projection)
result = MapReduce(record)
print("Projection Result:", result)


Input Records: [(65, 54, 74), (50, 90, 28), (65, 35, 72), (24, 44, 92), (40, 12, 14), (41, 68, 15), (54, 26, 71), (87, 12, 66), (24, 58, 74), (11, 13, 29)]
MAP output: [((), ()), ((90,), (90,)), ((), ()), ((), ()), ((), ()), ((), ()), ((), ()), ((), ()), ((), ()), ((), ())]
Grouped Data: [((), [(), (), (), (), (), (), (), (), ()]), ((90,), [(90,)])]
Projection Result: [((), ()), ((90,), (90,))]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [None]:
import random
from collections import defaultdict
from typing import List, Tuple

# Функция MAP: преобразует каждый кортеж t в пару (t, t)
def MAP(t: Tuple[int, int, int]) -> Tuple[Tuple[int, int, int], Tuple[int, int, int]]:
    return (t, t)

# Функция REDUCE: возвращает одну пару (t, t) для каждого ключа t
def REDUCE(key: Tuple[int, int, int], values: List[Tuple[int, int, int]]) -> Tuple[Tuple[int, int, int], Tuple[int, int, int]]:
    return (key, key)

# Функция для группировки данных по ключу
def group_by_key(mapped_data: List[Tuple[Tuple[int, int, int], Tuple[int, int, int]]]):
    grouped = defaultdict(list)
    for key, value in mapped_data:
        grouped[key].append(value)
    return list(grouped.items())

# Генерация случайных записей (кортежей формата (x, y, z))
def RECORDREADER(count: int) -> List[Tuple[int, int, int]]:
    return [(random.randint(0, 100), random.randint(0, 100), random.randint(0, 100)) for _ in range(count)]

# Основная функция для выполнения операций MAP и REDUCE
def MapReduce(record1: List[Tuple[int, int, int]], record2: List[Tuple[int, int, int]]):
    # Объединяем оба набора записей
    combined_record = record1 + record2
    print("Combined Records:", combined_record)

    # Применяем MAP операцию к объединённым записям
    mapped_data = [MAP(t) for t in combined_record]
    print("MAP output:", mapped_data)

    # Выполняем группировку данных по ключу
    grouped_data = group_by_key(mapped_data)
    print("Grouped Data:", grouped_data)

    # Применяем REDUCE операцию к сгруппированным данным
    reduced_result = [REDUCE(key, values) for key, values in grouped_data]
    return reduced_result

# Генерация двух наборов записей (пример объединения двух множеств)
record1 = RECORDREADER(5)
record2 = RECORDREADER(5)
print("Input Records Set 1:", record1)
print("Input Records Set 2:", record2)

# Выполнение операции MapReduce (Union)
result = MapReduce(record1, record2)
print("Union Result:", result)

Input Records Set 1: [(99, 23, 83), (53, 11, 2), (88, 54, 87), (89, 35, 68), (92, 21, 85)]
Input Records Set 2: [(65, 85, 24), (87, 44, 56), (24, 69, 6), (12, 69, 72), (85, 49, 50)]
Combined Records: [(99, 23, 83), (53, 11, 2), (88, 54, 87), (89, 35, 68), (92, 21, 85), (65, 85, 24), (87, 44, 56), (24, 69, 6), (12, 69, 72), (85, 49, 50)]
MAP output: [((99, 23, 83), (99, 23, 83)), ((53, 11, 2), (53, 11, 2)), ((88, 54, 87), (88, 54, 87)), ((89, 35, 68), (89, 35, 68)), ((92, 21, 85), (92, 21, 85)), ((65, 85, 24), (65, 85, 24)), ((87, 44, 56), (87, 44, 56)), ((24, 69, 6), (24, 69, 6)), ((12, 69, 72), (12, 69, 72)), ((85, 49, 50), (85, 49, 50))]
Grouped Data: [((99, 23, 83), [(99, 23, 83)]), ((53, 11, 2), [(53, 11, 2)]), ((88, 54, 87), [(88, 54, 87)]), ((89, 35, 68), [(89, 35, 68)]), ((92, 21, 85), [(92, 21, 85)]), ((65, 85, 24), [(65, 85, 24)]), ((87, 44, 56), [(87, 44, 56)]), ((24, 69, 6), [(24, 69, 6)]), ((12, 69, 72), [(12, 69, 72)]), ((85, 49, 50), [(85, 49, 50)])]
Union Result: [((99, 

### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [None]:
import random
from collections import defaultdict
from typing import List, Tuple

# Функция MAP: преобразует каждый кортеж t в пару (t, t)
def MAP(t: Tuple[int, int]) -> Tuple[Tuple[int, int], Tuple[int, int]]:
    return (t, t)

# Функция REDUCE: возвращает пару (t, t), если ключ присутствует дважды (в обоих множествах)
def REDUCE(key: Tuple[int, int], values: List[Tuple[int, int]]) -> Tuple[Tuple[int, int], Tuple[int, int]]:
    if len(values) == 2:  # Если ключ присутствует дважды (в обоих множествах)
        return (key, key)  # Возвращает пару (t, t)
    return None

# Функция для группировки данных по ключу
def group_by_key(mapped_data: List[Tuple[Tuple[int, int], Tuple[int, int]]]):
    grouped = defaultdict(list)
    for key, value in mapped_data:
        grouped[key].append(value)
    return list(grouped.items())

# Генерация случайных записей (кортежей формата (x, y))
def RECORDREADER(count: int) -> List[Tuple[int, int]]:
    return [(random.randint(0, 3), random.randint(0, 3)) for _ in range(count)]

# Основная функция для выполнения операций MAP и REDUCE
def MapReduce(record1: List[Tuple[int, int]], record2: List[Tuple[int, int]]):
    # Объединяем оба набора записей
    combined_record = record1 + record2
    print("Combined Records:", combined_record)

    # Применяем MAP операцию к объединённым записям
    mapped_data = [MAP(t) for t in combined_record]
    print("MAP output:", mapped_data)

    # Выполняем группировку данных по ключу
    grouped_data = group_by_key(mapped_data)
    print("Grouped Data:", grouped_data)

    # Применяем REDUCE операцию к сгруппированным данным
    reduced_result = [REDUCE(key, values) for key, values in grouped_data]

    # Фильтруем None значения из результата REDUCE
    return [res for res in reduced_result if res is not None]

# Генерация двух наборов записей (пример пересечения двух множеств)
record1 = RECORDREADER(10)
record2 = RECORDREADER(10)
print("Input Records Set 1:", record1)
print("Input Records Set 2:", record2)

# Выполнение операции MapReduce (Intersection)
result = MapReduce(record1, record2)
print("Intersection Result:", result)

Input Records Set 1: [(3, 2), (2, 1), (3, 3), (2, 0), (3, 0), (0, 3), (0, 1), (1, 0), (2, 2), (1, 2)]
Input Records Set 2: [(3, 2), (0, 0), (2, 3), (3, 2), (0, 0), (3, 1), (0, 0), (2, 2), (0, 2), (2, 3)]
Combined Records: [(3, 2), (2, 1), (3, 3), (2, 0), (3, 0), (0, 3), (0, 1), (1, 0), (2, 2), (1, 2), (3, 2), (0, 0), (2, 3), (3, 2), (0, 0), (3, 1), (0, 0), (2, 2), (0, 2), (2, 3)]
MAP output: [((3, 2), (3, 2)), ((2, 1), (2, 1)), ((3, 3), (3, 3)), ((2, 0), (2, 0)), ((3, 0), (3, 0)), ((0, 3), (0, 3)), ((0, 1), (0, 1)), ((1, 0), (1, 0)), ((2, 2), (2, 2)), ((1, 2), (1, 2)), ((3, 2), (3, 2)), ((0, 0), (0, 0)), ((2, 3), (2, 3)), ((3, 2), (3, 2)), ((0, 0), (0, 0)), ((3, 1), (3, 1)), ((0, 0), (0, 0)), ((2, 2), (2, 2)), ((0, 2), (0, 2)), ((2, 3), (2, 3))]
Grouped Data: [((3, 2), [(3, 2), (3, 2), (3, 2)]), ((2, 1), [(2, 1)]), ((3, 3), [(3, 3)]), ((2, 0), [(2, 0)]), ((3, 0), [(3, 0)]), ((0, 3), [(0, 3)]), ((0, 1), [(0, 1)]), ((1, 0), [(1, 0)]), ((2, 2), [(2, 2), (2, 2)]), ((1, 2), [(1, 2)]), ((0, 

### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [None]:
import random

# Определяем отношения
rels = [1, 2]  # 1 = R, 2 = S

# Класс для хранения данных кортежа и принадлежности к отношению
class Tuple:
    def __init__(self, data: tuple, rel_id: int):
        self.data = data
        self.rel_id = rel_id

    def __repr__(self):
        return f"Tuple(data={self.data}, rel_id={self.rel_id})"

# Генерация случайных кортежей с указанием принадлежности к отношению
def get_random_tuple(count):
    data = tuple((random.randint(0, 3), random.randint(0, 3)) for _ in range(count))
    rel_id = random.choice(rels)  # Выбираем случайное отношение R или S
    return Tuple(data, rel_id)

# Чтение записей: генерация списка случайных кортежей
def RECORDREADER(count):
    return [get_random_tuple(2) for _ in range(count)]

# MAP функция: создаёт пару (данные, принадлежность отношению)
def MAP(t: Tuple):
    # Если rel_id == 1, значит кортеж из R, иначе из S
    relation = 'R' if t.rel_id == 1 else 'S'
    return t.data, relation

# REDUCE функция: проверяет, есть ли кортеж только в R
def REDUCE(key, values):
    # Если кортеж есть только в R, возвращаем пару (key, key)
    if values == ['R']:
        return key, key
    return None

# Группировка значений по ключу
def group_by_key(iterable):
    grouped = {}
    for k, v in iterable:
        grouped.setdefault(k, []).append(v)
    return grouped.items()

# Генерация записей
record = RECORDREADER(1000)  # Увеличено количество записей
print("Records:", record)

# MAP: Применение функции MAP к каждому элементу
map_output = list(map(MAP, record))
print("MAP output:", map_output)

# SHUFFLE: Группировка значений по ключу
shuffle_output = list(group_by_key(map_output))
print("Shuffle output:", shuffle_output)

# REDUCE: Применение функции REDUCE к сгруппированным данным
reduce_output = list(filter(None, map(lambda x: REDUCE(*x), shuffle_output)))
print("Difference Result:", reduce_output)

Records: [Tuple(data=((2, 3), (2, 2)), rel_id=2), Tuple(data=((1, 2), (1, 1)), rel_id=1), Tuple(data=((0, 1), (0, 2)), rel_id=1), Tuple(data=((3, 1), (1, 1)), rel_id=1), Tuple(data=((0, 2), (0, 3)), rel_id=1), Tuple(data=((3, 2), (0, 1)), rel_id=2), Tuple(data=((2, 1), (0, 0)), rel_id=2), Tuple(data=((1, 2), (0, 1)), rel_id=2), Tuple(data=((3, 1), (0, 2)), rel_id=1), Tuple(data=((3, 2), (2, 1)), rel_id=2), Tuple(data=((3, 0), (1, 2)), rel_id=1), Tuple(data=((1, 2), (2, 3)), rel_id=1), Tuple(data=((2, 3), (3, 3)), rel_id=1), Tuple(data=((2, 3), (3, 2)), rel_id=2), Tuple(data=((1, 3), (1, 0)), rel_id=2), Tuple(data=((0, 0), (2, 3)), rel_id=1), Tuple(data=((0, 3), (3, 2)), rel_id=2), Tuple(data=((1, 2), (2, 3)), rel_id=1), Tuple(data=((3, 0), (2, 3)), rel_id=2), Tuple(data=((0, 0), (0, 3)), rel_id=1), Tuple(data=((1, 1), (0, 3)), rel_id=2), Tuple(data=((1, 1), (2, 2)), rel_id=2), Tuple(data=((2, 1), (3, 0)), rel_id=1), Tuple(data=((0, 2), (3, 0)), rel_id=2), Tuple(data=((3, 2), (2, 2)), r

### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [None]:
import random
from collections import defaultdict

# Определяем идентификаторы отношений: 1 — R, 2 — S
rels = [1, 2]

# Класс для хранения данных кортежа и принадлежности к отношению
class Tuple:
    def __init__(self, data: tuple, rel_id: int):
        self.data = data
        self.rel_id = rel_id

    def __repr__(self):
        return f"Tuple(data={self.data}, rel_id={self.rel_id})"

# Генерация случайного кортежа с данными и идентификатором отношения
def get_random_tuple():
    data = (random.randint(0, 3), random.randint(0, 3))
    rel_id = random.choice(rels)
    return Tuple(data, rel_id)

# Создание списка случайных кортежей заданного количества
def RECORDREADER(count):
    return [get_random_tuple() for _ in range(count)]

# MAP функция: создаёт пары (b, (R, a)) или (b, (S, c))
def MAP(t: Tuple):
    # Если кортеж принадлежит отношению R, создаём пару (b, (R, a))
    if t.rel_id == rels[0]:  # R
        return t.data[1], ('R', t.data[0])  # (b, (R, a))
    # Если кортеж принадлежит отношению S, создаём пару (b, (S, c))
    else:  # S
        return t.data[0], ('S', t.data[1])  # (b, (S, c))

# REDUCE функция: создаёт тройки (a, b, c)
def REDUCE(key, values):
    # Разделяем значения по отношениям R и S
    r_values = [v[1] for v in values if v[0] == 'R']  # a из (R, a)
    s_values = [v[1] for v in values if v[0] == 'S']  # c из (S, c)

    # Создаём тройки (a, b, c) для всех пар, где b общий
    result = [(a, key, c) for a in r_values for c in s_values]
    return result

# Группировка данных по ключу
def group_by_key(iterable):
    grouped = defaultdict(list)
    for k, v in iterable:
        grouped[k].append(v)
    return grouped.items()

# Генерация случайных кортежей
record = RECORDREADER(100)
print("Records:", record)

# MAP: Применение функции MAP к каждому элементу
map_output = list(map(MAP, record))
print("\nMAP output:", map_output)

# SHUFFLE: Группировка значений по ключу
shuffle_output = list(group_by_key(map_output))
print("\nShuffle output:", shuffle_output)

# REDUCE: Применение функции REDUCE к сгруппированным данным
reduce_output = [item for sublist in map(lambda x: REDUCE(*x), shuffle_output) for item in sublist]
print("\nNatural Join Result:", reduce_output)

Records: [Tuple(data=(3, 0), rel_id=1), Tuple(data=(0, 3), rel_id=1), Tuple(data=(2, 0), rel_id=1), Tuple(data=(2, 3), rel_id=1), Tuple(data=(2, 2), rel_id=2), Tuple(data=(2, 0), rel_id=1), Tuple(data=(2, 3), rel_id=1), Tuple(data=(3, 0), rel_id=1), Tuple(data=(2, 3), rel_id=2), Tuple(data=(0, 1), rel_id=1), Tuple(data=(2, 0), rel_id=2), Tuple(data=(1, 2), rel_id=1), Tuple(data=(0, 3), rel_id=1), Tuple(data=(3, 0), rel_id=1), Tuple(data=(3, 1), rel_id=1), Tuple(data=(0, 0), rel_id=1), Tuple(data=(3, 3), rel_id=2), Tuple(data=(3, 3), rel_id=1), Tuple(data=(0, 3), rel_id=1), Tuple(data=(1, 2), rel_id=1), Tuple(data=(0, 0), rel_id=2), Tuple(data=(3, 3), rel_id=1), Tuple(data=(0, 3), rel_id=2), Tuple(data=(2, 3), rel_id=1), Tuple(data=(2, 1), rel_id=2), Tuple(data=(3, 2), rel_id=2), Tuple(data=(2, 1), rel_id=1), Tuple(data=(0, 2), rel_id=2), Tuple(data=(0, 2), rel_id=1), Tuple(data=(0, 3), rel_id=2), Tuple(data=(1, 3), rel_id=1), Tuple(data=(3, 2), rel_id=2), Tuple(data=(0, 1), rel_id=2), 

### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [None]:
import random
from collections import defaultdict

# Генерация случайного кортежа с тремя элементами (a, b, c)
def get_random_tuple():
    return (random.randint(0, 3), random.randint(0, 3), random.randint(0, 3))

# Создание списка случайных кортежей заданного количества
def RECORDREADER(count):
    return [get_random_tuple() for _ in range(count)]

# MAP функция: создаёт пары (a, b)
def MAP(t):
    return (t[0], t[1])  # (a, b)

# Агрегирующая функция tetta: сумма всех значений
# Можно заменить на max(values) для поиска максимального значения
def tetta(values):
    return sum(values)

# REDUCE функция: применяет tetta к значениям, сгруппированным по ключу
def REDUCE(key, values):
    x = tetta(values)  # Применение агрегирующей функции
    return (key, x)

# Группировка данных по ключу
def group_by_key(iterable):
    grouped = defaultdict(list)
    for k, v in iterable:
        grouped[k].append(v)
    return grouped.items()

# Генерация случайных кортежей
record = RECORDREADER(100)
print("Records:", record)

# MAP: Применение функции MAP к каждому элементу
map_output = list(map(MAP, record))
print("\nMAP output:", map_output)

# SHUFFLE: Группировка значений по ключу
shuffle_output = list(group_by_key(map_output))
print("\nShuffle output:", shuffle_output)

# REDUCE: Применение функции REDUCE к сгруппированным данным
reduce_output = list(map(lambda x: REDUCE(*x), shuffle_output))
print("\nAggregation Result:", reduce_output)


Records: [(3, 2, 2), (1, 0, 0), (2, 0, 1), (0, 1, 1), (2, 1, 3), (1, 1, 1), (2, 3, 0), (2, 2, 3), (0, 3, 0), (1, 0, 0), (0, 1, 0), (2, 1, 1), (1, 0, 2), (1, 1, 0), (1, 1, 0), (1, 2, 2), (1, 1, 1), (2, 0, 0), (3, 2, 0), (1, 2, 1), (1, 3, 1), (0, 0, 2), (3, 0, 3), (2, 0, 3), (1, 0, 0), (3, 2, 1), (0, 0, 0), (0, 0, 1), (3, 3, 3), (0, 1, 1), (2, 0, 0), (2, 3, 2), (3, 3, 1), (1, 0, 0), (3, 1, 2), (3, 2, 1), (2, 0, 3), (2, 0, 3), (0, 2, 3), (0, 2, 0), (1, 0, 2), (3, 2, 3), (0, 2, 0), (2, 1, 2), (1, 1, 3), (2, 1, 1), (0, 1, 0), (1, 3, 3), (1, 0, 3), (1, 1, 0), (2, 3, 0), (0, 0, 2), (0, 1, 0), (3, 0, 0), (0, 1, 1), (2, 1, 0), (2, 0, 0), (3, 0, 3), (2, 3, 3), (0, 1, 1), (0, 1, 0), (1, 1, 0), (0, 3, 3), (2, 1, 1), (3, 3, 3), (1, 1, 2), (3, 2, 3), (2, 0, 1), (3, 2, 0), (3, 1, 3), (2, 3, 3), (0, 2, 1), (0, 3, 2), (3, 2, 1), (2, 1, 0), (1, 3, 1), (3, 1, 2), (0, 0, 3), (0, 3, 2), (0, 1, 1), (1, 1, 0), (3, 2, 0), (0, 2, 3), (2, 3, 1), (1, 0, 2), (1, 0, 0), (2, 0, 2), (1, 1, 3), (3, 0, 2), (2, 3, 3), 

#

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [None]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [None]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j,k), big_mat[j,k])

def MAP(k1, v1):
  (j, k) = k1
  w = v1
  for i in range(small_mat.shape[0]):
    yield ((i, k), w * small_mat[i][j])

def REDUCE(key, values):
  (i, k) = key
  el_value = 0
  for v in values:
    el_value += v
  yield ((i, k), el_value)

Проверьте своё решение

In [None]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [None]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [None]:
import numpy as np
from collections import defaultdict

# Параметры матриц
I = 2  # Строки в small_mat
J = 3  # Общее измерение
K = 4 * 10  # Столбцы в big_mat

# Кэш для хранения сгенерированных значений
record_cache = []

# Функция RECORDREADER: Генерация обеих матриц с кэшированием
def RECORDREADER():
    global record_cache
    if not record_cache:  # Кэшируем только при первом вызове
        # Генерация первой матрицы small_mat
        for i in range(I):
            for j in range(J):
                value = np.random.rand()
                record_cache.append(((0, i, j), value))
                yield ((0, i, j), value)

        # Генерация второй матрицы big_mat
        for j in range(J):
            for k in range(K):
                value = np.random.rand()
                record_cache.append(((1, j, k), value))
                yield ((1, j, k), value)
    else:
        # Если кэш уже заполнен, используем его
        for record in record_cache:
            yield record

# MAP_JOIN: Подготовка данных для перемножения
def MAP_JOIN(k1, v1):
    mat_num, i_or_j, j_or_k = k1  # Корректное распаковка
    w = v1  # Значение элемента
    if mat_num == 0:
        # Для первой матрицы: (i, j)
        yield (j_or_k, (mat_num, i_or_j, w))
    else:
        # Для второй матрицы: (j, k)
        yield (i_or_j, (mat_num, j_or_k, w))

# REDUCE_JOIN: Перемножение элементов
def REDUCE_JOIN(key, values):
    from_first_mat = [v for v in values if v[0] == 0]
    from_second_mat = [v for v in values if v[0] == 1]
    # Перемножение всех пар значений с одинаковым j
    for f in from_first_mat:
        for s in from_second_mat:
            yield ((f[1], s[1]), f[2] * s[2])

# MAP_MUL: Подготовка данных для суммирования произведений
def MAP_MUL(k1, v1):
    (i, k) = k1  # Индексы результирующего элемента
    yield ((i, k), v1)

# REDUCE_MUL: Суммирование произведений
def REDUCE_MUL(key, values):
    res_el_value = sum(values)  # Сумма всех произведений
    yield (key, res_el_value)

# Функция для группировки данных (SHUFFLE)
def group_by_key(iterable):
    grouped = defaultdict(list)
    for k, v in iterable:
        grouped[k].append(v)
    return grouped.items()

# Реализация MapReduce
def MapReduce(record_reader, mapper, reducer):
    # MAP step
    map_output = []
    for record in record_reader():
        map_output.extend(mapper(*record))

    # SHUFFLE step
    shuffle_output = list(group_by_key(map_output))

    # REDUCE step
    reduce_output = []
    for key, values in shuffle_output:
        reduce_output.extend(reducer(key, values))

    return reduce_output

# JOIN этап
joined = MapReduce(RECORDREADER, MAP_JOIN, REDUCE_JOIN)

# GET_JOINED: Передача результатов объединения для суммирования
def GET_JOINED():
    for j in joined:
        yield j

# MULTIPLY этап
solution = MapReduce(GET_JOINED, MAP_MUL, REDUCE_MUL)

# Преобразование в матрицу
def asmatrix(solution):
    result_mat = np.zeros((I, K))
    for ((i, k), value) in solution:
        result_mat[i, k] = value
    return result_mat

# Проверка с использованием numpy.dot
small_mat = np.array([[v for ((_, i, j), v) in record_cache if _ == 0 and i == row] for row in range(I)])
big_mat = np.array([[v for ((_, j, k), v) in record_cache if _ == 1 and j == row] for row in range(J)])

reference_solution = np.matmul(small_mat, big_mat)
result_matrix = asmatrix(solution)

print("\nResultant Matrix (MapReduce):\n", result_matrix)
print("\nCheck with numpy.dot:\n", reference_solution)
print("\nDifference:\n", np.abs(result_matrix - reference_solution))
print("\nCorrect:", np.allclose(reference_solution, result_matrix))


Resultant Matrix (MapReduce):
 [[1.16998884 0.74857894 0.59655845 0.86089913 1.05084297 0.46238732
  0.60998335 0.5787672  0.2798795  0.97921592 0.11561596 0.58361453
  0.77478158 0.64598837 0.94937878 1.16834925 0.94250859 0.8203258
  0.84735214 0.51402636 0.67124381 0.77421719 0.84415353 1.40362589
  0.38366348 1.3688546  0.76300283 1.17995188 0.75686361 0.61431964
  0.54878967 0.50526924 0.2112256  1.05828342 1.33770277 0.09546581
  0.96574133 0.79282211 0.8390534  0.92028196]
 [1.06128623 0.69709042 0.38962363 0.86860325 0.74943382 0.53444034
  0.38190694 0.7241297  0.20140485 0.75592905 0.14266861 0.69429569
  0.81536438 0.47334684 0.60075156 0.91125492 0.67182009 0.83795434
  0.6641379  0.37693995 0.66195913 0.80375466 0.68340459 1.20984647
  0.39344813 1.12713526 0.80450172 0.99964765 0.53205305 0.62185751
  0.6939183  0.3743349  0.14938622 0.97306561 1.18723882 0.12266247
  0.59481013 0.86365745 0.59354622 0.63948498]]

Check with numpy.dot:
 [[1.16998884 0.74857894 0.59655845

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [None]:
import numpy as np
from collections import defaultdict

I, J, K = 2, 3, 40
maps = 4
reducers = 2

small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)

reference_solution = np.matmul(small_mat, big_mat)

def flatten(nested_iterable):
    for iterable in nested_iterable:
        yield from iterable

def group_by_key(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
    partitions = defaultdict(dict)
    for map_partition in map_partitions:
        for k2, v2 in map_partition:
            partitions[PARTITIONER(k2)][k2] = partitions[PARTITIONER(k2)].get(k2, []) + [v2]
    return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for partition_id, partition in partitions.items()]

def PARTITIONER(obj):
    global reducers
    return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
    map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())

    if COMBINER:
        map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), group_by_key(map_partition))), map_partitions)

    reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER)

    reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

    print(f"{sum(len(vs) for _, vs in flatten([partition for _, partition in reduce_partitions]))} key-value pairs were sent over a network.")
    return reduce_outputs

def asmatrix(reduce_output):
    reduce_output = list(reduce_output)
    I_max = max(i for ((i, _), _) in reduce_output) + 1
    K_max = max(k for ((_, k), _) in reduce_output) + 1
    mat = np.zeros((I_max, K_max))
    for (i, k), value in reduce_output:
        mat[i, k] = value
    return mat

# RECORDREADER для первой матрицы
def RECORDREADER_SMALL():
    return [((0, i, j), small_mat[i, j]) for i in range(I) for j in range(J)]

# RECORDREADER для второй матрицы
def RECORDREADER_BIG():
    return [((1, j, k), big_mat[j, k]) for j in range(J) for k in range(K)]

# INPUTFORMAT теперь использует два RECORDREADER
def INPUTFORMAT():
    yield RECORDREADER_SMALL()
    yield RECORDREADER_BIG()

def MAP_JOIN(k1, v1):
    mat_num, i, j = k1
    if mat_num == 0:
        yield j, (mat_num, i, v1)
    else:
        yield i, (mat_num, j, v1)

def REDUCE_JOIN(key, values):
    from_first_mat = [v for v in values if v[0] == 0]
    from_second_mat = [v for v in values if v[0] == 1]
    for f in from_first_mat:
        for s in from_second_mat:
            yield (f[1], s[1]), f[2] * s[2]

def GET_JOINED():
    for j in joined:
        yield j[1]

def MAP_MUL(k1, v1):
    yield k1, v1

def REDUCE_MUL(key, values):
    yield key, sum(values)

# JOIN этап
joined = MapReduceDistributed(INPUTFORMAT, MAP_JOIN, REDUCE_JOIN)
joined = [(partition_id, list(partition)) for partition_id, partition in joined]
print("\nJOINED:")
print(joined)

# Этап умножения
mul_output = MapReduceDistributed(GET_JOINED, MAP_MUL, REDUCE_MUL)
pre_result = [(partition_id, list(partition)) for partition_id, partition in mul_output]
print("\nMULTIPLICATION RESULT:")
print(pre_result)

# Преобразование в матрицу
solution = [v for p in pre_result for v in p[1]]
result_matrix = asmatrix(solution)
print("\nRESULT MATRIX:")
print(result_matrix)

# Проверка результата
print("\nCorrect:", np.allclose(reference_solution, asmatrix(solution)))

126 key-value pairs were sent over a network.

JOINED:
[(0, [((0, 0), 0.09422732508183781), ((0, 1), 0.07045569979012038), ((0, 2), 0.47056192436413025), ((0, 3), 0.033253948026871136), ((0, 4), 0.33198032832192526), ((0, 5), 0.19415411919392067), ((0, 6), 0.12876782957202684), ((0, 7), 0.12515411237481228), ((0, 8), 0.19888056975756585), ((0, 9), 0.09614933581257196), ((0, 10), 0.46930267303061973), ((0, 11), 0.18295001947328113), ((0, 12), 0.48249466278740616), ((0, 13), 0.42546326557050784), ((0, 14), 0.39202910891184833), ((0, 15), 0.4025971936918404), ((0, 16), 0.34135000279031524), ((0, 17), 0.41438008131320786), ((0, 18), 0.09362128191982207), ((0, 19), 0.007947272425873), ((0, 20), 0.3151192596961567), ((0, 21), 0.4182480558888197), ((0, 22), 0.08972690965175653), ((0, 23), 0.23219100392743383), ((0, 24), 0.22245457297353013), ((0, 25), 0.19295720772340008), ((0, 26), 0.4325969222512119), ((0, 27), 0.2546895041260877), ((0, 28), 0.19766522161898864), ((0, 29), 0.021223949621030

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [None]:
import numpy as np
import random

I, J, K = 2, 3, 40
maps = 3
reducers = 2
random_subset_fraction = 0.7  # 70% элементов попадают в каждый RECORDREADER

small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)
reference_solution = np.matmul(small_mat, big_mat)

def flatten(nested_iterable):
    for iterable in nested_iterable:
        if iterable:  # Проверка на None
            yield from iterable

def groupbykey(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]
    return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
    global reducers
    partitions = [dict() for _ in range(reducers)]
    for map_partition in map_partitions:
        for (k2, v2) in map_partition:
            p = partitions[PARTITIONER(k2)]
            p[k2] = p.get(k2, []) + [v2]
    return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
    global reducers
    return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
    map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())

    if COMBINER is not None:
        map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)

    reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER)

    reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

    print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k, vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
    return reduce_outputs

def asmatrix(reduce_output):
    reduce_output = list(reduce_output)
    I = max(i for ((i, k), vw) in reduce_output) + 1
    K = max(k for ((i, k), vw) in reduce_output) + 1
    mat = np.zeros(shape=(I, K))  # Используем нули для отсутствующих элементов
    for ((i, k), vw) in reduce_output:
        mat[i, k] = vw
    return mat

def INPUTFORMAT():
    first_mat = []

    for i in range(small_mat.shape[0]):
        for j in range(small_mat.shape[1]):
            first_mat.append(((0, i, j), small_mat[i, j]))

    global maps
    split_size = int(np.ceil(len(first_mat) / maps))

    for i in range(0, len(first_mat), split_size):
      yield first_mat[i:i+split_size]

    second_mat = []

    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1]):
            second_mat.append(((1, j, k), big_mat[j, k]))

    split_size = int(np.ceil(len(second_mat) / maps))

    for i in range(0, len(second_mat), split_size):
      yield second_mat[i:i+split_size]

def MAP_JOIN(k1, v1):
  (mat_num, i, j) = k1
  w = v1
  if mat_num == 0:
    yield (j, (mat_num, i, w))
  else:
    yield (i, (mat_num, j, w))

def REDUCE_JOIN(key, values):
  from_first_mat = [v for v in values if v[0] == 0]
  from_second_mat = [v for v in values if v[0] == 1]

  for f in from_first_mat:
    for s in from_second_mat:
      yield ((f[1], s[1]), f[2] * s[2])

def GET_JOINED():
    for j in joined:
        yield j[1]

def MAP_MUL(k1, v1):
    yield (k1, v1)
    return

def REDUCE_MUL(key, values):
    res_val = 0
    for v in values:
        res_val += v
    yield (key, res_val)
    return

# JOIN этап
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP_JOIN, REDUCE_JOIN, COMBINER=None)
joined = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
print("\nJOINED:", joined)

# MUL этап
mul_output = MapReduceDistributed(GET_JOINED, MAP_MUL, REDUCE_MUL, COMBINER=None)
pre_result = [(partition_id, list(partition)) for (partition_id, partition) in mul_output]
print("\nPRE RESULT:", pre_result)

# Обработка результата
solution = [v for p in pre_result for v in p[1]]

# Проверка результата
print("\nRESULT MATRIX:\n", asmatrix(solution))
print("\nREFERENCE MATRIX:\n", reference_solution)

# Проверка результата
print("\nCorrect:", np.allclose(reference_solution, asmatrix(solution)))

126 key-value pairs were sent over a network.

JOINED: [(0, [((0, 0), 0.017180822707976977), ((0, 1), 0.02884425049488929), ((0, 2), 0.0030628278431351916), ((0, 3), 0.028115301275349187), ((0, 4), 0.016818304678886085), ((0, 5), 0.0027028651050937493), ((0, 6), 0.04345143566404781), ((0, 7), 0.005652534961294009), ((0, 8), 0.003354427713820848), ((0, 9), 0.04351614563209779), ((0, 10), 0.012135733691456863), ((0, 11), 0.006801744472364681), ((0, 12), 0.027356355371579606), ((0, 13), 0.04014421769218702), ((0, 14), 0.0005325109032357497), ((0, 15), 0.011629825511373721), ((0, 16), 0.03861206479390508), ((0, 17), 0.03653269342881741), ((0, 18), 0.013900322213037287), ((0, 19), 0.015006293376084138), ((0, 20), 0.0322856102127996), ((0, 21), 0.017574810477525956), ((0, 22), 0.009435158173867439), ((0, 23), 0.0182270138177874), ((0, 24), 0.015448859417890615), ((0, 25), 0.00820345473075618), ((0, 26), 0.044439659541188316), ((0, 27), 0.004479188559004987), ((0, 28), 0.020735658895862007), 