# Python для анализа данных

# Spark / PySpark 

#### автор: Валентин Бирюков


Spark является все более популярной кластерной вычислительной системой на основе Apache Hadoop, которая предлагает большую потенциальную ценность благодаря своей скорости и простоте использования. Мы рассмотрим его здесь, уделив особое внимание интерфейсу Python для Spark: PySpark.

In [None]:
# spark использует mapreduce техноологию
# мы будем общаться с spark через локальный API с использованием SparkContext

Подготовка
-------------

Для работы нам потребуется собствено сам Spark который можно скачать и установить с официального сайта http://spark.apache.org/downloads.html

Так же для его успешного функционирования потребуется Java8/11. И вот тут могут возникнуть сложности, поскольку сейчас последняя и поддерживаемая верся - Java11, но самостоятельная настройка может вызывать затруднение при совместимости пакетов, такие как ошибка вида:

`Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe. : java.lang.IllegalArgumentException: Unsupported class file major version 55` 

в таком случае самый простой вариант запустить данный блокнот используя **Google Colab**. 



*Замечание 1:
При локальном запуске и запуске в virtualenv мы должны указать Spark использовать текущую версию Python, иначе она будет использовать системную версию Python по умолчанию. Вставьте это в свой код: `os.environ['PYSPARK_PYTHON'] = sys.executable`.*

*Замечание 2:
Spark имеет веб-интерфейс, который показывает запущенные задачи, выполняющиеся процессы и различную статистику. Запуская локально, это может наблюдать в интерфесе http://localhost:4040/.*

*Замечание 3:
Запуская же ноутбук в **colab** чтобы получить такую ссылку раскомментируйте ячейку ниже и запустите ее. По этой ссылке будет доступен аналог локального хоста только для облачного блокнота. По этой ссылке доступ будет только у вас, залогиненных в учетной записи google. Для других же пользователей эта ссылка будет выдавать 403 ошибку - ошибку доступа к ресурсу.*

In [11]:
from google.colab.output import eval_js
print(eval_js("google.colab.kernel.proxyPort(4040)"))

https://o2gqkot9pq-496ff2e9c6d22116-4040-colab.googleusercontent.com/


## Поставим сам модуль

