**Занятие первое**

Начнём с простого. Многие знают, что такое map и reduce операции, но всё же для закрпеления мы их тут реализуем. Ах да, не забудем и про shuffle. Делать всё будем на упрощённой задаче с word count для ознакомления с самим подходом.

На самом деле мы рассмотрим всё в упрощённом виде, но это даст нам понимание, как можно через hadoop streaming, например, писать самописные map и reduce операции.

! mapred streaming \
  -input /wiki/sample.jsonl \
  -output /word-count \
  -mapper "/opt/conda/bin/python3.6 mapper.py" \
  -reducer "/opt/conda/bin/python3.6 reducer.py" \
  -file mapper.py \
  -file reducer.py

Выше mapper.py и reducer.py это программы, которые выполняют одноимённые операции над потоком информации из jsonl файла, записывая ответ в файл word-count

In [12]:
from collections import defaultdict
import re

import nltk
nltk.download("punkt_tab")
nltk.download("stopwords")
from nltk.corpus import stopwords

import string

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Давайте загрузим файл с текстом и посмотрим на него

In [2]:
with open("spark_text.txt", "rb") as f:
    data = f.readlines()

data = [text.decode() for text in data if text.decode() != "\r\n"]

In [3]:
len(data)

60

In [4]:
data[1]

'Apache Spark has its architectural foundation in the resilient distributed dataset (RDD), a read-only multiset of data items distributed over a cluster of machines, that is maintained in a fault-tolerant way.[2] The Dataframe API was released as an abstraction on top of the RDD, followed by the Dataset API. In Spark 1.x, the RDD was the primary application programming interface (API), but as of Spark 2.x use of the Dataset API is encouraged[3] even though the RDD API is not deprecated.[4][5] The RDD technology still underlies the Dataset API.[6][7]\r\n'

Как бы мы сделали..
Надо немного почистить слова, а также сделать всё в парадигме MapReduce. Понятно, что можно всё написать проще, но мы ведь хотим понять, как это работает=)

Загрузим стоп слова, очистим от них текст, приведём к нижнему регистру, всем раздадим ключи

In [5]:
stop_words = stopwords.words("english")
stop_words = set(stop_words)

In [6]:
stop_words

