**Занятие первое**

Начнем с простого. Многие знают что такое map и reduce операции, но все же для закрпеления мы их тут реализуем. Ах да, не забудем и про shuffle. Делать все будем на упрощенной задаче с word count для ознакомления с самим подходом.

На самом деле мы рассмптрим все в упрощенном виде, но это даст нам понимание, как можно через hadoop streaming, например, писать самописные map и reduce операции

! mapred streaming \
  -input /wiki/sample.jsonl \
  -output /word-count \
  -mapper "/opt/conda/bin/python3.6 mapper.py" \
  -reducer "/opt/conda/bin/python3.6 reducer.py" \
  -file mapper.py \
  -file reducer.py

Выше mapper.py и reducer.py это программы, которые выполняют одноименные операции нам потоком информации из jsonl файла, записывая ответ в файл word-count

In [None]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')
from nltk.corpus import stopwords
import re
import string

from joblib import Parallel, delayed

from google.colab import drive
drive.mount('/content/drive')

Давайте загрузим файл с текстом и посмотрим на него

In [None]:
with open('/content/drive/My Drive/Colab Notebooks/PySpark/spark_text.txt', 'rb') as f:
    data = f.readlines()
data = [text.decode() for text in data if text.decode() != '\r\n']

In [None]:
len(data)

In [None]:
data[1]

Как бы мы сделали..
Надо немного почистить слова, а также сделать все в парадигме MapReduce. Понятно, что можно все написать проще, но мы ведь хотим понять, как это работает=)

Загрузим стоп слова, очистим от них текст, приведем к нижнему регистру, всем раздадим ключи

In [None]:
stop_words = stopwords.words("english")
stop_words = set(stop_words)

In [None]:
stop_words

пунктуацию тоже полезно бы удалить

In [None]:
string.punctuation

In [None]:
def mapper_text(text):
    clean_text = re.sub(rf"[{string.punctuation}]", "", text)
    words = nltk.word_tokenize(clean_text)
    words_with_value = [(word.lower(), 1) for word in words
                        if word not in stop_words]
    words_with_value = sorted(words_with_value, key=lambda x:x[0])
    return words_with_value

def create_chunks(shuffled_data):
    result = {}
    for idx, data in shuffled_data:
        if idx in result:
            result[idx].append(data)
        else:
            result[idx] = [data]
    return list(result.items())

def shuffle_text(mapper_result, n_nodes=5):
    shuffled_data = []
    for key, value in mapper_result:
        shuffled_data.append((hash(key)%n_nodes, (key, value)))
    shuffled_data = sorted(shuffled_data, key=lambda x: x[0])
    chunks = create_chunks(shuffled_data)
    return chunks


# на самом деле для reduce в жизни пишут иначе..не зря мы сортируем внутри map
#данные по ключам. Это нужно для избавления от этапа проверки ключа и поиска
def reduce_text(values_to_reduce):
    result = {}
    for key, value in values_to_reduce:
        if key in result:
            result[key] += 1
        else:
            result[key] = 1
    return result

Проверим, что все работает

Сначала map

In [None]:
data[0]

In [None]:
map_stage = mapper_text(data[0])

In [None]:
map_stage

shuffle

In [None]:
shuffle_stage = shuffle_text(map_stage, 5)

In [None]:
shuffle_stage

reduce

In [None]:
reduce_text(shuffle_stage[4][1])

Итак, осталось все рассчитать параллельно и собрать результаты

In [27]:
n_nodes = 5

Обернем в 1 функциию для удобства map и shuffle

In [None]:
def map_shuffle(text, n_nodes):
    map_result = mapper_text(text)
    shuffle_result = shuffle_text(map_result, n_nodes)
    return shuffle_result

In [None]:
with Parallel(n_jobs=n_nodes, verbose=10, batch_size=5) as parallel:
    res = parallel(delayed(map_shuffle)(df, n_nodes) for df in data)

