# Введение в MapReduce модель на Python


In [1]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [None]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)
    
def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [None]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [None]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [None]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [None]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [None]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [None]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [None]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [None]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [None]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных. 

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [4]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*
 
mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL 

In [None]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str
    
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)
    
def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)
 
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication 

In [None]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])
 
def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])
      
output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, 2.905589809636405),
 (1, 2.905589809636405),
 (2, 2.905589809636405),
 (3, 2.905589809636405),
 (4, 2.905589809636405)]

## Inverted index 

In [None]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)
      
def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)
 
def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('what', ['0', '1']),
 ('is', ['0', '1', '2']),
 ('it', ['0', '1', '2']),
 ('a', ['2']),
 ('banana', ['2'])]

## WordCount

In [None]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):  
    yield (word, 1)
 
def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [None]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()
      
def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]
 
def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers
  
def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)
  
  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*
 
flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount 

In [None]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps
  
  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)
      
  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):  
    yield (word, 1)
 
def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)
  
# try to set COMBINER=REDUCER and look at the number of values sent over the network 
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None) 
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('a', 2), ('banana', 2), ('is', 18), ('it', 18), ('what', 10)]),
 (1, [])]

## TeraSort

In [None]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps
  
  def RECORDREADER(split):
    for value in split:
        yield (value, None)
      
  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])
    
def MAP(value:int, _):
  yield (value, None)
  
def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)
  
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, 0.0059671639991895065),
   (None, 0.07724245496172),
   (None, 0.08440804135613444),
   (None, 0.13575647907181598),
   (None, 0.14404826813474803),
   (None, 0.21204275967955666),
   (None, 0.21869633101751806),
   (None, 0.25055756276216923),
   (None, 0.28642389538931257),
   (None, 0.3834487515438496),
   (None, 0.3913614390023946),
   (None, 0.4041378102237341),
   (None, 0.41854626274930695),
   (None, 0.4704310153549396),
   (None, 0.4776995227348928),
   (None, 0.48992216726013693)]),
 (1,
  [(None, 0.5005353544023048),
   (None, 0.5135664686748047),
   (None, 0.53391984417089),
   (None, 0.5587932025401512),
   (None, 0.5673804905854288),
   (None, 0.6926646597910275),
   (None, 0.7237444251339501),
   (None, 0.7557883138083207),
   (None, 0.785709769245918),
   (None, 0.7937098630029404),
   (None, 0.7942646850708935),
   (None, 0.9160468126494941),
   (None, 0.9618068292060864),
   (None, 0.9820764489731459)])]

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [17]:
import random
from functools import reduce

# Присваиваем каждому числу уникальный ключ
def MAP(index: int, value: int):
    return (f"num_{index}", value) 

# Выбираем максимальное число из значений
def REDUCE(pair1, pair2):
    return pair1 if pair1[1] > pair2[1] else pair2

# Генерируем список случайных чисел
def RECORDREADER(size, lb=0, ub=100):
    return [random.randint(lb, ub) for _ in range(size)]


record = RECORDREADER(10)
print("Сгенерированный список чисел:", record)

# Применяем MAP, создавая пары (ключ, значение)
mapped_values = [MAP(index, value) for index, value in enumerate(record)]

# Применяем REDUCE для нахождения максимального значения
result = reduce(REDUCE, mapped_values)

print("Максимальное значение:", result)


Сгенерированный список чисел: [26, 15, 56, 74, 86, 70, 16, 58, 75, 52]
Максимальное значение: ('num_4', 86)


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [None]:
import random
from functools import reduce
from typing import Iterator, Tuple

def MAP(value: int):
    yield (1, value)  

# Вычисляем среднее арифметическое
def REDUCE(key, nums: Iterator[Tuple[int, int]]):
    total_sum = sum(n for _, n in nums)                         # Ищем сумму
    total_count = len(nums)                                     # Сохраняем количество элементов
    return total_sum / total_count if total_count > 0 else 0


def RECORDREADER(size, lb=0, ub=100):
    return [random.randint(lb, ub) for _ in range(size)]


record = RECORDREADER(10)
print("Сгенерированный список чисел:", record)


mapped_values = [pair for value in record for pair in MAP(value)]  

result = REDUCE("sum_count", mapped_values)
print("Среднее арифметическое:", result)


Сгенерированный список чисел: [51, 16, 38, 97, 100, 99, 40, 0, 58, 25]
Среднее арифметическое: 52.4


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [None]:
import random
from typing import Iterator
from itertools import groupby

def MAP(value: int):
    yield (1, value)  

