# Pyspark tutorial

Симулятор для рекомендательных систем sim4rec использует pyspark и spark-датафреймы для работы с большими объемами данных. Данные в симуляторе хранятся в формате spark-датафреймов, поэтому будет полезно уметь с ними работать. Spark хранит данные партиционированно, по частям, и выполняет вычисления сначала, если возможно, внутри каждой партции, а затем уже выполняет shuffle, т.е. перемешивание данных, например, для group by и join. В ходе работы симулятора вы можете накопить большой объем данных, который будет долго и ресурсоемко полностью выгружать в привычный pandas. Поэтому советуем выполнять простые операции (фильтрацию, просмотр, джойны, группировки) в  pyspark, а затем, при необходимости конвертировать данные в pandas. 

* Spark сессия
* Инициализация
* Чтение/запись
* Spark SQL

### Spark сессия
Для начала нужно создать spark-сессию, в контексте которой будет создан DataFrame. В заданиях этот код написан за вас.

In [1]:
import os
import sys
os.environ["PYSPARK_PYTHON"]=sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"]=sys.executable

In [2]:
import pandas as pd
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.types import (
    StructField,
    StructType,
    IntegerType,
    StringType
)

spark = (
    SparkSession
    .builder
    .master("local[*]")
    .appName('PySpark_Tutorial1')
    .getOrCreate()
)
spark

22/12/01 10:42:18 WARN Utils: Your hostname, cl1nr5sb14mq6gk2g9m8-ilyr resolves to a loopback address: 127.0.1.1; using 10.129.0.35 instead (on interface eth0)
22/12/01 10:42:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
22/12/01 10:42:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/12/01 10:42:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


### Инициализация

За создание объекта DataFrame отвечает метод `.createDataFrame()`, который в качестве данных может принимать:

* pandas.DataFrame
* dict
* list
* и т.д.

При создании можно явно указать схему для DataFrame, например, это может помочь, когда вам нужно задать конкретные типы для колонок.

Информация о датафрейме:
* .show(n) - просмотр DataFrame
* .count() - количество записей
* .columns - список колонок
* .printSchema() -  схема (колонки и типы)
* .toPandas() - конвертация в pandas.DataFrame

#### Инициализация c помощью Pandas

In [3]:
data_pd = pd.DataFrame({"name": ["Nikita", "Masha", "Sasha"], "age": [15, 24, 30]})
df = spark.createDataFrame(data_pd)
df.show()

                                                                                

+------+---+
|  name|age|
+------+---+
|Nikita| 15|
| Masha| 24|
| Sasha| 30|
+------+---+



#### Конвертация в Pandas

In [4]:
df.toPandas()

Unnamed: 0,name,age
0,Nikita,15
1,Masha,24
2,Sasha,30


Методом `.show()` можно просмотреть определенное количество записей.

In [5]:
df.show(2)

+------+---+
|  name|age|
+------+---+
|Nikita| 15|
| Masha| 24|
+------+---+
only showing top 2 rows



#### Инициализация c помощью dict

In [6]:
data_dct = [{"name": "Nikita", "age": 15}, 
           {"name": "Masha", "age": 24},
           {"name": "Sasha", "age": 30}]
df = spark.createDataFrame(data_dct)
df.show()

+---+------+
|age|  name|
+---+------+
| 15|Nikita|
| 24| Masha|
| 30| Sasha|
+---+------+



#### Инициализация c помощью list

In [7]:
data_list = [("Nikita", 15), ("Masha", 24), ("Sasha", 30)]
df = spark.createDataFrame(data_list)
df.show()

+------+---+
|    _1| _2|
+------+---+
|Nikita| 15|
| Masha| 24|
| Sasha| 30|
+------+---+



#### Инициализация с использованием схемы

In [8]:
data_list = [("Nikita", 15), ("Masha", 24), ("Sasha", 30)]
df = spark.createDataFrame(data_list, 
                           schema = StructType(
                                [
                                    StructField("name", StringType()),
                                    StructField("age", IntegerType()),
                                ]))

df.show()

+------+---+
|  name|age|
+------+---+
|Nikita| 15|
| Masha| 24|
| Sasha| 30|
+------+---+



#### Методы просмотра информации о DataFrame