In [2]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/8e/b0/bf9020b56492281b9c9d8aae8f44ff51e1bc91b3ef5a884385cb4e389a40/pyspark-3.0.0.tar.gz (204.7MB)
[K     |████████████████████████████████| 204.7MB 66kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 26.2MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.0-py2.py3-none-any.whl size=205044182 sha256=8066561adcc49e9a690831a4e60f2d7782cd30baf6e7cfce181cd07224f5013c
  Stored in directory: /root/.cache/pip/wheels/57/27/4d/ddacf7143f8d5b76c45c61ee2e43d9f8492fc5a8e78ebd7d37
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.0


Обращения к pyspark
---------------

Чтобы вызвать Spark из Python, нам нужно использовать интерфейс PySpark. Например, его можно вызвать интерактивной оболочкой из вашей домашней директории Spark.:

    ./bin/pyspark

Как оболочка iPython Spark:

    IPYTHON=1 ./bin/pyspark

Или как пусковая установка для скриптов:

    ./bin/pyspark --master local

Ниже мы рассмотрим, как использовать API PySpark внутри скриптов Python.

In [3]:
import os
import sys

# Spark's home directory (here it's: ~/spark-1.6.0) should be set as an environment variable.
# (Of course setting an env. variable doesn't need to be done from Python; any method will do.)
# os.environ['SPARK_HOME'] = os.path.join(os.path.expanduser('~'), 'spark-1.6.0')

# Add Spark's Python interface (PySpark) to PYTHONPATH.
# (Again: this doesn't need to be done from Python.)
# sys.path.append(os.path.join(os.environ.get('SPARK_HOME'), 'python'))

# This can be useful for running in virtualenvs:
# os.environ['PYSPARK_PYTHON'] = '/home/nico/virtualenv/bin/python'

# OK, now we can import PySpark
from pyspark import SparkContext, SparkConf

Внутри нашей *рабочей программки* соединение с Spark представлено экземпляром `SparkContext`. Для локального запуска Spark вы можете просто создать его с помощью:

    sc = SparkContext('local', 'mySparkApp')

Кроме того, вы можете использовать экземпляр `SparkConf` для управления различными свойствами конфигурации Spark, что мы и будем рассматривать ниже.

In [4]:
# создаем пустой config
conf = SparkConf()
conf.toDebugString()

''

In [5]:
# укажем что будем мы запускать все это локально
conf.setMaster('local') # задаем машину, которая раздает задания другим машинам
conf.setAppName('spark_tutorial') # некоторый alias нашего "приложения"
# SparkConf имеет методы 'set', 'setAll' и 'setIfMissing' которые могут быть использованы
# для уточнения конфигурации нашего "кластера" - той части которую мы хотим заиспользовать
# например - задействовать 4 ядра и 1Gb оперативы
conf.setIfMissing("spark.cores.max", "4")
conf.set("spark.executor.memory", "1g")

<pyspark.conf.SparkConf at 0x7fed4adf47f0>

In [6]:
# Другой вариан, задать все это разом:
conf.setAll([('spark.cores.max', '4'), (("spark.executor.memory", "1g"))])

<pyspark.conf.SparkConf at 0x7fed4adf47f0>

In [7]:
# И теперь запустим spark с такой конфигурацией
sc = SparkContext(conf=conf)

# остановить же это можно с помощью следующей команды в ручном режиме:
# sc.stop()

Как работает Spark, очень-очень вкратце
-------------------------

Spark использует *диспетчер кластеров* (например, собственный автономный менеджер Spark, YARN или Mesos) и несколько *рабочих узлов*. Менеджер задач (ака master/main) пытается получить *исполнителей* (ака slaves/secondary) на рабочих узлах, которые выполняют вычисления и хранят данные на основе кода и задач, которые им отправляются.


Основная абстракция Spark - это так называемый *Resilient Distributed Dataset (RDD)*. Spark может создавать RDD из любого источника хранения, поддерживаемого Hadoop. RDD содержит промежуточные результаты вычислений и хранится в ОЗУ или на диске на рабочих узлах. В случае сбоя узла, RDD может быть восстановлен. Многие процессы могут выполняться параллельно благодаря распределенной природе RDD, а конвейерная обработка и отложенное выполнение предотвращают необходимость сохранения промежуточных результатов для следующего шага. Важно отметить, что Spark поддерживает извлечение наборов данных в кластерный *кэш в памяти* для быстрого доступа.

Операции RDD можно разделить на 2 группы: *преобразования* (transform) и *действия* (actions). Преобразования (например, `map`) RDD всегда приводят к новым RDD, а действия (например, `reduce`) возвращают значения, которые являются результатом операций над RDD, обратно в программу драйвера.






Спарк - это надстройка, чтобы орекстировать кластерами - распределять задачи между машинами. Главный процесс раздает задания побочным, побочные выполняют задания, мастер затем все это собирает в один результат. rdd - распределенное хранилище данных. Промежуточные вычисления идут в память машины или на жесткий диск. Если одна из машин выходит из строя, мы можем восстанавливать данных, так как спарк использует механизм репликации. Реплика в оснвном создается на другом компьютере, который не используется для обработки, только для хранения реплики. 
К данным на жестком диске обращаться долго, к кешам - быстро, к оперативной памяти - быстрее, чем к жесткому диску, но медленнее, чем к кешу.  



### RDD и распределенные данные

Сейчас, когда мы запускаем все это дело локально на одной машине - они в реалиях не очень то распределенные, они лежат на физическом одном диске. Однако даже в этом случае запускаясь локально Spark будет оркестрировать всем, как будто у него маленький кластер. Настолько маленький, что ровно из одного вашего компьютера =)

In [12]:
# 'parallelize' создает RDD путем распределения данных по кластеру
rdd = sc.parallelize(range(14), numSlices=4)
# по сути создаем список из 14 элементов которой храним распределенно, на 4 "файлах"
print("Number of partitions: {}".format(rdd.getNumPartitions()))
# 'glom' перечисляет все элементы в каждом разделе
# collect - это тот самый action
print(rdd.glom().collect())

Number of partitions: 4
[[0, 1, 2], [3, 4, 5, 6], [7, 8, 9], [10, 11, 12, 13]]


### Spark - ленивый
Несмотря на любые промежуточные преобразования, Spark запускается только после выполнения *действия* на RDD. Это связано с тем, что он пытается выполнить умную конвейеризацию операций, чтобы не сохранять промежуточные результаты.

Этакий знакомый аналог `map` в питоне, который по факту еще ничего не применяет

только когда пишем collect массив разворачивается в памяти и происходит преобразование. только тогда получаем массив.

In [None]:
rddSquared = rdd.map(lambda x: x ** 2)

print(rddSquared.collect())

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169]


In [None]:
# Альтернативный вариант, с созданием функции:
def squared(x):
    return x ** 2
rddSquared = rdd.map(squared)
print(rddSquared.collect())


В данных обоих случаях только `collect` инициировал работу с данными, остальные же созданные преобразования откладывались как "состояния"

Рассмотрим другие популярные преобразования

In [10]:
# Преобразования
# flatMap - если обрабатываем массив массивов, то получим один распакованный список
# -----------------------

func = lambda x: -x
rdd.map(func)
rdd.flatMap(func) # почти как map, только результат будет распакован
rdd.filter(func)
rdd.sortBy(func)

PythonRDD[10] at RDD at PythonRDD.scala:53

