# Введение в MapReduce модель на Python


In [1]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator


In [2]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [3]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str
  
  def __str__(self):
        return f"User(id={self.id}, age={self.age}, gender={self.gender}, social_contacts={self.social_contacts})"

In [4]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [5]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [6]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [7]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [8]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [9]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [10]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [11]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [12]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных.

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [13]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*

mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL

In [14]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)

def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication

In [15]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(5) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])

def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, np.float64(2.462688996428474)),
 (1, np.float64(2.462688996428474)),
 (2, np.float64(2.462688996428474)),
 (3, np.float64(2.462688996428474)),
 (4, np.float64(2.462688996428474))]

## Inverted index

In [16]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)

def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)

def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('what', ['0', '1']),
 ('it', ['0', '1', '2']),
 ('is', ['0', '1', '2']),
 ('a', ['2']),
 ('banana', ['2'])]

## WordCount

In [17]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [18]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers

def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)

  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*

e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*

flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount

In [19]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)

  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):
    yield (word, 1)

def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

# try to set COMBINER=REDUCER and look at the number of values sent over the network
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('what', 10)]),
 (1, [('a', 2), ('banana', 2), ('is', 18), ('it', 18)])]

## TeraSort

In [20]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps

  def RECORDREADER(split):
    for value in split:
        yield (value, None)

  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])

def MAP(value:int, _):
  yield (value, None)

