# Введение в MapReduce модель на Python


In [1]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

In [2]:
def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)
    
def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)

Модель элемента данных

In [3]:
class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str

In [4]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

Функция RECORDREADER моделирует чтение элементов с диска или по сети.

In [5]:
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

In [6]:
list(RECORDREADER())

[(0, User(id=0, age=55, social_contacts=20, gender='male')),
 (1, User(id=1, age=25, social_contacts=240, gender='female')),
 (2, User(id=2, age=25, social_contacts=500, gender='female')),
 (3, User(id=3, age=33, social_contacts=800, gender='female'))]

In [7]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

In [8]:
map_output = flatten(map(lambda x: MAP(*x), RECORDREADER()))
map_output = list(map_output) # materialize
map_output

[(25, User(id=1, age=25, social_contacts=240, gender='female')),
 (25, User(id=2, age=25, social_contacts=500, gender='female')),
 (33, User(id=3, age=33, social_contacts=800, gender='female'))]

In [9]:
def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

In [10]:
shuffle_output = groupbykey(map_output)
shuffle_output = list(shuffle_output)
shuffle_output

[(25,
  [User(id=1, age=25, social_contacts=240, gender='female'),
   User(id=2, age=25, social_contacts=500, gender='female')]),
 (33, [User(id=3, age=33, social_contacts=800, gender='female')])]

In [11]:
reduce_output = flatten(map(lambda x: REDUCE(*x), shuffle_output))
reduce_output = list(reduce_output)
reduce_output

[(25, 370.0), (33, 800.0)]

Все действия одним конвейером!

In [12]:
list(flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER()))))))

[(25, 370.0), (33, 800.0)]

# **MapReduce**
Выделим общую для всех пользователей часть системы в отдельную функцию высшего порядка. Это наиболее простая модель MapReduce, без учёта распределённого хранения данных. 

Пользователь для решения своей задачи реализует RECORDREADER, MAP, REDUCE.

In [13]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

## Спецификация MapReduce



```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
mapreduce ((k1,v1)*) -> (k3,v3)*
groupby ((k2,v2)*) -> (k2,v2*)*
flatten (e2**) -> e2*
 
mapreduce .map(f).flatten.groupby(k2).map(g).flatten
```




# Примеры

## SQL 

In [14]:
from typing import NamedTuple # requires python 3.6+
from typing import Iterator

class User(NamedTuple):
  id: int
  age: str
  social_contacts: int
  gender: str
    
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def MAP(_, row:NamedTuple):
  if (row.gender == 'female'):
    yield (row.age, row)
    
def REDUCE(age:str, rows:Iterator[NamedTuple]):
  sum = 0
  count = 0
  for row in rows:
    sum += row.social_contacts
    count += 1
  if (count > 0):
    yield (age, sum/count)
  else:
    yield (age, 0)
 
def RECORDREADER():
  return [(u.id, u) for u in input_collection]

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(25, 370.0), (33, 800.0)]

## Matrix-Vector multiplication 

In [15]:
from typing import Iterator
import numpy as np

mat = np.ones((5,4))
vec = np.random.rand(4) # in-memory vector in all map tasks

def MAP(coordinates:(int, int), value:int):
  i, j = coordinates
  yield (i, value*vec[j])
 
def REDUCE(i:int, products:Iterator[NamedTuple]):
  sum = 0
  for p in products:
    sum += p
  yield (i, sum)

def RECORDREADER():
  for i in range(mat.shape[0]):
    for j in range(mat.shape[1]):
      yield ((i, j), mat[i,j])
      
output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[(0, np.float64(1.7652964521298804)),
 (1, np.float64(1.7652964521298804)),
 (2, np.float64(1.7652964521298804)),
 (3, np.float64(1.7652964521298804)),
 (4, np.float64(1.7652964521298804))]

## Inverted index 

In [16]:
from typing import Iterator

d1 = "it is what it is"
d2 = "what is it"
d3 = "it is a banana"
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    yield ("{}".format(docid), document)
      
def MAP(docId:str, body:str):
  for word in set(body.split(' ')):
    yield (word, docId)
 
def REDUCE(word:str, docIds:Iterator[str]):
  yield (word, sorted(docIds))

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('it', ['0', '1', '2']),
 ('what', ['0', '1']),
 ('is', ['0', '1', '2']),
 ('banana', ['2']),
 ('a', ['2'])]

## WordCount

In [17]:
from typing import Iterator

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3]

def RECORDREADER():
  for (docid, document) in enumerate(documents):
    for (lineid, line) in enumerate(document.split('\n')):
      yield ("{}:{}".format(docid,lineid), line)

def MAP(docId:str, line:str):
  for word in line.split(" "):  
    yield (word, 1)
 
def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)

output = MapReduce(RECORDREADER, MAP, REDUCE)
output = list(output)
output

[('', 3), ('it', 9), ('is', 9), ('what', 5), ('a', 1), ('banana', 1)]

# MapReduce Distributed

Добавляется в модель фабрика RECORDREARER-ов --- INPUTFORMAT, функция распределения промежуточных результатов по партициям PARTITIONER, и функция COMBINER для частичной аггрегации промежуточных результатов до распределения по новым партициям.

In [18]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()
      
def groupbykey_distributed(map_partitions, PARTITIONER):
  global reducers
  partitions = [dict() for _ in range(reducers)]
  for map_partition in map_partitions:
    for (k2, v2) in map_partition:
      p = partitions[PARTITIONER(k2)]
      p[k2] = p.get(k2, []) + [v2]
  return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for (partition_id, partition) in enumerate(partitions)]
 