In [None]:
len(res)

In [None]:
res[0]

Сделаем что-то вроде перессылки, собирая все в словари и заодно посмотрим на сколько равномерно распределлиись наши слова

In [None]:
shuffle_stage = {i:[] for i in range(5)}
for values in res:
    values = dict(values)
    for key in values.keys():
        shuffle_stage[key].extend(values[key])

In [None]:
for key in shuffle_stage.keys():
    print(f'{key}: number of words = {len(shuffle_stage[key])}')

И последний этап - нужно сделать reduce

In [None]:
with Parallel(n_jobs=n_nodes, verbose=10, batch_size=5) as parallel:
    res = parallel(delayed(reduce_text)(shuffle_stage[key]) for key in shuffle_stage.keys())

In [None]:
len(res)

In [None]:
res[0]

Собираем результат

In [None]:
result = {}
for partition in res:
    for key in partition.keys():
        if key in result:
            result[key] += partition[key]
        else:
            result[key] = partition[key]

In [None]:
sorted(result.items(), key=lambda x: x[1], reverse=True)

Да, было бы проще все сделать иным кодом и в один проход, но целью было разобрать, как все это примерно работает под капотом на больших данных.

**Домашнее задание**

Посчитать количество рейтингов больше 4 для каждого фильма и вывести фильмы в порядке убывания количества этих оценок

In [1]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')
from nltk.corpus import stopwords
import re
import string

from joblib import Parallel, delayed

from google.colab import drive
drive.mount('/content/drive')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


Mounted at /content/drive


In [3]:
with open('/content/drive/My Drive/Colab_Notebooks/PySpark/user_ratedmovies.dat', 'rb') as f:
    data = f.readlines()
headers = data[0].decode().split('\t')[:3]
data = [row.decode().split('\t')[:3] for row in data[1:]]

In [4]:
headers

['userID', 'movieID', 'rating']

In [5]:
data[0]

['75', '3', '1']

In [6]:
len(data)

855598

Пишем map, shiffle и reduce + параллелим вычисления. Лучше задавать batch_size при распараллеливании, либо даже заранее все разбить на батчи, будет быстрее

Также посмотрите на то, нет ли перекоса в данных после shuffle, можете попробовать использовать остаток от деления не простого hash, а ввести какую-то функию

In [7]:
def map_rating(row):
    if float(row[2]) > 4:
        return row[1], 1
    else:
        return row[1], 0

def create_chunks(shuffled_data):
    result = {}
    for idx, data in shuffled_data:
        if idx in result:
            result[idx].append(data)
        else:
            result[idx] = [data]
    return list(result.items())

def shuffle_rating(mapper_result, n_nodes=5):
    shuffled_data = []
    for key, value in mapper_result:
        shuffled_data.append((hash(key)%n_nodes, (key, value)))
    shuffled_data = sorted(shuffled_data, key=lambda x: x[0])
    chunks = create_chunks(shuffled_data)
    return chunks

def reduce_rating(values_to_reduce):
    result = {}
    for key, value in values_to_reduce:
        if key in result:
            result[key] += value
        else:
            result[key] = value
    return result

def map_shuffle(batch, n_nodes):
    map_result = list()
    for row in batch:
        map_result.append(map_rating(row))
    shuffle_result = shuffle_rating(map_result, n_nodes)
    return shuffle_result

In [8]:
n_nodes = 5

n_batches = 100
batch_size = len(data)//n_batches

data_batches = [data[i*batch_size:(i+1)*batch_size] for i in range(n_batches+1)]

In [9]:
with Parallel(n_jobs=n_nodes, verbose=10) as parallel:
    res = parallel(delayed(map_shuffle)(batch, n_nodes) for batch in data_batches)