{'a',
 'about',
 'above',
 'after',
 'again',
 'against',
 'ain',
 'all',
 'am',
 'an',
 'and',
 'any',
 'are',
 'aren',
 "aren't",
 'as',
 'at',
 'be',
 'because',
 'been',
 'before',
 'being',
 'below',
 'between',
 'both',
 'but',
 'by',
 'can',
 'couldn',
 "couldn't",
 'd',
 'did',
 'didn',
 "didn't",
 'do',
 'does',
 'doesn',
 "doesn't",
 'doing',
 'don',
 "don't",
 'down',
 'during',
 'each',
 'few',
 'for',
 'from',
 'further',
 'had',
 'hadn',
 "hadn't",
 'has',
 'hasn',
 "hasn't",
 'have',
 'haven',
 "haven't",
 'having',
 'he',
 'her',
 'here',
 'hers',
 'herself',
 'him',
 'himself',
 'his',
 'how',
 'i',
 'if',
 'in',
 'into',
 'is',
 'isn',
 "isn't",
 'it',
 "it's",
 'its',
 'itself',
 'just',
 'll',
 'm',
 'ma',
 'me',
 'mightn',
 "mightn't",
 'more',
 'most',
 'mustn',
 "mustn't",
 'my',
 'myself',
 'needn',
 "needn't",
 'no',
 'nor',
 'not',
 'now',
 'o',
 'of',
 'off',
 'on',
 'once',
 'only',
 'or',
 'other',
 'our',
 'ours',
 'ourselves',
 'out',
 'over',
 'own',
 'r

пунктуацию тоже полезно бы удалить

In [7]:
string.punctuation

'!"#$%&\'()*+,-./:;<=>?@[\\]^_`{|}~'

In [8]:
def mapper_text(text: str) -> list[tuple]:

    clean_text = re.sub(rf"[{string.punctuation}]", "", text)
    words = nltk.word_tokenize(clean_text)
    words_with_value = [(word.lower(), 1) for word in words if word not in stop_words]
    words_with_value = sorted(words_with_value, key=lambda x: x[0])

    return words_with_value


def shuffle_text(mapper_result: list[tuple], n_nodes: int = 5) -> list[tuple]:

    shuffled_data = []
    for key, value in mapper_result:
        shuffled_data.append((hash(key) % n_nodes, (key, value)))
    shuffled_data = sorted(shuffled_data, key=lambda x: x[0])
    chunks = create_chunks(shuffled_data)

    return chunks


def create_chunks(shuffled_data: list) -> list[tuple]:

    result = defaultdict(list)
    for idx, data in shuffled_data:
        result[idx].append(data)

    return list(result.items())


# на самом деле для reduce в жизни пишут иначе.. не зря мы сортируем внутри map
# данные по ключам. Это нужно для избавления от этапа проверки ключа и поиска
def reduce_text(values_to_reduce):

    result = defaultdict(int)
    for key, value in values_to_reduce:
        result[key] += 1

    return result

Проверим, что всё работает

Сначала map

In [9]:
data[0]

'Apache Spark is an open-source unified analytics engine for large-scale data processing. Spark provides an interface for programming entire clusters with implicit data parallelism and fault tolerance. Originally developed at the University of California, Berkeley AMPLab, the Spark codebase was later donated to the Apache Software Foundation, which has maintained it since.\r\n'

In [13]:
map_stage = mapper_text(data[0])

In [14]:
map_stage

[('amplab', 1),
 ('analytics', 1),
 ('apache', 1),
 ('apache', 1),
 ('berkeley', 1),
 ('california', 1),
 ('clusters', 1),
 ('codebase', 1),
 ('data', 1),
 ('data', 1),
 ('developed', 1),
 ('donated', 1),
 ('engine', 1),
 ('entire', 1),
 ('fault', 1),
 ('foundation', 1),
 ('implicit', 1),
 ('interface', 1),
 ('largescale', 1),
 ('later', 1),
 ('maintained', 1),
 ('opensource', 1),
 ('originally', 1),
 ('parallelism', 1),
 ('processing', 1),
 ('programming', 1),
 ('provides', 1),
 ('since', 1),
 ('software', 1),
 ('spark', 1),
 ('spark', 1),
 ('spark', 1),
 ('tolerance', 1),
 ('unified', 1),
 ('university', 1)]

shuffle

In [15]:
shuffle_stage = shuffle_text(map_stage, 5)

In [16]:
shuffle_stage

[(0,
  [('developed', 1),
   ('interface', 1),
   ('spark', 1),
   ('spark', 1),
   ('spark', 1),
   ('unified', 1)]),
 (1,
  [('apache', 1),
   ('apache', 1),
   ('california', 1),
   ('data', 1),
   ('data', 1),
   ('entire', 1),
   ('fault', 1),
   ('maintained', 1),
   ('opensource', 1),
   ('since', 1),
   ('software', 1)]),
 (2,
  [('amplab', 1),
   ('berkeley', 1),
   ('clusters', 1),
   ('codebase', 1),
   ('foundation', 1),
   ('later', 1),
   ('programming', 1),
   ('provides', 1)]),
 (3,
  [('analytics', 1),
   ('engine', 1),
   ('implicit', 1),
   ('largescale', 1),
   ('tolerance', 1),
   ('university', 1)]),
 (4,
  [('donated', 1), ('originally', 1), ('parallelism', 1), ('processing', 1)])]

reduce

In [17]:
reduce_text(shuffle_stage[4][1])

defaultdict(int,
            {'donated': 1, 'originally': 1, 'parallelism': 1, 'processing': 1})

Итак, осталось всё рассчитать параллельно и собрать результаты

In [18]:
from joblib import Parallel, delayed

In [19]:
n_nodes = 5

Обернём в 1 функциию для удобства map и shuffle

In [20]:
def map_shuffle(text, n_nodes):

    map_result = mapper_text(text)
    shuffle_result = shuffle_text(map_result, n_nodes)

    return shuffle_result

In [21]:
with Parallel(n_jobs=n_nodes, verbose=10, batch_size=5) as parallel:
    res = parallel(delayed(map_shuffle)(df, n_nodes) for df in data)

[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   9 tasks      | elapsed:    6.7s
[Parallel(n_jobs=5)]: Done  34 tasks      | elapsed:    6.7s
[Parallel(n_jobs=5)]: Done  58 out of  60 | elapsed:    7.0s remaining:    0.2s
[Parallel(n_jobs=5)]: Done  60 out of  60 | elapsed:    7.1s finished


In [22]:
len(res)

60

In [23]:
res[0]

[(0,
  [('codebase', 1),
   ('engine', 1),
   ('entire', 1),
   ('foundation', 1),
   ('later', 1),
   ('opensource', 1),
   ('processing', 1),
   ('tolerance', 1),
   ('unified', 1)]),
 (1,
  [('amplab', 1),
   ('california', 1),
   ('donated', 1),
   ('originally', 1),
   ('software', 1)]),
 (2,
  [('data', 1),
   ('data', 1),
   ('implicit', 1),
   ('interface', 1),
   ('maintained', 1),
   ('since', 1)]),
 (3,
  [('analytics', 1),
   ('berkeley', 1),
   ('fault', 1),
   ('largescale', 1),
   ('spark', 1),
   ('spark', 1),
   ('spark', 1),
   ('university', 1)]),
 (4,
  [('apache', 1),
   ('apache', 1),
   ('clusters', 1),
   ('developed', 1),
   ('parallelism', 1),
   ('programming', 1),
   ('provides', 1)])]

Сделаем что-то вроде перессылки, собирая всё в словари и заодно посмотрим на сколько равномерно распределлиись наши слова

In [24]:
shuffle_stage = {i:[] for i in range(5)}
for values in res:
    values = dict(values)
    for key in values.keys():
        shuffle_stage[key].extend(values[key])

In [25]:
for key in shuffle_stage.keys():
    print(f"{key}: number of words = {len(shuffle_stage[key])}")

0: number of words = 321
1: number of words = 328
2: number of words = 494
3: number of words = 384
4: number of words = 394


И последний этап - нужно сделать reduce

In [26]:
with Parallel(n_jobs=n_nodes, verbose=10, batch_size=5) as parallel:
    res = parallel(delayed(reduce_text)(shuffle_stage[key]) for key in shuffle_stage.keys())

[Parallel(n_jobs=5)]: Using backend LokyBackend with 5 concurrent workers.
[Parallel(n_jobs=5)]: Done   2 out of   5 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=5)]: Done   3 out of   5 | elapsed:    0.0s remaining:    0.0s
[Parallel(n_jobs=5)]: Done   5 out of   5 | elapsed:    0.0s finished