# Функция для группировки по ключу
def groupByKey(mapped_values):
    sorted_values = sorted(mapped_values, key=lambda x: x[0])                                                       # Сортируем список по ключу
    grouped = {key: list(map(lambda x: x[1], group)) for key, group in groupby(sorted_values, key=lambda x: x[0])}  # Группируем по ключу
    return grouped

def REDUCE(key, nums: Iterator[int]):
    total_sum = sum(nums)  
    total_count = len(nums)  
    return key, total_sum / total_count if total_count > 0 else 0


def RECORDREADER(size, lb=0, ub=100):
    return [random.randint(lb, ub) for _ in range(size)]


record = RECORDREADER(10)
print("Сгенерированный список чисел:", record)

mapped_values = [pair for value in record for pair in MAP(value)] 
print("Mapped values (ключ, значение):", mapped_values)  

grouped_values = groupByKey(mapped_values)
print("Grouped values:", grouped_values) 

results = [REDUCE(key, values) for key, values in grouped_values.items()]
print("Среднее арифметическое:", results)


Сгенерированный список чисел: [74, 29, 60, 66, 86, 1, 2, 20, 84, 99]
Mapped values (ключ, значение): [(1, 74), (1, 29), (1, 60), (1, 66), (1, 86), (1, 1), (1, 2), (1, 20), (1, 84), (1, 99)]
Grouped values: {1: [74, 29, 60, 66, 86, 1, 2, 20, 84, 99]}
Среднее арифметическое: [(1, 52.1)]


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [1]:
from typing import Iterator
import numpy as np


# Разворачиваем вложенные списки
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element



def groupbykey(iterable):
    grouped_data = {}
    for key, value in iterable:
        grouped_data.setdefault(key, []).append(value)          # Линеарно добавляем элементы
    return list(grouped_data.items())                           # Возвращаем список кортежей


# Распределенная группировка данных по ключу 
def groupbykey_distributed(map_partitions, PARTITIONER):
    global reducers                                             
    partitions = [dict() for _ in range(reducers)]              # Создаём столько пустых разделов в виде словарей, сколько есть редьюсеров
    for map_partition in map_partitions:
        for key, value in map_partition:
            p = partitions[PARTITIONER(key)]                    # Определяем, в какой раздел отправить ключ
            p.setdefault(key, []).append(value)                 # Добавляем value в список значений для соответствующего key в разделе p
    return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]

def PARTITIONER(obj):
    global reducers
    return hash(obj) % reducers                                  # Равномерно распределять ключи между reducers по хэшу


