In [None]:
from IPython.display import IFrame, Image

## Мотивация создания Apache Spark

### Рассмотрим два примера приложений:
- Обучить модель на больших данных (читай итеративный алгоритм над фиксированным датасетом)
- Провести ad-hoc анализ данных из двух таблиц (читай несколько интерактивных запросов с джойнами)

## Основные недостатки классического MapReduce
- Быстроумирающие контейнеры
- Постоянное чтение/запись во внешнее хранилище
- Сложный API
- Ограниченное число источников/приемников данных
- MapReduce - это только вычислительный фреймворк

## Apache Spark - это *быстрая* распределенная вычислительная платформа *общего назначения*
1. **Быстрая** - это в памяти и с ленивыми вычислениями
2. **Общего назначения** - значит на ней можно реализовать любые вычисления (батчевые, интерактивные, итеративные, в режиме реального времени)

<img src="pics/spark_stack.png" width=1000/>

## Множество источников данных

<img src="pics/spark_data_sources.jpg" width=1000/>

## Архитектура Apache Spark

<img src="pics/cluster-overview.png" width=800/>

## Запуск PySpark

In [27]:
import os
import sys

SPARK_HOME = "/usr/hdp/current/spark2-client"
PYSPARK_PYTHON = "/opt/conda/envs/dsenv/bin/python"
os.environ["PYSPARK_PYTHON"]= PYSPARK_PYTHON
os.environ["SPARK_HOME"] = SPARK_HOME

PYSPARK_HOME = os.path.join(SPARK_HOME, "python/lib")
sys.path.insert(0, os.path.join(PYSPARK_HOME, "py4j-0.10.7-src.zip"))
sys.path.insert(0, os.path.join(PYSPARK_HOME, "pyspark.zip"))

In [28]:
import random
from pyspark import SparkContext, SparkConf

spark_ui_port = random.choice(range(10000, 11000))
print(f"Spark UI port: {spark_ui_port}")

conf = SparkConf()
conf.set("spark.ui.port", spark_ui_port)

sc = SparkContext(appName="Lecture 01", conf=conf)

Spark UI port: 10269


## SparkContext (sc) - это основной управляющий объект.

In [4]:
sc

## Для получения всех установленных опций конфигурации можно использовать `sc.getConf()`

In [5]:
sc.getConf().getAll()