[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   3 tasks      | elapsed:    2.6s
[Parallel(n_jobs=5)]: Done   8 tasks      | elapsed:    2.8s
[Parallel(n_jobs=5)]: Done  15 tasks      | elapsed:    3.3s
[Parallel(n_jobs=5)]: Done  22 tasks      | elapsed:    3.8s
[Parallel(n_jobs=5)]: Done  31 tasks      | elapsed:    4.3s
[Parallel(n_jobs=5)]: Done  40 tasks      | elapsed:    4.7s
[Parallel(n_jobs=5)]: Done  51 tasks      | elapsed:    5.3s
[Parallel(n_jobs=5)]: Done  62 tasks      | elapsed:    5.8s
[Parallel(n_jobs=5)]: Done  75 tasks      | elapsed:    6.3s
[Parallel(n_jobs=5)]: Done  88 tasks      | elapsed:    6.8s
[Parallel(n_jobs=5)]: Done 101 out of 101 | elapsed:    7.3s finished


In [10]:
len(res)

101

In [20]:
res[0][0][1][:10]

[('1036', 0),
 ('1127', 0),
 ('1233', 0),
 ('1304', 0),
 ('1485', 0),
 ('1917', 0),
 ('2640', 0),
 ('2959', 1),
 ('3258', 0),
 ('5952', 0)]

In [21]:
shuffle_stage = {i:[] for i in range(n_nodes)}
for values in res:
    values = dict(values)
    for key in values.keys():
        shuffle_stage[key].extend(values[key])

In [22]:
shuffle_stage[0][:10]

[('1036', 0),
 ('1127', 0),
 ('1233', 0),
 ('1304', 0),
 ('1485', 0),
 ('1917', 0),
 ('2640', 0),
 ('2959', 1),
 ('3258', 0),
 ('5952', 0)]

In [23]:
for key in shuffle_stage.keys():
    print(f'{key}: number of films = {len(shuffle_stage[key])}')

0: number of films = 168354
1: number of films = 177666
2: number of films = 168259
3: number of films = 172311
4: number of films = 169008


In [24]:
with Parallel(n_jobs=n_nodes, verbose=10, batch_size=5) as parallel:
    res = parallel(delayed(reduce_rating)(shuffle_stage[key]) for key in shuffle_stage.keys())

[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   2 out of   5 | elapsed:    1.0s remaining:    1.5s
[Parallel(n_jobs=5)]: Done   3 out of   5 | elapsed:    1.0s remaining:    0.7s
[Parallel(n_jobs=5)]: Done   5 out of   5 | elapsed:    1.5s finished


In [31]:
list(res[0].items())[:10]

[('1036', 61),
 ('1127', 38),
 ('1233', 80),
 ('1304', 29),
 ('1485', 8),
 ('1917', 31),
 ('2640', 23),
 ('2959', 169),
 ('3258', 2),
 ('5952', 125)]

После reduce все можно собрать в одном цикле, считаем, что данные переслали после на 1 машину и агрегируем

In [34]:
result = {}
for partition in res:
    for key in partition.keys():
        if key in result:
            result[key] += partition[key]
        else:
            result[key] = partition[key]

In [35]:
result = sorted(result.items(), key=lambda x: x[1], reverse=True)
result[:10]

[('2571', 900),
 ('318', 887),
 ('296', 878),
 ('2959', 828),
 ('4993', 756),
 ('7153', 719),
 ('5952', 697),
 ('858', 690),
 ('50', 688),
 ('2858', 680)]

In [36]:
# наивная проверка

films = dict()
for row in data:
  if row[1] not in films:
    films[row[1]] = int(float(row[2]) > 4)
  else:
    films[row[1]] += int(float(row[2]) > 4)

films = sorted(films.items(), key=lambda x: x[1], reverse=True)
films[:10]

[('2571', 900),
 ('318', 887),
 ('296', 878),
 ('2959', 828),
 ('4993', 756),
 ('7153', 719),
 ('5952', 697),
 ('858', 690),
 ('50', 688),
 ('2858', 680)]