# MapReduce

Сегодня мы коротко обсудим что представляет из себя технология MapReduce и разберем пару примеров ее использования.

## 1. Мотивировка

Представим, что к вам пришел менеджер со следующей задачей: надо уметь определять сколько каких слов встречается в отзывах к ресторанам. На вход вы получаете текст отзыва, на выходе словарь, где для каждого слова указано количество его повоторений. Вы быстро реализовали прототип на питоне.

In [1]:
import re
import collections

def extract_words(text):
    delimiters = [' ', '.', '?', '!', ':', ',', '-', '"']
    regex_pattern = '|'.join(map(re.escape, delimiters))
    for word in re.split(regex_pattern, text):
        if word:
            yield word.lower().strip()
            

def word_counts(text):
    return collections.Counter(extract_words(text))

In [2]:
word_counts(
    ('The food is no doubt superb, especially the wagyu. Staff were very attentive.' 
    'I have to say we were really impressed.')
)

Counter({'the': 2,
         'food': 1,
         'is': 1,
         'no': 1,
         'doubt': 1,
         'superb': 1,
         'especially': 1,
         'wagyu': 1,
         'staff': 1,
         'were': 2,
         'very': 1,
         'attentive': 1,
         'i': 1,
         'have': 1,
         'to': 1,
         'say': 1,
         'we': 1,
         'really': 1,
         'impressed': 1})

Идея с подсчетом оказалась очень удачной - ваш бизнес растет как грибы после дождя. Было решено адаптировать ее к анализу частот слов в запросах пользователей вашего интернет магазина. Начальство оптимистично смотрит в будущее и считает, что скоро нужно будет обрабатывать миллионы запросов в день. Вам же осталось всего лишь адаптировать прототип. Только есть одна проблема - теперь надо обарабатывать не несколько сотен или тысяч текстов, а десятки миллионов. Тут вы понимаете, что для решения задачи вам не хватит одной (даже очень мощной машины) хотя бы потому, что размер обрабатываемых данных не поместится ни на один диск! Именно с такой проблемой впервые столкнулись в компании Google в начале нулевых. Там же было придумано решение которое они называли [MapReduce](https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf).

## 2. Постановка задачи и требования

Итак, вам нужно написать систему которая позволяла бы обрабатывать действительно огромные объемы данных. Естественно, что она должна уметь решать не только нашу задачу про подсчет слов, но и достаточно широкий класс других задач.

Ограничение от которого вы не сможете избавиться -- это хранение огромного массива данных. Так как все эти данные не поместятся на одну машину, то вам придется использовать несколько. Совокупность всех этих машин называют *кластером*, а сами машины *нодами*. Необходимость работы с несколькими машинами создает множество проблем. А именно, вам понадобятся:

 - **Распределенная файловая система.** Именно в ней будут хранится все массивы данных с которыми вы собрались работать.
 - **Модель вычислений.** Работа с кластером должна происходит через такой фреймворк/модель вычислений чтобы пользователи могли легко писать распараллеливаемые программы. Без параллелизации код пользователей не сможет полностью задействовать ресурсы кластера.
 - **Контроль исполнения задач.** На кластере будут запускаться задачи разных пользователей . Нужна система диспетчеризации и распараллеливания.
 - **Система управления ресурсами.** Разные задачи требуют разных ресурсов. Чьи-то задачи требуют много процессорного времени, а кто-то просто преобразует огромный массив данных в другой формат. Нужна система для оптимального распределения процессоров, жестких дисков, оперативной памяти.
 
При построении всех этих компонент надо учитывать важную особенность - в больших кластерах всегда ломается какая-нибудь машина, а может и несколько. Причин может быть много - сломалась или тормозит сеть, вышел из строя диск, пользовательская операция исчерпала все ресурсы или вошла в бесконечный цикл, машину отключили для замены оборудования. Кластер должен сам решать эти проблемы или минимизировать участие пользователя в их решении. Такое требование называется *fault tolerance*.

## 3. Распределенная файловая система

#### Интерфейс

Задача распределенной файловой системы дать пользователю удобную абстракцию, в которой он мог работать именно с файлами и не задумываться о деталях реализации. Естественной и привычной абстракцией была бы файловая система локальных компьютеров. Другими словами, мы хотим
 - Организацию файлов в по директориям
 - Возможность обращатся к файлам через пути. Например,
 ```
 ls //home/my_dir_with_big_data
 ```
 - Возможность создания, копирования и переноса директорий и файлов. Например,
 ```
 cp //home/my_dir1/big_table //home/my_dir2/big_table_copy
 ```
 - Возможность конкатенации файлов и приписывания к ним новых данных
 - Возможность получения метаинформации о файлах. Например размер, права доступа, время создания и модфикации.