In [13]:
# не может сделать flatMap от функции
rdd.flatMap(func).collect()

Py4JJavaError: ignored

In [19]:
# развернули и сделали преобразование
import numpy as np
sc.parallelize(np.array([[1,2], [3,4]]), numSlices=4).flatMap(lambda x: x**2).collect()
# фильтрация
sc.parallelize(range(15), numSlices=4).filter(lambda x: x%2).collect()
# сортировка
sc.parallelize(range(15), numSlices=4).sortBy(lambda x: -x).collect()

[14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0]

In [None]:
# действия
# ---------------
# reduce получает на вход пару объектов
rdd.reduce(lambda x, y: x + y)
rdd.count()

In [22]:
print(sum(range(14))) # накопленная сумма
rdd.reduce(lambda x, y: x + y) # reduce бежит окном, считаем накопленным итогом

91


91

В обоих этих случаях операции по сути никуда не применились, можно сказать что мы выстроили процесс по которому будут выполняться узлы, однако каждый из них вел в "никуда"

In [None]:
# Действия, возвращающие полученные в данные
# в качестве действия можно использовать обращение к данным:
print(rdd.collect())                    # вернуть все эллементы
print(rdd.first())                      # вернуть первый элемент
print(rdd.take(5))                      # вернуть первые N элементов
print(rdd.top(3))                       # Вернуть первые N элементов упорядоченные по убыванию
print(rdd.takeOrdered(7, lambda x: -x)) # Вернуть N эллементов, отсортированных согласно какой то "функции"

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
0
[0, 1, 2, 3, 4]
[13, 12, 11]
[13, 12, 11, 10, 9, 8, 7]


# Упражнение: Решето Эратосфена.

Напишите алгоритм просеивания простых чисел оперирую pyspark

Подсказка: все не так то просто, последовательные фильтры надо явно заставлять выполнять

In [None]:
n = 1000
primes = sc.parallelize(range(2, n), numSlices=4)
true_primes = []
# перебираем делители
for div in range(int(n**0.5)):
  # берем первое число в массиве primes
  prime = primes.first()
  true_primes.append(prime)
  # фильтруем: все, что разделилось на prime - убираем
  primes = primes.filter(lambda x: x%prime)
  # primes - это новый датасет из того, что осталось после фильтрации
  # просто primes - это ссылка на объект, для материализации нужно вызвать primes.collect()
  primes = sc.parallelize(primes.collect(), numSlices=4)


true_primes.extend(primes.collect())
print(true_primes)

[2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83, 89, 97, 101, 103, 107, 109, 113, 127, 131, 137, 139, 149, 151, 157, 163, 167, 173, 179, 181, 191, 193, 197, 199, 211, 223, 227, 229, 233, 239, 241, 251, 257, 263, 269, 271, 277, 281, 283, 293, 307, 311, 313, 317, 331, 337, 347, 349, 353, 359, 367, 373, 379, 383, 389, 397, 401, 409, 419, 421, 431, 433, 439, 443, 449, 457, 461, 463, 467, 479, 487, 491, 499, 503, 509, 521, 523, 541, 547, 557, 563, 569, 571, 577, 587, 593, 599, 601, 607, 613, 617, 619, 631, 641, 643, 647, 653, 659, 661, 673, 677, 683, 691, 701, 709, 719, 727, 733, 739, 743, 751, 757, 761, 769, 773, 787, 797, 809, 811, 821, 823, 827, 829, 839, 853, 857, 859, 863, 877, 881, 883, 887, 907, 911, 919, 929, 937, 941, 947, 953, 967, 971, 977, 983, 991, 997]


### RDD может использовать кеши
Spark позволяет пользователю контролировать, какие данные и как кэшируются. Правильное кэширование RDD может быть чрезвычайно полезным! Всякий раз, когда у вас есть RDD, который будет использоваться повторно несколько раз, вам следует рассмотреть возможность его кэширования.

 часть кеширования остается на узлах, в мастере хранятся только агреггированные результаты

In [25]:
import numpy as np

NUM_SAMPLES = int(1e6)
rddBig = sc.parallelize(np.random.random(NUM_SAMPLES))

# нет кэширования: будет пересчитываться каждый раз, когда мы проходим цикл
rddBigTrans = rddBig.map(lambda x: (x ** 2 - 0.1) ** 0.5)
print(rddBigTrans.getStorageLevel())
for threshold in (0.2, 0.4, 0.6, 0.8):
    %timeit -n 1 -r 1 rddBigTrans.filter(lambda x: x >= threshold).count()

Serialized 1x Replicated
1 loop, best of 1: 2.29 s per loop
1 loop, best of 1: 2.26 s per loop
1 loop, best of 1: 2.24 s per loop
1 loop, best of 1: 2.17 s per loop