def PARTITIONER(obj):
  global reducers
  return hash(obj) % reducers
  
def MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, PARTITIONER=PARTITIONER, COMBINER=None):
  map_partitions = map(lambda record_reader: flatten(map(lambda k1v1: MAP(*k1v1), record_reader)), INPUTFORMAT())
  if COMBINER != None:
    map_partitions = map(lambda map_partition: flatten(map(lambda k2v2: COMBINER(*k2v2), groupbykey(map_partition))), map_partitions)
  reduce_partitions = groupbykey_distributed(map_partitions, PARTITIONER) # shuffle
  reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: REDUCE(*reduce_input_group), reduce_partition[1]))), reduce_partitions)
  
  print("{} key-value pairs were sent over a network.".format(sum([len(vs) for (k,vs) in flatten([partition for (partition_id, partition) in reduce_partitions])])))
  return reduce_outputs

## Спецификация MapReduce Distributed


```
f (k1, v1) -> (k2,v2)*
g (k2, v2*) -> (k3,v3)*
 
e1 (k1, v1)
e2 (k2, v2)
partition1 (k2, v2)*
partition2 (k2, v2*)*
 
flatmap (e1->e2*, e1*) -> partition1*
groupby (partition1*) -> partition2*

mapreduce ((k1,v1)*) -> (k3,v3)*
mapreduce .flatmap(f).groupby(k2).flatmap(g)
```



## WordCount 

In [19]:
from typing import Iterator
import numpy as np

d1 = """
it is what it is
it is what it is
it is what it is"""
d2 = """
what is it
what is it"""
d3 = """
it is a banana"""
documents = [d1, d2, d3, d1, d2, d3]

maps = 3
reducers = 2

def INPUTFORMAT():
  global maps
  
  def RECORDREADER(split):
    for (docid, document) in enumerate(split):
      for (lineid, line) in enumerate(document.split('\n')):
        yield ("{}:{}".format(docid,lineid), line)
      
  split_size =  int(np.ceil(len(documents)/maps))
  for i in range(0, len(documents), split_size):
    yield RECORDREADER(documents[i:i+split_size])

def MAP(docId:str, line:str):
  for word in line.split(" "):  
    yield (word, 1)
 
def REDUCE(word:str, counts:Iterator[int]):
  sum = 0
  for c in counts:
    sum += c
  yield (word, sum)
  
# try to set COMBINER=REDUCER and look at the number of values sent over the network 
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None) 
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

56 key-value pairs were sent over a network.


[(0, [('', 6), ('it', 18), ('what', 10)]),
 (1, [('a', 2), ('banana', 2), ('is', 18)])]

## TeraSort

In [20]:
import numpy as np

input_values = np.random.rand(30)
maps = 3
reducers = 2
min_value = 0.0
max_value = 1.0

def INPUTFORMAT():
  global maps
  
  def RECORDREADER(split):
    for value in split:
        yield (value, None)
      
  split_size =  int(np.ceil(len(input_values)/maps))
  for i in range(0, len(input_values), split_size):
    yield RECORDREADER(input_values[i:i+split_size])
    
def MAP(value:int, _):
  yield (value, None)
  
def PARTITIONER(key):
  global reducers
  global max_value
  global min_value
  bucket_size = (max_value-min_value)/reducers
  bucket_id = 0
  while((key>(bucket_id+1)*bucket_size) and ((bucket_id+1)*bucket_size<max_value)):
    bucket_id += 1
  return bucket_id

def REDUCE(value:int, _):
  yield (None,value)
  
partitioned_output = MapReduceDistributed(INPUTFORMAT, MAP, REDUCE, COMBINER=None, PARTITIONER=PARTITIONER)
partitioned_output = [(partition_id, list(partition)) for (partition_id, partition) in partitioned_output]
partitioned_output

30 key-value pairs were sent over a network.


[(0,
  [(None, np.float64(0.05580414187843197)),
   (None, np.float64(0.07746310323829542)),
   (None, np.float64(0.1523061556840386)),
   (None, np.float64(0.1781435257413687)),
   (None, np.float64(0.20795357930234604)),
   (None, np.float64(0.2526211573705568)),
   (None, np.float64(0.41001255531541214)),
   (None, np.float64(0.4107689861626782)),
   (None, np.float64(0.4304497764238513)),
   (None, np.float64(0.43373485608087636)),
   (None, np.float64(0.4435789098092394)),
   (None, np.float64(0.44665171592205755)),
   (None, np.float64(0.45731269595244106))]),
 (1,
  [(None, np.float64(0.5147641690094529)),
   (None, np.float64(0.5218788961044795)),
   (None, np.float64(0.5374520058256215)),
   (None, np.float64(0.539627979266474)),
   (None, np.float64(0.558117619542932)),
   (None, np.float64(0.6108919840580103)),
   (None, np.float64(0.6354045639937652)),
   (None, np.float64(0.64571785153077)),
   (None, np.float64(0.6784012274182424)),
   (None, np.float64(0.7658267945594682

# Упражнения
Упражнения взяты из Rajaraman A., Ullman J. D. Mining of massive datasets. – Cambridge University Press, 2011.


Для выполнения заданий переопределите функции RECORDREADER, MAP, REDUCE. Для модели распределённой системы может потребоваться переопределение функций PARTITION и COMBINER.

### Максимальное значение ряда

Разработайте MapReduce алгоритм, который находит максимальное число входного списка чисел.

In [21]:
import numpy as np 
import random

In [22]:
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER)))))

In [23]:
def MAP(_, value):
    yield (None, value)

