**Занятие первое**

Начнем с простого. Многие знают что такое map и reduce операции, но все же для закрпеления мы их тут реализуем. Ах да, не забудем и про shuffle. Делать все будем на упрощенной задаче с word count для ознакомления с самим подходом.

На самом деле мы рассмптрим все в упрощенном виде, но это даст нам понимание, как можно через hadoop streaming, например, писать самописные map и reduce операции

! mapred streaming \
  -input /wiki/sample.jsonl \
  -output /word-count \
  -mapper "/opt/conda/bin/python3.6 mapper.py" \
  -reducer "/opt/conda/bin/python3.6 reducer.py" \
  -file mapper.py \
  -file reducer.py

Выше mapper.py и reducer.py это программы, которые выполняют одноименные операции нам потоком информации из jsonl файла, записывая ответ в файл word-count

In [None]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')
from nltk.corpus import stopwords
import re 
import string

Давайте загрузим файл с текстом и посмотрим на него

In [None]:
with open('spark_text.txt', 'rb') as f:
    data = f.readlines()
data = [text.decode() for text in data if text.decode() != '\r\n']    

In [None]:
len(data)

In [None]:
data[1]

Как бы мы сделали..
Надо немного почистить слова, а также сделать все в парадигме MapReduce. Понятно, что можно все написать проще, но мы ведь хотим понять, как это работает=)

Загрузим стоп слова, очистим от них текст, приведем к нижнему регистру, всем раздадим ключи

In [None]:
stop_words = stopwords.words("english")
stop_words = set(stop_words)

In [None]:
stop_words

пунктуацию тоже полезно бы удалить

In [None]:
string.punctuation

In [None]:
def mapper_text(text):
    clean_text = re.sub(rf"[{string.punctuation}]", "", text)
    words = nltk.word_tokenize(clean_text)
    words_with_value = [(word.lower(), 1) for word in words 
                        if word not in stop_words]
    words_with_value = sorted(words_with_value, key=lambda x:x[0])
    return words_with_value

def create_chunks(shuffled_data):
    result = {}
    for idx, data in shuffled_data:
        if idx in result:
            result[idx].append(data)
        else:
            result[idx] = [data]
    return list(result.items())

def shuffle_text(mapper_result, n_nodes=5):
    shuffled_data = []
    for key, value in mapper_result:
        shuffled_data.append((hash(key)%n_nodes, (key, value)))
    shuffled_data = sorted(shuffled_data, key=lambda x: x[0])
    chunks = create_chunks(shuffled_data)
    return chunks


# на самом деле для reduce в жизни пишут иначе..не зря мы сортируем внутри map
#данные по ключам. Это нужно для избавления от этапа проверки ключа и поиска
def reduce_text(values_to_reduce):
    result = {}
    for key, value in values_to_reduce:
        if key in result:
            result[key] += 1
        else:
            result[key] = 1
    return result

Проверим, что все работает

Сначала map

In [None]:
data[0]

In [None]:
map_stage = mapper_text(data[0])

In [None]:
map_stage

shuffle

In [None]:
shuffle_stage = shuffle_text(map_stage, 5)

In [None]:
shuffle_stage

reduce

In [None]:
reduce_text(shuffle_stage[4][1])

Итак, осталось все рассчитать параллельно и собрать результаты

In [None]:
from joblib import Parallel, delayed

In [None]:
n_nodes = 5

Обернем в 1 функциию для удобства map и shuffle

In [None]:
def map_shuffle(text, n_nodes):
    map_result = mapper_text(text)
    shuffle_result = shuffle_text(map_result, n_nodes)
    return shuffle_result

In [None]:
with Parallel(n_jobs=n_nodes, verbose=10, batch_size=5) as parallel:
    res = parallel(delayed(map_shuffle)(df, n_nodes) for df in data)

In [None]:
len(res)

In [None]:
res[0]

Сделаем что-то вроде перессылки, собирая все в словари и заодно посмотрим на сколько равномерно распределлиись наши слова

In [None]:
shuffle_stage = {i:[] for i in range(5)}
for values in res:
    values = dict(values)
    for key in values.keys():
        shuffle_stage[key].extend(values[key])

In [None]:
for key in shuffle_stage.keys():
    print(f'{key}: number of words = {len(shuffle_stage[key])}')

И последний этап - нужно сделать reduce

In [None]:
with Parallel(n_jobs=n_nodes, verbose=10, batch_size=5) as parallel:
    res = parallel(delayed(reduce_text)(shuffle_stage[key]) for key in shuffle_stage.keys())

In [None]:
len(res)

In [None]:
res[0]

Собираем результат

In [None]:
result = {}
for partition in res:
    for key in partition.keys():
        if key in result:
            result[key] += partition[key]
        else:
            result[key] = partition[key]

In [None]:
sorted(result.items(), key=lambda x: x[1], reverse=True)

Да, было бы проще все сделать иным кодом и в один проход, но целью было разобрать, как все это примерно работает под капотом на больших данных.

**Домашнее задание**

Посчитать количество рейтингов больше 4 для каждого фильма и вывести фильмы в порядке убывания количества этих оценок

In [None]:
with open('user_ratedmovies.dat', 'rb') as f:
    data = f.readlines()
headers = data[0].decode().split('\t')[:3]
data = [row.decode().split('\t')[:3] for row in data[1:]]

In [None]:
headers

In [None]:
data[0]

In [None]:
len(data)

Пишем map, shiffle и reduce + параллелим вычисления. Лучше задавать batch_size при распараллеливании, либо даже заранее все разбить на батчи, будет быстрее

Также посмотрите на то, нет ли перекоса в данных после shuffle, можете попробовать использовать остаток от деления не простого hash, а ввести какую-то функию

In [None]:
def map_rating(row):
    pass

def create_chunks(shuffled_data):
    pass

def shuffle_rating(mapper_result, n_nodes=5):
    pass

def reduce_rating(map_row):
    pass

def map_shuffle(text, n_nodes):
    pass

In [None]:
with Parallel(n_jobs=n_nodes, verbose=10) as parallel:
    pass

In [None]:
with Parallel(n_jobs=n_nodes, verbose=10) as parallel:
    pass

После reduce все можно собрать в одном цикле как на семинаре

В качестве ответа вывести топ 10 фильмов с наибольшим числом оценок более 4

### Срок выполнения домашего задания - 23.09.2024