# Распределенная обработка данных
def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
    map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
    if COMBINER is not None:
        map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
    reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER)
    reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group),
                                                                                    reduce_partition[1]))), reduce_partitions)
    total_values_sent = sum([len(vs) for (k, vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])
    print(f"{total_values_sent} key-value pairs were sent over a network.")
    return reduce_outputs

# Данные
documents = [
    "Big data allows businesses to analyze vast amounts of information and uncover hidden trends.\n"
    "Machine learning is revolutionizing industries by making predictions more accurate.",
    
    "Cloud computing provides scalable infrastructure for data processing and storage.\n"
    "Artificial intelligence enables automation and enhances decision-making processes.",

    "Distributed systems improve fault tolerance and reliability for large-scale applications.\n"
    "Data science combines statistics, programming, and domain expertise to extract insights.",

    "Neural networks mimic the human brain to recognize patterns and optimize performance.\n"
    "Deep learning is a subset of machine learning that specializes in feature extraction."
]


maps = 2
reducers = 2

# Чтение данных
def INPUTFORMAT():
    global maps
    def RECORDREADER(split):
        for (docid, document) in enumerate(split):
            for (lineid, line) in enumerate(document.split('\n')):
                yield ("{}:{}".format(docid, lineid), line)
    split_size = int(np.ceil(len(documents) / maps))
    for i in range(0, len(documents), split_size):
        yield RECORDREADER(documents[i:i + split_size])

def MAP(docId: str, line: str):
    for word in line.split(" "):
        yield (word, word)

def REDUCE(key: str, value: Iterator[str]):
    yield key

partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
print(partitioned_output)


88 key-value pairs were sent over a network.
[(0, ['Big', 'Deep', 'Distributed', 'a', 'accurate.', 'analyze', 'and', 'applications.', 'combines', 'computing', 'data', 'domain', 'enhances', 'expertise', 'extract', 'extraction.', 'human', 'industries', 'information', 'infrastructure', 'learning', 'machine', 'mimic', 'more', 'of', 'performance.', 'processing', 'provides', 'recognize', 'reliability', 'revolutionizing', 'science', 'specializes', 'subset', 'that', 'to', 'tolerance', 'vast']), (1, ['Artificial', 'Cloud', 'Data', 'Machine', 'Neural', 'allows', 'amounts', 'automation', 'brain', 'businesses', 'by', 'decision-making', 'enables', 'fault', 'feature', 'for', 'hidden', 'improve', 'in', 'insights.', 'intelligence', 'is', 'large-scale', 'making', 'networks', 'optimize', 'patterns', 'predictions', 'processes.', 'programming,', 'scalable', 'statistics,', 'storage.', 'systems', 'the', 'trends.', 'uncover'])]


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [None]:
from collections import defaultdict
import random

def MAP(el_list, predicate):
    mapped_result = defaultdict(list)
    for t in el_list:                                # Проходим по каждому элементу и применяем предикат
        if predicate(t):                             # Проверяем условие
            mapped_result[t].append(t)               # Добавляем кортеж в результат

    return mapped_result.items()

def REDUCE(mapped_items):
    reduced_result = []
    for values in mapped_items:                     # Проходим по каждой паре и добавляем элементы в итоговый список
        for value in values:
            reduced_result.append(value)            
    return reduced_result

# Предикат для фильтрации кортежей (первый элемент больше 50)
def first_element_greater_than_50(t):
    return t[0] > 50

# Функция для генерации случайных записей (кортежей)
def generate_random_records(count, num_elements):
    return [tuple(random.randint(0, 100) for _ in range(num_elements)) for _ in range(count)]

# Пример разбиения данных на части
def partition_records(records, part_size):
    return [records[i:i + part_size] for i in range(0, len(records), part_size)]     # Разбиваеv список записей на части заданного размера

# Генерация случайных данных для теста
records = generate_random_records(10, 4)  
print(f"Сгенерированные записи: {records}")

# Разбиение данных на части
partitioned_records = partition_records(records, 3)  
print(f"Записи после разбиения на части: {partitioned_records}")


mapped_results = list(map(lambda x: MAP(x, first_element_greater_than_50), partitioned_records))
print(f"Результаты MAP: {mapped_results}")

final_result = REDUCE([item for sublist in mapped_results for item in sublist])
print(f"Финальный результат после REDUCE: {final_result}")


Сгенерированные записи: [(54, 22, 63, 95), (100, 75, 87, 95), (23, 46, 36, 4), (40, 21, 23, 26), (48, 53, 20, 96), (42, 3, 22, 55), (86, 61, 30, 59), (99, 58, 12, 23), (66, 90, 97, 33), (31, 94, 9, 87)]
Записи после разбиения на части: [[(54, 22, 63, 95), (100, 75, 87, 95), (23, 46, 36, 4)], [(40, 21, 23, 26), (48, 53, 20, 96), (42, 3, 22, 55)], [(86, 61, 30, 59), (99, 58, 12, 23), (66, 90, 97, 33)], [(31, 94, 9, 87)]]
Результаты MAP: [dict_items([((54, 22, 63, 95), [(54, 22, 63, 95)]), ((100, 75, 87, 95), [(100, 75, 87, 95)])]), dict_items([]), dict_items([((86, 61, 30, 59), [(86, 61, 30, 59)]), ((99, 58, 12, 23), [(99, 58, 12, 23)]), ((66, 90, 97, 33), [(66, 90, 97, 33)])]), dict_items([])]
Финальный результат после REDUCE: [(54, 22, 63, 95), [(54, 22, 63, 95)], (100, 75, 87, 95), [(100, 75, 87, 95)], (86, 61, 30, 59), [(86, 61, 30, 59)], (99, 58, 12, 23), [(99, 58, 12, 23)], (66, 90, 97, 33), [(66, 90, 97, 33)]]


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [10]:
import random
from collections import defaultdict

S = set([3, 13, 24, 58, 83, 92])

def MAP(t):
    res_list = [el for el in t if el in S]              # Фильтруем элементы, которые есть в S
    res = tuple(res_list)                               # Преобразуем результат в кортеж
    return (res, res)                                   # Возвращаем пару (t', t')

def REDUCE(key, values):
    return (key, key)                                   # Возвращаем пару (t', t') для каждого ключа


def RECORDREADER(count):
    return [(random.randint(0, 100), random.randint(0, 100), random.randint(0, 100), random.randint(0, 100)) for _ in range(count)]

def group_by_key(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]                    # Группируем все значения по ключу k2
    return t.items()                                    # Возвращаем список кортежей (ключ, список значений)


record = RECORDREADER(10)  
print("Сгенерированные записи:", record)


map_output = list(map(lambda x: MAP(x), record))        # Применяем MAP ко всем кортежам
print("Результат MAP:", map_output)


shuffle_output = group_by_key(map_output)               # Группируем значения по ключу
print("Результат Shuffle:", list(shuffle_output))


reduce_output = list(map(lambda x: REDUCE(*x)[0], shuffle_output))  
print("Результат REDUCE:", reduce_output)


Сгенерированные записи: [(38, 66, 96, 22), (18, 87, 7, 57), (50, 52, 38, 42), (58, 46, 45, 69), (38, 66, 39, 42), (79, 84, 83, 34), (83, 96, 85, 13), (11, 91, 16, 10), (83, 23, 96, 24), (42, 82, 71, 42)]
Результат MAP: [((), ()), ((), ()), ((), ()), ((58,), (58,)), ((), ()), ((83,), (83,)), ((83, 13), (83, 13)), ((), ()), ((83, 24), (83, 24)), ((), ())]
Результат Shuffle: [((), [(), (), (), (), (), ()]), ((58,), [(58,)]), ((83,), [(83,)]), ((83, 13), [(83, 13)]), ((83, 24), [(83, 24)])]
Результат REDUCE: [(), (58,), (83,), (83, 13), (83, 24)]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [11]:
import random
from collections import defaultdict

def MAP(t):
    return (t, t)                       # Возвращаем пару (t, t)


def REDUCE(key, values):
    return (key, key)                   # Возвращаем пару (key, key)

def RECORDREADER(count):
    return [(random.randint(0, 100), random.randint(0, 100), random.randint(0, 100), random.randint(0, 100)) for _ in range(count)]

def group_by_key(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]    # Группируем значения по ключу k2
    return t.items()                    # Возвращаем список кортежей (ключ, список значений)


record = RECORDREADER(10) 
print("Сгенерированные записи:", record)


map_output = list(map(lambda x: MAP(x), record))  
print("Результат MAP:", map_output)


shuffle_output = group_by_key(map_output)  
print("Результат Shuffle:", list(shuffle_output))


reduce_output = list(map(lambda x: REDUCE(*x)[0], shuffle_output))  
print("Результат REDUCE:", reduce_output)


Сгенерированные записи: [(72, 6, 20, 85), (51, 95, 60, 57), (49, 55, 67, 72), (51, 45, 16, 23), (91, 89, 72, 56), (52, 38, 19, 10), (90, 39, 58, 18), (8, 8, 14, 41), (24, 68, 7, 92), (13, 71, 31, 20)]
Результат MAP: [((72, 6, 20, 85), (72, 6, 20, 85)), ((51, 95, 60, 57), (51, 95, 60, 57)), ((49, 55, 67, 72), (49, 55, 67, 72)), ((51, 45, 16, 23), (51, 45, 16, 23)), ((91, 89, 72, 56), (91, 89, 72, 56)), ((52, 38, 19, 10), (52, 38, 19, 10)), ((90, 39, 58, 18), (90, 39, 58, 18)), ((8, 8, 14, 41), (8, 8, 14, 41)), ((24, 68, 7, 92), (24, 68, 7, 92)), ((13, 71, 31, 20), (13, 71, 31, 20))]
Результат Shuffle: [((72, 6, 20, 85), [(72, 6, 20, 85)]), ((51, 95, 60, 57), [(51, 95, 60, 57)]), ((49, 55, 67, 72), [(49, 55, 67, 72)]), ((51, 45, 16, 23), [(51, 45, 16, 23)]), ((91, 89, 72, 56), [(91, 89, 72, 56)]), ((52, 38, 19, 10), [(52, 38, 19, 10)]), ((90, 39, 58, 18), [(90, 39, 58, 18)]), ((8, 8, 14, 41), [(8, 8, 14, 41)]), ((24, 68, 7, 92), [(24, 68, 7, 92)]), ((13, 71, 31, 20), [(13, 71, 31, 20)])]

### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [12]:
import random
from collections import defaultdict

def MAP(t):
    return (t, t)  


def REDUCE(key, values):
    if len(values) == 2:                # Если для ключа t есть два значения
        return (key, key)               # Возвращаем пару (t, t)
    return None                         # Если ключ появляется только один раз, ничего не возвращаем


def RECORDREADER(count):
    return [(random.randint(0, 3), random.randint(0, 3)) for _ in range(count)]


def group_by_key(iterable):
    t = {}
    for (k2, v2) in iterable:
        t[k2] = t.get(k2, []) + [v2]    # Группируем значения по ключу k2
    return t.items()                    # Возвращаем список кортежей (ключ, список значений)


record = RECORDREADER(10)  
print("Сгенерированные записи:", record)


map_output = list(map(lambda x: MAP(x), record))  
print("Результат MAP:", map_output)

shuffle_output = group_by_key(map_output) 
print("Результат Shuffle:", list(shuffle_output))


reduce_output = [el[0] for el in list(map(lambda x: REDUCE(*x), shuffle_output)) if el is not None]  
print("Результат REDUCE:", reduce_output)


Сгенерированные записи: [(0, 1), (2, 1), (0, 1), (3, 0), (3, 1), (0, 2), (2, 2), (1, 1), (3, 3), (2, 0)]
Результат MAP: [((0, 1), (0, 1)), ((2, 1), (2, 1)), ((0, 1), (0, 1)), ((3, 0), (3, 0)), ((3, 1), (3, 1)), ((0, 2), (0, 2)), ((2, 2), (2, 2)), ((1, 1), (1, 1)), ((3, 3), (3, 3)), ((2, 0), (2, 0))]
Результат Shuffle: [((0, 1), [(0, 1), (0, 1)]), ((2, 1), [(2, 1)]), ((3, 0), [(3, 0)]), ((3, 1), [(3, 1)]), ((0, 2), [(0, 2)]), ((2, 2), [(2, 2)]), ((1, 1), [(1, 1)]), ((3, 3), [(3, 3)]), ((2, 0), [(2, 0)])]
Результат REDUCE: [(0, 1)]


### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [14]:
import random
from collections import defaultdict

# Два отношения: R и S ( 0 и 1)
relations = [0, 1]

# Класс для хранения кортежа и его принадлежности к одному из двух отношений (R или S)
class Tuple:
    def __init__(self, data: tuple, rel_id: int):
        self.data = data
        self.rel_id = rel_id


# Генерируем случайный кортеж данных и назначает ему отношение (0 или 1)
def create_tuple(count):
    data = tuple(random.randint(0, 5) for _ in range(count))
    rel_id = random.choice(relations)                           # Выбираем 0 (R) или 1 (S)
    return Tuple(data, rel_id)

def RECORDREADER(count):
    return [create_tuple(3) for _ in range(count)]

def MAP(t: Tuple):
    return (t.data, t.rel_id)

def REDUCE(key, values):
    if values == [relations[0]]:  
        return (key, key)
    return None

def group_by_key(iterable):
    grouped = defaultdict(list)
    for k, v in iterable:
        grouped[k].append(v)
    return grouped.items()

records = RECORDREADER(10)
print("Сгенерированные записи:", [(r.data, r.rel_id) for r in records])

map_output = list(map(MAP, records))
print("MAP output:", map_output)

shuffle_output = group_by_key(map_output)
print("Shuffle output:", list(shuffle_output))

reduce_output = [el[0] for el in (REDUCE(*x) for x in shuffle_output) if el is not None]
print("Reduce output:", reduce_output)


Сгенерированные записи: [((0, 3, 3), 0), ((4, 1, 5), 0), ((5, 2, 2), 1), ((5, 0, 5), 1), ((0, 5, 0), 0), ((3, 1, 5), 1), ((4, 2, 2), 1), ((2, 3, 4), 1), ((2, 3, 3), 1), ((4, 5, 5), 0)]
MAP output: [((0, 3, 3), 0), ((4, 1, 5), 0), ((5, 2, 2), 1), ((5, 0, 5), 1), ((0, 5, 0), 0), ((3, 1, 5), 1), ((4, 2, 2), 1), ((2, 3, 4), 1), ((2, 3, 3), 1), ((4, 5, 5), 0)]
Shuffle output: [((0, 3, 3), [0]), ((4, 1, 5), [0]), ((5, 2, 2), [1]), ((5, 0, 5), [1]), ((0, 5, 0), [0]), ((3, 1, 5), [1]), ((4, 2, 2), [1]), ((2, 3, 4), [1]), ((2, 3, 3), [1]), ((4, 5, 5), [0])]
Reduce output: [(0, 3, 3), (4, 1, 5), (0, 5, 0), (4, 5, 5)]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [15]:
import random
from collections import defaultdict

relations = [0, 1]

class Tuple:
    def __init__(self, data: tuple, rel_id: int):
        self.data = data
        self.rel_id = rel_id

def create_tuple():
    data = (random.randint(0, 5), random.randint(0, 5))
    rel_id = random.choice(relations)  
    return Tuple(data, rel_id)

def RECORDREADER(count):
    return [create_tuple() for _ in range(count)]

def MAP(t: Tuple):
    if t.rel_id == relations[0]:                                 # Если принадлежит отношению R
        return (t.data[1], (t.rel_id, t.data[0]))                # (b, (R, a))
    else:                                                        # Если принадлежит отношению S
        return (t.data[0], (t.rel_id, t.data[1]))                # (b, (S, c))

def REDUCE(key, values):
    r_values = [v[1] for v in values if v[0] == relations[0]]    # Все a, где (R, a)
    s_values = [v[1] for v in values if v[0] == relations[1]]    # Все c, где (S, c)

    return [(a, key, c) for a in r_values for c in s_values]     # Декартово произведение

def group_by_key(iterable):
    grouped = defaultdict(list)
    for k, v in iterable:
        grouped[k].append(v)
    return grouped.items()

records = RECORDREADER(10)
print("Сгенерированные записи:", [(r.data, r.rel_id) for r in records])

map_output = list(map(MAP, records))
print("MAP output:", map_output)

shuffle_output = group_by_key(map_output)
print("Shuffle output:", list(shuffle_output))

reduce_output = [triple for sublist in (REDUCE(*x) for x in shuffle_output) for triple in sublist]
print("Reduce output:", reduce_output)


Сгенерированные записи: [((2, 1), 0), ((0, 5), 0), ((2, 5), 0), ((1, 4), 0), ((2, 1), 1), ((4, 2), 0), ((2, 0), 0), ((5, 3), 1), ((3, 1), 1), ((0, 4), 1)]
MAP output: [(1, (0, 2)), (5, (0, 0)), (5, (0, 2)), (4, (0, 1)), (2, (1, 1)), (2, (0, 4)), (0, (0, 2)), (5, (1, 3)), (3, (1, 1)), (0, (1, 4))]
Shuffle output: [(1, [(0, 2)]), (5, [(0, 0), (0, 2), (1, 3)]), (4, [(0, 1)]), (2, [(1, 1), (0, 4)]), (0, [(0, 2), (1, 4)]), (3, [(1, 1)])]
Reduce output: [(0, 5, 3), (2, 5, 3), (4, 2, 1), (2, 0, 4)]


### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [16]:
import random
from collections import defaultdict

# Генерируем случайный кортеж (a, b, c), где значения от 1 до 10
def get_tuple():
    return (random.randint(1, 10), random.randint(1, 10), random.randint(1, 10))

# Генерируем список случайных кортежей данных
def RECORDREADER(count):
    return [get_tuple() for _ in range(count)]

def MAP(t):
    return (t[0], t[1])

# Функция агрегирования. Принимает список значений и тип операции
def tetta(values, agg_func="SUM"):
    if agg_func == "SUM":
        return sum(values)
    elif agg_func == "MAX":
        return max(values)
    else:
        raise ValueError("Unsupported aggregation function. Use 'SUM' or 'MAX'.")

def REDUCE(key, values, agg_func="SUM"):
    x = tetta(values, agg_func)
    return (key, x)

def group_by_key(iterable):
    grouped = defaultdict(list)
    for k, v in iterable:
        grouped[k].append(v)
    return grouped.items()

records = RECORDREADER(10)
print("Сгенерированные записи:", records)

map_output = list(map(MAP, records))
print("MAP output:", map_output)

shuffle_output = group_by_key(map_output)
print("Shuffle output:", list(shuffle_output))

reduce_sum_output = [REDUCE(k, v, "SUM") for k, v in shuffle_output]
reduce_max_output = [REDUCE(k, v, "MAX") for k, v in shuffle_output]

print("Reduce output (SUM):", reduce_sum_output)
print("Reduce output (MAX):", reduce_max_output)

Сгенерированные записи: [(6, 3, 1), (3, 3, 8), (6, 10, 5), (10, 2, 8), (1, 5, 6), (1, 8, 3), (10, 10, 10), (6, 8, 9), (1, 4, 6), (6, 10, 5)]
MAP output: [(6, 3), (3, 3), (6, 10), (10, 2), (1, 5), (1, 8), (10, 10), (6, 8), (1, 4), (6, 10)]
Shuffle output: [(6, [3, 10, 8, 10]), (3, [3]), (10, [2, 10]), (1, [5, 8, 4])]
Reduce output (SUM): [(6, 31), (3, 3), (10, 12), (1, 17)]
Reduce output (MAX): [(6, 10), (3, 3), (10, 10), (1, 8)]


# 

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$. 





In [20]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [None]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
  for j in range(big_mat.shape[0]):
    for k in range(big_mat.shape[1]):
      yield ((j,k), big_mat[j,k])
      
def MAP(k1, v1):
  (j, k) = k1
  w = v1
  for i in range(small_mat.shape[0]):             # Перебираем строки small_mat
    yield ((i, k), small_mat[i, j] * w)           # ((i, k), произведение элементов)
  
def REDUCE(key, values):
  (i, k) = key
  return [(key, sum(values))]                     # (i, k) -> сумма всех произведений

Проверьте своё решение

In [24]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat) 
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [25]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [None]:
import numpy as np