[('spark.history.kerberos.keytab', 'none'),
 ('spark.eventLog.enabled', 'true'),
 ('spark.app.id', 'application_1613052795737_4264'),
 ('spark.history.ui.port', '18081'),
 ('spark.driver.extraLibraryPath',
  '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
 ('spark.history.fs.cleaner.interval', '7d'),
 ('spark.shuffle.service.port', '7447'),
 ('spark.shuffle.io.serverThreads', '128'),
 ('spark.sql.streaming.streamingQueryListeners', ''),
 ('spark.executor.extraLibraryPath',
  '/usr/hdp/current/hadoop-client/lib/native:/usr/hdp/current/hadoop-client/lib/native/Linux-amd64-64'),
 ('spark.executorEnv.PYTHONPATH',
  '{{PWD}}/pyspark.zip<CPS>{{PWD}}/py4j-0.10.7-src.zip'),
 ('spark.shuffle.file.buffer', '1m'),
 ('spark.sql.hive.convertMetastoreOrc', 'true'),
 ('spark.yarn.dist.files', ''),
 ('spark.sql.autoBroadcastJoinThreshold', '26214400'),
 ('spark.ui.filters',
  'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter'),
 ('spark.

## Существует два способа создать RDD
- распределить коллекцию объектов с драйвера
- загрузить внешний датасет

## 1. Распределить коллекцию с драйвера

In [6]:
import numpy as np
vocabulary = ("Apache", "Spark", "Hadoop")
numbers = np.random.randint(10, size=10000)
words = np.random.choice(vocabulary, size=10000)
collection = zip(numbers, words)

In [7]:
rdd = sc.parallelize(collection)

In [8]:
rdd

ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

In [9]:
rdd.count()

10000

In [10]:
rdd.take(10)

[(1, 'Apache'),
 (8, 'Apache'),
 (7, 'Spark'),
 (2, 'Spark'),
 (8, 'Hadoop'),
 (7, 'Apache'),
 (2, 'Hadoop'),
 (8, 'Hadoop'),
 (9, 'Spark'),
 (5, 'Hadoop')]

## 2. Загрузить внешний датасет (датасет загружается из HDFS)

In [11]:
!hdfs dfs -ls /datasets/spark/ips.txt

-rw-r--r--   3 hdfs hdfs      19462 2021-03-11 13:43 /datasets/spark/ips.txt


In [3]:
rdd2 = sc.textFile("/datasets/spark/ips.txt")

In [4]:
rdd2.take(10)

['192.168.0.1\tCHINA',
 '192.168.0.2\tCHINA',
 '192.168.0.3\tCHINA',
 '192.168.0.4\tCHINA',
 '192.168.0.5\tCHINA',
 '192.168.0.6\tCHINA',
 '192.168.0.7\tCHINA',
 '192.168.0.8\tCHINA',
 '192.168.0.9\tCHINA',
 '192.168.0.10\tCHINA']

In [5]:
rdd2.count()

1000

## RDD API состоит из операции двух типов:
- action
- transformation

### Трансформация преобразовывает RDD в другой RDD и не приводит к вычислениям

In [20]:
rdd = sc.parallelize(range(100))

In [21]:
rdd

PythonRDD[14] at RDD at PythonRDD.scala:53

### Action заставляет Spark произвести вычисления и вернуть результат либо на драйвер, либо во внешнее хранилище

In [22]:
rdd.count()

100

In [23]:
rdd.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

### Трансформации можно применять одну за другой, никаких вычислений не будет сделано, пока не будет вызван action

In [24]:
rdd

PythonRDD[14] at RDD at PythonRDD.scala:53

In [26]:
print(rdd.toDebugString().decode())

(2) PythonRDD[14] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[13] at parallelize at PythonRDD.scala:195 []


In [27]:
rdd2 = rdd.filter(lambda x: x % 2)
rdd2

PythonRDD[17] at RDD at PythonRDD.scala:53

In [28]:
print(rdd2.toDebugString().decode())

(2) PythonRDD[17] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[13] at parallelize at PythonRDD.scala:195 []


In [29]:
rdd3 = rdd2.map(lambda x: x * 2)
rdd3

PythonRDD[18] at RDD at PythonRDD.scala:53

In [30]:
print(rdd3.toDebugString().decode())

(2) PythonRDD[18] at RDD at PythonRDD.scala:53 []
 |  ParallelCollectionRDD[13] at parallelize at PythonRDD.scala:195 []


In [31]:
rdd3.collect()

[2,
 6,
 10,
 14,
 18,
 22,
 26,
 30,
 34,
 38,
 42,
 46,
 50,
 54,
 58,
 62,
 66,
 70,
 74,
 78,
 82,
 86,
 90,
 94,
 98,
 102,
 106,
 110,
 114,
 118,
 122,
 126,
 130,
 134,
 138,
 142,
 146,
 150,
 154,
 158,
 162,
 166,
 170,
 174,
 178,
 182,
 186,
 190,
 194,
 198]

## Как получить данные на драйвер и не выстрелить себе в ногу?

### `take()` пытается минимизировать число обращений к партициям, поэтому может возвращать смещенные результаты

In [32]:
rdd.take(10)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

### Будьте аккуратны с `collect()`, потому что он загружает все данные из RDD на драйвер. Это может легко привести к `Out of Memory exception`

In [None]:
rdd.collect()[:20]

### Если нужно получить небольшое число записей на драйвер и, при этом, сохранить распределение, то лучше сделать выборку

In [33]:
rdd.takeSample(withReplacement=False, num=20, seed=5757)

[36, 75, 57, 18, 19, 96, 11, 55, 39, 40, 79, 26, 15, 13, 47, 7, 66, 86, 51, 12]

## Познакомимся с данными. Будем работать с двумя таблицами

![](pics/data_table1.png)

![](pics/data_table2.png)

### Примеры трансформаций

In [29]:
rdd = sc.textFile("/datasets/spark/ips.txt")

In [30]:
rdd.take(5)

['192.168.0.1\tCHINA',
 '192.168.0.2\tCHINA',
 '192.168.0.3\tCHINA',
 '192.168.0.4\tCHINA',
 '192.168.0.5\tCHINA']

In [31]:
ips = rdd.map(lambda x: x.split("\t"))

In [32]:
ips.take(5)

[['192.168.0.1', 'CHINA'],
 ['192.168.0.2', 'CHINA'],
 ['192.168.0.3', 'CHINA'],
 ['192.168.0.4', 'CHINA'],
 ['192.168.0.5', 'CHINA']]

In [10]:
ips_filtered = ips.filter(lambda x: x[1] != "CHINA")

In [11]:
ips_filtered.take(5)

[['192.168.0.201', 'RUSSIA'],
 ['192.168.0.202', 'RUSSIA'],
 ['192.168.0.203', 'RUSSIA'],
 ['192.168.0.204', 'RUSSIA'],
 ['192.168.0.205', 'RUSSIA']]

In [33]:
raw_logs = sc.textFile("/datasets/spark/log.txt")

In [34]:
raw_logs.take(5)

['192.168.0.10\tERROR\tWhen production fails in dipsair, whom you gonna call?',
 '192.168.0.39\tINFO\tJust an info message passing by',
 '192.168.0.35\tINFO\tJust an info message passing by',
 '192.168.0.19\tINFO\tJust an info message passing by',
 '192.168.0.23\tERROR\tWhen production fails in dipsair, whom you gonna call?']

In [35]:
logs = raw_logs.map(lambda x: x.split("\t"))

In [36]:
logs.take(5)

[['192.168.0.10',
  'ERROR',
  'When production fails in dipsair, whom you gonna call?'],
 ['192.168.0.39', 'INFO', 'Just an info message passing by'],
 ['192.168.0.35', 'INFO', 'Just an info message passing by'],
 ['192.168.0.19', 'INFO', 'Just an info message passing by'],
 ['192.168.0.23',
  'ERROR',
  'When production fails in dipsair, whom you gonna call?']]

In [16]:
logs.flatMap(lambda x: x[2].split()).take(20)

['When',
 'production',
 'fails',
 'in',
 'dipsair,',
 'whom',
 'you',
 'gonna',
 'call?',
 'Just',
 'an',
 'info',
 'message',
 'passing',
 'by',
 'Just',
 'an',
 'info',
 'message',
 'passing']

In [17]:
words = logs.flatMap(lambda x: x[2].split())

In [18]:
words.take(10)

['When',
 'production',
 'fails',
 'in',
 'dipsair,',
 'whom',
 'you',
 'gonna',
 'call?',
 'Just']

## Зачем нужны отдельные трансформации и отдельные action?

![](pics/dag1.png)

![](pics/dag2.png)

### Последовательность трансформаций определяет граф вычислений (DAG - direct acyclic graph). В нем есть партиции и зависимости между партициями. Таким образом Spark имеет всю необходимую информацию для вычилсения графа в любой точке и возможных оптимизаций

![](pics/dag3.png)

### Трансформации бывают *узкими*

![](pics/narrow_transformation.png)

### И *широкими*

![](pics/wide_transformation.png)

### Широкие трансформации разделяют джоб на стейджи. Между стейджами происходит shuffle данных, которого надо избегать

## Персистентность и кэширование

### RDD вычисляются лениво, когда вызывается action. Часто мы хотим вызвать несколько actions для одного и тоге же RDD. Если мы просто сделаем это, то граф будет полностью перевычисляться каждый раз.

In [45]:
ips.count()

1000

In [46]:
ips.top(10)

[['192.168.3.99', 'USA'],
 ['192.168.3.98', 'USA'],
 ['192.168.3.97', 'USA'],
 ['192.168.3.96', 'USA'],
 ['192.168.3.95', 'USA'],
 ['192.168.3.94', 'USA'],
 ['192.168.3.93', 'USA'],
 ['192.168.3.92', 'USA'],
 ['192.168.3.91', 'USA'],
 ['192.168.3.90', 'USA']]

### Чтобы этого избежать, мы можем закэшировать RDD в памяти. Кэширование произойдет при вызове первого action.

In [47]:
ips_cached = ips.cache()

In [48]:
ips_cached

PythonRDD[34] at RDD at PythonRDD.scala:53

In [49]:
ips_cached.count()

1000

In [50]:
ips_cached.top(20)

[['192.168.3.99', 'USA'],
 ['192.168.3.98', 'USA'],
 ['192.168.3.97', 'USA'],
 ['192.168.3.96', 'USA'],
 ['192.168.3.95', 'USA'],
 ['192.168.3.94', 'USA'],
 ['192.168.3.93', 'USA'],
 ['192.168.3.92', 'USA'],
 ['192.168.3.91', 'USA'],
 ['192.168.3.90', 'USA'],
 ['192.168.3.9', 'USA'],
 ['192.168.3.89', 'USA'],
 ['192.168.3.88', 'USA'],
 ['192.168.3.87', 'USA'],
 ['192.168.3.86', 'USA'],
 ['192.168.3.85', 'USA'],
 ['192.168.3.84', 'USA'],
 ['192.168.3.83', 'USA'],
 ['192.168.3.82', 'USA'],
 ['192.168.3.81', 'USA']]

In [51]:
ips_cached

PythonRDD[34] at RDD at PythonRDD.scala:53

### `cache()` сохраняет RDD в памяти. Для большего контроля можно использовать `persist(storage_level)`:
+ MEMORY_ONLY
+ MEMORY_AND_DISK
+ DISK_ONLY
+ MEMORY_ONLY_2
+ MEMORY_AND_DISK_2

### Все сохраненные RDD можно увидеть во вкладке "Storage" Spark UI
### Или более программатичным способом

In [52]:
from pyspark import StorageLevel

In [53]:
StorageLevel(False, True, False, False, 1)

StorageLevel(False, True, False, False, 1)

In [54]:
ips.getStorageLevel()

StorageLevel(False, True, False, False, 1)

In [55]:
ips.unpersist()

PythonRDD[34] at RDD at PythonRDD.scala:53

In [57]:
ips.persist(StorageLevel.DISK_ONLY_2)

PythonRDD[34] at RDD at PythonRDD.scala:53

In [58]:
ips.getStorageLevel()

StorageLevel(True, False, False, False, 2)

In [59]:
ips.count()

1000

## PairRDD (ключ-значение)

### PairRDD - это RDD для работы с парами ключ-значение. Spark предполагает, что PairRDD содержить в себе объекты, состящие ровно из двух элементов! PairRDD предоставляют методы группировки, аггрегации и объединения (join) двух RDD

### Пусть есть задача подсчитать распределение кодов ERROR и WARNING в лог-файле

In [60]:
raw_logs.take(5)

['192.168.0.10\tERROR\tWhen production fails in dipsair, whom you gonna call?',
 '192.168.0.39\tINFO\tJust an info message passing by',
 '192.168.0.35\tINFO\tJust an info message passing by',
 '192.168.0.19\tINFO\tJust an info message passing by',
 '192.168.0.23\tERROR\tWhen production fails in dipsair, whom you gonna call?']

In [61]:
(raw_logs.filter(lambda x: "INFO" not in x)
         .map(lambda x: (x.split("\t")[1], 1))\
         .groupByKey()
         .collect())

 ('ERROR', <pyspark.resultiterable.ResultIterable at 0x7fa5203dd410>)]

In [62]:
(raw_logs.filter(lambda x: "INFO" not in x)
         .map(lambda x: (x.split("\t")[1], 1))\
         .groupByKey()
         .map(lambda x: (x[0], len(x[1])))
         .collect())



### Или немного проще

In [63]:
(raw_logs.filter(lambda x: "INFO" not in x)
         .map(lambda x: (x.split("\t")[1], 1))
         .countByKey()
         .items())



### Стоит заметить, что `groupByKey()` предполагает перемещение всех записей с одним ключом на один экзекьютор. В случае очень скоршенных распределений это может привести к падению экзекьютора с OOM. Поэтому всегда при группировках стоит подумать об использовании `reduceByKey()`.

In [None]:
(raw_logs.filter(lambda x: "INFO" not in x)
         .map(lambda x: (x.split("\t")[1], 1))\
         .reduceByKey(lambda x, y: x + y)
         .collect())

## Join

### Два PairRDD можно объединить по ключу
### Поддерживаются inner join, left outer join, right outer join и full outer join

In [37]:
logs.take(5)

[['192.168.0.10',
  'ERROR',
  'When production fails in dipsair, whom you gonna call?'],
 ['192.168.0.39', 'INFO', 'Just an info message passing by'],
 ['192.168.0.35', 'INFO', 'Just an info message passing by'],
 ['192.168.0.19', 'INFO', 'Just an info message passing by'],
 ['192.168.0.23',
  'ERROR',
  'When production fails in dipsair, whom you gonna call?']]

In [38]:
ips.take(5)

[['192.168.0.1', 'CHINA'],
 ['192.168.0.2', 'CHINA'],
 ['192.168.0.3', 'CHINA'],
 ['192.168.0.4', 'CHINA'],
 ['192.168.0.5', 'CHINA']]

In [42]:
ips.filter(lambda x: x[0] == '192.168.0.23').take(5)

[['192.168.0.23', 'CHINA']]

In [43]:
logs.join(ips).take(5)

[('192.168.0.23', ('ERROR', 'CHINA')),
 ('192.168.0.23', ('INFO', 'CHINA')),
 ('192.168.0.23', ('INFO', 'CHINA')),
 ('192.168.0.23', ('ERROR', 'CHINA')),
 ('192.168.0.23', ('INFO', 'CHINA'))]

![](pics/Jackie-Chan-WTF.jpg)

### Не стоит забывать, что Spark предполагает, что PairRDD состоит ровно! из двух элементов, поэтому все остальные элементы просто отбрасываются!

In [65]:
def split_logs(line):
    split = line.split("\t")
    return split[0], split[1:]

In [66]:
logs_cached = raw_logs.map(split_logs).cache()

In [67]:
logs_cached.take(5)

[('192.168.0.10',
  ['ERROR', 'When production fails in dipsair, whom you gonna call?']),
 ('192.168.0.39', ['INFO', 'Just an info message passing by']),
 ('192.168.0.35', ['INFO', 'Just an info message passing by']),
 ('192.168.0.19', ['INFO', 'Just an info message passing by']),
 ('192.168.0.23',
  ['ERROR', 'When production fails in dipsair, whom you gonna call?'])]

In [68]:
logs_cached.join(ips).take(5)

[('192.168.0.4', (['INFO', 'Just an info message passing by'], 'CHINA')),
 ('192.168.0.4', (['INFO', 'Just an info message passing by'], 'CHINA')),

## Управление параллелизмом.

### Вспомним, что атомарным уровнем параллелизма в Spark является партиция. Об этом всегда стоит помнить, когда есть проблемы с производительностью приложения

In [69]:
logs.getNumPartitions()

4

### Метод `repartition()` может быть использован для изменения числа партиций.

In [70]:
logs = logs.repartition(8)

In [71]:
logs.getNumPartitions()

8

### `repartition()` всегда приводит к равномерному перераспределению данных, что ведет к shuffle. Если Вы уменьшаете число партиций, то стоит использовать `coalesce()`, который может избежать shuffle

In [72]:
logs = logs.coalesce(10)

In [73]:
logs.getNumPartitions()

8

In [74]:
logs = logs.coalesce(4)

In [75]:
logs.getNumPartitions()

4

### Узнать дефолтный уровень параллелизма можно из конфига. По-умолчанию, при работе с YARN, использукется общее число ядер, выделенных этому SparkContext на всех экзекьюторах, либо 2. Что больше.

In [76]:
sc.getConf().get("spark.default.parallelism")

In [None]:
sc.parallelize(range(100)).getNumPartitions()

## Broadcast

### Broadcast-объект - это неизменяемый объект, которая разделяется между всеми экзекьюторами
### Дистрибуция broadcast-объекта производится быстро и эффективно p2p-протоколом

### Реализуем map-side join с помощью broadcast-объекта

In [77]:
ips_local = dict(ips.collect())

In [78]:
ips_local['192.168.0.10']

'CHINA'

In [79]:
ips_broadcasted = sc.broadcast(ips_local)

In [80]:
ips_broadcasted

<pyspark.broadcast.Broadcast at 0x7fa503c53c10>

In [81]:
ips_broadcasted.value['192.168.0.10']

'CHINA'

In [None]:
logs_cached.take(5)

In [82]:
def resolve_ip(row):


In [83]:
logs_cached.map(resolve_ip).take(10)

[('CHINA',
  (['ERROR', 'When production fails in dipsair, whom you gonna call?'],)),
 ('CHINA', (['INFO', 'Just an info message passing by'],)),
 ('CHINA', (['INFO', 'Just an info message passing by'],)),
 ('CHINA', (['INFO', 'Just an info message passing by'],)),
 ('CHINA',
  (['ERROR', 'When production fails in dipsair, whom you gonna call?'],)),
 ('CHINA',
  (['ERROR', 'When production fails in dipsair, whom you gonna call?'],)),
 ('CHINA', (['INFO', 'Just an info message passing by'],)),
 ('CHINA', (['INFO', 'Just an info message passing by'],)),
 ('CHINA', (['INFO', 'Just an info message passing by'],))]

## Не забудьте погасить SparkContext!

In [44]:
sc.stop()