def REDUCE(_, values):
    yield max(values)

def RECORDREADER(count):
    return ((None, random.randint(0, 1000)) for _ in range(count))

values = list(RECORDREADER(15))
print(f'List of ints: {[val for _, val in values]}')
output = list(MapReduce(values, MAP, REDUCE))
print(f'Max elem of list: {output}')

List of ints: [472, 850, 593, 156, 852, 303, 556, 299, 148, 651, 971, 195, 611, 550, 228]
Max elem of list: [971]


### Арифметическое среднее

Разработайте MapReduce алгоритм, который находит арифметическое среднее.

$$\overline{X} = \frac{1}{n}\sum_{i=0}^{n} x_i$$


In [24]:
def MAP(key, value):
    yield (None, value)

def REDUCE(key, values):
    yield sum(values) / len(values)
    
def RECORDREADER(count):
    return ((None, random.randint(0, 1000)) for _ in range(count))
values = list(RECORDREADER(15))
output = list(MapReduce(values, MAP, REDUCE))
print(f'List of ints: {[val for _, val in values]}')
print(f'Avg of list: {output}')

List of ints: [583, 556, 251, 500, 587, 835, 743, 733, 881, 807, 82, 326, 485, 778, 39]
Avg of list: [545.7333333333333]


### GroupByKey на основе сортировки

Реализуйте groupByKey на основе сортировки, проверьте его работу на примерах

In [25]:
def groupbykey_sorted(iterable):
    sorted_items = sorted(iterable, key=lambda x: x[0])
    grouped = {}
    for key, value in sorted_items:
        grouped.setdefault(key, []).append(value)
    return grouped.items()

def MAP(key, value):
    yield (key, value)

def REDUCE(key, values):
    yield max(values) / len(values)
    
def RECORDREADER(count):
    return ((random.randint(0,3), random.randint(0, 1000)) for _ in range(count))

def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey_sorted(flatten(map(lambda x: MAP(*x), RECORDREADER)))))
values = list(RECORDREADER(10))
print(values)
print(list(MapReduce(values, MAP, REDUCE)))
print(groupbykey_sorted(values))

[(3, 876), (1, 67), (1, 111), (3, 102), (2, 772), (0, 175), (0, 426), (0, 401), (3, 584), (3, 376)]
[142.0, 55.5, 772.0, 219.0]
dict_items([(0, [175, 426, 401]), (1, [67, 111]), (2, [772]), (3, [876, 102, 584, 376])])


### Drop duplicates (set construction, unique elements, distinct)

Реализуйте распределённую операцию исключения дубликатов

In [26]:
import numpy as np

# Пример документов
doc1 = """
it is what it is
it is what it is
it is what it is"""
doc2 = """
what is it
what is it"""
doc3 = """
it is a dog"""

documents = [doc1, doc2, doc3, doc1, doc2, doc3]

num_mappers = 3
num_reducers = 2

# Группировка по ключам с распределением по партициям
def group_by_key_distributed(map_partitions, partitioner):
    partitions = [dict() for _ in range(num_reducers)]
    for map_partition in map_partitions:
        for key, value in map_partition:
            partition = partitions[partitioner(key)]
            partition.setdefault(key, []).append(value)
    return [(partition_id, sorted(partition.items(), key=lambda x: x[0])) for partition_id, partition in enumerate(partitions)]

# Функция партиционирования по хешу
def partitioner(obj):
    return hash(obj) % num_reducers

# Основная функция MapReduce с распределенным вычислением
def map_reduce_distributed(input_format, map_func, reduce_func, partitioner=partitioner, combiner=None):
    # Шаг 1: Применение маппинга
    map_partitions = map(lambda record_reader: flatten(map(lambda key_value: map_func(*key_value), record_reader)), input_format())
    
    # Если есть комбинирование, применяем его
    if combiner is not None:
        map_partitions = map(lambda map_partition: flatten(map(lambda key_value: combiner(*key_value), groupbykey(map_partition))), map_partitions)
    
    # Шаг 2: Группировка по ключам и распределение по партициям
    reduce_partitions = group_by_key_distributed(map_partitions, partitioner)
    
    # Шаг 3: Применение редукции
    reduce_outputs = map(lambda reduce_partition: (reduce_partition[0], flatten(map(lambda reduce_input_group: reduce_func(*reduce_input_group), reduce_partition[1]))), reduce_partitions)
    
    # Выводим количество переданных данных по сети
    total_values_sent = sum([len(values) for _, values in flatten([partition for partition_id, partition in reduce_partitions])])
    print(f"{total_values_sent} key-value pairs were sent over a network.")
    
    return reduce_outputs

# Функция для преобразования вложенных итераторов в один
def flatten(nested_iterable):
    for iterable in nested_iterable:
        for element in iterable:
            yield element

# Формат ввода
def input_format():
    def record_reader(split):
        for doc_id, document in enumerate(split):
            for line_id, line in enumerate(document.split('\n')):
                yield (f"{doc_id}:{line_id}", line)
    
    split_size = int(np.ceil(len(documents) / num_mappers))
    for i in range(0, len(documents), split_size):
        yield record_reader(documents[i:i + split_size])

# Маппинг: разбиваем строку на слова и возвращаем их как ключи и значения
def map_func(doc_id: str, line: str):
    # Игнорируем пустые строки
    for word in line.split(" "):
        if word.strip():  # Пропускаем пустые строки или слова
            yield (word, word)

# Редукция: просто возвращаем ключ
def reduce_func(key: str, values):
    yield key