I = 2  
J = 3  
K = 4  

A = np.random.rand(I, J)  
B = np.random.rand(J, K)  

# MapReduce model
def flatten(nested_iterable):
    
    for iterable in nested_iterable:
        for element in iterable:
            yield element

def groupbykey(iterable):
    grouped = {}
    for (k, v) in iterable:
        grouped[k] = grouped.get(k, []) + [v]
    return grouped.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
    records = RECORDREADER()
    mapped = list(flatten(map(lambda x: MAP(*x), records)))         # Применяем MAP к данным
    grouped = groupbykey(mapped)  
    reduced = flatten(map(lambda x: REDUCE(*x), grouped))           # Применяем REDUCE
    return reduced


def RECORDREADER():
    # Передаем A
    for i in range(A.shape[0]):                                     # I строк
        for j in range(A.shape[1]):                                 # J столбцов
            yield (("A", i, j), A[i, j])                            

    # Передаем B
    for j in range(B.shape[0]):                                     # J строк
        for k in range(B.shape[1]):                                 # K столбцов
            yield (("B", j, k), B[j, k])  

# Сопоставляем элементы A и B
def MAP(k1, v1):
    tag, i_or_j, j_or_k = k1                                       # Распаковываем ключ
    w = v1                                                         # Значение элемента

    if tag == "A":  
        i, j = i_or_j, j_or_k
        for k in range(K):                                         # Перебираем все столбцы B
            yield ((i, k), ("A", j, w))  

    elif tag == "B":  
        j, k = i_or_j, j_or_k
        for i in range(I):                                         # Перебираем все строки A
            yield ((i, k), ("B", j, w))  

