# WordCount, RDD, Logical plan

In [2]:
from pyspark import SparkConf
from pyspark import SparkContext

In [39]:
conf = SparkConf()
conf = conf.set('spark.driver.memory', '512m')
conf = conf.set('spark.executor.memory', '512m')

In [40]:
sc = SparkContext('spark://185739.simplecloud.ru:7077', appName="Word Count App. Dmitry", conf=conf)

In [6]:
sc._conf.getAll()

[('spark.driver.memory', '512m'),
 ('spark.executor.memory', '512m'),
 ('spark.app.id', 'app-20200514143316-0020'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '185739.simplecloud.ru'),
 ('spark.app.name', 'Word Count App. Dmitry'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.driver.port', '40488'),
 ('spark.master', 'spark://185739.simplecloud.ru:7077')]

In [3]:
import os
PATH_TO_FILES = '/home/dmitry/pyspark-training/files'

## Попробуем сначала для одного файла

In [8]:
# read data from text file and split each line into words
file = sc.textFile(os.path.join(PATH_TO_FILES, 'tezd.txt'))
file

/home/dmitry/pyspark-training/files/tezd.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [9]:
file.getNumPartitions() # почему две партиции, ведь файл всего один?

2

In [10]:
file.glom().collect() 

[['dasds dw dsa dsa sd ', 'dweqd', 'wdq', '', 'wdgtglrt'],
 ['', 'Big brown fox jumped over a lazy dog.']]

In [11]:
words = file.flatMap(lambda line: line.split(" "))

In [12]:
words.getNumPartitions()

2

In [13]:
# count the occurrence of each word
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b: a + b)
wordCounts

PythonRDD[7] at RDD at PythonRDD.scala:53

In [14]:
wordCounts.getNumPartitions()

2

In [15]:
# save the counts to output
wordCounts.collect()

[('', 3),
 ('dog.', 1),
 ('dsa', 2),
 ('dweqd', 1),
 ('wdgtglrt', 1),
 ('a', 1),
 ('dw', 1),
 ('sd', 1),
 ('brown', 1),
 ('Big', 1),
 ('wdq', 1),
 ('over', 1),
 ('jumped', 1),
 ('fox', 1),
 ('dasds', 1),
 ('lazy', 1)]

## А теперь для нескольких
Что поменяется?

In [41]:
# read data from text file and split each line into words
files = sc.textFile(os.path.join(PATH_TO_FILES, 'multiple/*.txt'))
files

