# Apache Spark SQL + DataFrames

## Модель вычислений

### DAG

### Master-slave принцип работы

- Master - Driver:
    + запускается отдельно, запрашивает ресурсы у менеджера,
    + создает SparkContext/SparkSession,
    + распределяет задания между exectutor-ами,
    + планирует и отслеживает прогрусс выполняемых стадий.
- Slaves - Executors:
    + запущены для отказоустойчивости на разных узлах (если возможно,
    + выполняют основную работу по вычислениям.

## Spark SQL

Позволяет обрабатывать структурированные и полуструктурированные данные.  
**Интерфейсы.**
- DataFrame:
  + абстракция поверх RDD,
  + имеет схему данных - 
    - имена полей,
    - типы полей,
  + включает оптимизатор вычислений,
  + поддерживает SQL-like запросы,
  + поддерживает интеграции со множеством БД.
- Dataset:
  + Как DF, только со строгой типизацией,
  + Не поддерживает Python (и никогда не будет).

## Давайте приступим!

In [None]:
!pip install pyarrow

In [None]:
import os
JAVA_HOME = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["JAVA_HOME"]= JAVA_HOME

In [None]:
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder \
    .appName("my_spark") \
    .master("local[*]") \
    .getOrCreate()

In [None]:
spark

## Базовый SQL
+ схема [pyspsark.sql.StructType](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.StructType.html)
+ колонки [pyspark.sql.Column](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.html)
+ данные [pyspark.sql.Row](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Row.html)

## Работа с чтением данных

В нашем примере будем использовать датасет MovieLens, который содержит информацию о фильмах. Файл `movies.csv` должен располагаться в каталоге (например, "movielens/"). Мы используем метод `spark.read.csv` для чтения данных, указывая, что в файле присутствует заголовок и типы данных могут быть автоматически определены.

### Как создать DataFrame?

### Прочитать данные из файла!

In [None]:
help(spark.read)

In [None]:
!curl -fSL https://files.grouplens.org/datasets/movielens/ml-latest.zip -o ml-latest.zip

In [None]:
!unzip ml-latest.zip

In [None]:
!mv ml-latest movielens

### Конвейер чтения данных
```python
spark.read\
     .format(...)\
     .option(key, value)\
     .option(key, value)\
     .load(path)
```

In [None]:
%%time
movies_df = spark.read\
          .format("csv")\
          .option("sep", ",")\
          .load("movielens/movies.csv")

In [None]:
movies_df

In [None]:
movies_df.show(10)

In [None]:
movies_df.printSchema()

### Схема!

In [None]:
from pyspark.sql.types import *

In [None]:
schema = StructType(fields=[
    StructField("movieId", IntegerType()),
    StructField("title", StringType()),
    StructField("genres", StringType())
])

In [None]:
movies_df = spark.read\
          .schema(schema)\
          .format("csv")\
          .option("sep", ",")\
          .option("header", "True")\
          .load("movielens/movies.csv")

In [None]:
movies_df.printSchema()

In [None]:
movies_df.show(1)

### Схему также можно задавать в виде SQL DDL

In [None]:
ddl_schema = """
    movieId INT,
    title STRING,
    genres STRING
"""

In [None]:
movies_df = spark.read\
          .schema(ddl_schema)\
          .format("csv")\
          .option("sep", ",")\
          .load("movielens/movies.csv")

In [None]:
movies_df.printSchema()

In [None]:
movies_df.show(10)

### Наличие схемы оборачивается еще одним приятным бонусом - правильно работает описательная статистика.

In [None]:
movies_df.summary().show()

Давайте посмотрим на наш датафрейм

In [None]:
movies_df?

In [None]:
movies_df.printSchema()

### Что значит `nullable`?

In [None]:
tiny_schema = StructType(fields=[
    StructField("id", IntegerType()),
    StructField("value", StringType(), nullable=False)
])

In [None]:
df = spark.createDataFrame([[1, None], [3, "Hello"]], schema=tiny_schema)

In [None]:
tiny_schema = StructType(fields=[
    StructField("id", IntegerType()),
    StructField("value", StringType(), nullable=True)
])

In [None]:
spark.createDataFrame?

In [None]:
df = spark.createDataFrame([[1, None], [3, "Hello"]], schema=tiny_schema)

In [None]:
df.printSchema()

### Для популярных источников есть удобные обертки

In [None]:
movies_df = spark.read.csv("movielens/movies.csv", header=True, inferSchema=True)

In [None]:
spark.read?

In [None]:
movies_df.printSchema()

In [None]:
movies_df.show(10)

### А самое классное, что через один API можно работать с множеством источников!
+ CSV
+ JSON
+ Hive
+ HBase
+ Cassandra
+ MySQL
+ PostgreSQL
+ Parquet
+ ORC
+ Kafka
+ ElasticSearch
+ Amazon S3
+ ...and more through custom connectors

### Создание DataFrame из RDD, pandas.DataFrame или списка

In [None]:
rdd = spark.sparkContext.textFile("movielens/movies.csv").map(lambda x: x.split(","))

In [None]:
rdd.take(5)

In [None]:
df = spark.createDataFrame(rdd)

### RDD нетипизирован и никакой схемы не имеет

In [None]:
df

In [None]:
df = spark.createDataFrame(rdd, schema=schema)

In [None]:
df

In [None]:
df.show(5)

### Может быть, если мы это проигнорируем, оно само уйдет?

In [None]:
df = spark.createDataFrame(rdd, schema=schema, verifySchema=False)

In [None]:
df.show(5)

### %(

In [None]:
clean_rdd = rdd.map(lambda x: (int(x[0]), x[1], x[2]))

In [None]:
clean_rdd.take(5)

### %(

In [None]:
header = rdd.first()
clean_rdd = rdd.filter(lambda x: x != header).map(lambda x: (int(x[0]), x[1], x[2]))

In [None]:
df = spark.createDataFrame(clean_rdd, schema=schema)

In [None]:
clean_rdd.take(5)

In [None]:
df.show(5)

### Работать будем со сгенерированным логом доступа

In [None]:
!tail logsM.txt

In [None]:
log_schema = StructType(fields=[
    StructField("ip", StringType()),
    StructField("timestamp", LongType()),
    StructField("url", StringType()),
    StructField("size", IntegerType()),
    StructField("code", IntegerType()),
    StructField("ua", StringType())
])

In [None]:
log = spark.read.csv("logsM.txt", sep="\t", schema=log_schema)

In [None]:
log

In [None]:
log.rdd.getNumPartitions()

In [None]:
log = log.repartition(4)

In [None]:
log.show(10)

In [None]:
log.show(5, truncate=False, vertical=True)

### Проекции и фильтры
**Проекция** возвращает подмножество столбцов

**Фильтр** возвращает подмножество строк

In [None]:
log.select(["ip", "timestamp", "url"])

In [None]:
log.select("ip", "code").show(5)

In [None]:
log.select(log.ip, log.code).show(5)

In [None]:
log.ip

### Зачем нужны столбцы-объекты? Потому что у них есть методы!

In [None]:
log.select(log.ip,
           log.code.alias("response")).show(5)

In [None]:
import pyspark.sql.functions as f

In [None]:
log.select("ip", 
           f.col("code").alias("response")).show(5)

### Pandas-like

In [None]:
log[["ip", "code"]].show(5)

In [None]:
log[[log.ip, log.code.alias("response")]].show(5)

### Фильтрация

In [None]:
log.where("code = 200").show(5)

In [None]:
log.filter(log.code == 200).show(5, truncate=False)

In [None]:
log.filter("code == 200 AND url LIKE '%ozon%'").show(5, truncate=False, vertical=True)

In [None]:
log.filter((log.code.isin([200, 404])) & (log.url.like("%ozon%"))).show(5)

In [None]:
log.code?

### Pandas

In [None]:
log[(log.code == 200) & (log.url.like("%ozon%"))].show(5)

### И все вместе

In [None]:
log[(log.code == 200) & (log.url.like("%ozon%"))][["ip", "code"]].show(5)

### А SQL-то можно писать?

In [None]:
query = """
SELECT ip, code FROM log_table 
WHERE code == 200 AND url LIKE '%ozon%'
"""

In [None]:
spark.sql(query).show(5)

#### Нужно зарегистрировать `DataFrame` как таблицу во внутреннем каталоге Spark SQL

In [None]:
log.createOrReplaceTempView("log_table")

In [None]:
spark.catalog.listTables()

In [None]:
spark.sql(query).show(5)

### Функции
Не все вычисления можно реализовать стандартным SQL. Здесь на помощь приходят функции (встроенные или пользовательские). Встроенные функции находятся в модуле [`pyspark.sql.functions`](https://spark.apache.org/docs/2.4.7/api/python/pyspark.sql.html#module-pyspark.sql.functions) 

In [None]:
log.select("ua", f.length("ua")).show(5)

#### Функции возвращают объект типа `Column`

In [None]:
f.length("ua")

In [None]:
log.select("ua", f.length("ua").alias("length")).show(5)

### Довольно часто возникает ошибка с неверными именами столбцов

In [None]:
log.select(f.concat("url", "?utm_medium=email")).show(5)

### Нужно убедиться в правильности имени или типа

In [None]:
log.select(f.concat("url", f.lit("?utm_medium=email")).alias("newurl")).show(5, False)

### Посчитаем word count


In [None]:
log.select("ua", f.split("ua", " ").alias("word_list")).show(5, False, True)

### Прелесть ленивых вычислений и строгой типизации

In [None]:
log.select("ua", f.split("ua", " ").alias("word_list")).printSchema()

### К элементам сложных типов можно получить доступ

In [None]:
log.select("ua", f.split("ua", " ").alias("word_list"))\
   .select(f.col("word_list")[0].alias("first_word"), f.col("word_list")[1].alias("second_word"))\
   .show(5)

In [None]:
f.explode?

In [None]:
log.select("ua", f.split("ua", " ").alias("word_list"))\
   .select(f.explode("word_list").alias("word")).show()

In [None]:
log.select("ua", f.split("ua", " ").alias("word_list"))\
   .select(f.explode("word_list").alias("word"))\
   .groupby("word").count()\
   .orderBy("count", ascending=False)\
   .show(5)

### Самое время посмотреть в Spark UI!

### Joins

In [None]:
!tail ipDataM.txt

In [None]:
ip_schema = StructType(fields=[
    StructField("ip", StringType()),
    StructField("region", StringType())
])

In [None]:
ips = spark.read.csv("ipDataM.txt", schema=ip_schema, sep="\t").cache()

In [None]:
ips.show(5)

In [None]:
log.show(5)

#### Трюк для отключения автоматической оптимизации

In [None]:
spark.sql("SET spark.sql.autoBroadcastJoinThreshold = 100500")

In [None]:
log_with_regions = log.join(ips, on="ip", how="inner")

In [None]:
log_with_regions

In [None]:
log_with_regions.show(5)

In [None]:
log.explain(extended=True)

In [None]:
log_with_regions.explain(True)

#### Можно делать не только equi-join, но и по произвольному выражению

In [None]:
ips_with_ts = ips.withColumn("timestamp", f.lit(20220901000000).cast("bigint"))

In [None]:
ips_with_ts.show(5)

In [None]:
log.join(ips_with_ts, on=((log.ip == ips_with_ts.ip) & (log.timestamp > ips_with_ts.timestamp))).count()

In [None]:
log_with_regions.count()

#### А что там с партициями?

In [None]:
(log.rdd.getNumPartitions(),
 ips.rdd.getNumPartitions(),
 log_with_regions.rdd.getNumPartitions())

In [None]:
spark.conf.get("spark.sql.shuffle.partitions")

In [None]:
log_with_regions = log_with_regions.coalesce(2).cache()

#### По-умолчанию Spark SQL использует алгоритм SortMergeJoin

In [None]:
log_with_regions.explain(extended=True)

#### Если одна из таблиц мала, то можно реализовать map-side join через broadcast

In [None]:
log_with_regions = log.join(f.broadcast(ips), on="ip", how="inner")

In [None]:
log_with_regions.explain(extended=True)

### Аггрегация
```python
df.groupBy(*cols)\
  .agg(*expressions)
```

In [None]:
log_with_regions.groupBy("region")\
                .agg(f.count("ip").alias("count"))\
                .orderBy("count", ascending=False)\
                .show(10)

In [None]:
log_with_regions.groupBy("region")\
                .count()\
                .withColumnRenamed("count", "row_count")\
                .orderBy("row_count", ascending=False)\
                .show(10)

In [None]:
!pip install pandas

In [None]:
length_stat = log_with_regions.groupBy(f.length("url").alias("url_length"))\
                              .agg(f.count("*").alias("row_count"))\
                              .orderBy("row_count", ascending=False)\
                              .toPandas()

In [None]:
length_stat

### UDF (пользовательские функции)
Тип функции UDF может быть одним из следующих:
SCALAR. Скалярная UDF: один или несколько pandas.Series -> A pandas.Series. Скалярные UDF используются с pyspark.sql.DataFrame.withColumn() и pyspark.sql.DataFrame.select()
GROUPED_MAP. Группированная UDF: A pandas.DataFrame -> A pandas.DataFrame. Группированные UDF используются с pyspark.sql.GroupedData.apply()

In [None]:
@f.pandas_udf(StringType())
def encode_http_status(codes):
    mapping = {
        1: "info",
        2: "success",
        3: "redirect",
        4: "client error",
        5: "server error"
    }
    return (codes // 100).replace(mapping)

In [None]:
log.withColumn("http_status", encode_http_status("code"))\
   .groupBy("http_status").count().show()

In [None]:
log_filtered = log.filter("size > 400").select("ip", "size", "code").cache()

In [None]:
log_filtered

In [None]:
log_filtered.show()

In [None]:
log_filtered.count()

In [None]:
log_filtered.rdd.getNumPartitions()

## Прибираемся!

In [None]:
spark.stop()