Конечно, давать пользователю весь интерфейс обычной файловой системы здесь не получится хотя бы потому, что некоторые операции (например, запись в произвольное место файла) сложно реализовать эффективно в распределенной файловой системе.

#### Чанки

Работа с данными распределенными по нескольким машинам значительно отличается от работы на локальной файловой системе. Файлы которые мы будем хранить, наверняка не поместятся на одной машине. Решение здесь достаточно простое - мы разбиваем такие файлы на чанки (*chunks*) и храним на машинах именно их. Оптимальный размер чанка обычно несколько десятков или сотен мегабайт. Так как безотказных машин не бывает, то нельзя хранить на кластере чанки в единственном экземпляре. Каждый чанк надо продублировать на нескольких других машинах. Количество копий называется *коэффициентом репликации*. Из-за того, что данные хранятся в избыточном виде, а жесткие диски стоят денег, то чанки надо уметь архивировать. Для данных к которым редко обращаются можно использовать алгоритмы с сильным коэффициентом сжатия. 

Дупликация данных имеет и недостатки. Например, если вы захотите изменить содержимое одного чанка, то вы должны сделать это атомарно и со всеми его копиями. Это сложная задача, поэтому в первых реализациях распределнных файловых систем не давали такой возможности, а разрешали лишь добавлять новые чанки. Получается, в общем случае, для изменения файла вам надо его полность создать заново.

#### Метаданные

Как и в любой файловой системе надо уметь хранить не только сами данные, но и дополнительную информацию об этих данных. Например,

 - для каждого файла мы должны знать все его чанки 
 - для каждого чанка надо знать машины на которых он лежит
 - для каждой машины надо знать какие чанки на ней хранились, (на случай отказа машины)
 - для каждого файла мы должны знать его права доступа, время создания и прочую служебную информацию
 
Все это вместе называют метаданными. Они не занимают много места и поэтому их удобно хранить на одной машине которую называют *мастером*. Он занимается хранением и атомарным обновлением и получением метаинформации о файлах кластера. Вся метаинформация хранится в оперативной памяти мастера. Отметим, что сам мастер работает только с метаданными. Когда пользователи хотят прочитать или записать в файлы, то пересылка данных происходит между пользователем и машинами кластера, но по особому протоколу в котором участвует мастер. Мастер следит за консистентностью метаданных и успешностью операции записи/чтения.

#### Отказоустойчивость

Для поддержания консистентности мастер должен знать какие машины кластера в рабочем состоянии. Для этого он ожидает, что машины регулярно с постоянным интервалом времени будут отправлять мастеру сообщения о своем состоянии (так называемые *heartbeat*). Если от какой-то машины не приходит heartbeat, то она помечается как сломанная для чанков которые хранились на ней запускается процесс повышения коэффициента репликации.

Слабым местом во всей этой схеме есть и остается мастер. Если он сломается или потеряет свое состояние, то весь кластер будет в нерабочем состоянии. Поэтому регулярно требуется делать бэкапы состояния мастера. Для большей надежности рядом с мастером держат еще одну или несколько машин (*вторичные мастера*), которые сохраняют у себя состояние мастера и включаются в работу если отказал основной мастер.

![cluster](cluster.png)

## 4. Модель вычислений Map Reduce

Для обработки больших объемов данных необходимо использовать параллелизацию. Алгоритмы которые можно разбить на подзадачи выполняемые одновременно на нескольких вычислителях называют конкурентными. Написание таких алгоритмов довольно сложное занятие, поэтому нужно придумать фреймворк/библиотеку/модель вычислений которая позволяла бы писать легко распараллеливаемые алгоритмы. Такой моделью вычислений оказалась модель Map Reduce о которой мы сейчас поговорим.

Map и reduce это функции высшего порядка пришедшие из функциональных языков. Известно что программы написанные в функциональном стиле хорошо распараллеливаются. Если программисты будут писать свои алгоритмы в терминах функций map и reduce, то их программы будут автоматически обладать конкурентностью и задачу параллелизации можно будет переложить с плеч программистов на фреймворк в рамках которого они будут писать код. Разберем как устроены эти функции.

### Функция map

Функция map имеет два аргумента
 - Первый аргумент -- пользовательская функция $f:U\to List[V]$. Она получает на вход один объект типа $U$ и генерирует список элементов типа $V$
 - Второй аргумент -- список $L$ типа $List[U]$. 
 
Далее map поэлементно применяет функцию $f$ (именно тут происходит распараллеливание) к элментам списка $L$. Получающиеся результаты от каждого применения объединяются в один список типа $List[V]$.

### Функция reduce