# Перемножаем и суммируем элементы
def REDUCE(key, values):
    A_values = {j: w for tag, j, w in values if tag == "A"}        # Словарь A[j] -> значение
    B_values = {j: w for tag, j, w in values if tag == "B"}        # Словарь B[j] -> значение

    sum_result = sum(A_values[j] * B_values[j] for j in A_values if j in B_values)
    
    return [(key, sum_result)]                                     # (i, k) -> сумма произведений


solution = list(MapReduce(RECORDREADER, MAP, REDUCE))

# Преобразование результата в матрицу
def asmatrix(reduce_output):
    reduce_output = list(reduce_output)
    I = max(i for ((i, k), vw) in reduce_output) + 1
    K = max(k for ((i, k), vw) in reduce_output) + 1
    mat = np.zeros((I, K)) 
    for ((i, k), vw) in reduce_output:
        mat[i, k] = vw
    return mat


reference_solution = np.matmul(A, B)  
computed_solution = asmatrix(solution)  

print(np.allclose(reference_solution, computed_solution))  


True


Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER. 

In [28]:
import numpy as np

I = 2  
J = 3  
K = 4  

A = np.random.rand(I, J)  
B = np.random.rand(J, K)  

def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

def groupbykey(iterable):
    grouped = {}
    for (k, v) in iterable:
        grouped[k] = grouped.get(k, []) + [v]
    return grouped.items()