def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, np.float64(0.027138773292931906)),
   (None, np.float64(0.08356263600422342)),
   (None, np.float64(0.15681484862983108)),
   (None, np.float64(0.15987107383532406)),
   (None, np.float64(0.16157845934304993)),
   (None, np.float64(0.18990358218140502)),
   (None, np.float64(0.20299276342994776)),
   (None, np.float64(0.20911968010164816)),
   (None, np.float64(0.21105793005202045)),
   (None, np.float64(0.2301916297392297)),
   (None, np.float64(0.25658617713672927)),
   (None, np.float64(0.28832201484558895)),
   (None, np.float64(0.29922218092157216)),
   (None, np.float64(0.4827766626191481))]),
 (1,
  [(None, np.float64(0.5426546868573219)),
   (None, np.float64(0.5739871586959192)),
   (None, np.float64(0.6147482551207583)),
   (None, np.float64(0.62874950495668)),
   (None, np.float64(0.6309815281111341)),
   (None, np.float64(0.6522199394840711)),
   (None, np.float64(0.6544280147908885)),
   (None, np.float64(0.6626954044700207)),
   (None, np.float64(0.74195179

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [21]:
import random
from typing import Generator, Tuple, List

random_list = [random.randint(0, 10000) for _ in range(1000)]

def RECORDREADER() -> Generator[Tuple[int, int], None, None]:
    """
    Генератор, который читает данные из случайного списка.

    Возвращает кортежи, где первый элемент всегда 0 (идентификатор),
    а второй элемент - случайное значение из списка.
    """
    for value in random_list:
        yield 0, value

def MAP(id: int, value: int) -> Generator[Tuple[int, int], None, None]:
    """
    Функция отображения (Map), которая принимает идентификатор и значение.

    Возвращает кортеж, содержащий идентификатор и значение.
    """
    yield id, value

def REDUCE(id: int, values: List[int]) -> Generator[Tuple[int, int], None, None]:
    """
    Функция редукции, которая принимает идентификатор и список значений.

    Возвращает кортеж, содержащий идентификатор и максимальное значение из списка.
    """
    yield id, max(values)

print(list(MapReduce(RECORDREADER, MAP, REDUCE)))

[(0, 9996)]


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [22]:
input_data = [random.randint(0, 10) for _ in range(3)]
print(input_data)

[0, 0, 2]


In [23]:
def RECORDREADER() -> Generator[Tuple[int, int], None, None]:
    """
    Генератор, который читает данные из случайного списка.

    Возвращает кортежи, где первый элемент всегда 0 (идентификатор),
    а второй элемент - случайное значение из списка.
    """
    for value in input_data:
        yield 0, value

def MAP(id: int, value: int) -> Generator[Tuple[int, int], None, None]:
    """
    Функция отображения, которая принимает идентификатор и значение.

    Возвращает кортеж, содержащий идентификатор и значение.
    """
    yield id, value

def REDUCE(id: int, values: List[int]) -> Generator[Tuple[int, int], None, None]:
    """
    Функция редукции, которая принимает идентификатор и список значений.

    Возвращает кортеж, содержащий идентификатор и арифметическое среднее из списка.
    """
    yield id, sum(values)/len(values)

print(list(MapReduce(RECORDREADER, MAP, REDUCE)))

[(0, 0.6666666666666666)]


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [24]:
from typing import List, Tuple, Iterator, Dict, Any
from collections import defaultdict

input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]


def RECORDREADER(key: str) -> Iterator[Tuple[Any, User]]:
    """
    Генератор, который извлекает значения по указанному ключу (атрибуту) из коллекции пользователей.
    """
    for user in input_collection:
        yield getattr(user, key), user


def groupByKey(iterable: Iterator[Tuple[Any, Any]]) -> Iterator[Tuple[Any, List[Any]]]:
    """
    Группирует элементы по ключу.
    Принимает на вход итерируемый объект, состоящий из пар (key, value),
    """
    groups = defaultdict(list)  # Используем defaultdict для автоматического создания списков
    for key, value in iterable:
        groups[key].append(value)  # Добавляем значение в соответствующий список
    return groups.items()

def flatten(nested_iterable: Iterator[Iterator[Any]]) -> Iterator[Any]:
    """
    Разворачивает вложенные итерируемые объекты в плоский итератор.
    """
    for inner_iterable in nested_iterable:
        yield from inner_iterable


print(list(groupByKey(flatten(map(lambda x: MAP(*x), RECORDREADER("id"))))), end="\n\n")

print(list(groupByKey(flatten(map(lambda x: MAP(*x), RECORDREADER("gender"))))), end="\n\n")

print(list(groupByKey(flatten(map(lambda x: MAP(*x), RECORDREADER("age"))))))

[(0, [User(id=0, age=55, social_contacts=20, gender='male')]), (1, [User(id=1, age=25, social_contacts=240, gender='female')]), (2, [User(id=2, age=25, social_contacts=500, gender='female')]), (3, [User(id=3, age=33, social_contacts=800, gender='female')])]

[('male', [User(id=0, age=55, social_contacts=20, gender='male')]), ('female', [User(id=1, age=25, social_contacts=240, gender='female'), User(id=2, age=25, social_contacts=500, gender='female'), User(id=3, age=33, social_contacts=800, gender='female')])]

[(55, [User(id=0, age=55, social_contacts=20, gender='male')]), (25, [User(id=1, age=25, social_contacts=240, gender='female'), User(id=2, age=25, social_contacts=500, gender='female')]), (33, [User(id=3, age=33, social_contacts=800, gender='female')])]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [25]:
import numpy as np

users_data = [
    User(id=0, age=5, gender='male', social_contacts=20),
    User(id=1, age=2, gender='female', social_contacts=24),
    User(id=2, age=2, gender='female', social_contacts=50),
    User(id=3, age=3, gender='female', social_contacts=80),
    User(id=4, age=3, gender='female', social_contacts=80)
]

num_mappers = 2  # Количество мапперов
num_reducers = 2  # Количество редьюсеров

def INPUTFORMAT():
    """Функция, разбивающая данные на части для мапперов."""
    global num_mappers
    
    def RECORDREADER(user_partition):
        """Генератор, который поочередно возвращает пользователей из разбиения."""
        for user in user_partition:
            yield (user.id, user)
    
    partition_size = int(np.ceil(len(users_data) / num_mappers))  # Вычисляем размер каждой части
    
    # Разбиваем данные на части и создаем генераторы для каждой из них
    for start_idx in range(0, len(users_data), partition_size):
        yield RECORDREADER(users_data[start_idx:start_idx + partition_size])

def MAP(user_id: int, user: User):
    """Функция маппера: просто передаем данные пользователя дальше."""
    yield (user_id, user)

def REDUCE(_, user_list: list):
    """Функция редьюсера: удаляет дубликаты пользователей."""
    yield list(set(user_list))
    

map_reduce_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)

final_output = [(partition_id, list(partition)) for (partition_id, partition) in map_reduce_output]
final_output

5 key-value pairs were sent over a network.


[(0,
  [[User(id=0, age=5, social_contacts=20, gender='male')],
   [User(id=2, age=2, social_contacts=50, gender='female')],
   [User(id=4, age=3, social_contacts=80, gender='female')]]),
 (1,
  [[User(id=1, age=2, social_contacts=24, gender='female')],
   [User(id=3, age=3, social_contacts=80, gender='female')]])]

#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [26]:
user_data = [
    User(id=0, age=5, gender='male', social_contacts=20),
    User(id=1, age=2, gender='female', social_contacts=24),
    User(id=2, age=2, gender='male', social_contacts=50),
    User(id=3, age=3, gender='female', social_contacts=80)
]

def RECORDREADER():
    """Генератор, который поочередно возвращает пользователей."""
    for user in user_data:
        yield (user.id, user)

def MAP(user_id: int, user: User):
    """Функция маппера: фильтрует пользователей младше 3 лет."""
    if user.age < 3 and user.gender == 'female':
        yield (user, user)

def REDUCE(user_id: int, user: User):
    """Функция редьюсера: передает пользователя без изменений."""
    yield user, user

map_reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
map_reduce_output

[([User(id=1, age=2, social_contacts=24, gender='female')],
  [User(id=1, age=2, social_contacts=24, gender='female')])]

### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [27]:
from typing import Generator, Tuple, List, NamedTuple

class User:
    def __init__(self, id: int, age: int, gender: str, social_contacts: int):
        self.id = id
        self.age = age
        self.gender = gender
        self.social_contacts = social_contacts

class UserProjection(NamedTuple):
    id: int
    gender: str

data = [
    User(id=0, age=5, gender='male', social_contacts=20),
    User(id=1, age=2, gender='female', social_contacts=24),
    User(id=2, age=2, gender='female', social_contacts=50),
    User(id=3, age=3, gender='female', social_contacts=80)
]

def RECORDREADER():
    """
    Генератор, который читает данные из коллекции data.
    """
    for user in data:
        yield (user.id, user)

def MAP(userId: int, user: User):
    """
    Создает проекцию пользователя и возвращает пару (t', t').
    """
    new_user = UserProjection(userId, user.gender)
    yield (new_user, new_user)

def REDUCE(key: UserProjection, usersList: List[UserProjection]):
    """
    Для каждого ключа t' возвращает одну пару (t', t').
    """
    yield (key, key)


output = list(MapReduce(RECORDREADER, MAP, REDUCE))
output

[(UserProjection(id=0, gender='male'), UserProjection(id=0, gender='male')),
 (UserProjection(id=1, gender='female'),
  UserProjection(id=1, gender='female')),
 (UserProjection(id=2, gender='female'),
  UserProjection(id=2, gender='female')),
 (UserProjection(id=3, gender='female'),
  UserProjection(id=3, gender='female'))]

### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [28]:
import numpy as np
from typing import NamedTuple

class User(NamedTuple):
    id: int
    age: int
    social_contacts: int
    gender: str
    
    def __str__(self):
        return f"User(id={self.id}, age={self.age}, gender={self.gender}, social_contacts={self.social_contacts})"

# Исходные данные - два списка пользователей
users_data_1 = [
    User(id=0, age=5, gender='male', social_contacts=20),
    User(id=1, age=2, gender='female', social_contacts=24),
    User(id=2, age=2, gender='female', social_contacts=50),
    User(id=3, age=3, gender='female', social_contacts=80)
]

users_data_2 = [
    User(id=4, age=5, gender='male', social_contacts=20),
    User(id=5, age=2, gender='female', social_contacts=24),
    User(id=6, age=2, gender='female', social_contacts=50),
    User(id=7, age=3, gender='female', social_contacts=80)
]

# Объединяем списки данных
all_users_data = [users_data_1, users_data_2]

def RECORDREADER():
    """Генератор, который поочередно возвращает пользователей из всех списков."""
    for user_list in all_users_data:
        for user in user_list:
            yield (user.id, user)

def MAP(user_id: int, user: User):
    """Функция маппера: передает объект пользователя дальше."""
    yield (user, user)

def REDUCE(key: User, user_list: list):
    """Функция редьюсера: убирает дубликаты пользователей."""
    yield (key, key)

# Выполняем MapReduce-процесс
map_reduce_output = MapReduce(RECORDREADER, MAP, REDUCE)

# Преобразуем результат в читаемый формат
final_output = [str(user) for user in map_reduce_output]
final_output


["(User(id=0, age=5, social_contacts=20, gender='male'), User(id=0, age=5, social_contacts=20, gender='male'))",
 "(User(id=1, age=2, social_contacts=24, gender='female'), User(id=1, age=2, social_contacts=24, gender='female'))",
 "(User(id=2, age=2, social_contacts=50, gender='female'), User(id=2, age=2, social_contacts=50, gender='female'))",
 "(User(id=3, age=3, social_contacts=80, gender='female'), User(id=3, age=3, social_contacts=80, gender='female'))",
 "(User(id=4, age=5, social_contacts=20, gender='male'), User(id=4, age=5, social_contacts=20, gender='male'))",
 "(User(id=5, age=2, social_contacts=24, gender='female'), User(id=5, age=2, social_contacts=24, gender='female'))",
 "(User(id=6, age=2, social_contacts=50, gender='female'), User(id=6, age=2, social_contacts=50, gender='female'))",
 "(User(id=7, age=3, social_contacts=80, gender='female'), User(id=7, age=3, social_contacts=80, gender='female'))"]

### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [29]:
import numpy as np

user_data_source_1 = [
    User(id=0, age=5, gender='male', social_contacts=20),
    User(id=1, age=2, gender='female', social_contacts=24),
    User(id=2, age=2, gender='female', social_contacts=50),
    User(id=3, age=3, gender='female', social_contacts=80)
]

user_data_source_2 = [
    User(id=0, age=5, gender='male', social_contacts=20),
    User(id=1, age=2, gender='female', social_contacts=24),
    User(id=2, age=2, gender='female', social_contacts=50),
    User(id=4, age=5, gender='female', social_contacts=80)
]

all_user_data_sources = [user_data_source_1, user_data_source_2]

def RECORDREADER():
    """Генератор, который поочередно возвращает пользователей из всех списков."""
    for user_list in all_user_data_sources:
        for user in user_list:
            yield (user.id, user)

def MAP(user_id: int, user: User):
    """Функция маппера: передает объект пользователя дальше без изменений."""
    yield (user, user)

def REDUCE(user_key: User, user_occurrences: list):
    """Функция редьюсера: оставляет пользователей, которые встречаются дважды."""
    if len(user_occurrences) == 2:
        yield (user_key, user_key)

map_reduce_output = MapReduce(RECORDREADER, MAP, REDUCE)

final_output = [str(user) for user in map_reduce_output]
final_output


["(User(id=0, age=5, social_contacts=20, gender='male'), User(id=0, age=5, social_contacts=20, gender='male'))",
 "(User(id=1, age=2, social_contacts=24, gender='female'), User(id=1, age=2, social_contacts=24, gender='female'))",
 "(User(id=2, age=2, social_contacts=50, gender='female'), User(id=2, age=2, social_contacts=50, gender='female'))"]

### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [30]:
user_data_source_1 = [
    User(id=0, age=5, gender='male', social_contacts=20),
    User(id=1, age=2, gender='female', social_contacts=24),
    User(id=2, age=2, gender='female', social_contacts=50),
    User(id=3, age=3, gender='female', social_contacts=80)
]

user_data_source_2 = [
    User(id=0, age=5, gender='male', social_contacts=20),
    User(id=1, age=2, gender='female', social_contacts=24),
    User(id=3, age=2, gender='female', social_contacts=50),
    User(id=4, age=3, gender='female', social_contacts=80)
]

all_user_data_sources = [user_data_source_1, user_data_source_2]

def RECORDREADER():
    """Генератор, который поочередно возвращает пользователей из всех списков."""
    for source_id, user_list in enumerate(all_user_data_sources):
        for user in user_list:
            yield (source_id, user)

def MAP(source_id: int, user: User):
    """Функция маппера: связывает пользователя с его источником данных."""
    yield (user, source_id)

def REDUCE(user_key: User, source_list: list):
    """Функция редьюсера: оставляет только тех пользователей, которые есть только в первом списке."""
    if len(source_list) == 1 and source_list[0] == 0:
        yield (user_key, user_key)

map_reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
map_reduce_output

[(User(id=2, age=2, social_contacts=50, gender='female'),
  User(id=2, age=2, social_contacts=50, gender='female')),
 (User(id=3, age=3, social_contacts=80, gender='female'),
  User(id=3, age=3, social_contacts=80, gender='female'))]

### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [31]:
import numpy as np
from typing import NamedTuple

class UserProjection(NamedTuple):
    id: int
    gender: str

class GenderSalary(NamedTuple):
    gender: str
    salary: int

users = [
    UserProjection(id=1, gender='female'),
    UserProjection(id=2, gender='female'),
    UserProjection(id=3, gender='male'),
    UserProjection(id=4, gender='female')
]

salaries = [
    GenderSalary(gender='male', salary=500),
    GenderSalary(gender='female', salary=650)
]

data_list = [users, salaries]

def RECORDREADER():
    """Генератор, который поочередно возвращает записи из всех списков."""
    for dataset_id, dataset in enumerate(data_list):
        for obj in dataset:
            yield (dataset_id, obj)

def MAP(dataset_id: int, obj: NamedTuple):
    """Функция маппера: разделяет данные пользователей и зарплат."""
    if dataset_id == 0:
        yield (obj.gender, (0, obj.id))  # Пользователь, отмеченный как (0, id)
    else:
        yield (obj.gender, (1, obj.salary))  # Зарплата, отмеченная как (1, salary)

def REDUCE(gender: str, grouped_data: list):
    """Функция редьюсера: соединяет пользователей с их зарплатами по полу."""
    users_group = [entry for (dataset_id, entry) in grouped_data if dataset_id == 0]
    salaries_group = [entry for (dataset_id, entry) in grouped_data if dataset_id == 1]
    
    for user_id in users_group:
        for salary in salaries_group:
            yield (user_id, gender, salary)

map_reduce_output = MapReduce(RECORDREADER, MAP, REDUCE)
final_output = list(map_reduce_output)
final_output


[(1, 'female', 650), (2, 'female', 650), (4, 'female', 650), (3, 'male', 500)]

### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примените аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [32]:
class SalaryInfo(NamedTuple):
    group_id: int
    salary: int
    gender: str


class EmployeeData(NamedTuple):
    gender: str
    salary: int


salary_group_1 = [
    EmployeeData(gender='female', salary=150),
    EmployeeData(gender='male', salary=50),
    EmployeeData(gender='male', salary=85),
    EmployeeData(gender='male', salary=90),
    EmployeeData(gender='female', salary=250),
]

salary_group_2 = [
    EmployeeData(gender='male', salary=200),
    EmployeeData(gender='female', salary=350),
    EmployeeData(gender='female', salary=550),
    EmployeeData(gender='male', salary=650),
    EmployeeData(gender='male', salary=50),
]

data_groups = [salary_group_1, salary_group_2]

def RECORDREADER():
    """Функция-генератор: добавляет идентификатор группы к каждой записи."""
    for index, salary_list in enumerate(data_groups):
        for record in salary_list:
            yield (index, SalaryInfo(index, record.salary, record.gender))

def MAP(group_id: int, salary_info: SalaryInfo):
    """Функция маппера: передает объект зарплаты дальше без изменений."""
    yield (group_id, salary_info.salary)

def REDUCE(group_id: int, salaries: list):
    """Функция редьюсера: вычисляет суммарную зарплату по группам."""
    yield (group_id, sum(salaries))

# Выполнение MapReduce
output_result = MapReduce(RECORDREADER, MAP, REDUCE)
output_result = list(output_result)
output_result

[(0, 625), (1, 1800)]

#

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [33]:
import numpy as np

# Исходные данные: матрица и вектор
matrix_data = np.ones((8, 4))
vector_data = np.array([2, 4, 1, 1])

num_mappers = 3
num_reducers = 1

def RECORDREADER():
    """Функция для разбиения матрицы на части и передачи в мапперы."""
    global num_mappers
    
    def record_reader(matrix_chunk):
        for row_idx in range(matrix_chunk.shape[0]):
            for col_idx in range(matrix_chunk.shape[1]):
                yield ((row_idx, col_idx), (matrix_chunk[row_idx, col_idx], vector_data[col_idx], matrix_chunk.shape[1]))
    
    chunk_size = int(np.ceil(len(matrix_data) / num_mappers))
    
    for i in range(0, len(matrix_data), chunk_size):
        yield record_reader(matrix_data[i: i + chunk_size])

def MAP(coords: (int, int), values: (int, int, int)):
    """Функция маппера: вычисляет произведение элемента матрицы и соответствующего элемента вектора."""
    row, col = coords
    matrix_value, vector_value, total_cols = values
    yield ((row, total_cols), matrix_value * vector_value)

def REDUCE(keys: (int, int), product_values: list):
    """Функция редьюсера: вычисляет среднее значение произведений для строки."""
    row, total_cols = keys
    yield (row, sum(product_values) / (len(product_values) / total_cols))

# Запуск MapReduce
result = MapReduceDistributed(RECORDREADER, MAP, REDUCE, COMBINER=None)
result = [(partition_id, list(partition)) for (partition_id, partition) in result]

result

32 key-value pairs were sent over a network.


[(0, [(0, np.float64(8.0)), (1, np.float64(8.0)), (2, np.float64(8.0))]),
 (1, [])]

## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$.





In [34]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [35]:
import numpy as np

I = 2
J = 3
K = 4 * 10

small_mat = np.random.rand(I, J)  # Маленькая матрица, доступная во всех функциях
big_mat = np.random.rand(J, K)    # Большая матрица

def RECORDREADER():
    """Читает данные из большой матрицы и передает их в формате (индекс, значение)."""
    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1]):
            yield ((j, k), big_mat[j, k])