In [26]:
# мы кешируем этот промежуточный результат, потому что он будет неоднократно вызываться
rddBigTrans_c = rddBig.map(lambda x: (x ** 2 - 0.1) ** 0.5).cache()
print(rddBigTrans_c.getStorageLevel())
for threshold in (0.2, 0.4, 0.6, 0.8):
    %timeit -n 1 -r 1 rddBigTrans_c.filter(lambda x: x >= threshold).count()

Memory Serialized 1x Replicated
1 loop, best of 1: 4.97 s per loop
1 loop, best of 1: 771 ms per loop
1 loop, best of 1: 779 ms per loop
1 loop, best of 1: 823 ms per loop


In [27]:
# используем unpersist для удаления из кэша
# Serialized 1x Replicated - данные сериализованные с 1 репликой
print(rddBigTrans_c.unpersist().getStorageLevel())
# для еще более детального управления кэшированием используйте функцию «persist» 
from pyspark import storagelevel
# явно задаем, где хранить данные 
print(rddBigTrans.persist(storagelevel.StorageLevel.MEMORY_AND_DISK).getStorageLevel())

Serialized 1x Replicated
Disk Memory Serialized 1x Replicated


### Spark: key-value хранилище
Так называемые PairRDD - это RDD, в которых хранятся пары ключ-значение. В Spark используется множество специальных операций, таких как объединение по ключу, группирование по ключу и т. д.

In [28]:
# PairRDD автоматически создаются всякий раз, когда мы представляем список кортежей ключ-значение
# Здесь мы трансформируем rddA и создаем ключ на основе четных / нечетных флагов.
rddP1 = rdd.map(lambda x: (x % 2 == 0, x))
rddP1.collect()

[(True, 0),
 (False, 1),
 (True, 2),
 (False, 3),
 (True, 4),
 (False, 5),
 (True, 6),
 (False, 7),
 (True, 8),
 (False, 9),
 (True, 10),
 (False, 11),
 (True, 12),
 (False, 13)]

In [None]:
# Более понятный вариант для этого:
rddP1 = rdd.keyBy(lambda x: x % 2 == 0)
rddP1.collect()

[(True, 0),
 (False, 1),
 (True, 2),
 (False, 3),
 (True, 4),
 (False, 5),
 (True, 6),
 (False, 7),
 (True, 8),
 (False, 9),
 (True, 10),
 (False, 11),
 (True, 12),
 (False, 13)]

In [None]:
# Другой способ создать PairRDD - это заархивировать два RDD (предполагается, что RDD одинаковой длины)
print("Zipped: {}".format(rdd.zip(rdd).collect()))

Zipped: [(0, 0), (1, 1), (2, 2), (3, 3), (4, 4), (5, 5), (6, 6), (7, 7), (8, 8), (9, 9), (10, 10), (11, 11), (12, 12), (13, 13)]


In [29]:
# Доступ к ключам и значениям
print("Keys: {}".format(rddP1.keys().collect()))
print("Values: {}".format(rddP1.values().collect()))

Keys: [True, False, True, False, True, False, True, False, True, False, True, False, True, False]
Values: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]


In [30]:
# Другой вариант обращения к ключам-значением - через кортеж; x[0] - key, x[1] - value
print(rddP1.map(lambda x: (x[0], x[1] ** 2)).collect())

[(True, 0), (False, 1), (True, 4), (False, 9), (True, 16), (False, 25), (True, 36), (False, 49), (True, 64), (False, 81), (True, 100), (False, 121), (True, 144), (False, 169)]


In [31]:
# Лучше: mapValues / flatMapValues, который работает только со значениями и сохраняет ключи на месте
print(rddP1.mapValues(lambda x: x ** 2).collect())

[(True, 0), (False, 1), (True, 4), (False, 9), (True, 16), (False, 25), (True, 36), (False, 49), (True, 64), (False, 81), (True, 100), (False, 121), (True, 144), (False, 169)]


In [32]:
# Мы также можем вернуться от PairRDD к обычному RDD, просто опустив ключ
print(rddP1.map(lambda x: x[1] ** 2).collect())

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169]


In [33]:
# Возможны различные агрегации по ключу, такие как reduceByKey, combineByKey и foldByKey
# пример с reduceByKey:
print("Sum per key: {}".format(rddP1.reduceByKey(lambda x, y: x + y).collect()))

Sum per key: [(False, 49), (True, 42)]


In [34]:
# Кроме того, некоторые общие операции доступны в форме «ByKey», например:
rddP1.sortByKey()
rddP1.countByKey()

defaultdict(int, {False: 7, True: 7})

# Группировка и соединение по ключу

In [37]:
# Существуют различные возможные способы объединения двух RDD по ключу:
rddP2 = sc.parallelize(range(0, 28, 2)).map(lambda x: (x % 2 == 0, x))
rddP2.collect()