# Запуск MapReduce
partitioned_output = map_reduce_distributed(input_format, map_func, reduce_func, combiner=None)
partitioned_output = [(partition_id, list(partition)) for partition_id, partition in partitioned_output]
print(partitioned_output)


50 key-value pairs were sent over a network.
[(0, ['dog', 'it', 'what']), (1, ['a', 'is'])]


#Операторы реляционной алгебры
### Selection (Выборка)

**The Map Function**: Для  каждого кортежа $t \in R$ вычисляется истинность предиката $C$. В случае истины создаётся пара ключ-значение $(t, t)$. В паре ключ и значение одинаковы, равны $t$.

**The Reduce Function:** Роль функции Reduce выполняет функция идентичности, которая возвращает то же значение, что получила на вход.



In [27]:
from  collections import  namedtuple

In [28]:
Person = namedtuple('Person', ['id', 'gender'])

# Теперь используем namedtuple в relation
relation = [
    Person(1, "male"),
    Person(2, "female"),
    Person(3, "female")
]


def RECORDREADER_select():
    for rec in relation:
        yield (None, rec)

def MAP_select(_, rec):
    if rec.gender == "female":
        # Используем строковое представление как ключ
        yield (rec, rec)

def REDUCE_select(key, values):
    # Выдаем один элемент из группы (они одинаковы)
    yield values[0]
    
def MapReduce(RECORDREADER, MAP, REDUCE):
    return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

    
result_select = list(MapReduce(RECORDREADER_select, MAP_select, REDUCE_select))
print(result_select)

[Person(id=2, gender='female'), Person(id=3, gender='female')]


### Projection (Проекция)

Проекция на множество атрибутов $S$.

**The Map Function:** Для каждого кортежа $t \in R$ создайте кортеж $t′$, исключая  из $t$ те значения, атрибуты которых не принадлежат  $S$. Верните пару $(t′, t′)$.

**The Reduce Function:** Для каждого ключа $t′$, созданного любой Map задачей, вы получаете одну или несколько пар $(t′, t′)$. Reduce функция преобразует $(t′, [t′, t′, . . . , t′])$ в $(t′, t′)$, так, что для ключа $t′$ возвращается одна пара  $(t′, t′)$.

In [29]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500),
    User(id=2, age=35, gender='male', social_contacts=490),
    User(id=3, age=33, gender='female', social_contacts=800)
]

def RECORDREADER():
  return [(u.id, u) for u in input_collection]

# Проекция на атрибуты 'id' и 'age'
attributes = ['id', 'age']

def MAP_projection(_, rec):
    # Проекция на нужные атрибуты
    proj = {attr: getattr(rec, attr) for attr in attributes if hasattr(rec, attr)}
    yield (str(proj), proj)

# Функция REDUCE: для каждого ключа (t′) возвращаем один результат (t′, t′)
def REDUCE_projection(key, values):
    yield values[0] 


result_projection = list(MapReduce(RECORDREADER, MAP_projection, REDUCE_projection))
print(result_projection)

[{'id': 0, 'age': 55}, {'id': 1, 'age': 25}, {'id': 2, 'age': 25}, {'id': 2, 'age': 35}, {'id': 3, 'age': 33}]


### Union (Объединение)

**The Map Function:** Превратите каждый входной кортеж $t$ в пару ключ-значение $(t, t)$.

**The Reduce Function:** С каждым ключом $t$ будет ассоциировано одно или два значений. В обоих случаях создайте $(t, t)$ в качестве выходного значения.

In [30]:
from collections import namedtuple


input_collection_1 = [
    User(id=0, age=55, gender='male', social_contacts=20),
    User(id=1, age=25, gender='female', social_contacts=240),
    User(id=2, age=25, gender='female', social_contacts=500)
]

input_collection_2 = [
    User(id=3, age=35, gender='male', social_contacts=490),
    User(id=4, age=33, gender='female', social_contacts=800)
]


# Функция RECORDREADER: генерирует пары (t, t) для каждого элемента
def RECORDREADER_union():
    for user in input_collection_1 + input_collection_2:
        yield (user, user)

# Функция MAP: для каждого кортежа t генерируем пару (t, t)
def MAP_union(_, t):
    yield (t, t)

# Функция REDUCE: для каждого ключа t возвращаем пару (t, t)
def REDUCE_union(key, values):
    yield (key, key)


result_union = list(MapReduce(RECORDREADER_union, MAP_union, REDUCE_union))
for item in result_union:
    print(item)


(User(id=0, age=55, social_contacts=20, gender='male'), User(id=0, age=55, social_contacts=20, gender='male'))
(User(id=1, age=25, social_contacts=240, gender='female'), User(id=1, age=25, social_contacts=240, gender='female'))
(User(id=2, age=25, social_contacts=500, gender='female'), User(id=2, age=25, social_contacts=500, gender='female'))
(User(id=3, age=35, social_contacts=490, gender='male'), User(id=3, age=35, social_contacts=490, gender='male'))
(User(id=4, age=33, social_contacts=800, gender='female'), User(id=4, age=33, social_contacts=800, gender='female'))


### Intersection (Пересечение)

**The Map Function:** Превратите каждый кортеж $t$ в пары ключ-значение $(t, t)$.

**The Reduce Function:** Если для ключа $t$ есть список из двух элементов $[t, t]$ $-$ создайте пару $(t, t)$. Иначе, ничего не создавайте.

In [31]:
def RECORDREADER_intersection():
    for user in input_collection_1 + input_collection_2:
        yield (user.id, user)

def MAP_intersection(_, t):
    yield (t, t)

def REDUCE_intersection(key, values):
    if len(values) == 2:  # Если ключ встречается в обоих наборах данных
        yield (key, key)