def MAP(coordinates: (int, int), value: int):
    """Функция маппера: берет значение из большой матрицы и умножает его на соответствующие элементы маленькой матрицы."""
    j, k = coordinates
    # Проходим по строкам маленькой матрицы
    # Индекс строки выходной матрицы — это строка из маленькой матрицы
    # Индекс столбца выходной матрицы — это столбец из большой матрицы
    for row in range(small_mat.shape[0]):
        yield ((row, k), value * small_mat[row][j])

def REDUCE(key, values):
    """Функция редьюсера: суммирует значения, соответствующие одному индексу в выходной матрице."""
    yield (key, sum(values))


Проверьте своё решение

In [36]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat)
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output) + 1
  K = max(k for ((i,k), vw) in reduce_output) + 1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [37]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [38]:
def RECORDREADER():
    """Генерирует элементы маленькой и большой матриц в виде пар (индекс, значение)."""
    # Сначала записываем данные из маленькой матрицы
    for k in range(big_mat.shape[1]):
        for i in range(small_mat.shape[0]):
            for j in range(small_mat.shape[1]):
                yield ((i, k), (0, small_mat[i, j]))

    # Затем добавляем элементы большой матрицы
    for j in range(big_mat.shape[0]):
        for i in range(small_mat.shape[0]):
            for k in range(big_mat.shape[1]):
                yield ((i, k), (1, big_mat[j, k]))