In [10]:
df.columns

['name', 'age']

In [9]:
df.count()

3

In [10]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)



### Чтение/запись

Для **чтения** нужно обраться к модулю `spark.DataFrame.read`. <br>
Для **записи** к модулю `spark.write`. <br>
Так же для записи понадобиться метод `.mode()` с указанием следующих режимов сохранения:

* overwrite - режим используется для перезаписи существующего файла.
* append - Добавить данные в существующий файл.

In [11]:
df.show()

+------+---+
|  name|age|
+------+---+
|Nikita| 15|
| Masha| 24|
| Sasha| 30|
+------+---+



In [12]:
df.write.mode("overwrite").parquet("path_to_save")

pyspark сохраняет данные отдельных партиций в отдельные файлы. Данные в таком формате не получится прочитать pandas-ом.

In [13]:
!cd path_to_save && ls

_SUCCESS
part-00000-ca289f3c-9aa7-4b0c-b7a4-c68e44988eb9-c000.snappy.parquet
part-00003-ca289f3c-9aa7-4b0c-b7a4-c68e44988eb9-c000.snappy.parquet
part-00007-ca289f3c-9aa7-4b0c-b7a4-c68e44988eb9-c000.snappy.parquet
part-00011-ca289f3c-9aa7-4b0c-b7a4-c68e44988eb9-c000.snappy.parquet


In [14]:
new_df = spark.read.parquet("path_to_save")

In [15]:
new_df.show()

+------+---+
|  name|age|
+------+---+
|Nikita| 15|
| Masha| 24|
| Sasha| 30|
+------+---+



### Spark SQL

Для запросов в Spark используется SQL-like синтаксис.

Основные методы:
* .select() - просмотр столбцов
* .filter(), .where() - фильтрация записей
* .join() - объединение нескольких spark.DataFrame
* .distinct() - уникальные значения
* .withColumn() - создание/преобразование столбца
* .withColumnRename() - переименование столбца
* .orderBy() - сортировка
* .groupBy().agg() - группировка
* и т.д.

Существуют несколько способов обращения к столбцу:

* строка - "column_name"
* модуль pyspark.sql.functions - sf.col("column_name")
* поле объекта - df.column_name

Помимо метода `.withColumnRename()`, вызываемого от DataFrame  может использоваться метод  `.alias()`, вызываемый от определенного столбца.

Модуль pyspark.sql.functions используется для преобразования данных в столбцах с помощью функций. [Список функций](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html)

Основные методы:
* .col() - без преобразований
* .max(), .min(), .mean(), .count()  - арифметические функции
* .lit() - столбец-константа
* .when().otherwise() - условное выражение
* и т.д.

In [13]:
from pyspark.sql import functions as sf

data_dct_info = [{"name": "Nikita", "age": 20, "height": 170}, 
              {"name": "Masha", "age": 21, "height": 180},
              {"name": "Sasha", "age": 20, "height": 170},
              {"name": "Lera", "age": 30, "height": 175},
              {"name": "Vika", "age": 30, "height": 170},
              {"name": "Max", "age": 21, "height": 175},
              {"name": "Misha", "age": 30, "height": 175}]

data_dct_city = [{"name": "Nikita", "city": "St. Peterburg"}, 
              {"name": "Sasha", "city": "Moscoow"},
              {"name": "Vika", "city": "St. Peterburg"},
              {"name": "Tanya", "city": "St. Peterburg"},
              {"name": "Misha", "city": "Moscoow"}]

names_info = spark.createDataFrame(data_dct_info)

names_city = spark.createDataFrame(data_dct_city)

names_info.show()
names_city.show()

+---+------+------+
|age|height|  name|
+---+------+------+
| 20|   170|Nikita|
| 21|   180| Masha|
| 20|   170| Sasha|
| 30|   175|  Lera|
| 30|   170|  Vika|
| 21|   175|   Max|
| 30|   175| Misha|
+---+------+------+

+-------------+------+
|         city|  name|
+-------------+------+
|St. Peterburg|Nikita|
|      Moscoow| Sasha|
|St. Peterburg|  Vika|
|St. Peterburg| Tanya|
|      Moscoow| Misha|
+-------------+------+



**Разные способы обращения к столбцу**