def MapReduce(RECORDREADER_A, RECORDREADER_B, MAP, REDUCE):

    # Читаем A и В
    records_A = RECORDREADER_A()                
    records_B = RECORDREADER_B()  
    
    # MAP для A и В
    mapped_A = list(flatten(map(lambda x: MAP(*x), records_A)))  
    mapped_B = list(flatten(map(lambda x: MAP(*x), records_B)))  
    
    # Объединяем результаты и группируем по (i, k)
    mapped = mapped_A + mapped_B                
    grouped = groupbykey(mapped)  
    
    reduced = flatten(map(lambda x: REDUCE(*x), grouped))  
    return reduced

# Читаем матрицу A
def RECORDREADER_A():
    for i in range(A.shape[0]):  
        for j in range(A.shape[1]):  
            yield (("A", i, j), A[i, j])  

# Читаем матрицу B
def RECORDREADER_B():
    for j in range(B.shape[0]):  
        for k in range(B.shape[1]):  
            yield (("B", j, k), B[j, k])  

# Распределяем значения A и B
def MAP(k1, v1):
    tag, i_or_j, j_or_k = k1                # Разбираем ключ
    w = v1  

    if tag == "A":  
        i, j = i_or_j, j_or_k
        for k in range(K): 
            yield ((i, k), ("A", j, w))  

    elif tag == "B":  
        j, k = i_or_j, j_or_k
        for i in range(I):  
            yield ((i, k), ("B", j, w))  