Функция reduce имеет два аргумента
 - Первый аргумент - функция $g:Pair[K, List[V]]\to List[W]$. Она получает на вход пару из ключа типа $K$ и списка  значений типа $V$ и генерирует список элементов типа $W$.
 - Второй аргумент - список $L$ типа $List[Pair[K, List[V]]]$ состоящий из пар вида: ключ и его список значений.
 
Далее reduce применяет пользовательскую функцию $g$ (именно тут происходит распараллеливание) к элментам списка $L$. Получающиеся результаты от каждого применения объединяются в один список типа $List[W]$. 

### Протокол

На первый взгляд функция reduce является частным случаем функции map, но их стоит рассматривать как разные, потому что у них следюущий протокол работы.

 1. Фаза **map** применяет пользовательскую функцию $f$ к элементам $u$ списка $L$ и получает список  $L'$
 2. Фаза **shuffle** для каждого элемента $v$ списка $L'$ назначается ключ $k$. Обычно этот ключ - часть данных хранящися в самом элементе $v$. Элементы списка $L'$ имеющие один и тот же ключ собираются в одну группу и формируется пара $(k, [v_1,\ldots,v_{n(k)}])$. Список таких пар обозначим $L''$.
 4. Фаза **reduce** применяет пользовательскую функцию $g$ к парам из списка  $L''$ и получает результирующий список $L'''$

![mapreduce](mapreduce.png)

Теоретически доказано, что подобная модель вычислений Тьюринг полная. Другими словами на ней можно реализовать любой алгоритм, но не факт что эффективно.

Теперь код для фаз map и reduce задачи подсчета слов на гипотетическом фреймфорке для MapReduce мог бы выглядеть приблизительно так

In [3]:
def f(record):
    for word in extract_words(record['text']):
        yield {'word': word, 'count': 1}

def g(key, records):
    yield {'word': key, 'total': sum(record['count'] for record in records)}

### Детали реализации

В теории все легко и просто, но на практике появляется много нюансов о которых мы сейчас поговорим. 

В общем случае пользовательская операция может содержать только map фазу или только reduce фазу либо обе. Как мы обсудили выше данные операции легко распараллеливаются на более мелкие подзадачи. Эти подзадачи мы будем называть джобами (*jobs*). По сути джоба это процесс запускаемый на машине кластера.

#### Чтение входа и запись выхода операции

Для каждой из рассматриваемых операций на входе и на выходе находятся файлы. Модель вычислений предполагает, что входные данные состоят из элементов одинакового типа. Поэтому чтение и запись выглядят немного сложнее чем думалось.
Весь процесс выглядит так

1. На входе каждой операции должен стоять читатель данных (InputFormat), который обрабатывает чанки файла и разбивает его на части называемые сплитами (*splits*). Тут конечно могут возникнуть проблемы с несвпадением границ сплитов и чанков, но это легко решаемая задача. 

2. Далее каждый сплит сериализуется и отправляется по сети на свою машину и там запускается отдельная джоба. 

3. В джобе вычитывается сплит и разбивается на записи (*records*) c помощью *RecordsReader*, которые отправляются в пользовательскую функцию.  После того как пользовательская функция закончила обработку она возвращает другие записи. Эти записи накапливаются в памяти процесса в некотором буфере и при переполнении данные сбрасываются из буфера на диск. При сбросе запускается код (*OutputFormat* и *OutputReader*) преобразующий записи в выходной формат. Принцип их работы симметричен работе *InputFormat* и *InputReader*.

4. Если выход джобы не помещается на самой машине, то результат пересылается по сети на другие.

#### Работа джобы для map фазы

Мы уже обсудили, что вход для оперции формируется в виде записей. В случае map операции в джобе запускается простой цикл который применяет пользовательскую функцию к записям и возвращает результат. В случае map_reduce операции записи, если их не очень много, не складываются на диск а сразу из памяти джобы отправляются на шаг shuffling.

#### Организация shuffling фазы

Это самая тяжелая стадия map_reduce операций, потому что здесь будут задействованы сортировки и что еще хуже - передача данных по сети. Эта фаза начинается когда отработают все джобы родительской map фазы. Для работы shuffling фазы пользователь должен указать количество reduce джобов $M$ которые он будет в дальнейшем запускать и код (*Partitioner*) который умеет по ключу вычислять номер reduce джобы в которую отправятся данные этого ключа. Далее работа просходит следующим образом:

1. В рамках одной джобы фазы map из каждой записи $v$ мы достаем ключ $k$, далее *Partitioner* выдает номер $p$ reduce джобы в которую будет отправлена пара $(k,v)$. Совокупность пар отправляющихся в одну reduce джобу называется партицией (*partion*). По умолчанию номер партиции вычисляется как остаток от деления некоторого хэша ключа $k$ на количество reduce джобов. Трюк с вычислением $p$ нужен для того чтобы равномерно распределить выход map фазы по reduce джобам.

2. Далее, в рамках одной джобы все тройки $(p, k, v)$ сортируются по $(p, k)$.

3. Теперь со всех машин на которых идет shuffling нашей оперции начинают пересылаться записи $(p, k, v)$. Машина назначения вычисляется по номеру $p$. Каждая машина назначения делает merge sort по ключу k приходящих потоков данных. Этот процесс отработает корректно, потому что входные данные были отсортированы по $(p, k)$.

#### Работа джобы для  reduce фазы

После фазы shuffle на входе у reduce джобы будет набор чанков в которых данные организованы так что записи с одним ключом идут подряд. Reduce джоба читает каждый такой диапазон записей и запускает пользовательскую функцию $g$. Результат записывается на диск с использованием *OutputFormat* и *OutputReader*.

#### Подводим итог

Как видно для map_reduce операции нам потребуется

 - Код для чтения данных (*InputFormat*)
 - Код для map фазы (*Mapper*) 
 - Код для reduce фазы (*Reducer*) 
 - Количество reduce джоб $M$
 - Код для разбиения выхода маппера на партиции (*Partitioner*)
 - Код для записи результата (*OutputFormat*)

Это позволяет достаточно тонко настроить и оптимизировать операцию, особенно если вы запускаете ее регулярно. На практике обычно

 - Используются дефолтные форматы входа и выхода, и эти форматы обычно совпадают
 - Количество необходимых reduce джобов вычисляется эвристически
 - Используется дефолтный *Partitioner*

Таким образом от пользователя в стандартной ситуации требуется только необходымий минимум - код для map и reduce фаз.

## 5. Контроль исполнения задач и управление реурсами

Теперь когда мы разобрали в деталях работу операций поговорим о том как ее инциализировать и провести контроль исполнения. Для этих задач на мастере есть еще одна система. Будем называть ее шедулер (*scheduler*). Она  
 - Следит за доступностью машин
 - Следит за распределением вычислительных ресурсов на кластере
 - Запускает операции и следит за их выполненеим.
 
### Контроль исполнения
 
Разберем как идет работа для map_reduce операции:

1. Пользователь отправляет на кластер запрос на запуск операции
2. Шедулер создает на одной из свободных и доступных машин процесс аппмастер (*application master*). Этот процесс будет следить за исполнением операции и запрашивать ресурсы у шедулера.
3. Аппмастер скачивает себе пользовательский код в виде (.jar архивов если это java или .pyc файлов если это питон)
4. Аппмастер сообщает шедулеру пути к файлам на которых он хочет запустить map фазу. Также он сообщает какие ему нужны для этого ресурсы.
5. Шедулер пытается выделить запрашиваемые ресурсы на тех машинах где хранятся чанки запрашиваемых файлов, чтобы избежать лишней пересылки данных. Если это не удается то ресурсы выделяются где придется.
6. Аппмастер запускает джобы map фазы. Обычно это контейнеры (типа docker контейнеров), в которых будет работать код фазы и пользовательской функции. 
7. Аппмастер оценивает количество необходимых reduce джобов (или использует заданное значение) и запрашивает у шедулера машины для сохранения результатов shuffle фазы.
8. Шедулер выдает ему список машин.
11. Аппмастер запускает shuffle фазу, а потом reduce джобы.
12. После завершения операции управление возвращается в пользовательский код

### Управление ресурсами

Операции кластера используют дисковое пространство, вычислительные процессоры, оперативную память и сеть. Распределением этих ресурсов занимается шедулер. Он, зная статистику уже используемых ресурсов, пытается для вновь пришедших задач  распределить оставшиеся ресурсы наиболее честным образом. Алгоритм зависит от используемого понятия честности. Один из первых подходов - одинаковое распределение ресурсов между процессами. Но если один пользователь создаст много операций, то получит бОльшую часть вычислительных ресурсов кластера. Поэтому предпочтительнее распределение ресурсов по пользователям и группам пользователей. Для этой задачи есть алгоритм [fair share](https://en.wikipedia.org/wiki/Fair-share_scheduling).

### Обработка ошибок

Шедулер следит за доступностью машин. Если одна из машин с джобой упала, то шедулер и сообщит об этом аппмастеру. Аппмастер перезапустит джобу. Но тут есть вопрос какую. Если упала джоба map фазы то достаточно перезапустить только ее. Если упала джоба reduce фазы, то перезапускается вся операция так как вход reduce джобы может зависеть от выхода всех мапперов. Казалось бы этого можно избежать, сохраняя промежуточные результаты, но практика показывает, что проще перезапускать упавшие джобы/операции чем хранить промежуточные результаты и тратить дисковые ресурсы кластера.