In [27]:
len(res)

5

In [28]:
res[0]

defaultdict(int,
            {'codebase': 1,
             'engine': 1,
             'entire': 1,
             'foundation': 2,
             'later': 1,
             'opensource': 1,
             'processing': 1,
             'tolerance': 1,
             'unified': 1,
             'api': 5,
             'dataframe': 1,
             'distributed': 2,
             'readonly': 1,
             'top': 1,
             'cluster': 1,
             'disk': 3,
             'input': 2,
             'mapreduce': 10,
             'programs': 3,
             'reduction': 1,
             'results': 2,
             'restricted': 1,
             'set': 1,
             'iterative': 1,
             'ie': 1,
             'multiple': 1,
             'algorithms': 7,
             'compared': 2,
             'implementation29': 1,
             'several': 1,
             'systems': 1,
             'also': 13,
             'mesos': 2,
             'testing': 2,
             '11': 1,
             'core': 2,
     

Собираем результат

In [29]:
result = {}
for partition in res:
    for key in partition.keys():
        if key in result:
            result[key] += partition[key]
        else:
            result[key] = partition[key]

In [30]:
sorted(result.items(), key=lambda x: x[1], reverse=True)

[('spark', 110),
 ('apache', 56),
 ('data', 44),
 ('streaming', 36),
 ('distributed', 23),
 ('processing', 19),
 ('sql', 17),
 ('also', 15),
 ('rdd', 15),
 ('learning', 15),
 ('api', 14),
 ('structured', 13),
 ('cluster', 12),
 ('machine', 12),
 ('hadoop', 11),
 ('mapreduce', 10),
 ('interface', 10),
 ('rdds', 10),
 ('provides', 10),
 ('core', 9),
 ('the', 9),
 ('mllib', 9),
 ('programming', 9),
 ('use', 9),
 ('framework', 9),
 ('graph', 9),
 ('application', 8),
 ('python', 8),
 ('code', 8),
 ('tasks', 8),
 ('used', 8),
 ('support', 8),
 ('algorithms', 7),
 ('it', 7),
 ('run', 7),
 ('in', 7),
 ('dataset', 7),
 ('operations', 7),
 ('graphx', 7),
 ('deep', 7),
 ('including', 6),
 ('map', 6),
 ('abstraction', 6),
 ('implementation', 6),
 ('scala', 6),
 ('simple', 6),
 ('developers', 6),
 ('pipelines', 6),
 ('graphs', 6),
 ('big', 6),
 ('two', 6),
 ('batch', 6),
 ('top', 5),
 ('set', 5),
 ('system', 5),
 ('these', 5),
 ('analytics', 5),
 ('much', 5),
 ('across', 5),
 ('java', 5),
 ('new', 

Да, было бы проще всё сделать иным кодом и в один проход, но целью было разобрать, как всё это примерно работает под капотом на больших данных.