### Занятие первое

Начнем с простого. Многие знают что такое map и reduce операции, но все же для закрпеления мы их тут реализуем. Ах да, не забудем и про shuffle. Делать все будем на упрощенной задаче с word count для ознакомления с самим подходом.

На самом деле мы рассмптрим все в упрощенном виде, но это даст нам понимание, как можно через hadoop streaming, например, писать самописные map и reduce операции

! mapred streaming \
  -input /wiki/sample.jsonl \
  -output /word-count \
  -mapper "/opt/conda/bin/python3.6 mapper.py" \
  -reducer "/opt/conda/bin/python3.6 reducer.py" \
  -file mapper.py \
  -file reducer.py

Выше mapper.py и reducer.py это программы, которые выполняют одноименные операции нам потоком информации из jsonl файла, записывая ответ в файл word-count

In [1]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')
from nltk.corpus import stopwords
import re 
import string

[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\Admin\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\Admin\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Давайте загрузим файл с текстом и посмотрим на него

In [2]:
with open('spark_text.txt', 'rb') as f:
    data = f.readlines()
data = [text.decode() for text in data if text.decode() != '\r\n']    

In [3]:
len(data)

100

In [4]:
data[1]

'\n'

Как бы мы сделали..
Надо немного почистить слова, а также сделать все в парадигме MapReduce. Понятно, что можно все написать проще, но мы ведь хотим понять, как это работает=)

Загрузим стоп слова, очистим от них текст, приведем к нижнему регистру, всем раздадим ключи

In [5]:
stop_words = stopwords.words("english")
stop_words = set(stop_words)

In [6]:
stop_words