result_intersection = list(MapReduce(RECORDREADER_intersection, MAP_intersection, REDUCE_intersection))
print(result_intersection)


[]


### Difference (Разница)

**The Map Function:** Для кортежа $t \in R$, создайте пару $(t, R)$, и для кортежа $t \in S$, создайте пару $(t, S)$. Задумка заключается в том, чтобы значение пары было именем отношения $R$ or $S$, которому принадлежит кортеж (а лучше, единичный бит, по которому можно два отношения различить $R$ or $S$), а не весь набор атрибутов отношения.

**The Reduce Function:** Для каждого ключа $t$, если соответствующее значение является списком $[R]$, создайте пару $(t, t)$. В иных случаях не предпринимайте действий.

In [32]:

def RECORDREADER_difference():
    return [(0, a) for a in input_collection_1] + [(1, b) for b in input_collection_2]


def MAP_difference(id, user):
    yield (user, id)

def REDUCE_difference(key, values):
    if values == [0]:
        yield (key)

def flatten(iterable):
    for item in iterable:
        if hasattr(item, '__iter__') and not isinstance(item, (str, bytes)):
            for sub in item:
                yield sub
        else:
            yield item

result_difference = list(MapReduce(RECORDREADER_difference, MAP_difference, REDUCE_difference))
print(result_difference)


[User(id=0, age=55, social_contacts=20, gender='male'), User(id=1, age=25, social_contacts=240, gender='female'), User(id=2, age=25, social_contacts=500, gender='female')]


### Natural Join

**The Map Function:** Для каждого кортежа $(a, b)$ отношения $R$, создайте пару $(b,(R, a))$. Для каждого кортежа $(b, c)$ отношения $S$, создайте пару $(b,(S, c))$.

**The Reduce Function:** Каждый ключ $b$ будет асоциирован со списком пар, которые принимают форму либо $(R, a)$, либо $(S, c)$. Создайте все пары, одни, состоящие из  первого компонента $R$, а другие, из первого компонента $S$, то есть $(R, a)$ и $(S, c)$. На выходе вы получаете последовательность пар ключ-значение из списков ключей и значений. Ключ не нужен. Каждое значение, это тройка $(a, b, c)$ такая, что $(R, a)$ и $(S, c)$ это принадлежат входному списку значений.

In [33]:
class User(NamedTuple):
    id: int
    age: str
    gender: str
    social_contacts: int
    location_id: int

class Location(NamedTuple):
    id: int 
    city: str
    country: str

In [34]:
input_collection = [
    User(id=0, age=55, gender='male', social_contacts=20,location_id=101),
    User(id=1, age=25, gender='female', social_contacts=240,location_id=102),
    User(id=2, age=25, gender='female', social_contacts=500,location_id=103),
    User(id=2, age=35, gender='male', social_contacts=490,location_id=102),
    User(id=3, age=33, gender='female', social_contacts=800,location_id=101)
]

locations = [
    Location(id=101, city="New York", country="USA"),
    Location(id=102, city="London", country="UK"),
    Location(id=103, city="Tokyo", country="Japan")
]


In [35]:
def RECORDREADER_join():
    return [(user.location_id, user) for user in input_collection] + [(location.id, location) for location in locations]

def MAP_join(loc_id, row):
    yield (loc_id, row)

def REDUCE_join(loc_id, rows):
    users = []
    loc = None

    for row in rows:
        if type(row) is User:
            users += [row]
        else:
            loc = row

    for row in rows:
        if type(row) is User:
            yield (row, row.location_id, loc)
            
result_join = list(MapReduce(RECORDREADER_join, MAP_join, REDUCE_join))
print(result_join)

[(User(id=0, age=55, gender='male', social_contacts=20, location_id=101), 101, Location(id=101, city='New York', country='USA')), (User(id=3, age=33, gender='female', social_contacts=800, location_id=101), 101, Location(id=101, city='New York', country='USA')), (User(id=1, age=25, gender='female', social_contacts=240, location_id=102), 102, Location(id=102, city='London', country='UK')), (User(id=2, age=35, gender='male', social_contacts=490, location_id=102), 102, Location(id=102, city='London', country='UK')), (User(id=2, age=25, gender='female', social_contacts=500, location_id=103), 103, Location(id=103, city='Tokyo', country='Japan'))]


### Grouping and Aggregation (Группировка и аггрегация)

**The Map Function:** Для каждого кортежа $(a, b, c$) создайте пару $(a, b)$.

**The Reduce Function:** Ключ представляет ту или иную группу. Примение аггрегирующую операцию $\theta$ к списку значений $[b1, b2, . . . , bn]$ ассоциированных с ключом $a$. Возвращайте в выходной поток $(a, x)$, где $x$ результат применения  $\theta$ к списку. Например, если $\theta$ это $SUM$, тогда $x = b1 + b2 + · · · + bn$, а если $\theta$ is $MAX$, тогда $x$ это максимальное из значений $b1, b2, . . . , bn$.

In [36]:
def RECORDREADER_group():
     return [(location_id, user, location) for user, location_id, location in result_join] 

def MAP_group(loc_id, user, loc):
    yield (loc_id, user)

def REDUCE_group(loc_id, rows):
    yield f"location id={loc_id} = {len(rows)} users"

result_group = MapReduce(RECORDREADER_group, MAP_group, REDUCE_group)
print(list(result_join))