def MAP(coordinates: (int, int), values: (int, int)):
    """Передает входные данные дальше без изменений."""
    yield (coordinates, values)


def REDUCE(coordinates: (int, int), values: list):
    """Умножает соответствующие элементы маленькой и большой матриц, затем суммирует их."""
    big_mat_vals = [val for mat_idx, val in values if mat_idx == 1]
    small_mat_vals = [val for mat_idx, val in values if mat_idx == 0]
    yield (coordinates, np.dot(small_mat_vals, big_mat_vals))


output = MapReduce(RECORDREADER, MAP, REDUCE)

# Проверка корректности результата
np.allclose(reference_solution, asmatrix(output))  # Должно вернуть True


True

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER.

In [39]:
maps = 2  # Количество мапперов
reducers = 1  # Количество редьюсеров

def INPUTFORMAT():
    """Определяет, как данные разбиваются на части перед обработкой."""

    global maps
    num_small_rows = small_mat.shape[0]
    num_big_rows = big_mat.shape[0]

    # Генерируем записи из маленькой матрицы
    def RECORDREADER_SMALL(small_chunk, row_offset):
        """Обрабатывает фрагмент маленькой матрицы и создает пары (координаты, значение)."""
        for col_idx in range(big_mat.shape[1]):
            for row_idx in range(small_chunk.shape[0]):
                for inner_idx in range(small_chunk.shape[1]):
                    yield ((row_idx + row_offset, col_idx), (0, small_chunk[row_idx, inner_idx]))

    chunk_size = int(np.ceil(num_small_rows / maps))
    for row in range(0, num_small_rows, chunk_size):
        yield RECORDREADER_SMALL(small_mat[row: row + chunk_size], row)

    # Генерируем записи из большой матрицы
    def RECORDREADER_BIG(big_chunk):
        """Обрабатывает фрагмент большой матрицы и создает пары (координаты, значение)."""
        for row_idx in range(num_small_rows):
            for inner_idx in range(big_chunk.shape[0]):
                for col_idx in range(big_chunk.shape[1]):
                    yield ((row_idx, col_idx), (1, big_chunk[inner_idx, col_idx]))

    chunk_size = int(np.ceil(num_big_rows / maps))
    for row in range(0, num_big_rows, chunk_size):
        yield RECORDREADER_BIG(big_mat[row: row + chunk_size])