{'a',
 'about',
 'above',
 'after',
 'again',
 'against',
 'ain',
 'all',
 'am',
 'an',
 'and',
 'any',
 'are',
 'aren',
 "aren't",
 'as',
 'at',
 'be',
 'because',
 'been',
 'before',
 'being',
 'below',
 'between',
 'both',
 'but',
 'by',
 'can',
 'couldn',
 "couldn't",
 'd',
 'did',
 'didn',
 "didn't",
 'do',
 'does',
 'doesn',
 "doesn't",
 'doing',
 'don',
 "don't",
 'down',
 'during',
 'each',
 'few',
 'for',
 'from',
 'further',
 'had',
 'hadn',
 "hadn't",
 'has',
 'hasn',
 "hasn't",
 'have',
 'haven',
 "haven't",
 'having',
 'he',
 'her',
 'here',
 'hers',
 'herself',
 'him',
 'himself',
 'his',
 'how',
 'i',
 'if',
 'in',
 'into',
 'is',
 'isn',
 "isn't",
 'it',
 "it's",
 'its',
 'itself',
 'just',
 'll',
 'm',
 'ma',
 'me',
 'mightn',
 "mightn't",
 'more',
 'most',
 'mustn',
 "mustn't",
 'my',
 'myself',
 'needn',
 "needn't",
 'no',
 'nor',
 'not',
 'now',
 'o',
 'of',
 'off',
 'on',
 'once',
 'only',
 'or',
 'other',
 'our',
 'ours',
 'ourselves',
 'out',
 'over',
 'own',
 'r

пунктуацию тоже полезно бы удалить

In [7]:
string.punctuation

'!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'

In [8]:
def mapper_text(text):
    clean_text = re.sub(rf"[{string.punctuation}]", "", text)
    words = nltk.word_tokenize(clean_text)
    words_with_value = [(word.lower(), 1) for word in words 
                        if word not in stop_words]
    words_with_value = sorted(words_with_value, key=lambda x:x[0])
    return words_with_value

def create_chunks(shuffled_data):
    result = {}
    for idx, data in shuffled_data:
        if idx in result:
            result[idx].append(data)
        else:
            result[idx] = [data]
    return list(result.items())

def shuffle_text(mapper_result, n_nodes=5):
    shuffled_data = []
    for key, value in mapper_result:
        shuffled_data.append((hash(key)%n_nodes, (key, value)))
    shuffled_data = sorted(shuffled_data, key=lambda x: x[0])
    chunks = create_chunks(shuffled_data)
    return chunks


# на самом деле для reduce в жизни пишут иначе..не зря мы сортируем внутри map
#данные по ключам. Это нужно для избавления от этапа проверки ключа и поиска
def reduce_text(values_to_reduce):
    result = {}
    for key, value in values_to_reduce:
        if key in result:
            result[key] += 1
        else:
            result[key] = 1
    return result

Проверим, что все работает

Сначала map

In [9]:
data[0]

'Apache Spark is an open-source unified analytics engine for large-scale data processing. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Originally developed at the University of California, Berkeley AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since.\n'

In [10]:
map_stage = mapper_text(data[0])

In [11]:
map_stage

[('amplab', 1),
 ('analytics', 1),
 ('apache', 1),
 ('apache', 1),
 ('berkeley', 1),
 ('california', 1),
 ('clusters', 1),
 ('codebase', 1),
 ('data', 1),
 ('data', 1),
 ('developed', 1),
 ('donated', 1),
 ('engine', 1),
 ('entire', 1),
 ('fault', 1),
 ('foundation', 1),
 ('implicit', 1),
 ('interface', 1),
 ('largescale', 1),
 ('later', 1),
 ('maintained', 1),
 ('opensource', 1),
 ('originally', 1),
 ('parallelism', 1),
 ('processing', 1),
 ('programming', 1),
 ('provides', 1),
 ('since', 1),
 ('software', 1),
 ('spark', 1),
 ('spark', 1),
 ('spark', 1),
 ('tolerance', 1),
 ('unified', 1),
 ('university', 1)]

Shuffle:

In [12]:
shuffle_stage = shuffle_text(map_stage, 5)

In [13]:
shuffle_stage

[(0,
  [('analytics', 1),
   ('california', 1),
   ('codebase', 1),
   ('largescale', 1),
   ('opensource', 1),
   ('originally', 1),
   ('spark', 1),
   ('spark', 1),
   ('spark', 1),
   ('university', 1)]),
 (1, [('fault', 1), ('software', 1), ('tolerance', 1)]),
 (2,
  [('berkeley', 1),
   ('clusters', 1),
   ('foundation', 1),
   ('implicit', 1),
   ('maintained', 1),
   ('parallelism', 1),
   ('programming', 1),
   ('since', 1),
   ('unified', 1)]),
 (3,
  [('data', 1),
   ('data', 1),
   ('developed', 1),
   ('donated', 1),
   ('interface', 1)]),
 (4,
  [('amplab', 1),
   ('apache', 1),
   ('apache', 1),
   ('engine', 1),
   ('entire', 1),
   ('later', 1),
   ('processing', 1),
   ('provides', 1)])]

Reduce:

In [14]:
reduce_text(shuffle_stage[4][1])

{'amplab': 1,
 'apache': 2,
 'engine': 1,
 'entire': 1,
 'later': 1,
 'processing': 1,
 'provides': 1}

Итак, осталось все рассчитать параллельно и собрать результаты

In [15]:
from joblib import Parallel, delayed

In [16]:
n_nodes = 5

Обернем в 1 функциию для удобства map и shuffle

In [17]:
def map_shuffle(text, n_nodes):
    map_result = mapper_text(text)
    shuffle_result = shuffle_text(map_result, n_nodes)
    return shuffle_result

In [18]:
with Parallel(n_jobs=n_nodes, verbose=10, batch_size=5) as parallel:
    res = parallel(delayed(map_shuffle)(df, n_nodes) for df in data)

[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   6 tasks      | elapsed:    0.7s
[Parallel(n_jobs=5)]: Done  25 tasks      | elapsed:    0.8s
[Parallel(n_jobs=5)]: Done  50 tasks      | elapsed:    0.8s
[Parallel(n_jobs=5)]: Done  85 tasks      | elapsed:    0.8s
[Parallel(n_jobs=5)]: Done  92 tasks      | elapsed:    0.8s
[Parallel(n_jobs=5)]: Done 100 out of 100 | elapsed:    0.8s finished


In [19]:
len(res)

100

In [20]:
res[0]

[(0,
  [('amplab', 1),
   ('berkeley', 1),
   ('clusters', 1),
   ('provides', 1),
   ('unified', 1)]),
 (1,
  [('analytics', 1),
   ('developed', 1),
   ('engine', 1),
   ('foundation', 1),
   ('maintained', 1)]),
 (2,
  [('codebase', 1),
   ('data', 1),
   ('data', 1),
   ('donated', 1),
   ('implicit', 1),
   ('later', 1),
   ('opensource', 1),
   ('parallelism', 1),
   ('programming', 1),
   ('since', 1),
   ('software', 1)]),
 (3,
  [('entire', 1),
   ('fault', 1),
   ('interface', 1),
   ('originally', 1),
   ('tolerance', 1),
   ('university', 1)]),
 (4,
  [('apache', 1),
   ('apache', 1),
   ('california', 1),
   ('largescale', 1),
   ('processing', 1),
   ('spark', 1),
   ('spark', 1),
   ('spark', 1)])]

Сделаем что-то вроде перессылки, собирая все в словари и заодно посмотрим на сколько равномерно распределлиись наши слова

In [21]:
shuffle_stage = {i:[] for i in range(5)}
for values in res:
    values = dict(values)
    for key in values.keys():
        shuffle_stage[key].extend(values[key])

In [22]:
for key in shuffle_stage.keys():
    print(f'{key}: number of words = {len(shuffle_stage[key])}')

0: number of words = 381
1: number of words = 361
2: number of words = 403
3: number of words = 320
4: number of words = 456


И последний этап - нужно сделать reduce

In [23]:
with Parallel(n_jobs=n_nodes, verbose=10, batch_size=5) as parallel:
    res = parallel(delayed(reduce_text)(shuffle_stage[key]) for key in shuffle_stage.keys())

[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   2 out of   5 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=5)]: Done   3 out of   5 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=5)]: Done   5 out of   5 | elapsed:    0.0s finished