[(User(id=0, age=55, gender='male', social_contacts=20, location_id=101), 101, Location(id=101, city='New York', country='USA')), (User(id=3, age=33, gender='female', social_contacts=800, location_id=101), 101, Location(id=101, city='New York', country='USA')), (User(id=1, age=25, gender='female', social_contacts=240, location_id=102), 102, Location(id=102, city='London', country='UK')), (User(id=2, age=35, gender='male', social_contacts=490, location_id=102), 102, Location(id=102, city='London', country='UK')), (User(id=2, age=25, gender='female', social_contacts=500, location_id=103), 103, Location(id=103, city='Tokyo', country='Japan'))]


# 

### Matrix-Vector multiplication

Случай, когда вектор не помещается в памяти Map задачи


In [37]:
input_matrix = [
    (1, 2, 3), (4, 5, 6), (7, 8, 9),
    (10, 11, 12), (13, 14, 15), (16, 17, 18),
    (19, 20, 21), (22, 23, 24), (25, 26, 27),
]

input_vector = [
    (1, 2), (3, 4), (5, 6)
]

# Функция для чтения матрицы
def RECORDREADER_matvec():
    return [(None, m) for m in input_matrix]

# MAP функция для перемножения матрицы и вектора
def MAP_matvec(_, matrix_row):
    row, col, value = matrix_row
    for vector_col, vector_value in input_vector:
        if vector_col == col:
            yield (row, value * vector_value)

# REDUCE функция для суммирования значений
def REDUCE_matvec(row, values):
    yield (row, sum(values))


output = list(MapReduce(RECORDREADER_matvec, MAP_matvec, REDUCE_matvec))

output


[(4, 36)]

## Matrix multiplication (Перемножение матриц)

Если у нас есть матрица $M$ с элементами $m_{ij}$ в строке $i$ и столбце $j$, и матрица $N$ с элементами $n_{jk}$ в строке $j$ и столбце $k$, тогда их произведение $P = MN$ есть матрица $P$ с элементами $p_{ik}$ в строке $i$ и столбце $k$, где

$$p_{ik} =\sum_{j} m_{ij}n_{jk}$$

Необходимым требованием является одинаковое количество столбцов в $M$ и строк в $N$, чтобы операция суммирования по  $j$ была осмысленной. Мы можем размышлять о матрице, как об отношении с тремя атрибутами: номер строки, номер столбца, само значение. Таким образом матрица $M$ предстваляется как отношение $ M(I, J, V )$, с кортежами $(i, j, m_{ij})$, и, аналогично, матрица $N$ представляется как отношение $N(J, K, W)$, с кортежами $(j, k, n_{jk})$. Так как большие матрицы как правило разреженные (большинство значений равно 0), и так как мы можем нулевыми значениями пренебречь (не хранить), такое реляционное представление достаточно эффективно для больших матриц. Однако, возможно, что координаты $i$, $j$, и $k$ неявно закодированы в смещение позиции элемента относительно начала файла, вместо явного хранения. Тогда, функция Map (или Reader) должна быть разработана таким образом, чтобы реконструировать компоненты $I$, $J$, и $K$ кортежей из смещения.

Произведение $MN$ это фактически join, за которым следуют группировка по ключу и аггрегация. Таким образом join отношений $M(I, J, V )$ и $N(J, K, W)$, имеющих общим только атрибут $J$, создаст кортежи $(i, j, k, v, w)$ из каждого кортежа $(i, j, v) \in M$ и кортежа $(j, k, w) \in N$. Такой 5 компонентный кортеж представляет пару элементов матрицы $(m_{ij} , n_{jk})$. Что нам хотелось бы получить на самом деле, это произведение этих элементов, то есть, 4 компонентный кортеж$(i, j, k, v \times w)$, так как он представляет произведение $m_{ij}n_{jk}$. Мы представляем отношение как результат одной MapReduce операции, в которой мы можем произвести группировку и аггрегацию, с $I$ и $K$  атрибутами, по которым идёт группировка, и суммой  $V \times W$. 





In [38]:
# MapReduce model
def flatten(nested_iterable):
  for iterable in nested_iterable:
    for element in iterable:
      yield element

def groupbykey(iterable):
  t = {}
  for (k2, v2) in iterable:
    t[k2] = t.get(k2, []) + [v2]
  return t.items()

def MapReduce(RECORDREADER, MAP, REDUCE):
  return flatten(map(lambda x: REDUCE(*x), groupbykey(flatten(map(lambda x: MAP(*x), RECORDREADER())))))

Реализуйте перемножение матриц с использованием модельного кода MapReduce для одной машины в случае, когда одна матрица хранится в памяти, а другая генерируется RECORDREADER-ом.

In [39]:
import numpy as np
I = 2
J = 3
K = 4*10
small_mat = np.random.rand(I,J) # it is legal to access this from RECORDREADER, MAP, REDUCE
big_mat = np.random.rand(J,K)

def RECORDREADER():
    for j in range(big_mat.shape[0]):
        for k in range(big_mat.shape[1]):
            yield ((j,k), big_mat[j,k])
      
def MAP(k1, v1):
    (j, k) = k1
    w = v1
    for i in range(I):
      yield (i, k), (small_mat[i,j], w)
  
def REDUCE(key, values):
    (i, k) = key
    mat_mul = 0
    for small_val, big_val in values:
      mat_mul += small_val * big_val
    yield ((i, k), mat_mul)

Проверьте своё решение

In [40]:
# CHECK THE SOLUTION
reference_solution = np.matmul(small_mat, big_mat) 
solution = MapReduce(RECORDREADER, MAP, REDUCE)