# Перемножаем и суммируем
def REDUCE(key, values):
    A_values = {j: w for tag, j, w in values if tag == "A"}  
    B_values = {j: w for tag, j, w in values if tag == "B"}  

    sum_result = sum(A_values[j] * B_values[j] for j in A_values if j in B_values)
    
    return [(key, sum_result)]  

solution = list(MapReduce(RECORDREADER_A, RECORDREADER_B, MAP, REDUCE))

def asmatrix(reduce_output):
    reduce_output = list(reduce_output)
    I = max(i for ((i, k), vw) in reduce_output) + 1
    K = max(k for ((i, k), vw) in reduce_output) + 1
    mat = np.zeros((I, K))  
    for ((i, k), vw) in reduce_output:
        mat[i, k] = vw
    return mat

reference_solution = np.matmul(A, B)  
computed_solution = asmatrix(solution)  

print( np.allclose(reference_solution, computed_solution))  


True


Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [33]:
import numpy as np
import random

I = 3  
J = 4  
K = 5  

A = np.random.rand(I, J)
B = np.random.rand(J, K)

# Количество RECORDREADER-ов для каждой матрицы
NUM_READERS_A = 3
NUM_READERS_B = 2

def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element


def groupbykey(iterable):
    grouped = {}
    for (k, v) in iterable:
        grouped[k] = grouped.get(k, []) + [v]
    return grouped.items()