[(True, 0),
 (True, 2),
 (True, 4),
 (True, 6),
 (True, 8),
 (True, 10),
 (True, 12),
 (True, 14),
 (True, 16),
 (True, 18),
 (True, 20),
 (True, 22),
 (True, 24),
 (True, 26)]

In [38]:
# inner join / cross join в случае наложения ключей
print("Join: {}".format(rddP1.join(rddP2).collect()))

Join: [(True, (0, 0)), (True, (0, 2)), (True, (0, 4)), (True, (0, 6)), (True, (0, 8)), (True, (0, 10)), (True, (0, 12)), (True, (0, 14)), (True, (0, 16)), (True, (0, 18)), (True, (0, 20)), (True, (0, 22)), (True, (0, 24)), (True, (0, 26)), (True, (2, 0)), (True, (2, 2)), (True, (2, 4)), (True, (2, 6)), (True, (2, 8)), (True, (2, 10)), (True, (2, 12)), (True, (2, 14)), (True, (2, 16)), (True, (2, 18)), (True, (2, 20)), (True, (2, 22)), (True, (2, 24)), (True, (2, 26)), (True, (4, 0)), (True, (4, 2)), (True, (4, 4)), (True, (4, 6)), (True, (4, 8)), (True, (4, 10)), (True, (4, 12)), (True, (4, 14)), (True, (4, 16)), (True, (4, 18)), (True, (4, 20)), (True, (4, 22)), (True, (4, 24)), (True, (4, 26)), (True, (6, 0)), (True, (6, 2)), (True, (6, 4)), (True, (6, 6)), (True, (6, 8)), (True, (6, 10)), (True, (6, 12)), (True, (6, 14)), (True, (6, 16)), (True, (6, 18)), (True, (6, 20)), (True, (6, 22)), (True, (6, 24)), (True, (6, 26)), (True, (8, 0)), (True, (8, 2)), (True, (8, 4)), (True, (8, 6)

In [39]:
# left/right outer join
rddP1.leftOuterJoin(rddP2)
rddP1.rightOuterJoin(rddP2)

PythonRDD[112] at RDD at PythonRDD.scala:53

In [None]:
# для всех ключей в rddP1 или rddP2 cogroup возвращает итерируемые значения
# 
print("Cogroup: {}".format(rddP1.cogroup(rddP2).collect()))
# Группируем вместе более двух RDD по ключу можно с помощью groupWith
rddP1.groupWith(rddP2, rddP2)

# с groupByKey мы создаем новый RDD, который сохраняет те же ключи на том же узле, где это возможно
print("After groupByKey: {}".format(rddP1.groupByKey().glom().collect()))

In [44]:
a = rddP1.cogroup(rddP2).collect()
list(a[0][1])

[<pyspark.resultiterable.ResultIterable at 0x7effbc763f60>,
 <pyspark.resultiterable.ResultIterable at 0x7effbc769048>]

### Spark: работа напряму с созданием фреймов RDD из текстовых файлов

In [45]:
# TODO: Вариант для colab, локально можно поискать другие удобные файлы
from pyspark import SparkFiles
sc.addFile(os.path.join('/content/sample_data', 'README.md'))
rddT = sc.textFile(SparkFiles.get('README.md'))
# берем первые 5 строчек
print(rddT.take(5))

['This directory includes a few sample datasets to get you started.', '', '*   `california_housing_data*.csv` is California housing data from the 1990 US', '    Census; more information is available at:', '    https://developers.google.com/machine-learning/crash-course/california-housing-data-description']


### RDDs простые статистические аггрегаторы

In [46]:
# reducers
print(rdd.stats())
print(rdd.count())
print(rdd.sum())
print(rdd.mean())
print(rdd.stdev(), rdd.sampleStdev())
print(rdd.variance(), rdd.sampleVariance())
print(rdd.min(), rdd.max())
print(rdd.histogram(5))

(count: 14, mean: 6.5, stdev: 4.031128874149275, max: 13.0, min: 0.0)
14
91
6.5
4.031128874149275 4.183300132670378
16.25 17.5
0 13
([0.0, 2.6, 5.2, 7.800000000000001, 10.4, 13], [3, 3, 2, 3, 3])


### RDDs преобразования множеств

In [47]:
rddB = sc.parallelize(range(0, 26, 2))
print(rdd.union(rddB).collect()) # or: rdd + rddB
# не сортированы
print(rdd.union(rddB).distinct().collect())
print(rdd.intersection(rddB).collect())
print(rdd.subtract(rddB).collect())
# декартово произведение: каждому элементу одного множества ставим элемент из второго множества
print(rdd.cartesian(rddB).collect())

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 0, 2, 4, 6, 8, 10, 12, 14, 16, 18, 20, 22, 24]
[0, 5, 10, 20, 1, 6, 11, 16, 2, 7, 12, 22, 3, 8, 13, 18, 4, 9, 14, 24]
[0, 10, 6, 2, 12, 8, 4]
[5, 1, 11, 7, 3, 13, 9]
[(0, 0), (0, 2), (0, 4), (0, 6), (0, 8), (0, 10), (0, 12), (0, 14), (0, 16), (0, 18), (0, 20), (0, 22), (0, 24), (1, 0), (2, 0), (1, 2), (1, 4), (2, 2), (2, 4), (1, 6), (1, 8), (1, 10), (1, 12), (2, 6), (2, 8), (2, 10), (2, 12), (1, 14), (1, 16), (1, 18), (1, 20), (1, 22), (1, 24), (2, 14), (2, 16), (2, 18), (2, 20), (2, 22), (2, 24), (3, 0), (3, 2), (3, 4), (3, 6), (3, 8), (3, 10), (3, 12), (3, 14), (3, 16), (3, 18), (3, 20), (3, 22), (3, 24), (4, 0), (5, 0), (4, 2), (4, 4), (5, 2), (5, 4), (4, 6), (4, 8), (4, 10), (4, 12), (5, 6), (5, 8), (5, 10), (5, 12), (4, 14), (4, 16), (4, 18), (4, 20), (4, 22), (4, 24), (5, 14), (5, 16), (5, 18), (5, 20), (5, 22), (5, 24), (6, 0), (6, 2), (6, 4), (6, 6), (6, 8), (6, 10), (6, 12), (6, 14), (6, 16), (6, 18), (6, 20), (6, 22), (6, 24), (7

### Spark поддержка общих переменных

In [1]:
# Общая переменная копируется на каждую машину только один раз, эффективным образом.
# Это очень удобно, когда каждый узел использует данные в нем, и особенно, если данные
# большие и в противном случае будут отправлены по сети несколько раз.
# такая переменная отправляется на каждый узел, таким образом данные не гоняются каждый раз из мастера в узлы
broadcastVar = sc.broadcast({'CA': 'California', 'NL': 'Netherlands'})
print(broadcastVar.value)

NameError: ignored

In [49]:
# "Аккумулятор" является общей переменной, которая живет на главном узле,
# который каждая операция может просматривать.
accu = sc.accumulator(0)

In [50]:
# 'foreach' просто применяет функцию к каждому элементу RDD, ничего не возвращая
rdd.foreach(lambda x: accu.add(x))
print(accu.value)

91


Популярные баги:
--------

### Не кэшировать промежуточные результаты, которые  используются позже

Кешировать необходимо маленькие объемы данных

In [None]:
print("Not so great:")
rddBigTrans = rddBig.map(lambda x: (x ** 2 - 0.1) ** 0.5)
for threshold in (0.2, 0.4, 0.6, 0.8):
    %timeit -n 1 -r 1 rddBigTrans.filter(lambda x: x >= threshold).count()

In [None]:
print("Better:")
rddBigTrans_c = rddBig.map(lambda x: (x ** 2 - 0.1) ** 0.5).cache()
for threshold in (0.2, 0.4, 0.6, 0.8):
    %timeit -n 1 -r 1 rddBigTrans_c.filter(lambda x: x >= threshold).count()

### Не учитывать, когда и как данные передаются через кластер
Имейте в виду, что Spark является распределенной вычислительной средой и что следует избегать передачи данных по сети внутри кластера (пропускная способность сети в ~100 раз дороже пропускной способности памяти).



In [None]:
# groupByKey запускает случайное воспроизведение, поэтому по сети копируется много данных
sumPerKey = rddP1.groupByKey().mapValues(lambda x: sum(x)).collect()

In [None]:
# Лучше: reduceByKey уменьшает ту передачу локально перед "перетасовкой"
sumPerKey = rddP1.reduceByKey(lambda x, y: x + y).collect()

### Не работать с соответствующим количеством разделов

Недостаточное количество разделов (партиций)) приводит к плохому параллелизму в кластере.

Это также оказывает нагрузку на память для определенных операций.



In [None]:
# С другой стороны, предположим, что RDD распределен по 1000 разделам,
# но мы работаем только над небольшим подмножеством данных в RDD, например:
rddF = rdd.filter(lambda x: x < 0.1).map(lambda x: x ** 2)

In [None]:
# Затем мы эффективно создаем много пустых задач и используем объединение или перераспределение.
# было бы полезно создать RDD с меньшим количеством разделов
rddF = rdd.filter(lambda x: x < 3).coalesce(10).map(lambda x: x ** 2)

### Используя преобразование с высокими накладными расходами на элемент, лучше использовать mapPartitions

In [None]:
# Например, просто подключение к базе и отключени от нее уже требует расходов
def db_operation(x):
    # тут мы подключилис
    # Поделали что-то с элементом
    # завершаем действие, отключаемся от базы
    pass

# Особенно, если вы повторите это для каждого элемента:
rdd.map(db_operation)

In [None]:
# Лучше: делайте это на уровне раздела, а не на уровне элемента.
def vectorized_db_operation(x):
    # тут мы подключились
    # Поделали что-то с элементом
    # завершаем действие, отключаемся от базы
    pass

# в таком случае мы будем обрабатывать даныне целиком пачками, они все будут вычитывать в память за раз
result = rdd.mapPartitions(vectorized_db_operation)

### Отправка большого количества данных вместе с вызовом функции для каждого элемента

In [None]:
bigData = np.random.random(int(1e6)) #наши "большие данные", мы ж все же на одной машинке работаем

def myFunc(x):
    return x * np.random.choice(bigData)

# и тогда наш массив будет отправляться в каждую партицию, то есть просто гоняться по сети в холостую
rdd.map(myFunc)

# Лучше: сделать большие данные доступными только для чтения, чтобы они эффективно копировались по сети
bigDataBC = sc.broadcast(bigData)

В боевых задачах
-------------

### Поучим что-нибудь скайлерном

Рассмотрим полусинтетический пример. Создадим какую нибудь выборку данных, из которых мы захотим решить задачу регересси.

Вот только решение ее - будем делать распределенно.

In [54]:
from sklearn.model_selection import train_test_split, ShuffleSplit
from sklearn.datasets import make_regression
from sklearn import pipeline
from sklearn.linear_model import Ridge
from sklearn.preprocessing import StandardScaler

N = 10000   # number of data points
D = 100     # number of dimensions

X, y = make_regression(
    n_samples=N,
    n_features=D,
    n_informative=int(D*0.1),
    n_targets=1,
    bias=-6.,
    noise=50.,
    random_state=42
)
X_train, X_test, y_train, y_test = train_test_split(X, y)

In [None]:
# раскидаем данные случаным образом по партициям
samples = sc.parallelize(ShuffleSplit(y_train.size, n_iter=8))
reg_model = pipeline.Pipeline([("scaler", StandardScaler()), ("ridge", Ridge())])
# это кусочек обработки данных для обучения - перегоним переменные в нормальное распределение нормировкой,
# и потом будем запусать на них решение задачи гребневой регрессии

# обучим модель на каждой пачке и примеyим к выборке
mean_rsq = samples.map(
    lambda (index, _): reg_model.fit(X[index], y[index]).score(X_test, y_test)
).mean()
print(mean_rsq)

получили такой самопальный вариант нескольких моделей, которые как-то голосуют за данные

# Упражнение: "не боевая" работа "с боевыми" данными

Нам потребуется датасет с ценами на жилье https://www.kaggle.com/camnugent/california-housing-prices
попробуем пообрабатывать его не привычным пандасом, а с использованием спарка

# Задание 1

считаем датасет и приведем его в человеческий вид

In [8]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

# SparkSession позволяет работать с таблицей с помощью полуSQL запросов
spark = SparkSession(sc)
# поможет нам собрать из строчек более привычный пандасовский вариант
# здесь нам лучше избавиться пока от заголовка в файле,
# зато сделать данные более удобными назначива постолбцовое хранение
rdd = sc.textFile('/content/sample_data/california_housing_train.csv')
# названия колонок
print(rdd.map(lambda x: x.split(',')).filter(lambda x: x[0][0] == '"').collect())
#  Row - преобразование, которое будет возвращать нам строку а-ля словарь
df = rdd.map(lambda x: x.split(',')).filter(lambda x: x[0][0] != '"').map(
    lambda row: Row(
        longitude=row[0],
        latitude=row[1],
        housing_median_age=row[2],
        total_rooms=row[3],
        total_bedrooms=row[4],
        population=row[5],
        households=row[6],
        median_income=row[7],
        median_house_value=row[8]
    )
)
# теперь можем запускать SQL запросы к таблице
df = df.toDF()
df.show()

[['"longitude"', '"latitude"', '"housing_median_age"', '"total_rooms"', '"total_bedrooms"', '"population"', '"households"', '"median_income"', '"median_house_value"']]
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|  longitude| latitude|housing_median_age|total_rooms|total_bedrooms| population| households|median_income|median_house_value|
+-----------+---------+------------------+-----------+--------------+-----------+-----------+-------------+------------------+
|-114.310000|34.190000|         15.000000|5612.000000|   1283.000000|1015.000000| 472.000000|     1.493600|      66900.000000|
|-114.470000|34.400000|         19.000000|7650.000000|   1901.000000|1129.000000| 463.000000|     1.820000|      80100.000000|
|-114.560000|33.690000|         17.000000| 720.000000|    174.000000| 333.000000| 117.000000|     1.650900|      85700.000000|
|-114.570000|33.640000|         14.000000|1501.000000|    337.000000| 

# Задание 2 

теперь проведем все колонки в типизированный вид, там же пока строки

In [9]:
# пока у нас все типы - это строки, нужно поменять тип на числовой
from pyspark.sql.types import *
col = list(map(lambda x: x.replace('"', ''), rdd.map(lambda x: x.split(',')).filter(lambda x: x[0][0] == '"').collect()[0]))

for name in col:
  # cast преобразует данные к определенному типу
  df = df.withColumn(name, df[name].cast(FloatType()))
  
# структура таблицы
df.printSchema()

root
 |-- longitude: float (nullable = true)
 |-- latitude: float (nullable = true)
 |-- housing_median_age: float (nullable = true)
 |-- total_rooms: float (nullable = true)
 |-- total_bedrooms: float (nullable = true)
 |-- population: float (nullable = true)
 |-- households: float (nullable = true)
 |-- median_income: float (nullable = true)
 |-- median_house_value: float (nullable = true)



# Задание 3

Добавим новых признаков:
    * комнат на домовладельцев
    * жителей на домовладение
    * доля спальных комнат относительно всех

In [10]:
from pyspark.sql.functions import *
df = df.withColumn('RoomsPerHouseHolder', col('total_rooms')/col('households'))
df.show()

+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------+
|longitude|latitude|housing_median_age|total_rooms|total_bedrooms|population|households|median_income|median_house_value|RoomsPerHouseHolder|
+---------+--------+------------------+-----------+--------------+----------+----------+-------------+------------------+-------------------+
|  -114.31|   34.19|              15.0|     5612.0|        1283.0|    1015.0|     472.0|       1.4936|           66900.0| 11.889830508474576|
|  -114.47|    34.4|              19.0|     7650.0|        1901.0|    1129.0|     463.0|         1.82|           80100.0|  16.52267818574514|
|  -114.56|   33.69|              17.0|      720.0|         174.0|     333.0|     117.0|       1.6509|           85700.0|  6.153846153846154|
|  -114.57|   33.64|              14.0|     1501.0|         337.0|     515.0|     226.0|       3.1917|           73400.0| 6.6415929203539825|
|  -11

# Стахостический градиентный спуск своими руками, как бонус

### Stochastic gradient descent using scikit-learn (from: https://gist.github.com/MLnick/4707012)
Each partition is a mini-batch for the SGD, uses average weights.

In [None]:
from sklearn import linear_model as lm
from sklearn.base import copy

N = 10000   # Number of data points
D = 10      # Numer of dimensions
ITERATIONS = 5
np.random.seed(seed=42)

def generate_data(N):
    return [[[1] if np.random.rand() < 0.5 else [0], np.random.randn(D)]
            for _ in range(N)]

def train(iterator, sgd):
    for x in iterator:
        sgd.partial_fit(x[1], x[0], classes=np.array([0, 1]))
    yield sgd

def merge(left, right):
    new = copy.deepcopy(left)
    new.coef_ += right.coef_
    new.intercept_ += right.intercept_
    return new

def avg_model(sgd, slices):
    sgd.coef_ /= slices
    sgd.intercept_ /= slices
    return sgd

slices = 4
data = generate_data(N)
print(len(data))

# init stochastic gradient descent
sgd = lm.SGDClassifier(loss='log')
# training
for ii in range(ITERATIONS):
    sgd = sc.parallelize(data, numSlices=slices) \
            .mapPartitions(lambda x: train(x, sgd)) \
            .reduce(lambda x, y: merge(x, y))
    # averaging weight vector => iterative parameter mixtures
    sgd = avg_model(sgd, slices)
    print("Iteration %d:" % (ii + 1))
    print("Model: ")
    print(sgd.coef_)
    print(sgd.intercept_)

The Spark universe
------------------

Other interesting tools for Spark:

- Spark SQL: http://spark.apache.org/docs/latest/sql-programming-guide.html
- MLlib, Spark's machine learning library: http://spark.apache.org/docs/latest/mllib-guide.html
- Spark Streaming, for streaming data applications: http://spark.apache.org/docs/latest/streaming-programming-guide.html

More information
----------------

### Documentation

Spark documentation: https://spark.apache.org/docs/latest/index.html

Spark programming guide: http://spark.apache.org/docs/latest/programming-guide.html

PySpark documentation: https://spark.apache.org/docs/latest/api/python/index.html

### Books

Learning Spark: http://shop.oreilly.com/product/0636920028512.do

(preview: https://www.safaribooksonline.com/library/view/learning-spark/9781449359034/)

### Talks (recommended to watch them in this order)

Parallel programming with Spark: https://www.youtube.com/watch?v=7k4yDKBYOcw

Advanced Spark features: https://www.youtube.com/watch?v=w0Tisli7zn4

PySpark: Python API for Spark: https://www.youtube.com/watch?v=xc7Lc8RA8wE

Understanding Spark performance: https://www.youtube.com/watch?v=NXp3oJHNM7E

A deeper understanding of Spark's internals: https://www.youtube.com/watch?v=dmL0N3qfSc8