def asmatrix(reduce_output):
  reduce_output = list(reduce_output)
  I = max(i for ((i,k), vw) in reduce_output)+1
  K = max(k for ((i,k), vw) in reduce_output)+1
  mat = np.empty(shape=(I,K))
  for ((i,k), vw) in reduce_output:
    mat[i,k] = vw
  return mat

np.allclose(reference_solution, asmatrix(solution)) # should return true

True

In [41]:
reduce_output = list(MapReduce(RECORDREADER, MAP, REDUCE))
max(i for ((i,k), vw) in reduce_output)

1

Реализуйте перемножение матриц  с использованием модельного кода MapReduce для одной машины в случае, когда обе матрицы генерируются в RECORDREADER. Например, сначала одна, а потом другая.

In [45]:
def PARTITIONER(key):
    global reducers
    return hash(key) % reducers


In [46]:

I, J, K = 2, 3, 40

small_mat = np.random.rand(I, J)
big_mat = np.random.rand(J, K)

def RECORDREADER():
    for i in range(I):
        for j in range(J):
            yield ((i, j), small_mat[i, j])
    for j in range(J):
        for k in range(K):
            yield ((j, k), big_mat[j, k])

def MAP(item1, item2) -> Iterator:
    (i, j), value_small = item1
    (j2, k), value_big = item2
    if j == j2:
        yield ((i, k), value_small * value_big)

def REDUCE(key, values: Iterator[float]) -> Iterator:
    total_sum = sum(values)
    yield (key, total_sum)



def MapReduce(RECORDREADER, MAP, REDUCE):
    records = list(RECORDREADER())
    records_A = records[:I * J]
    records_B = records[I * J:]
    
    map_results = []
    for a in records_A:
        for b in records_B:
            map_results.extend(list(MAP(a, b)))
    
    grouped = groupbykey(map_results)
    
    reduce_results = []
    for key, values in grouped:
        reduce_results.extend(list(REDUCE(key, values)))
    
    return reduce_results


solution = MapReduce(RECORDREADER, MAP, REDUCE)

reference_solution = np.matmul(small_mat, big_mat)

print("Матрица, полученная через MapReduce:\n", asmatrix(solution))
print("Матрица, полученная через np.matmul:\n", reference_solution)
print("Совпадают ли результаты?", np.allclose(reference_solution, asmatrix(solution)))


Матрица, полученная через MapReduce:
 [[0.69344898 1.15714029 0.78895281 1.33639469 1.14510243 1.04031095
  0.82391379 1.23717313 0.64962312 0.41013825 0.95864276 0.32197672
  0.76316166 0.94126594 1.12459894 0.98508058 0.62772749 0.5341474
  1.39790172 0.71126176 0.66319503 1.13600959 1.31181978 1.14994801
  0.88910175 1.04092292 0.62213529 0.39658638 0.97567059 1.02078964
  0.35224152 0.91319019 0.58922948 0.64294556 0.52053716 1.12221225
  0.6206318  1.53677179 0.5186652  1.46275898]
 [0.36300631 0.72850702 0.44149822 0.98583034 0.94903095 0.66408314
  0.64471716 0.76593339 0.41288413 0.23135345 0.88967174 0.50886433
  0.50024427 0.29172432 0.60631533 0.46513952 0.18194715 0.07862231
  0.79435129 0.57424303 0.60048865 0.96706138 0.68645419 0.69965925
  0.94852644 0.68991709 0.16192874 0.32247161 0.79752936 0.32994309
  0.49914811 0.74819654 0.2457492  0.23216904 0.34059941 1.022792
  0.63061557 1.12133804 0.50812482 0.57399394]]