def MAP(coords: (int, int), values: (int, int)):
    """Передает входные данные дальше без изменений."""
    yield (coords, values)


def REDUCE(coords: (int, int), values: list):
    """Обрабатывает пары значений из маленькой и большой матриц и выполняет их скалярное произведение."""
    small_values = [val for mat_type, val in values if mat_type == 0]
    big_values = [val for mat_type, val in values if mat_type == 1]
    yield (coords, np.dot(small_values, big_values))


partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]


def asmatrix_distributed(mapped_results):
    """Преобразует результаты распределенного вычисления в матричный формат."""
    merged_results = []

    for _, values in mapped_results:
        for value in values:
            merged_results.append(value)

    # Объединяем одинаковые ключи, суммируя значения
    combined_dict = {}
    for key, value in merged_results:
        if key in combined_dict:
            combined_dict[key] += value
        else:
            combined_dict[key] = value

    # Конвертируем в формат кортежей
    return asmatrix((key, val) for key, val in combined_dict.items())


# Проверяем корректность вычислений
np.allclose(reference_solution, asmatrix_distributed(partitioned_output))  # Должно вернуть True

480 key-value pairs were sent over a network.


True

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [40]:
import random

def randomize_indices(matrix):
    """Создает список индексов элементов матрицы и перемешивает его случайным образом.
    
    Это позволяет моделировать случайный порядок выдачи элементов, что может использоваться 
    при разбиении данных между различными RECORDREADER-ами.
    """
    shuffled_indices = [(row, col) for row in range(matrix.shape[0]) for col in range(matrix.shape[1])]
    random.shuffle(shuffled_indices)
    return shuffled_indices