def MapReduce(RECORDREADERS_A, RECORDREADERS_B, MAP, REDUCE):
    # Все A и В
    records_A = flatten(reader() for reader in RECORDREADERS_A)  
    records_B = flatten(reader() for reader in RECORDREADERS_B)  
    
    # MAP для A и В
    mapped_A = list(flatten(map(lambda x: MAP(*x), records_A)))  
    mapped_B = list(flatten(map(lambda x: MAP(*x), records_B))) 
    
    # Объединяем результаты и группируем по (i, k)
    mapped = mapped_A + mapped_B  
    grouped = groupbykey(mapped)  
    
    reduced = flatten(map(lambda x: REDUCE(*x), grouped))  
    return reduced

# Читаем подмножество A
def RECORDREADER_A(reader_id):
    for i in range(I):
        for j in range(J):
            yield (("A", i, j), A[i, j]) 

# Читаем подмножество B
def RECORDREADER_B(reader_id):
    for j in range(J):
        for k in range(K):
            yield (("B", j, k), B[j, k]) 

# Распределяем значения A и B
def MAP(k1, v1):
    tag, i_or_j, j_or_k = k1
    w = v1  

    if tag == "A":  
        i, j = i_or_j, j_or_k
        for k in range(K):
            yield ((i, k), ("A", j, w))  

    elif tag == "B":  
        j, k = i_or_j, j_or_k
        for i in range(I): 
            yield ((i, k), ("B", j, w))  

def REDUCE(key, values):
    A_values = {j: w for tag, j, w in values if tag == "A"}  
    B_values = {j: w for tag, j, w in values if tag == "B"}  

    sum_result = sum(A_values[j] * B_values[j] for j in A_values if j in B_values)
    
    return [(key, sum_result)]  


RECORDREADERS_A = [lambda: RECORDREADER_A(i) for i in range(NUM_READERS_A)]
RECORDREADERS_B = [lambda: RECORDREADER_B(i) for i in range(NUM_READERS_B)]


solution = list(MapReduce(RECORDREADERS_A, RECORDREADERS_B, MAP, REDUCE))


def asmatrix(reduce_output):
    reduce_output = list(reduce_output)
    I = max(i for ((i, k), vw) in reduce_output) + 1
    K = max(k for ((i, k), vw) in reduce_output) + 1
    mat = np.zeros((I, K))  
    for ((i, k), vw) in reduce_output:
        mat[i, k] = vw
    return mat

reference_solution = np.matmul(A, B)  
computed_solution = asmatrix(solution)  

print(np.allclose(reference_solution, computed_solution))  


True