Матрица, полученная через np.matmul:
 [[0.69344898 1.1

Реализуйте перемножение матриц с использованием модельного кода MapReduce Distributed, когда каждая матрица генерируется в своём RECORDREADER. 

In [47]:
I, J, K = 2, 3, 40
small_mat = np.random.rand(I, J)
big_mat   = np.random.rand(J, K)

reducers = 3

def RECORDREADER_A():
    for i in range(I):
        for j in range(J):
            yield (('A', i, j), small_mat[i, j])
            
def RECORDREADER_B():
    for j in range(J):
        for k in range(K):
            yield (('B', j, k), big_mat[j, k])

def INPUTFORMAT():
    return [RECORDREADER_A(), RECORDREADER_B()]

def MAP(k1, v1):
    if k1[0] == 'A':
        _, i, j = k1
        a_val = v1
        for k in range(K):
            yield ((i, k), ('A', j, a_val))
    else:
        _, j, k = k1
        b_val = v1
        for i in range(I):
            yield ((i, k), ('B', j, b_val))

def COMBINER(k2, values):
    
    combined = {}
    for typ, j, val in values:
        key = (typ, j)
        combined[key] = combined.get(key, 0) + val
    for (typ, j), total in combined.items():
        yield (k2, (typ, j, total))

def REDUCE(k2, values):
    a_dict = {}
    b_dict = {}
    for typ, j, val in values:
        if typ == 'A':
            a_dict[j] = a_dict.get(j, 0) + val
        elif typ == 'B':
            b_dict[j] = b_dict.get(j, 0) + val
    s = 0
    for j in set(a_dict.keys()) & set(b_dict.keys()):
        s += a_dict[j] * b_dict[j]
    yield (k2, s)


distributed_result_iter = MapReduceDistributed(
    INPUTFORMAT=INPUTFORMAT,
    MAP=MAP,
    REDUCE=REDUCE,
    PARTITIONER=PARTITIONER,
    COMBINER=COMBINER
)

final_results = []
for partition_id, results_generator in distributed_result_iter:
    for (key, val) in results_generator:
        final_results.append((key, val))

C = np.zeros((I, K))
for ((i, k), value) in final_results:
    C[i, k] = value

# Проверяем с результатом numpy.dot
real_mult = small_mat.dot(big_mat)
print("Матрица, полученная через MapReduce:\n", C)
print("Матрица, полученная через numpy.dot:\n", real_mult)
print("Совпадает ли результат?", np.allclose(C, real_mult))

480 key-value pairs were sent over a network.
Матрица, полученная через MapReduce:
 [[0.29383169 0.72289452 0.74751656 0.43694946 0.65237524 0.48825533
  0.33077978 0.33956227 0.63267419 0.44054589 0.68247104 0.51750233
  0.16698029 0.38513024 0.9377877  0.14353625 0.25841132 0.30240276
  0.42228553 0.3356601  0.23963143 0.43210256 0.83982005 0.55402941
  0.18920238 0.16217367 0.59265324 0.39853879 0.60952021 0.51426189
  0.46999697 0.54758518 0.41906908 0.36412669 0.3164998  0.53845975
  0.60306668 0.74087874 0.62358186 0.64192154]
 [0.99686186 1.74920735 2.050573   1.4046806  1.43845383 1.2984176
  0.83648231 1.02008025 1.5105342  1.29362802 1.85717813 1.27949802
  0.547963   1.13459505 2.36292576 0.30201154 0.71122998 0.70972459
  1.19960803 0.79646647 0.78864498 1.14722006 2.081121   1.3335884
  0.49780696 0.52474224 1.62623415 0.90748916 1.42757181 1.39878602
  1.36393983 1.33739236 1.23586295 0.94565116 0.87530109 1.31044099
  1.42121153 1.77956857 1.63917598 1.79810712]]
Матрица

Обобщите предыдущее решение на случай, когда каждая матрица генерируется несколькими RECORDREADER-ами, и проверьте его работоспособность. Будет ли работать решение, если RECORDREADER-ы будут генерировать случайное подмножество элементов матрицы?

In [48]:
I, J, K = 2, 3, 40
small_mat = np.random.rand(I, J)
big_mat   = np.random.rand(J, K)

reducers = 3

# Для матрицы A разбиваем строки на две группы
def RECORDREADER_A1():
    for i in range(0, I // 2):
        for j in range(J):
            yield (('A', i, j), small_mat[i, j])

def RECORDREADER_A2():
    for i in range(I // 2, I):
        for j in range(J):
            yield (('A', i, j), small_mat[i, j])

# Для матрицы B разбиваем столбцы на две группы
def RECORDREADER_B1():
    for j in range(J):
        for k in range(0, K // 2):
            yield (('B', j, k), big_mat[j, k])

def RECORDREADER_B2():
    for j in range(J):
        for k in range(K // 2, K):
            yield (('B', j, k), big_mat[j, k])

def INPUTFORMAT():
    return [RECORDREADER_A1(), RECORDREADER_A2(),
            RECORDREADER_B1(), RECORDREADER_B2()]


distributed_result_iter = MapReduceDistributed(
    INPUTFORMAT=INPUTFORMAT,
    MAP=MAP,
    REDUCE=REDUCE,
    PARTITIONER=PARTITIONER,
    COMBINER=COMBINER
)

final_results = []
for partition_id, results_generator in distributed_result_iter:
    for (key, val) in results_generator:
        final_results.append((key, val))

# Собираем итоговую матрицу C по ключам (i,k)
C = np.zeros((I, K))
for ((i, k), value) in final_results:
    C[i, k] = value

# Вычисляем результат перемножения через numpy.dot
real_mult = small_mat.dot(big_mat)
print("Матрица, полученная через MapReduce:\n", C)
print("Матрица, полученная через numpy.dot:\n", real_mult)
print("Совпадают ли результаты?", np.allclose(C, real_mult))

# Решение будет работать, если объединение всех RECORDREADER-ов охватывает все элементы каждой матрицы.
# Если отдельные RECORDREADER-ы генерируют случайное подмножество элементов, но в сумме эти подмножества
# содержат все необходимые данные для вычисления каждого элемента результата, то перемножение будет выполнено корректно.
# Однако, если какой-либо элемент отсутствует во всех RECORDREADER-ах, итоговое произведение будет неполным.


480 key-value pairs were sent over a network.
Матрица, полученная через MapReduce:
 [[0.93331858 0.88469165 0.2055342  0.83223904 0.67455918 1.18350876
  0.5667464  1.04900377 1.04475848 0.1526974  0.42634728 0.37106209
  0.31455125 0.82896878 0.77421285 1.07497107 0.78131239 0.93916045
  0.14094894 0.92811111 0.56187345 0.8526406  0.84508886 0.47005497
  0.25439054 0.92817568 0.24678564 1.05053042 1.16109467 0.39576966
  0.36607763 1.10803933 0.80534546 0.75134841 0.65931211 0.85408006
  0.59336062 0.55434972 1.12801844 0.52443505]
 [1.49152021 1.7043509  0.54123676 1.63505869 1.60631076 1.94563346
  1.4662617  1.47256772 1.90394122 0.21120677 0.94134512 0.69651849
  0.51243872 1.51677064 0.8110271  1.87803013 1.15467169 1.48421527
  0.59025394 1.64947677 1.40617054 1.28292443 1.60886031 0.70538033
  0.6243506  1.37204285 0.62332631 1.61597456 1.8812271  0.89871866
  0.73593272 1.63244887 1.25970147 1.25658177 1.05155157 1.2428233
  1.25666404 1.15556409 1.76869072 1.11416205]]
Матриц