# Ответ на вопрос:
# Если RECORDREADER-ы будут выдавать случайное подмножество элементов матрицы,
# то стандартная обработка MapReduce может не сработать корректно, 
# поскольку возможны пропуски данных. Однако, если в конечном итоге будут
# собраны все элементы, то алгоритм суммирования и умножения по ключам 
# все равно отработает правильно.


### Рандомное распределение с использованием REDUCER'ов и RECORDREADER'ов

In [41]:
maps = 2
reducers = 1

def INPUTFORMAT():
    """Разбивает данные на небольшие части для параллельной обработки в MapReduce."""
    global maps
    num_small_rows = small_mat.shape[0]
    num_small_cols = small_mat.shape[1]

    num_big_rows = big_mat.shape[0]
    num_big_cols = big_mat.shape[1]

    # Читаем части малой матрицы и сопоставляем с большой
    def READ_SMALL_PART1(submatrix, index_pair):
        row_idx, col_idx = index_pair
        for col in range(num_big_cols):
            for i in range(submatrix.shape[0]):
                for j in range(submatrix.shape[1]):
                    yield ((i + row_idx, col), (0, col_idx, submatrix[i, j]))

    def READ_SMALL_PART2(submatrix, index_pair):
        row_idx, col_idx = index_pair
        for col in range(num_big_cols):
            for i in range(submatrix.shape[0]):
                for j in range(submatrix.shape[1]):
                    yield ((i + row_idx, col), (0, col_idx, submatrix[i, j]))

    # Читаем части большой матрицы и сопоставляем с малой
    def READ_BIG_PART1(submatrix, index_pair):
        row_idx, col_idx = index_pair
        for i in range(num_small_rows):
            for j in range(submatrix.shape[0]):
                for k in range(submatrix.shape[1]):
                    yield ((i, k + col_idx), (1, row_idx, submatrix[j, k]))

    def READ_BIG_PART2(submatrix, index_pair):
        row_idx, col_idx = index_pair
        for i in range(num_small_rows):
            for j in range(submatrix.shape[0]):
                for k in range(submatrix.shape[1]):
                    yield ((i, k + col_idx), (1, row_idx, submatrix[j, k]))

    # Списки обработчиков данных
    small_matrix_readers = {
        0: READ_SMALL_PART1,
        1: READ_SMALL_PART2
    }

    big_matrix_readers = {
        0: READ_BIG_PART1,
        1: READ_BIG_PART2
    }

    # Разбиваем малую матрицу по индексам и обрабатываем
    read_counter = -1
    small_matrix_indices = randomize_indices(small_mat)
    for (row, col) in small_matrix_indices:
        read_counter += 1
        yield small_matrix_readers[read_counter % len(small_matrix_readers)](np.array([[small_mat[row, col]]]), (row, col))

    # Разбиваем большую матрицу по индексам и обрабатываем
    read_counter = -1
    big_matrix_indices = randomize_indices(big_mat)
    for (row, col) in big_matrix_indices:
        read_counter += 1
        yield big_matrix_readers[read_counter % len(big_matrix_readers)](np.array([[big_mat[row, col]]]), (row, col))


def MAP(coords: (int, int), values: (int, int, int)):
    """Передает данные без изменений, распределяя их по ключу."""
    yield (coords, values)


def REDUCE(coords: (int, int), values: list):
    """Сортирует и выполняет скалярное произведение соответствующих значений из двух матриц."""
    sorted_values = sorted(values, key=lambda item: item[1])  # Сортировка по индексу строки/столбца
    small_values = [val for mat_type, col_idx, val in sorted_values if mat_type == 0]
    big_values = [val for mat_type, row_idx, val in sorted_values if mat_type == 1]
    yield (coords, np.dot(small_values, big_values))


partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]

# Проверяем, совпадает ли результат с эталонным решением
np.allclose(reference_solution, asmatrix_distributed(partitioned_output))  # Должно вернуть True

480 key-value pairs were sent over a network.


True