In [14]:
names_info.select("height").show()

+------+
|height|
+------+
|   170|
|   180|
|   170|
|   175|
|   170|
|   175|
|   175|
+------+



In [15]:
names_info.select(sf.col("height")).show()

+------+
|height|
+------+
|   170|
|   180|
|   170|
|   175|
|   170|
|   175|
|   175|
+------+



#### Основные методы  Spark SQL

Фильтрация по одному столбцу

In [20]:
names_info.filter("age > 25").show()

+---+------+-----+
|age|height| name|
+---+------+-----+
| 30|   175| Lera|
| 30|   170| Vika|
| 30|   175|Misha|
+---+------+-----+



Фильтрация по нескольким столбцам

In [21]:
names_info.filter("age > 25 and height > 170").show()

+---+------+-----+
|age|height| name|
+---+------+-----+
| 30|   175| Lera|
| 30|   175|Misha|
+---+------+-----+



Уникальные значения столбца 

In [19]:
names_info.select("height").distinct().show()

                                                                                

+------+
|height|
+------+
|   170|
|   175|
|   180|
+------+



С использованием `pyspark.sql.functions`

In [20]:
names_info.select(sf.max("height")).show()

+-----------+
|max(height)|
+-----------+
|        180|
+-----------+



In [21]:
names_info.select(sf.mean("height").alias("mean height")).show()

+------------------+
|       mean height|
+------------------+
|173.57142857142858|
+------------------+



##### Сложные преобразования

In [22]:
(
    names_info
    .select("age")
    .withColumn("age_over_25", sf.when(sf.col("age") > 25, True).otherwise(False))
    .orderBy("age")
).show()

+---+-----------+
|age|age_over_25|
+---+-----------+
| 20|      false|
| 20|      false|
| 21|      false|
| 21|      false|
| 30|       true|
| 30|       true|
| 30|       true|
+---+-----------+



In [23]:
(
    names_info.
    select("age")
    .filter("age > 15")
    .withColumn("age_over_25", sf.col("age") > 25)
    .orderBy("age")
).show()

+---+-----------+
|age|age_over_25|
+---+-----------+
| 20|      false|
| 20|      false|
| 21|      false|
| 21|      false|
| 30|       true|
| 30|       true|
| 30|       true|
+---+-----------+



#### Join датафреймов

Inner join

In [25]:
names_info.join(names_city, on="name").show()

+------+---+------+-------------+
|  name|age|height|         city|
+------+---+------+-------------+
|  Vika| 30|   170|St. Peterburg|
| Sasha| 20|   170|      Moscoow|
| Misha| 30|   175|      Moscoow|
|Nikita| 20|   170|St. Peterburg|
+------+---+------+-------------+



Left join

In [26]:
names_info.join(names_city, on="name", how="left").show()

+------+---+------+-------------+
|  name|age|height|         city|
+------+---+------+-------------+
|  Vika| 30|   170|St. Peterburg|
| Sasha| 20|   170|      Moscoow|
|  Lera| 30|   175|         null|
| Misha| 30|   175|      Moscoow|
|Nikita| 20|   170|St. Peterburg|
|   Max| 21|   175|         null|
| Masha| 21|   180|         null|
+------+---+------+-------------+



 Right join

In [27]:
names_info.join(names_city, on="name", how="right").show()

+------+----+------+-------------+
|  name| age|height|         city|
+------+----+------+-------------+
|  Vika|  30|   170|St. Peterburg|
| Sasha|  20|   170|      Moscoow|
| Tanya|null|  null|St. Peterburg|
| Misha|  30|   175|      Moscoow|
|Nikita|  20|   170|St. Peterburg|
+------+----+------+-------------+



Комбинация join и запросов к DataFrame

In [28]:
names_city.join(
    names_info.where(
        sf.col("height") > 170
    ),
    on="name",
    how="left").show()

+------+-------------+----+------+
|  name|         city| age|height|
+------+-------------+----+------+
|  Vika|St. Peterburg|null|  null|
| Sasha|      Moscoow|null|  null|
| Tanya|St. Peterburg|null|  null|
| Misha|      Moscoow|  30|   175|
|Nikita|St. Peterburg|null|  null|
+------+-------------+----+------+