In [24]:
len(res)

5

In [25]:
res[0]

{'amplab': 1,
 'berkeley': 1,
 'clusters': 2,
 'provides': 9,
 'unified': 2,
 'apache': 11,
 'followed': 1,
 'still': 1,
 'technology': 2,
 'underlies': 1,
 'computing': 1,
 'function': 2,
 'map': 5,
 'particular': 1,
 'read': 1,
 'response': 2,
 'results': 2,
 'distributed': 4,
 'shared': 1,
 'implementation': 1,
 'applications': 1,
 'data': 7,
 'may': 1,
 'among': 1,
 'hadoop': 5,
 'impetus': 1,
 'implementation29': 1,
 'initial': 1,
 'mapreduce': 3,
 'several': 1,
 'spark10': 1,
 'manually': 1,
 'package': 1,
 'provided': 1,
 'requires': 2,
 'scripts': 1,
 'single': 2,
 'development': 1,
 'kudu': 1,
 'lustre': 1,
 'mapr': 1,
 's3': 2,
 'swift': 1,
 'system': 3,
 'api': 1,
 'application': 1,
 'connect': 1,
 'java': 4,
 'programming': 3,
 'python': 4,
 'usable': 1,
 'additional': 1,
 'filter': 1,
 'lazy': 1,
 'lineage': 1,
 'ones': 1,
 'operations': 4,
 'produced': 1,
 'schedules': 1,
 'these': 3,
 'track': 1,
 'besides': 1,
 'forms': 1,
 'rddoriented': 1,
 'readonly': 1,
 'a': 2,
 'a

Собираем результат

In [26]:
result = {}
for partition in res:
    for key in partition.keys():
        if key in result:
            result[key] += partition[key]
        else:
            result[key] = partition[key]

In [27]:
sorted(result.items(), key=lambda x: x[1], reverse=True)

[('spark', 110),
 ('apache', 56),
 ('data', 44),
 ('streaming', 36),
 ('distributed', 23),
 ('processing', 19),
 ('sql', 17),
 ('rdd', 15),
 ('learning', 15),
 ('also', 15),
 ('api', 14),
 ('structured', 13),
 ('cluster', 12),
 ('machine', 12),
 ('hadoop', 11),
 ('provides', 10),
 ('mapreduce', 10),
 ('rdds', 10),
 ('interface', 10),
 ('programming', 9),
 ('framework', 9),
 ('the', 9),
 ('core', 9),
 ('mllib', 9),
 ('use', 9),
 ('graph', 9),
 ('application', 8),
 ('python', 8),
 ('tasks', 8),
 ('used', 8),
 ('support', 8),
 ('code', 8),
 ('operations', 7),
 ('dataset', 7),
 ('it', 7),
 ('algorithms', 7),
 ('in', 7),
 ('graphx', 7),
 ('deep', 7),
 ('run', 7),
 ('map', 6),
 ('implementation', 6),
 ('including', 6),
 ('pipelines', 6),
 ('big', 6),
 ('simple', 6),
 ('abstraction', 6),
 ('batch', 6),
 ('graphs', 6),
 ('two', 6),
 ('developers', 6),
 ('scala', 6),
 ('computing', 5),
 ('applications', 5),
 ('system', 5),
 ('java', 5),
 ('these', 5),
 ('analytics', 5),
 ('much', 5),
 ('across'

Да, было бы проще все сделать иным кодом и в один проход, но целью было разобрать, как все это примерно работает под капотом на больших данных.

### Домашнее задание

Посчитать количество рейтингов больше 4 для каждого фильма и вывести фильмы в порядке убывания количества этих оценок

In [28]:
with open('user_ratedmovies.dat', 'rb') as f:
    data = f.readlines()
headers = data[0].decode().split('\t')[:3]
data = [row.decode().split('\t')[:3] for row in data[1:]]

In [29]:
headers

['userID', 'movieID', 'rating']

In [30]:
data[0]

['75', '3', '1']

In [31]:
len(data)

855598

Пишем map, shiffle и reduce + параллелим вычисления. Лучше задавать batch_size при распараллеливании, либо даже заранее все разбить на батчи, будет быстрее

Также посмотрите на то, нет ли перекоса в данных после shuffle, можете попробовать использовать остаток от деления не простого hash, а ввести какую-то функию

In [32]:
def create_batches(data, batch_size):
    batches = [data[i:i+batch_size] for i in range(0, len(data), batch_size)]
    return batches

def map_rating(rows, movie_idx=1, rating_idx=2):
    rows_with_value = [
        (row[movie_idx], 1)
        for row in rows
        if float(row[rating_idx]) >= 4
    ]
    return sorted(rows_with_value, key=lambda x:x[0])

def create_chunks(shuffled_data):
    result = {}
    for idx, data in shuffled_data:
        if idx in result:
            result[idx].append(data)
        else:
            result[idx] = [data]
    return list(result.items())

def shuffle_rating(mapper_result, n_nodes=5):
    shuffled_data = []
    for key, value in mapper_result:
        shuffled_data.append((hash(key) % n_nodes, (key, value)))
    shuffled_data = sorted(shuffled_data, key=lambda x: x[0])
    chunks = create_chunks(shuffled_data)
    return chunks

def reduce_rating(map_row):
    result = {}
    for key, value in map_row:
        if key in result:
            result[key] += 1
        else:
            result[key] = 1
    return result

def map_shuffle(rows_batch, n_nodes):
    map_result = map_rating(rows_batch)
    shuffle_result = shuffle_rating(map_result, n_nodes)
    return shuffle_result

После reduce все можно собрать в одном цикле, считаем, что данные переслали после на 1 машину и агрегируем

#### Создадим батчи

In [33]:
BATCH_SIZE = 1024
batches = create_batches(data, BATCH_SIZE)

In [34]:
len(batches)

836

In [35]:
batches[0]

[['75', '3', '1'],
 ['75', '32', '4.5'],
 ['75', '110', '4'],
 ['75', '160', '2'],
 ['75', '163', '4'],
 ['75', '165', '4.5'],
 ['75', '173', '3.5'],
 ['75', '296', '5'],
 ['75', '353', '3.5'],
 ['75', '420', '2'],
 ['75', '589', '4'],
 ['75', '653', '3'],
 ['75', '832', '4.5'],
 ['75', '920', '0.5'],
 ['75', '996', '4.5'],
 ['75', '1036', '4'],
 ['75', '1127', '3.5'],
 ['75', '1215', '4.5'],
 ['75', '1233', '4'],
 ['75', '1304', '2.5'],
 ['75', '1370', '4'],
 ['75', '1374', '4'],
 ['75', '1485', '4'],
 ['75', '1527', '4.5'],
 ['75', '1917', '2.5'],
 ['75', '2011', '2'],
 ['75', '2054', '1.5'],
 ['75', '2058', '4'],
 ['75', '2490', '4'],
 ['75', '2571', '4.5'],
 ['75', '2640', '3'],
 ['75', '2688', '3'],
 ['75', '2700', '4.5'],
 ['75', '2762', '3.5'],
 ['75', '2959', '4.5'],
 ['75', '3258', '1.5'],
 ['75', '3793', '3'],
 ['75', '3889', '3'],
 ['75', '3994', '3.5'],
 ['75', '4993', '3.5'],
 ['75', '5107', '3'],
 ['75', '5833', '2.5'],
 ['75', '5952', '3.5'],
 ['75', '6213', '4'],
 ['75'

#### MapReduce на 1 батче

##### Map:

In [36]:
map_stage = map_rating(batches[0])

In [37]:
display(map_stage)

[('1', 1),
 ('1', 1),
 ('101', 1),
 ('1028', 1),
 ('1036', 1),
 ('1036', 1),
 ('1036', 1),
 ('104', 1),
 ('104', 1),
 ('1041', 1),
 ('105', 1),
 ('1060', 1),
 ('1077', 1),
 ('1079', 1),
 ('1080', 1),
 ('1080', 1),
 ('1084', 1),
 ('1085', 1),
 ('1089', 1),
 ('1089', 1),
 ('1090', 1),
 ('1090', 1),
 ('1094', 1),
 ('1095', 1),
 ('110', 1),
 ('110', 1),
 ('110', 1),
 ('1101', 1),
 ('111', 1),
 ('111', 1),
 ('1131', 1),
 ('1136', 1),
 ('1136', 1),
 ('1136', 1),
 ('1148', 1),
 ('1150', 1),
 ('1161', 1),
 ('1172', 1),
 ('1172', 1),
 ('1175', 1),
 ('1178', 1),
 ('1179', 1),
 ('1179', 1),
 ('1183', 1),
 ('1185', 1),
 ('1186', 1),
 ('1188', 1),
 ('1189', 1),
 ('1193', 1),
 ('1193', 1),
 ('1196', 1),
 ('1196', 1),
 ('1197', 1),
 ('1197', 1),
 ('1197', 1),
 ('1198', 1),
 ('1198', 1),
 ('1198', 1),
 ('1199', 1),
 ('1200', 1),
 ('1200', 1),
 ('1200', 1),
 ('1201', 1),
 ('1201', 1),
 ('1202', 1),
 ('1203', 1),
 ('1204', 1),
 ('1206', 1),
 ('1206', 1),
 ('1207', 1),
 ('1208', 1),
 ('1208', 1),
 ('1209

##### Shuffle:

In [38]:
shuffle_stage = shuffle_rating(map_stage, 5)

In [39]:
display(shuffle_stage)

[(0,
  [('1079', 1),
   ('1089', 1),
   ('1089', 1),
   ('1090', 1),
   ('1090', 1),
   ('1136', 1),
   ('1136', 1),
   ('1136', 1),
   ('1183', 1),
   ('1185', 1),
   ('1186', 1),
   ('1196', 1),
   ('1196', 1),
   ('1200', 1),
   ('1200', 1),
   ('1200', 1),
   ('1203', 1),
   ('1213', 1),
   ('1214', 1),
   ('1214', 1),
   ('1238', 1),
   ('1246', 1),
   ('1248', 1),
   ('1248', 1),
   ('1249', 1),
   ('1254', 1),
   ('1262', 1),
   ('1266', 1),
   ('1274', 1),
   ('1278', 1),
   ('1278', 1),
   ('1285', 1),
   ('1287', 1),
   ('1291', 1),
   ('1296', 1),
   ('1299', 1),
   ('1299', 1),
   ('1346', 1),
   ('1358', 1),
   ('1374', 1),
   ('1374', 1),
   ('1485', 1),
   ('1617', 1),
   ('1629', 1),
   ('1674', 1),
   ('17', 1),
   ('1721', 1),
   ('1834', 1),
   ('1952', 1),
   ('1965', 1),
   ('2009', 1),
   ('2076', 1),
   ('2100', 1),
   ('2161', 1),
   ('2290', 1),
   ('2346', 1),
   ('2395', 1),
   ('2529', 1),
   ('260', 1),
   ('260', 1),
   ('26203', 1),
   ('26285', 1),
   ('

##### Reduce:

In [40]:
display(reduce_rating(shuffle_stage[0][1]))

{'1079': 1,
 '1089': 2,
 '1090': 2,
 '1136': 3,
 '1183': 1,
 '1185': 1,
 '1186': 1,
 '1196': 2,
 '1200': 3,
 '1203': 1,
 '1213': 1,
 '1214': 2,
 '1238': 1,
 '1246': 1,
 '1248': 2,
 '1249': 1,
 '1254': 1,
 '1262': 1,
 '1266': 1,
 '1274': 1,
 '1278': 2,
 '1285': 1,
 '1287': 1,
 '1291': 1,
 '1296': 1,
 '1299': 2,
 '1346': 1,
 '1358': 1,
 '1374': 2,
 '1485': 1,
 '1617': 1,
 '1629': 1,
 '1674': 1,
 '17': 1,
 '1721': 1,
 '1834': 1,
 '1952': 1,
 '1965': 1,
 '2009': 1,
 '2076': 1,
 '2100': 1,
 '2161': 1,
 '2290': 1,
 '2346': 1,
 '2395': 1,
 '2529': 1,
 '260': 2,
 '26203': 1,
 '26285': 1,
 '2819': 1,
 '2919': 1,
 '2921': 1,
 '293': 1,
 '2970': 1,
 '3070': 1,
 '3072': 1,
 '3210': 1,
 '3224': 1,
 '3326': 1,
 '3328': 1,
 '3359': 2,
 '3363': 1,
 '33794': 1,
 '3448': 1,
 '3683': 2,
 '3703': 1,
 '3751': 1,
 '3783': 1,
 '3836': 1,
 '39': 1,
 '39292': 1,
 '3967': 1,
 '4019': 1,
 '4034': 1,
 '4226': 3,
 '4251': 1,
 '4278': 1,
 '4407': 1,
 '44191': 2,
 '45081': 1,
 '45726': 1,
 '46976': 1,
 '4842': 1,
 '

#### MapReduce на всех батчах

##### Map и Shuffle:

In [41]:
n_nodes = 5

In [42]:
with Parallel(n_jobs=n_nodes, verbose=10, batch_size=BATCH_SIZE) as parallel:
    res = parallel(delayed(map_shuffle)(df, n_nodes) for df in batches)

[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   6 tasks      | elapsed:    0.0s
[Parallel(n_jobs=5)]: Done  58 tasks      | elapsed:    0.0s
[Parallel(n_jobs=5)]: Done 138 tasks      | elapsed:    0.1s
[Parallel(n_jobs=5)]: Done 250 tasks      | elapsed:    0.2s
[Parallel(n_jobs=5)]: Done 362 tasks      | elapsed:    0.3s
[Parallel(n_jobs=5)]: Done 506 tasks      | elapsed:    0.4s
[Parallel(n_jobs=5)]: Done 650 tasks      | elapsed:    0.5s
[Parallel(n_jobs=5)]: Done 836 out of 836 | elapsed:    0.6s finished


In [43]:
len(res)

836

In [44]:
res[0]

[(0,
  [('1028', 1),
   ('1041', 1),
   ('1089', 1),
   ('1089', 1),
   ('1094', 1),
   ('1148', 1),
   ('1161', 1),
   ('1172', 1),
   ('1172', 1),
   ('1179', 1),
   ('1179', 1),
   ('1186', 1),
   ('1197', 1),
   ('1197', 1),
   ('1197', 1),
   ('1201', 1),
   ('1201', 1),
   ('1208', 1),
   ('1208', 1),
   ('1212', 1),
   ('1217', 1),
   ('1217', 1),
   ('1232', 1),
   ('1234', 1),
   ('1234', 1),
   ('1243', 1),
   ('1248', 1),
   ('1248', 1),
   ('1253', 1),
   ('1260', 1),
   ('1276', 1),
   ('1278', 1),
   ('1278', 1),
   ('1280', 1),
   ('1285', 1),
   ('1288', 1),
   ('1288', 1),
   ('1358', 1),
   ('1485', 1),
   ('1653', 1),
   ('1663', 1),
   ('1674', 1),
   ('1682', 1),
   ('1694', 1),
   ('1721', 1),
   ('1916', 1),
   ('1931', 1),
   ('1968', 1),
   ('2009', 1),
   ('2019', 1),
   ('2064', 1),
   ('2108', 1),
   ('2116', 1),
   ('2174', 1),
   ('2174', 1),
   ('2290', 1),
   ('2346', 1),
   ('2360', 1),
   ('2396', 1),
   ('2396', 1),
   ('2490', 1),
   ('2700', 1),
   

##### Пересылка:

In [45]:
shuffle_stage = {i:[] for i in range(5)}
for values in res:
    values = dict(values)
    for key in values.keys():
        shuffle_stage[key].extend(values[key])

In [46]:
for key in shuffle_stage.keys():
    print(f'Node {key}: number of movies = {len(shuffle_stage[key])}')

Node 0: number of movies = 72633
Node 1: number of movies = 76435
Node 2: number of movies = 75659
Node 3: number of movies = 76474
Node 4: number of movies = 74904


##### Reduce:

In [47]:
with Parallel(n_jobs=n_nodes, verbose=10, batch_size=BATCH_SIZE) as parallel:
    res = parallel(delayed(reduce_text)(shuffle_stage[key]) for key in shuffle_stage.keys())

[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   2 out of   5 | elapsed:    0.1s remaining:    0.1s
[Parallel(n_jobs=5)]: Done   3 out of   5 | elapsed:    0.1s remaining:    0.0s
[Parallel(n_jobs=5)]: Done   5 out of   5 | elapsed:    0.1s finished


In [48]:
len(res)

5

In [49]:
len(res[0])

4540

##### Собираем результат воедино:

In [50]:
result = {}
for partition in res:
    for key in partition.keys():
        if key in result:
            result[key] += partition[key]
        else:
            result[key] = partition[key]

In [51]:
sorted(result.items(), key=lambda x: x[1], reverse=True)

[('2571', 1300),
 ('296', 1249),
 ('318', 1227),
 ('4993', 1168),
 ('2959', 1145),
 ('2858', 1099),
 ('5952', 1097),
 ('7153', 1085),
 ('593', 1061),
 ('356', 1054),
 ('4226', 1053),
 ('50', 1026),
 ('260', 1014),
 ('1198', 989),
 ('1196', 986),
 ('2762', 982),
 ('858', 949),
 ('527', 921),
 ('47', 903),
 ('4973', 900),
 ('7361', 894),
 ('4306', 865),
 ('1136', 862),
 ('1210', 836),
 ('2028', 834),
 ('1270', 834),
 ('32', 818),
 ('3578', 816),
 ('6874', 813),
 ('608', 798),
 ('6539', 797),
 ('589', 785),
 ('8961', 783),
 ('33794', 778),
 ('1193', 773),
 ('6377', 771),
 ('1089', 764),
 ('110', 751),
 ('2329', 747),
 ('4995', 746),
 ('1291', 744),
 ('541', 744),
 ('1214', 741),
 ('7438', 729),
 ('3996', 724),
 ('32587', 721),
 ('1', 713),
 ('4886', 708),
 ('1704', 707),
 ('1265', 702),
 ('1206', 696),
 ('1221', 693),
 ('2997', 690),
 ('4878', 669),
 ('1240', 669),
 ('1682', 665),
 ('6711', 658),
 ('1213', 658),
 ('293', 654),
 ('750', 654),
 ('5418', 649),
 ('1036', 649),
 ('5445', 648),

##### Проверим, что все правильно отработало:

In [52]:
from collections import Counter

In [53]:
good_movies = [row[1] for row in filter(lambda row: float(row[2]) >= 4, data)]
display(Counter(good_movies).most_common(5))

[('2571', 1300), ('296', 1249), ('318', 1227), ('4993', 1168), ('2959', 1145)]

In [54]:
import IPython
IPython.Application.instance().kernel.do_shutdown(True)

{'status': 'ok', 'restart': True}