/home/dmitry/pyspark-training/files/multiple/*.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0

In [17]:
files.getNumPartitions()

4

In [18]:
files.glom().collect() 

[['Aenean leo ligula, porttitor eu, consequat vitae, eleifend ac, enim. Aliquam lorem ante, dapibus in, viverra quis, feugiat a, tellus. Phasellus viverra nulla ut metus varius laoreet. Quisque rutrum. Aenean imperdiet. Etiam ultricies nisi vel augue. Curabitur ullamcorper ultricies nisi. Nam eget dui. Etiam rhoncus. Maecenas tempus, tellus eget condimentum rhoncus, sem quam semper libero, sit amet adipiscing sem neque sed ipsum. Nam quam nunc, blandit vel, luctus pulvinar, hendrerit id, lorem. Maecenas nec odio et ante tincidunt tempus. Donec vitae sapien ut libero venenatis faucibus. Nullam quis ante. Etiam sit amet orci eget eros faucibus tincidunt. Duis leo. Sed fringilla mauris sit amet nibh. Donec sodales sagittis magna. Sed consequat, leo eget bibendum sodales, augue velit cursus nunc,'],
 ['Donec pede justo, fringilla vel, aliquet nec, vulputate eget, arcu. In enim justo, rhoncus ut, imperdiet a, venenatis vitae, justo. Nullam dictum felis eu pede mollis pretium. Integer tincid

In [42]:
words = files.flatMap(lambda line: line.split(" "))
words

PythonRDD[2] at RDD at PythonRDD.scala:53

In [20]:
words.getNumPartitions()

4

In [21]:
words.map(lambda word: (word, 1)).glom().collect()

[[('Aenean', 1),
  ('leo', 1),
  ('ligula,', 1),
  ('porttitor', 1),
  ('eu,', 1),
  ('consequat', 1),
  ('vitae,', 1),
  ('eleifend', 1),
  ('ac,', 1),
  ('enim.', 1),
  ('Aliquam', 1),
  ('lorem', 1),
  ('ante,', 1),
  ('dapibus', 1),
  ('in,', 1),
  ('viverra', 1),
  ('quis,', 1),
  ('feugiat', 1),
  ('a,', 1),
  ('tellus.', 1),
  ('Phasellus', 1),
  ('viverra', 1),
  ('nulla', 1),
  ('ut', 1),
  ('metus', 1),
  ('varius', 1),
  ('laoreet.', 1),
  ('Quisque', 1),
  ('rutrum.', 1),
  ('Aenean', 1),
  ('imperdiet.', 1),
  ('Etiam', 1),
  ('ultricies', 1),
  ('nisi', 1),
  ('vel', 1),
  ('augue.', 1),
  ('Curabitur', 1),
  ('ullamcorper', 1),
  ('ultricies', 1),
  ('nisi.', 1),
  ('Nam', 1),
  ('eget', 1),
  ('dui.', 1),
  ('Etiam', 1),
  ('rhoncus.', 1),
  ('Maecenas', 1),
  ('tempus,', 1),
  ('tellus', 1),
  ('eget', 1),
  ('condimentum', 1),
  ('rhoncus,', 1),
  ('sem', 1),
  ('quam', 1),
  ('semper', 1),
  ('libero,', 1),
  ('sit', 1),
  ('amet', 1),
  ('adipiscing', 1),
  ('sem', 

In [43]:
# count the occurrence of each word
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b: a + b)
wordCounts

PythonRDD[7] at RDD at PythonRDD.scala:53

In [23]:
wordCounts.getNumPartitions()

4

In [44]:
# save the counts to output
wordCounts.collect()

[('', 1),
 ('Maecenas', 2),
 ('eget,', 3),
 ('tincidunt.', 2),
 ('lorem.', 1),
 ('metus', 1),
 ('porttitor', 1),
 ('vel,', 4),
 ('tellus', 1),
 ('ut,', 1),
 ('varius', 1),
 ('Lorem', 2),
 ('sapien', 1),
 ('justo.', 1),
 ('mus.', 2),
 ('mollis', 1),
 ('ipsum', 2),
 ('libero,', 1),
 ('sodales', 1),
 ('pellentesque', 2),
 ('quam', 4),
 ('enim.', 3),
 ('Aliquam', 1),
 ('vitae', 1),
 ('feugiat', 1),
 ('Quisque', 1),
 ('sagittis', 1),
 ('condimentum', 1),
 ('elit.', 2),
 ('nascetur', 2),
 ('ipsum.', 1),
 ('rutrum.', 1),
 ('fringilla', 4),
 ('Vivamus', 1),
 ('pede', 4),
 ('ullamcorper', 1),
 ('dapibus', 1),
 ('faucibus', 1),
 ('nulla', 1),
 ('Nam', 2),
 ('nisi', 1),
 ('dictum', 1),
 ('ante', 1),
 ('leo', 2),
 ('ligula', 2),
 ('libero', 1),
 ('adipiscing', 3),
 ('eleifend', 2),
 ('sit', 5),
 ('eu,', 3),
 ('lorem', 1),
 ('amet,', 2),
 ('penatibus', 2),
 ('consequat', 3),
 ('magnis', 2),
 ('ut', 2),
 ('justo,', 4),
 ('eros', 1),
 ('luctus', 1),
 ('id,', 1),
 ('montes,', 2),
 ('enim', 1),
 ('part

In [30]:
sc.stop()

### Небольшое упражнение
Усовершенствуем нашу программу, приведите все слова в нижний регистр, и удалите точки и запятые. 

Создайте новый SparkContext. Задайте память для экзекьютора в 512m, а также ограничьте количество ядер на каждый экзекьютор одним ядром. 

Посмотрите в интерфейс на приложение (Application UI)

In [14]:
conf = SparkConf()
conf = conf.set('spark.executor.memory', '512m')
conf = conf.set('spark.executor.cores', '1')

In [15]:
sc = SparkContext('spark://185739.simplecloud.ru:7077', appName="Word Count App. Modified. Dmitry", conf=conf)

In [37]:
# ваш код тут
files = sc.textFile(os.path.join(PATH_TO_FILES, 'multiple/*.txt'))
words = files.flatMap(lambda line: line.lower().replace('.','').replace(',', '').split(" "))
wordCounts = words.map(lambda word: (word, 1)).reduceByKey(lambda a,b: a + b)
wordCounts.collect()

[('', 1),
 ('sapien', 1),
 ('ullamcorper', 1),
 ('rutrum', 1),
 ('vitae', 3),
 ('pede', 4),
 ('porttitor', 1),
 ('leo', 3),
 ('nunc', 2),
 ('etiam', 3),
 ('ipsum', 3),
 ('sodales', 2),
 ('dictum', 1),
 ('duis', 1),
 ('nascetur', 2),
 ('quam', 4),
 ('tellus', 3),
 ('varius', 1),
 ('integer', 1),
 ('maecenas', 2),
 ('fringilla', 4),
 ('sagittis', 1),
 ('justo', 5),
 ('phasellus', 1),
 ('dapibus', 2),
 ('faucibus', 2),
 ('feugiat', 1),
 ('nibh', 1),
 ('condimentum', 1),
 ('nulla', 3),
 ('mollis', 1),
 ('in', 2),
 ('metus', 1),
 ('nisi', 3),
 ('aenean', 7),
 ('ante', 3),
 ('pellentesque', 2),
 ('quisque', 1),
 ('libero', 2),
 ('luctus', 1),
 ('consectetuer', 2),
 ('a', 2),
 ('eleifend', 2),
 ('lorem', 4),
 ('arcu', 3),
 ('parturient', 2),
 ('ac', 1),
 ('consequat', 4),
 ('magnis', 2),
 ('ut', 3),
 ('sem', 4),
 ('eros', 1),
 ('adipiscing', 3),
 ('laoreet', 1),
 ('sit', 5),
 ('cum', 2),
 ('dui', 1),
 ('penatibus', 2),
 ('et', 3),
 ('id', 1),
 ('vivamus', 1),
 ('massa', 4),
 ('enim', 4),
 ('d

In [38]:
sc.stop()

## План выполнения и кэширование

In [45]:
print(wordCounts.toDebugString().decode('utf-8'))

(4) PythonRDD[7] at RDD at PythonRDD.scala:53 []
 |  MapPartitionsRDD[6] at mapPartitions at PythonRDD.scala:133 []
 |  ShuffledRDD[5] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(4) PairwiseRDD[4] at reduceByKey at <ipython-input-43-4e803b7ed2a2>:2 []
    |  PythonRDD[3] at reduceByKey at <ipython-input-43-4e803b7ed2a2>:2 []
    |  /home/dmitry/pyspark-training/files/multiple/*.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
    |  /home/dmitry/pyspark-training/files/multiple/*.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [46]:
wordCounts.persist()

PythonRDD[7] at RDD at PythonRDD.scala:53

In [47]:
print(wordCounts.toDebugString().decode('utf-8'))

(4) PythonRDD[7] at RDD at PythonRDD.scala:53 [Memory Serialized 1x Replicated]
 |  MapPartitionsRDD[6] at mapPartitions at PythonRDD.scala:133 [Memory Serialized 1x Replicated]
 |  ShuffledRDD[5] at partitionBy at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]
 +-(4) PairwiseRDD[4] at reduceByKey at <ipython-input-43-4e803b7ed2a2>:2 [Memory Serialized 1x Replicated]
    |  PythonRDD[3] at reduceByKey at <ipython-input-43-4e803b7ed2a2>:2 [Memory Serialized 1x Replicated]
    |  /home/dmitry/pyspark-training/files/multiple/*.txt MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]
    |  /home/dmitry/pyspark-training/files/multiple/*.txt HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 [Memory Serialized 1x Replicated]


In [76]:
wordCounts.collect() # посмотрим в UI

[('', 1),
 ('Maecenas', 2),
 ('eget,', 3),
 ('tincidunt.', 2),
 ('lorem.', 1),
 ('metus', 1),
 ('porttitor', 1),
 ('vel,', 4),
 ('tellus', 1),
 ('ut,', 1),
 ('varius', 1),
 ('Lorem', 2),
 ('sapien', 1),
 ('justo.', 1),
 ('mus.', 2),
 ('mollis', 1),
 ('ipsum', 2),
 ('libero,', 1),
 ('sodales', 1),
 ('pellentesque', 2),
 ('quam', 4),
 ('enim.', 3),
 ('Aliquam', 1),
 ('vitae', 1),
 ('feugiat', 1),
 ('Quisque', 1),
 ('sagittis', 1),
 ('condimentum', 1),
 ('elit.', 2),
 ('nascetur', 2),
 ('ipsum.', 1),
 ('rutrum.', 1),
 ('fringilla', 4),
 ('Vivamus', 1),
 ('pede', 4),
 ('ullamcorper', 1),
 ('dapibus', 1),
 ('faucibus', 1),
 ('nulla', 1),
 ('Nam', 2),
 ('nisi', 1),
 ('dictum', 1),
 ('ante', 1),
 ('leo', 2),
 ('ligula', 2),
 ('libero', 1),
 ('adipiscing', 3),
 ('eleifend', 2),
 ('sit', 5),
 ('eu,', 3),
 ('lorem', 1),
 ('amet,', 2),
 ('penatibus', 2),
 ('consequat', 3),
 ('magnis', 2),
 ('ut', 2),
 ('justo,', 4),
 ('eros', 1),
 ('luctus', 1),
 ('id,', 1),
 ('montes,', 2),
 ('enim', 1),
 ('part

## RDD. Лучшие практики

### Не вызывайте .collect() на больших RDD
Часто в общем-то и не требуется полное копирование на драйвер. Для этого есть .take(), .takeSample()

In [4]:
conf = SparkConf()
conf = conf.set('spark.driver.memory', '512m')
conf = conf.set('spark.executor.memory', '512m')
conf = conf.set('spark.executor.cores', '2')

In [5]:
sc = SparkContext('spark://185739.simplecloud.ru:7077', appName="RDD practice. Dmitry", conf=conf)

In [5]:
parallel = sc.parallelize(list(range(100)))

In [6]:
parallel.getNumPartitions()

2

In [7]:
parallel.glom().collect()

[[0,
  1,
  2,
  3,
  4,
  5,
  6,
  7,
  8,
  9,
  10,
  11,
  12,
  13,
  14,
  15,
  16,
  17,
  18,
  19,
  20,
  21,
  22,
  23,
  24,
  25,
  26,
  27,
  28,
  29,
  30,
  31,
  32,
  33,
  34,
  35,
  36,
  37,
  38,
  39,
  40,
  41,
  42,
  43,
  44,
  45,
  46,
  47,
  48,
  49],
 [50,
  51,
  52,
  53,
  54,
  55,
  56,
  57,
  58,
  59,
  60,
  61,
  62,
  63,
  64,
  65,
  66,
  67,
  68,
  69,
  70,
  71,
  72,
  73,
  74,
  75,
  76,
  77,
  78,
  79,
  80,
  81,
  82,
  83,
  84,
  85,
  86,
  87,
  88,
  89,
  90,
  91,
  92,
  93,
  94,
  95,
  96,
  97,
  98,
  99]]

In [8]:
parallel.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [9]:
parallel.takeSample(num=10, withReplacement=True)

[46, 25, 83, 15, 72, 73, 5, 6, 37, 14]

Можно также взять только одну партицию. Здесь нам понадобится .mapPartitionsWithIndex()

In [10]:
from functools import partial

def filter_by_idx(idx, iterator, idx_to_get):
    if idx == idx_to_get:
        return list(iterator)
    else:
        return []

projector = partial(filter_by_idx, idx_to_get=0)
part = parallel.mapPartitionsWithIndex(projector, preservesPartitioning=True)

In [11]:
part.collect()

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49]

Конечно, это если только одна партиция имеет адекватные размеры. Но мы всегда можем искуственно увеличить количество партиций с помощью .repartition()

In [12]:
parallel.repartition(4).mapPartitionsWithIndex(projector, preservesPartitioning=True).collect()

[10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89]

### Используйте .count() только тогда, когда вам правда нужно посчитать точное число
Реализация использует .take(1) ==> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala

In [66]:
# для всего остального есть .isEmpty()
parallel.isEmpty()

False

### Используйте правильные операторы

#### Избегайте списки итераторов
.map() -> .flatMap()

In [67]:
inp = sc.parallelize (["oh,my,parallel", "execution,go,wild", "me,too,by the way"]).cache()

mapped = inp.map(lambda l: l.split(','))
flat = inp.flatMap(lambda l: l.split(','))

mapped_res = mapped.collect()
flat_res = flat.collect()

print("Mapped выдает")
print(mapped_res)

print("FlatMapped выдает")
print(flat_res)

Mapped выдает
[['oh', 'my', 'parallel'], ['execution', 'go', 'wild'], ['me', 'too', 'by the way']]
FlatMapped выдает
['oh', 'my', 'parallel', 'execution', 'go', 'wild', 'me', 'too', 'by the way']


In [13]:
sc.stop()

#### Избегайте groupByKey, когда делаете группу (например, собираете список значений) по ключу
##### Небольшое упражнение
Прочитайте с помощью sc.textFile() файлик csv_burp.csv там должно быть 3 колонки, но это ж .csv и в благородство он играть не будет проигнорируйте все строки, где число колонок меньше трех, и значения нельзя интерпретировать как целое число

разделитель запятая 

преобразуйте файл к 10 партициям

In [5]:
# небольшое упражнение, прочитайте с помощью sc.textFile() файлик csv_burp.csv
# там должно быть 3 колонки, но это ж .csv и в благородство он играть не будет
# проигнорируйте все строки, где число колонок меньше трех, и значения нельзя интерпретировать как целое число
# разделитель запятая
# преобразуйте файл к 10 партициям
# ваш код тут
def convert(line):
    try:
        return list(map(int, line))
    except Exception as e:
        return []

data = sc.textFile("files/csv_burp.csv") \
        .map(lambda line: line.split(",")) \
        .map(convert) \
        .filter(lambda line: len(line)==3) \
        .map(lambda line: (line[0],line[1],line[2])).repartition(10)

In [6]:
# это что-то вроде id, возраст, значение
data.collect()

[(100878, 78, 161),
 (105463, 65, 791),
 (108623, 69, 129),
 (101245, 75, 302),
 (108636, 70, 240),
 (100955, 20, 359),
 (109876, 82, 898),
 (102939, 79, 308),
 (104466, 95, 897),
 (106392, 17, 642),
 (106878, 73, 793),
 (101102, 90, 147),
 (109475, 60, 737),
 (106499, 83, 655),
 (100015, 71, 971),
 (100678, 42, 419),
 (103578, 60, 376),
 (105542, 92, 313),
 (107732, 20, 559),
 (107061, 9, 851),
 (105888, 67, 307),
 (105340, 10, 590),
 (101727, 95, 369),
 (109666, 38, 122),
 (104596, 32, 535),
 (103082, 69, 266),
 (109644, 7, 400),
 (105957, 14, 144),
 (100045, 54, 213),
 (107230, 95, 587),
 (104017, 29, 376),
 (100279, 39, 650),
 (102143, 27, 322),
 (105679, 82, 172),
 (106370, 81, 400),
 (106422, 54, 333),
 (100561, 80, 632),
 (102889, 44, 120),
 (100131, 95, 157),
 (104865, 91, 533),
 (106506, 35, 732),
 (100040, 26, 878),
 (100495, 65, 672),
 (108109, 1, 269),
 (100816, 8, 853),
 (105722, 34, 750),
 (105197, 86, 589),
 (107490, 43, 738),
 (107187, 96, 952),
 (109854, 73, 907),
 (10

In [70]:
# мы хотим собрать (id, ((возраст,  значение), (возраст,  значение), ... ))
# значения при этом уже просумированны по возрасту

In [7]:
data.map(lambda t: ((t[0], t[1]), t[2])).take(10)

[((100878, 78), 161),
 ((105463, 65), 791),
 ((108623, 69), 129),
 ((101245, 75), 302),
 ((108636, 70), 240),
 ((100955, 20), 359),
 ((109876, 82), 898),
 ((102939, 79), 308),
 ((104466, 95), 897),
 ((106392, 17), 642)]

In [8]:
data.map(lambda t: ((t[0], t[1]), t[2])).reduceByKey(lambda a,b: a+b).take(10)

[((109017, 29), 632),
 ((102263, 69), 940),
 ((101636, 74), 422),
 ((103151, 91), 955),
 ((104122, 32), 876),
 ((106645, 87), 286),
 ((107295, 13), 1065),
 ((104531, 35), 3085),
 ((107228, 32), 514),
 ((105232, 14), 636)]

In [9]:
pre_calculated = data.map(lambda t: ((t[0], t[1]), t[2])).reduceByKey(lambda a,b: a+b)

In [10]:
print(pre_calculated.toDebugString().decode('utf-8'))

(10) PythonRDD[17] at RDD at PythonRDD.scala:53 []
 |   MapPartitionsRDD[16] at mapPartitions at PythonRDD.scala:133 []
 |   ShuffledRDD[15] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(10) PairwiseRDD[14] at reduceByKey at <ipython-input-9-264184b71dda>:1 []
    |   PythonRDD[13] at reduceByKey at <ipython-input-9-264184b71dda>:1 []
    |   MapPartitionsRDD[6] at coalesce at NativeMethodAccessorImpl.java:0 []
    |   CoalescedRDD[5] at coalesce at NativeMethodAccessorImpl.java:0 []
    |   ShuffledRDD[4] at coalesce at NativeMethodAccessorImpl.java:0 []
    +-(2) MapPartitionsRDD[3] at coalesce at NativeMethodAccessorImpl.java:0 []
       |  PythonRDD[2] at RDD at PythonRDD.scala:53 []
       |  files/csv_burp.csv MapPartitionsRDD[1] at textFile at NativeMethodAccessorImpl.java:0 []
       |  files/csv_burp.csv HadoopRDD[0] at textFile at NativeMethodAccessorImpl.java:0 []


In [11]:
reorganized = pre_calculated.map(lambda val: (val[0][0], (val[0][1],val[1])))

In [12]:
reorganized.take(10)

[(109017, (29, 632)),
 (102263, (69, 940)),
 (101636, (74, 422)),
 (103151, (91, 955)),
 (104122, (32, 876)),
 (106645, (87, 286)),
 (107295, (13, 1065)),
 (104531, (35, 3085)),
 (107228, (32, 514)),
 (105232, (14, 636))]

In [13]:
res = reorganized.groupByKey().map(lambda v: (v[0],list(v[1])))
res.collect()

[(102400,
  [(52, 1838),
   (92, 481),
   (46, 1477),
   (50, 368),
   (98, 1494),
   (32, 107),
   (100, 659),
   (83, 612),
   (37, 1650),
   (9, 562),
   (31, 154),
   (11, 300),
   (39, 1324),
   (77, 363),
   (94, 762),
   (80, 522),
   (54, 608),
   (26, 343),
   (19, 766),
   (85, 126),
   (67, 1271),
   (13, 1033),
   (87, 1323),
   (59, 645),
   (65, 1081),
   (16, 129),
   (38, 100),
   (78, 460),
   (10, 1825),
   (84, 288),
   (30, 986),
   (56, 302),
   (41, 978),
   (23, 848),
   (69, 2245),
   (71, 482),
   (66, 787),
   (86, 1082),
   (14, 335),
   (68, 417),
   (18, 580),
   (60, 1402),
   (99, 1230),
   (91, 501),
   (5, 1772),
   (47, 587),
   (51, 455),
   (89, 377),
   (70, 588),
   (42, 2039),
   (2, 715),
   (48, 607),
   (44, 758),
   (96, 959),
   (22, 668),
   (27, 350),
   (75, 1425),
   (53, 966),
   (25, 988),
   (35, 548),
   (93, 271),
   (55, 428),
   (95, 368)]),
 (104450,
  [(54, 675),
   (42, 808),
   (64, 957),
   (4, 570),
   (56, 271),
   (16, 241)

In [14]:
print(res.toDebugString().decode('utf-8'))

(10) PythonRDD[23] at collect at <ipython-input-13-d7887bdd68d8>:2 []
 |   MapPartitionsRDD[22] at mapPartitions at PythonRDD.scala:133 []
 |   ShuffledRDD[21] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(10) PairwiseRDD[20] at groupByKey at <ipython-input-13-d7887bdd68d8>:1 []
    |   PythonRDD[19] at groupByKey at <ipython-input-13-d7887bdd68d8>:1 []
    |   MapPartitionsRDD[16] at mapPartitions at PythonRDD.scala:133 []
    |   ShuffledRDD[15] at partitionBy at NativeMethodAccessorImpl.java:0 []
    +-(10) PairwiseRDD[14] at reduceByKey at <ipython-input-9-264184b71dda>:1 []
       |   PythonRDD[13] at reduceByKey at <ipython-input-9-264184b71dda>:1 []
       |   MapPartitionsRDD[6] at coalesce at NativeMethodAccessorImpl.java:0 []
       |   CoalescedRDD[5] at coalesce at NativeMethodAccessorImpl.java:0 []
       |   ShuffledRDD[4] at coalesce at NativeMethodAccessorImpl.java:0 []
       +-(2) MapPartitionsRDD[3] at coalesce at NativeMethodAccessorImpl.java:0 []
       

use case: после группировки, у нас получается набор данных другого типа
aggregateByKey 3 параметра:

*   Начальное (нулевое значение)
*   Sequence operation функция, которая трансформирует/мерджит новый тип [U] и старый тип [V]
*   Combination operation мерджит 2 объекта нового типа [U] 

In [15]:
def add_to_list(l, val):
    l.append(val)
    return l

def merge_2_lists(l1, l2):
    return l1 + l2
    

alternative_res = reorganized.aggregateByKey([], add_to_list, merge_2_lists)
alternative_res.collect()

[(102400,
  [(52, 1838),
   (92, 481),
   (46, 1477),
   (50, 368),
   (98, 1494),
   (32, 107),
   (100, 659),
   (83, 612),
   (37, 1650),
   (9, 562),
   (31, 154),
   (11, 300),
   (39, 1324),
   (77, 363),
   (94, 762),
   (80, 522),
   (54, 608),
   (26, 343),
   (19, 766),
   (85, 126),
   (67, 1271),
   (13, 1033),
   (87, 1323),
   (59, 645),
   (65, 1081),
   (16, 129),
   (38, 100),
   (78, 460),
   (10, 1825),
   (84, 288),
   (30, 986),
   (56, 302),
   (41, 978),
   (23, 848),
   (69, 2245),
   (71, 482),
   (66, 787),
   (86, 1082),
   (14, 335),
   (68, 417),
   (18, 580),
   (60, 1402),
   (99, 1230),
   (91, 501),
   (5, 1772),
   (47, 587),
   (51, 455),
   (89, 377),
   (70, 588),
   (42, 2039),
   (2, 715),
   (48, 607),
   (44, 758),
   (96, 959),
   (22, 668),
   (27, 350),
   (75, 1425),
   (53, 966),
   (25, 988),
   (35, 548),
   (93, 271),
   (55, 428),
   (95, 368)]),
 (104450,
  [(54, 675),
   (42, 808),
   (64, 957),
   (4, 570),
   (56, 271),
   (16, 241)

In [16]:
print(alternative_res.toDebugString().decode('utf-8'))

(10) PythonRDD[29] at collect at <ipython-input-15-751b2e59da4e>:10 []
 |   MapPartitionsRDD[28] at mapPartitions at PythonRDD.scala:133 []
 |   ShuffledRDD[27] at partitionBy at NativeMethodAccessorImpl.java:0 []
 +-(10) PairwiseRDD[26] at aggregateByKey at <ipython-input-15-751b2e59da4e>:9 []
    |   PythonRDD[25] at aggregateByKey at <ipython-input-15-751b2e59da4e>:9 []
    |   MapPartitionsRDD[16] at mapPartitions at PythonRDD.scala:133 []
    |   ShuffledRDD[15] at partitionBy at NativeMethodAccessorImpl.java:0 []
    +-(10) PairwiseRDD[14] at reduceByKey at <ipython-input-9-264184b71dda>:1 []
       |   PythonRDD[13] at reduceByKey at <ipython-input-9-264184b71dda>:1 []
       |   MapPartitionsRDD[6] at coalesce at NativeMethodAccessorImpl.java:0 []
       |   CoalescedRDD[5] at coalesce at NativeMethodAccessorImpl.java:0 []
       |   ShuffledRDD[4] at coalesce at NativeMethodAccessorImpl.java:0 []
       +-(2) MapPartitionsRDD[3] at coalesce at NativeMethodAccessorImpl.java:0 [

Штука в том, что .aggregateByKey() использует этап "combine", прямо как в map reduce, .groupByKey() нет. Поэтому потенциально shuffle в первом случае займет больше времени

#### Избегайте groupByKey, когда производите ассоциативную редуцирующую операцию
Причина в общем-то та же, что и в предыдущем случае, используя вместо этого reduceByKey мы не будем пересылать весь набор данных по сети

In [17]:
# вернемся к word count
file = sc.textFile(os.path.join(PATH_TO_FILES, 'multiple/*.txt'))
words = file.flatMap(lambda line: line.lower().replace(',','').replace('.','').split(" "))
paired_rdd = words.map(lambda word: (word, 1))

Теперь есть два варианта

Первый (после выполнения не забудьте открыть интерфейс с этой джобой)

In [20]:
paired_rdd.groupByKey().map(lambda pair: (pair[0], sum(pair[1]))).collect()

[('nunc', 2),
 ('maecenas', 2),
 ('phasellus', 1),
 ('rutrum', 1),
 ('', 1),
 ('nascetur', 2),
 ('vitae', 3),
 ('feugiat', 1),
 ('pede', 4),
 ('ullamcorper', 1),
 ('etiam', 3),
 ('ipsum', 3),
 ('porttitor', 1),
 ('aenean', 7),
 ('duis', 1),
 ('metus', 1),
 ('justo', 5),
 ('tellus', 3),
 ('pellentesque', 2),
 ('varius', 1),
 ('integer', 1),
 ('sapien', 1),
 ('sagittis', 1),
 ('quam', 4),
 ('dapibus', 2),
 ('leo', 3),
 ('dictum', 1),
 ('nibh', 1),
 ('nulla', 3),
 ('fringilla', 4),
 ('sodales', 2),
 ('mollis', 1),
 ('condimentum', 1),
 ('in', 2),
 ('ante', 3),
 ('faucibus', 2),
 ('nisi', 3),
 ('quisque', 1),
 ('libero', 2),
 ('luctus', 1),
 ('eleifend', 2),
 ('a', 2),
 ('lorem', 4),
 ('et', 3),
 ('parturient', 2),
 ('ac', 1),
 ('consequat', 4),
 ('magnis', 2),
 ('ut', 3),
 ('sem', 4),
 ('eros', 1),
 ('adipiscing', 3),
 ('laoreet', 1),
 ('sit', 5),
 ('cum', 2),
 ('dui', 1),
 ('penatibus', 2),
 ('consectetuer', 2),
 ('arcu', 3),
 ('id', 1),
 ('vivamus', 1),
 ('massa', 4),
 ('enim', 4),
 ('d

Второй (и внимание на интерфейс)

In [22]:
paired_rdd.reduceByKey(lambda a,b: a+b).collect()

[('nunc', 2),
 ('maecenas', 2),
 ('phasellus', 1),
 ('rutrum', 1),
 ('', 1),
 ('nascetur', 2),
 ('vitae', 3),
 ('feugiat', 1),
 ('pede', 4),
 ('ullamcorper', 1),
 ('etiam', 3),
 ('ipsum', 3),
 ('porttitor', 1),
 ('aenean', 7),
 ('duis', 1),
 ('metus', 1),
 ('justo', 5),
 ('tellus', 3),
 ('pellentesque', 2),
 ('varius', 1),
 ('integer', 1),
 ('sapien', 1),
 ('sagittis', 1),
 ('quam', 4),
 ('dapibus', 2),
 ('leo', 3),
 ('dictum', 1),
 ('nibh', 1),
 ('nulla', 3),
 ('fringilla', 4),
 ('sodales', 2),
 ('mollis', 1),
 ('condimentum', 1),
 ('in', 2),
 ('ante', 3),
 ('faucibus', 2),
 ('nisi', 3),
 ('quisque', 1),
 ('libero', 2),
 ('luctus', 1),
 ('eleifend', 2),
 ('a', 2),
 ('lorem', 4),
 ('et', 3),
 ('parturient', 2),
 ('ac', 1),
 ('consequat', 4),
 ('magnis', 2),
 ('ut', 3),
 ('sem', 4),
 ('eros', 1),
 ('adipiscing', 3),
 ('laoreet', 1),
 ('sit', 5),
 ('cum', 2),
 ('dui', 1),
 ('penatibus', 2),
 ('consectetuer', 2),
 ('arcu', 3),
 ('id', 1),
 ('vivamus', 1),
 ('massa', 4),
 ('enim', 4),
 ('d

#### Избегайте reduceByKey в тех случаях, когда входной и выходной тип данных различается
Снова вспоминаем про aggregateByKey

In [84]:
# плохо, куча не нужных инициализаций множеств
reorganized.map(lambda val: (val[0], set([val[1],]))).reduceByKey(lambda a,b: a | b).collect()

[(102400,
  {(2, 715),
   (5, 1772),
   (9, 562),
   (10, 1825),
   (11, 300),
   (13, 1033),
   (14, 335),
   (16, 129),
   (18, 580),
   (19, 766),
   (22, 668),
   (23, 848),
   (25, 988),
   (26, 343),
   (27, 350),
   (30, 986),
   (31, 154),
   (32, 107),
   (35, 548),
   (37, 1650),
   (38, 100),
   (39, 1324),
   (41, 978),
   (42, 2039),
   (44, 758),
   (46, 1477),
   (47, 587),
   (48, 607),
   (50, 368),
   (51, 455),
   (52, 1838),
   (53, 966),
   (54, 608),
   (55, 428),
   (56, 302),
   (59, 645),
   (60, 1402),
   (65, 1081),
   (66, 787),
   (67, 1271),
   (68, 417),
   (69, 2245),
   (70, 588),
   (71, 482),
   (75, 1425),
   (77, 363),
   (78, 460),
   (80, 522),
   (83, 612),
   (84, 288),
   (85, 126),
   (86, 1082),
   (87, 1323),
   (89, 377),
   (91, 501),
   (92, 481),
   (93, 271),
   (94, 762),
   (95, 368),
   (96, 959),
   (98, 1494),
   (99, 1230),
   (100, 659)}),
 (104450,
  {(1, 302),
   (3, 611),
   (4, 570),
   (6, 788),
   (7, 362),
   (8, 317),
   

In [85]:
# хороший пример см выше

#### Избегайте последовательности flatMap-join-groupBy
Если есть два набора данных, уже сгруппированных по ключу, и нужно их сджоинить (соединить), можно использовать .cogroup(), чтобы избежать оверхеда на распаковку

In [23]:
# возьмем 2 партиции
from functools import partial

def filter_by_idx(idx, iterator, idx_to_get):
    if idx == idx_to_get:
        return list(iterator)
    else:
        return []

projector = partial(filter_by_idx, idx_to_get=0)
part0 = paired_rdd.mapPartitionsWithIndex(projector, preservesPartitioning=True)

projector = partial(filter_by_idx, idx_to_get=1)
part1 = paired_rdd.mapPartitionsWithIndex(projector, preservesPartitioning=True)

In [24]:
part0.collect()

[('aenean', 1),
 ('leo', 1),
 ('ligula', 1),
 ('porttitor', 1),
 ('eu', 1),
 ('consequat', 1),
 ('vitae', 1),
 ('eleifend', 1),
 ('ac', 1),
 ('enim', 1),
 ('aliquam', 1),
 ('lorem', 1),
 ('ante', 1),
 ('dapibus', 1),
 ('in', 1),
 ('viverra', 1),
 ('quis', 1),
 ('feugiat', 1),
 ('a', 1),
 ('tellus', 1),
 ('phasellus', 1),
 ('viverra', 1),
 ('nulla', 1),
 ('ut', 1),
 ('metus', 1),
 ('varius', 1),
 ('laoreet', 1),
 ('quisque', 1),
 ('rutrum', 1),
 ('aenean', 1),
 ('imperdiet', 1),
 ('etiam', 1),
 ('ultricies', 1),
 ('nisi', 1),
 ('vel', 1),
 ('augue', 1),
 ('curabitur', 1),
 ('ullamcorper', 1),
 ('ultricies', 1),
 ('nisi', 1),
 ('nam', 1),
 ('eget', 1),
 ('dui', 1),
 ('etiam', 1),
 ('rhoncus', 1),
 ('maecenas', 1),
 ('tempus', 1),
 ('tellus', 1),
 ('eget', 1),
 ('condimentum', 1),
 ('rhoncus', 1),
 ('sem', 1),
 ('quam', 1),
 ('semper', 1),
 ('libero', 1),
 ('sit', 1),
 ('amet', 1),
 ('adipiscing', 1),
 ('sem', 1),
 ('neque', 1),
 ('sed', 1),
 ('ipsum', 1),
 ('nam', 1),
 ('quam', 1),
 ('nu

In [25]:
part1.collect()

[('donec', 1),
 ('pede', 1),
 ('justo', 1),
 ('fringilla', 1),
 ('vel', 1),
 ('aliquet', 1),
 ('nec', 1),
 ('vulputate', 1),
 ('eget', 1),
 ('arcu', 1),
 ('in', 1),
 ('enim', 1),
 ('justo', 1),
 ('rhoncus', 1),
 ('ut', 1),
 ('imperdiet', 1),
 ('a', 1),
 ('venenatis', 1),
 ('vitae', 1),
 ('justo', 1),
 ('nullam', 1),
 ('dictum', 1),
 ('felis', 1),
 ('eu', 1),
 ('pede', 1),
 ('mollis', 1),
 ('pretium', 1),
 ('integer', 1),
 ('tincidunt', 1),
 ('cras', 1),
 ('dapibus', 1),
 ('vivamus', 1),
 ('elementum', 1),
 ('semper', 1),
 ('nisi', 1),
 ('aenean', 1),
 ('vulputate', 1),
 ('eleifend', 1),
 ('tellus', 1),
 ('', 1)]

In [26]:
part0.cogroup(part1).collect()

[('sapien',
  (<pyspark.resultiterable.ResultIterable at 0x7fce580b7ac8>,
   <pyspark.resultiterable.ResultIterable at 0x7fce580b7668>)),
 ('sagittis',
  (<pyspark.resultiterable.ResultIterable at 0x7fce580b7828>,
   <pyspark.resultiterable.ResultIterable at 0x7fce580b7748>)),
 ('nulla',
  (<pyspark.resultiterable.ResultIterable at 0x7fce580b75f8>,
   <pyspark.resultiterable.ResultIterable at 0x7fce580b7a90>)),
 ('mollis',
  (<pyspark.resultiterable.ResultIterable at 0x7fce580b76a0>,
   <pyspark.resultiterable.ResultIterable at 0x7fce580b72e8>)),
 ('phasellus',
  (<pyspark.resultiterable.ResultIterable at 0x7fce580b7390>,
   <pyspark.resultiterable.ResultIterable at 0x7fce580b72b0>)),
 ('dapibus',
  (<pyspark.resultiterable.ResultIterable at 0x7fce580b7240>,
   <pyspark.resultiterable.ResultIterable at 0x7fce580b7160>)),
 ('',
  (<pyspark.resultiterable.ResultIterable at 0x7fce580b7128>,
   <pyspark.resultiterable.ResultIterable at 0x7fce580b70b8>)),
 ('vitae',
  (<pyspark.resultiterab

In [27]:
part0.cogroup(part1).map(lambda val: (val[0], tuple(map(list,val[1])))).collect()

[('sapien', ([1], [])),
 ('sagittis', ([1], [])),
 ('nulla', ([1], [])),
 ('mollis', ([], [1])),
 ('phasellus', ([1], [])),
 ('dapibus', ([1], [1])),
 ('', ([], [1])),
 ('vitae', ([1, 1], [1])),
 ('nunc', ([1, 1], [])),
 ('quisque', ([1], [])),
 ('condimentum', ([1], [])),
 ('etiam', ([1, 1, 1], [])),
 ('metus', ([1], [])),
 ('fringilla', ([1], [1])),
 ('ipsum', ([1], [])),
 ('tellus', ([1, 1], [1])),
 ('ante', ([1, 1, 1], [])),
 ('nisi', ([1, 1], [1])),
 ('maecenas', ([1, 1], [])),
 ('nullam', ([1], [1])),
 ('libero', ([1, 1], [])),
 ('ac', ([1], [])),
 ('ligula', ([1], [])),
 ('dui', ([1], [])),
 ('a', ([1], [1])),
 ('eleifend', ([1], [1])),
 ('id', ([1], [])),
 ('velit', ([1], [])),
 ('laoreet', ([1], [])),
 ('ut', ([1, 1], [1])),
 ('rhoncus', ([1, 1], [1])),
 ('odio', ([1], [])),
 ('felis', ([], [1])),
 ('venenatis', ([1], [1])),
 ('eget', ([1, 1, 1, 1], [1])),
 ('amet', ([1, 1, 1], [])),
 ('tempus', ([1, 1], [])),
 ('semper', ([1], [1])),
 ('pulvinar', ([1], [])),
 ('nec', ([1], [

#### Используйте treeReduce/treeAggregate вместо reduce/aggregate
treeReduce и treeAggregate более эффективны, чем reduce и aggregate. См. презентацию

#### Перед цепочкой трансформаций по ключу, следует сделать reaprtitionBy по HashPartition

.partitionBy() позволяет нам определить произвольную логику партиционирования

**Замечание**

Чтобы использовать partitionBy() RDD должен состоять из кортежа (хотя бы пар) объектов.

In [56]:
def word_partitioner(word):
    return hash(word)


def word_partitioner_bad(word):
    return hash(word[:1])

In [43]:
list(map(lambda elem: len(elem), words.map(lambda word: (word, 1)).partitionBy(10, word_partitioner).glom().collect()))

[1, 24, 0, 73, 54, 11, 33, 0, 18, 49]

In [53]:
repartitioned = words.map(lambda word: (word, 1)).partitionBy(10, word_partitioner)

In [54]:
list(map(lambda elem: len(elem), repartitioned.glom().collect()))

[21, 34, 29, 21, 32, 24, 22, 29, 29, 22]

In [55]:
repartitioned.reduceByKey(lambda a,b:a+b).collect()

[('', 1),
 ('mollis', 1),
 ('feugiat', 1),
 ('natoque', 2),
 ('ullamcorper', 1),
 ('condimentum', 1),
 ('dis', 2),
 ('aenean', 7),
 ('cras', 1),
 ('nascetur', 2),
 ('rutrum', 1),
 ('odio', 1),
 ('viverra', 2),
 ('consectetuer', 2),
 ('blandit', 1),
 ('enim', 4),
 ('ligula', 3),
 ('penatibus', 2),
 ('pulvinar', 1),
 ('arcu', 3),
 ('imperdiet', 2),
 ('parturient', 2),
 ('et', 3),
 ('nam', 2),
 ('ut', 3),
 ('velit', 1),
 ('adipiscing', 3),
 ('rhoncus', 3),
 ('leo', 3),
 ('felis', 3),
 ('duis', 1),
 ('augue', 2),
 ('dapibus', 2),
 ('amet', 5),
 ('in', 2),
 ('magna', 1),
 ('cursus', 1),
 ('metus', 1),
 ('integer', 1),
 ('pellentesque', 2),
 ('commodo', 2),
 ('libero', 2),
 ('tempus', 2),
 ('consequat', 4),
 ('cum', 2),
 ('hendrerit', 1),
 ('sit', 5),
 ('eros', 1),
 ('massa', 4),
 ('sapien', 1),
 ('maecenas', 2),
 ('justo', 5),
 ('vitae', 3),
 ('venenatis', 2),
 ('pede', 4),
 ('etiam', 3),
 ('tellus', 3),
 ('sociis', 2),
 ('dictum', 1),
 ('phasellus', 1),
 ('varius', 1),
 ('mus', 2),
 ('bibe

#### Используйте coalesce, чтобы понижать кол-во партиций, и repartition, чтобы повышать
coalesce не делает полный shuffle, а использует существующие партиции

In [57]:
repartitioned_bad = words.map(lambda word: (word, 1)).partitionBy(10, word_partitioner_bad)

In [58]:
list(map(lambda elem: len(elem), repartitioned_bad.glom().collect()))

[1, 24, 0, 73, 54, 11, 33, 0, 18, 49]

In [59]:
coalesce_res = repartitioned_bad.coalesce(8)

In [61]:
list(map(lambda elem: len(elem), coalesce_res.glom().collect()))

[1, 24, 54, 11, 73, 33, 18, 49]

In [60]:
repartition_res = repartitioned_bad.repartition(8)

In [62]:
list(map(lambda elem: len(elem), repartition_res.glom().collect()))

[43, 28, 40, 34, 34, 30, 24, 30]

#### Используйте mapPartitions вместо map, если для вызываемой функции есть тяжелая инициализация

In [70]:
def very_time_consuming_function(r):
    import time
    time.sleep(1) # типо долгая инициализация
    return r

In [63]:
parallel = sc.parallelize(list(range(10))).repartition(5)

In [69]:
%%time
parallel.map(very_time_consuming_function).collect()

CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 5.2 s


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

In [71]:
%%time
parallel.mapPartitions(very_time_consuming_function).collect()

CPU times: user 8 ms, sys: 8 ms, total: 16 ms
Wall time: 3.19 s


[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

### Используйте broadcast и accumulator
broadcast и accumaulator это так называемые shared variables
* **broadcast**: позволяет распределить значения по воркерам (а не по всем таскам!!!)
* **accumulator**: счетчик между всеми нодами

http://spark.apache.org/docs/latest/rdd-programming-guide.html#shared-variables

In [None]:
def f(r):
    dictionary = { # bad
        ...
    }

In [72]:
# можно сделать и так
collection = ['ligula', 'aliquam', 'lorem']
words.map(lambda val: val in collection).collect()

[False,
 False,
 True,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 True,
 True,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 True,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 Fal

In [73]:
# но лучше вот так, поскольку collection скопируется кол-во раз, равное кол-ву нод, но не задач!
broad_collection = sc.broadcast(collection)
words.map(lambda val: val in broad_collection.value).collect()

[False,
 False,
 True,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 True,
 True,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 True,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 False,
 Fal

In [74]:
accum = sc.accumulator(0)
accum

Accumulator<id=0, value=0>

In [75]:
sc.parallelize([1, 2, 3, 4]).foreach(lambda x: accum.add(x))

In [76]:
accum.value

10

#### Ещё один интересный пример использования broadcast: мы можем делать джоин маленького RDD с большим. Так называемый map-side join

In [6]:
small = sc.parallelize([(i,i**2) for i in range(1000)], 4)
big = sc.parallelize([(i,i) for i in range(900,100000)],100)

In [82]:
small.join(big).collect() # должно получиться

[(936, (876096, 936)),
 (937, (877969, 937)),
 (938, (879844, 938)),
 (939, (881721, 939)),
 (940, (883600, 940)),
 (941, (885481, 941)),
 (942, (887364, 942)),
 (943, (889249, 943)),
 (944, (891136, 944)),
 (945, (893025, 945)),
 (946, (894916, 946)),
 (947, (896809, 947)),
 (948, (898704, 948)),
 (949, (900601, 949)),
 (950, (902500, 950)),
 (951, (904401, 951)),
 (952, (906304, 952)),
 (953, (908209, 953)),
 (954, (910116, 954)),
 (955, (912025, 955)),
 (956, (913936, 956)),
 (957, (915849, 957)),
 (958, (917764, 958)),
 (959, (919681, 959)),
 (960, (921600, 960)),
 (961, (923521, 961)),
 (962, (925444, 962)),
 (963, (927369, 963)),
 (964, (929296, 964)),
 (965, (931225, 965)),
 (966, (933156, 966)),
 (967, (935089, 967)),
 (968, (937024, 968)),
 (969, (938961, 969)),
 (970, (940900, 970)),
 (971, (942841, 971)),
 (972, (944784, 972)),
 (973, (946729, 973)),
 (974, (948676, 974)),
 (975, (950625, 975)),
 (976, (952576, 976)),
 (977, (954529, 977)),
 (978, (956484, 978)),
 (979, (958

In [83]:
sc.stop()

##### Реализуйте map-side join

In [8]:
# ваш код тут
def add_to_list(l, val):
    l.append(val)
    return l

def merge_2_lists(l1, l2):
    return l1 + l2

small_broad = sc.broadcast(dict(small.aggregateByKey([], add_to_list, merge_2_lists).collect()))

In [9]:
def j(pair):
    k, v = pair
    res = []
    if k in small_broad.value:
        for elem in small_broad.value[k]:
            res.append((k, (v, elem)))
    
    return res


big.flatMap(j).collect()

[(900, (900, 810000)),
 (901, (901, 811801)),
 (902, (902, 813604)),
 (903, (903, 815409)),
 (904, (904, 817216)),
 (905, (905, 819025)),
 (906, (906, 820836)),
 (907, (907, 822649)),
 (908, (908, 824464)),
 (909, (909, 826281)),
 (910, (910, 828100)),
 (911, (911, 829921)),
 (912, (912, 831744)),
 (913, (913, 833569)),
 (914, (914, 835396)),
 (915, (915, 837225)),
 (916, (916, 839056)),
 (917, (917, 840889)),
 (918, (918, 842724)),
 (919, (919, 844561)),
 (920, (920, 846400)),
 (921, (921, 848241)),
 (922, (922, 850084)),
 (923, (923, 851929)),
 (924, (924, 853776)),
 (925, (925, 855625)),
 (926, (926, 857476)),
 (927, (927, 859329)),
 (928, (928, 861184)),
 (929, (929, 863041)),
 (930, (930, 864900)),
 (931, (931, 866761)),
 (932, (932, 868624)),
 (933, (933, 870489)),
 (934, (934, 872356)),
 (935, (935, 874225)),
 (936, (936, 876096)),
 (937, (937, 877969)),
 (938, (938, 879844)),
 (939, (939, 881721)),
 (940, (940, 883600)),
 (941, (941, 885481)),
 (942, (942, 887364)),
 (943, (943

#### Если же мы имеем дело со средних размеров RDD (который не помещается в память) и с большим RDD, то этого все равно можно получить выгоду, существенно упростив shuffle

##### Сделайте join двух таких RDD с предварительной фильтрацией ключей по первому RDD

In [10]:
medium = sc.parallelize([(i,i**2) for i in range(10000)], 4)
big = sc.parallelize([(i,i) for i in range(9000,100000)],100)

In [11]:
set(medium.map(lambda pair: pair[0]).collect())

[0,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55,
 56,
 57,
 58,
 59,
 60,
 61,
 62,
 63,
 64,
 65,
 66,
 67,
 68,
 69,
 70,
 71,
 72,
 73,
 74,
 75,
 76,
 77,
 78,
 79,
 80,
 81,
 82,
 83,
 84,
 85,
 86,
 87,
 88,
 89,
 90,
 91,
 92,
 93,
 94,
 95,
 96,
 97,
 98,
 99,
 100,
 101,
 102,
 103,
 104,
 105,
 106,
 107,
 108,
 109,
 110,
 111,
 112,
 113,
 114,
 115,
 116,
 117,
 118,
 119,
 120,
 121,
 122,
 123,
 124,
 125,
 126,
 127,
 128,
 129,
 130,
 131,
 132,
 133,
 134,
 135,
 136,
 137,
 138,
 139,
 140,
 141,
 142,
 143,
 144,
 145,
 146,
 147,
 148,
 149,
 150,
 151,
 152,
 153,
 154,
 155,
 156,
 157,
 158,
 159,
 160,
 161,
 162,
 163,
 164,
 165,
 166,
 167,
 168,
 169,
 170,
 171,
 172,
 173,
 174,
 175,
 176,
 177,
 178,
 179,
 180,
 181,
 182,
 183,
 184,


In [15]:
# ваш код тут
storage = sc.broadcast(set(medium.map(lambda pair: pair[0]).collect()))
reduced_big = big.filter(lambda pair: pair[0] in storage.value)
reduced_big.join(medium).collect()

[(9360, (9360, 87609600)),
 (9048, (9048, 81866304)),
 (9776, (9776, 95570176)),
 (9256, (9256, 85673536)),
 (9672, (9672, 93547584)),
 (9984, (9984, 99680256)),
 (9568, (9568, 91546624)),
 (9880, (9880, 97614400)),
 (9152, (9152, 83759104)),
 (9464, (9464, 89567296)),
 (9361, (9361, 87628321)),
 (9569, (9569, 91565761)),
 (9049, (9049, 81884401)),
 (9465, (9465, 89586225)),
 (9985, (9985, 99700225)),
 (9881, (9881, 97634161)),
 (9153, (9153, 83777409)),
 (9673, (9673, 93566929)),
 (9777, (9777, 95589729)),
 (9257, (9257, 85692049)),
 (9466, (9466, 89605156)),
 (9986, (9986, 99720196)),
 (9050, (9050, 81902500)),
 (9778, (9778, 95609284)),
 (9882, (9882, 97653924)),
 (9570, (9570, 91584900)),
 (9258, (9258, 85710564)),
 (9154, (9154, 83795716)),
 (9362, (9362, 87647044)),
 (9674, (9674, 93586276)),
 (9987, (9987, 99740169)),
 (9363, (9363, 87665769)),
 (9259, (9259, 85729081)),
 (9051, (9051, 81920601)),
 (9571, (9571, 91604041)),
 (9155, (9155, 83814025)),
 (9467, (9467, 89624089)),
 

In [16]:
sc.stop()