### Spark

In [1]:
# ссылка на документацию : https://spark.apache.org/docs/latest/api/python/reference/pyspark.html#rdd-apis
from pyspark.sql import SparkSession

In [29]:
# Локально создаем в Jupiter
spark = SparkSession\
    .builder\
    .master("local")\
    .appName("Python Spark SQL basic example")\
    .getOrCreate()
    # создаём объект Spark-сессии, обращаясь к объекту builder, который создаёт сессию, учитывая параметры конфигурации
# явно указываем, что хотим запустить Spark в локальном режиме
# задаём название нашего Spark-приложения
# функция инициализации объекта сессии


In [20]:
rdd = spark.sparkContext.parallelize([1,2,3,4,5,6,7])
print(rdd)

ParallelCollectionRDD[0] at readRDDFromFile at PythonRDD.scala:274


In [23]:
# Закрытие сессии
spark.stop()

### Создание коллекции RDD

💡 RDD(англ. resilient distributed dataset) — отказоустойчивый распределённый набор данных, или коллекция. Тип хранения данных, представляющий собой набор элементов, разделённых по узлам кластера, с которыми можно работать параллельно.


```python
 # Создание пустой коллекции Вариант 1
rdd2 = spark.sparkContext.emptyRDD()
 # Создание пустой коллекции Вариант 2
rdd2 = spark.sparkContext.parallelize([])
 # Создание коллекции из csv
rdd3 = spark.sparkContext.textFile('C:/tmp/files/text01.csv')


### Партиционирование

In [32]:
# посмотреть количество партиций
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7])
rdd.getNumPartitions()

1

In [33]:
# repartition(val:int) Изменить количество партиций
rdd.repartition(5)
rdd.getNumPartitions()

1

In [34]:
# coalesce() Только уменьшает
rdd.coalesce(2)

CoalescedRDD[6] at coalesce at NativeMethodAccessorImpl.java:0

### Типы операций

In [36]:
rdd = spark.sparkContext.parallelize([1, 2, 3, 4, 5, 6, 7])
# map и в африке map
rdd2 = rdd.map(lambda x: x + 2)
rdd2.collect()

                                                                                

[3, 4, 5, 6, 7, 8, 9]

In [37]:
rdd = spark.sparkContext.parallelize('hello world')
# Может вернуть несколь объектов
rdd3 = rdd.flatMap(lambda x: x.split(','))
for i in rdd3.collect():
    print(i, end=' | ')

[Stage 1:>                                                          (0 + 1) / 1]

h | e | l | l | o |   | w | o | r | l | d | 

                                                                                

In [38]:
# Фильтрация
rdd = spark.sparkContext.parallelize([1,2,34,5,6,6,7,8,9,10])
rdd4 = rdd.filter(lambda x: x%2 == 0)
rdd4.collect()

[2, 34, 6, 6, 8, 10]

In [42]:
# Пересечение объектом
rdd = spark.sparkContext.parallelize(['hello','vs','vasya','world'])
rdd2 = spark.sparkContext.parallelize(['jopa','spark', 'hello','world'])
intersection = rdd.intersection(rdd2)
intersection.collect()

                                                                                

['world', 'hello']

In [47]:
# distinct() - уникальные значения
# union() - создание новой rdd из двух других
# sortByKey() - сортировка по ключу
# sortBy(function) - сортировка по ф-ции
# join()
# reduceByKey(func) - аналог groupby
# groupByKey() - возвращает интерируемый объект(похож на reduceByKey)
rdd = spark.sparkContext.parallelize(
    [
        ('beer',1),
        ('vodka',2),
        ('beer',5),
        ('milk',0),
        ('vodka',8)
    ]
)
rdd_reduce = rdd.reduceByKey(lambda x, y: x * y)
rdd_group = rdd.groupByKey()
print(f'{rdd_reduce.collect() = }')
print(f'{rdd_group.map(lambda x: (x[0], list(x[1]))).collect() = }')


                                                                                

rdd_reduce.collect() = [('beer', 5), ('vodka', 16), ('milk', 0)]
rdd_group.map(lambda x: (x[0], list(x[1]))).collect() = [('beer', [1, 5]), ('vodka', [2, 8]), ('milk', [0])]


In [49]:
spark.stop()

### Действия (actions)

|метод|что делает|последствия|
|--|--|--|
|`collect()`|Явно указывает собрать все части RDD|см.выше|
|`count()`|Считает количество элементов в дочерней RDD.|`count(rdd2)` -> 5|
|`take(n)`|Метод аналогичный `head()` в пандас|`rdd.take(2)`|
|`countByValue()`|Метод похож на reduceByKey(), но применяется, когда RDD состоит только из значений, которые нужно посчитать. Метод аналогичен value_counts()в библиотеке pandas.|`rdd.countByValue()`->`[(1,8),(2,3),(3,15)]`|
|`countByKey()`|Это действие аналогично методу reduceByKey(), так как суммирует по ключу. Отличие в том, что reduceByKey() — это трансформация, которая создаёт новую RDD, а countByKey() — это действие, которое запускает выполнение логического плана.||
|`reduce(function)`|явная инструкция на совершение агрегации. Он принимает агрегирующую функцию и применяет её ко всей RDD целиком.|`sc.parallelize([1, 2, 3, 4, 5]).reduce(add)` -> 15|
|``|||

## О DataFrame API

Минус RDD API: 
- нет механизма оптимизации процесса вычислений
- Структура кода напоминает `MapReduce` и не особо понятна начинающим
- Хорошо работает со слабоструктурированными данными, но с таблицами: плохо.
От RDD DataFrame унаследовал много полезных функций, например:
- Процесс вычисления конечного DataFrame происходит прямо в оперативной памяти.
- Как и RDD, DataFrame обрабатывается в распределённой манере за счёт механизма партиционирования.
- Вычисления в DataFrame API тоже основаны на концепции ленивых вычислений, а методы также делятся на трансформации и действия.
- DataFrame как структура данных основана на RDD, наследуя возможность параллельного хранения и обработки данных.

Как и в случае с RDD API, операции в DataFrame API делятся на два типа:
- Трансформации (англ. transformations) создают объекты DataFrame из источника, но не запускают вычисления.
- Действия (англ. actions) — явная инструкция, чтобы начать процесс вычислений финальной таблицы.

### Создание df 

In [None]:
spark = SparkSession.builder \
                    .master("local") \
                    .appName("Learning DataFrames") \
                    .getOrCreate()

известные мне на данный момент 
- Прочитать из источника `spark.read.csv('datasets/cal_housing_data.csv', schema=schema)`
- создать с помощью метода `createDataFrame()`
- из pd.DataFrame: `spark.createDataFrame(pandas_df) `

In [5]:
data = [('val_1_1', 1, 123.22),
        ('val_2_1', 2, 124.22),
        ('val_3_1', 3, 125.22),]
columns = ['column_1', 'column_2', 'column_3']
# schema - передает название колонок
df = spark.createDataFrame(data=data, schema=columns) 
df.printSchema() # Вывести схему

root
 |-- column_1: string (nullable = true)
 |-- column_2: long (nullable = true)
 |-- column_3: double (nullable = true)



### Задать schema явно!

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
schema = StructType([
    StructField("longitude", FloatType(), nullable=True),
    StructField("latitude", FloatType(), nullable=True),
    StructField("median_age", FloatType(), nullable=True),
    StructField("total_rooms", FloatType(), nullable=True),
    StructField("total_bdrms", FloatType(), nullable=True),
    StructField("population", FloatType(), nullable=True),
    StructField("households", FloatType(), nullable=True),
    StructField("median_income", FloatType(), nullable=True),
    StructField("median_house_value", FloatType(), nullable=True)]
)
data = spark.read.csv('datasets/cal_housing_data.csv', schema=schema)
data.printSchema()


### Действия Spark DataFrame

|Метод|Что делает|Примечание|
|--|--|--|
|`.show(5)`|аналог .head(5)||
|`take(n)`|запускает вычисление всех трансформаций|запуск с DataFrame и возвращает n-ое количество строк.|
|`toPandas()`|позволяет перевести в pd.DataFrame|Обратите внимание, что пользоваться методом toPandas() нужно осторожно. Он запускает вычисление всего плана трансформаций, и если попробовать вычислить результат, который весит больше, чем позволяют настройки Spark-сессии spark.driver.maxResultSize и spark.driver.memory, то Spark-приложение завершится по причине нехватки памяти. Так можно потерять результаты предыдущих расчётов, которые хранятся в оперативной памяти.|
|`collect()`|запускает вычисление всего плана |трансформаций и собирает результат в одну таблицу на драйвере. В результате создаётся Python-список с объектами типа Row — внутренним типом данных Spark, который хранит значения строк.|
|`count()`|запускает вычисление всего плана трансформаций |и считает количество строк в таблице на драйвере.|

### Трансформации Spark DataFrame

**Трансформации** — это операции, которые изменяют таблицу-источник, но не запускают процесс вычислений. После применения трансформации операция появится в планах запроса.  

**Для более продвинутых трансформаций** с колонками в Spark SQL есть отдельный модуль — `Spark Functions`. В нём реализовано много математических, строковых, логических и статистических функций для разных задач. 

|Метод|Что делает|Примечание|
|--|--|--|
|`select()`|Базовый метод трансформации, аналогичный оператору SELECT в SQL||
|`distinct()`|аналогичный оператору DISTINCT в SQL|1*Чтобы получить уникальные значения, нужно добавить операцию действия collect() или toPandas()     |
|`withColumn()`|Метод, который создаёт новую колонку. |2*В таблице-примере появится новый столбец|
|`lit()`|создаёт столбец с фиксированным значением|3*из модуля Spark Functions|
|`withColumnRenamed()`|Метод, который используют для переименования колонки. |4*Новое имя указывают вторым в аргументе.|
|`toDF()`|Метод, который возвращает новый объект типа DataFrame.|5*Его часто используют в случаях, когда нужно переименовать несколько колонок сразу.|
|`cast()`|аналогичен оператору CAST в SQL|6*меняет тип данных в выбранной колонке на указанный.|
|`filter()`|отбор по условию|7*Его сочетают вместе с функцией col() из модуля Spark Functions:|
|||8**Чтобы выбрать данные, не соответствующие условию, можно использовать привычный синтаксис pandas|
|`like()`|Аналог `LIKE` in SQL||
|`rlike()`|||
|`startswith()`|||
|`endswith()`|||
|`contains()`|||
|` orderBy()`|||
|`join()`|Метод аналогичен join() в pandas||
|`broadcast()`||Представьте, что вам нужно соединить две таблицы. Размер первой — несколько миллионов строк, а второй — всего 100–200 строк.|

```python
# 1* 
casesDist = cases.select('province').distinct()
# 2* 
casesNew = cases.withColumn('confirmedNew', F.col('confirmed') + 99)
# 3* 
casesPred = cases.withColumn('prediction', F.lit(1))
# 4*
cases = cases.withColumnRenamed('infection_case', 'infection_source')
# 5*
cases = cases.toDF(*['case_id', 'province', 'city', 'group', 'infection_case', 'latitude', 'longitude'])
# 6*
from pyspark.sql.types import DoubleType, IntegerType, StringType 
cases = cases.withColumn('group', F.col('group').cast(IntegerType()))
# 7*
cases.filter(F.col('province') == 'Seoul').count() 
# 8*
cases.filter(F.col('province') != 'Seoul').count() 
cases.filter(~(F.col('province') == 'Seoul')).count()
cases.filter((cases.province  == 'Seoul') & (cases.city  == 'Guro-gu')).toPandas() 
```
Больше методов см. в [документации](https://spark.apache.org/docs/3.1.1/api/python/reference/pyspark.sql.html#dataframe-apis)