## Dataframe API

### Dataframe:
+ структурированная колоночная структура данных
+ может быть создана на основе:
  - локальной коллекции
  - файла (файлов)
  - базы данных
+ в Python работает значительно быстрее, чем RDD
+ под капотом использует RDD
+ позволяет выполнять произвольные SQL операции с данными
+ аналогично RDD являются ленивыми и неизменяеыми

### Из чего состоит Dataframe?
+ схема [pyspsark.sql.StructType](https://spark.apache.org/docs/2.4.7/api/python/pyspark.sql.html#pyspark.sql.types.StructType)
+ колонки [pyspark.sql.Column](https://spark.apache.org/docs/2.4.7/api/python/pyspark.sql.html#pyspark.sql.Column)
+ данные [pyspark.sql.Row](https://spark.apache.org/docs/2.4.7/api/python/pyspark.sql.html#pyspark.sql.Row)

## Основной управляющий объект в Spark SQL - [SparkSession](https://spark.apache.org/docs/2.4.7/api/python/pyspark.sql.html#pyspark.sql.SparkSession)

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()

spark = SparkSession.builder.config(conf=conf).appName("Spark SQL").getOrCreate()

In [None]:
spark

In [None]:
spark.sparkContext

In [None]:
%pylab inline

## Как создать DataFrame?

### Прочитать данные из файла

In [None]:
spark.read

### Конвейер чтения данных
```python
spark.read\
     .format(...)\
     .option(key, value)\
     .option(key, value)\
     .load(path)
```

In [None]:
%%time
df = spark.read\
          .format("csv")\
          .option("sep", "|")\
          .load("/data/spark/lecture06/ml-100k/u.user")

In [None]:
%%time
rdd = spark.sparkContext.textFile("/data/spark/lecture06/ml-100k/u.user")

### Откуда такая разница?

In [None]:
df

In [None]:
df.show(1)

In [None]:
df.take(5)

### Схема!

In [None]:
from pyspark.sql.types import *

In [None]:
schema = StructType(fields=[
    StructField("user_id", IntegerType()),
    StructField("age", IntegerType()),
    StructField("gender", StringType()),
    StructField("occupation", StringType()),
    StructField("zip", IntegerType())
])

In [None]:
%%time
df = spark.read\
          .schema(schema)\
          .format("csv")\
          .option("sep", "|")\
          .load("/data/spark/lecture06/ml-100k/u.user")

In [None]:
df

In [None]:
df.printSchema()

In [None]:
df.show(5)

### Схему также можно задавать в виде SQL DDL

In [None]:
ddl_schema = """
    user_id INT,
    age INT,
    gender STRING,
    occupation STRING,
    zip INT
"""

In [None]:
%%time
df = spark.read\
          .schema(ddl_schema)\
          .format("csv")\
          .option("sep", "|")\
          .load("/data/spark/lecture06/ml-100k/u.user")

In [None]:
df.printSchema()

### Наличие схемы оборачивается еще одним приятным бонусом - правильно работает описательная статистика.

In [None]:
df.summary().show()

### Что значит `nullable`?

In [None]:
tiny_schema = StructType(fields=[
    StructField("id", IntegerType()),
    StructField("value", StringType(), nullable=False)
])

In [None]:
df = spark.createDataFrame([[1, None], [3, "Hello"]], schema=tiny_schema)

In [None]:
tiny_schema = StructType(fields=[
    StructField("id", IntegerType()),
    StructField("value", StringType(), nullable=True)
])

In [None]:
df = spark.createDataFrame([[1, None], [3, "Hello"]], schema=tiny_schema)

In [None]:
df.printSchema()

### На самом деле конвейер чтения выглядит так
```python
spark.read\
     .schema(schema)\
     .format(...)\
     .option(key, value)\
     .option(key, value)\
     .load(path)
```

### Для популярных источников есть удобные обертки

In [None]:
df = spark.read.csv("/data/spark/lecture06/ml-100k/u.user", schema=schema, sep="|")

In [None]:
df

In [None]:
df.show(5)

### А самое классное, что через один API можно работать с множеством источников!
+ CSV
+ JSON
+ Hive
+ HBase
+ Cassandra
+ MySQL
+ PostgreSQL
+ Parquet
+ ORC
+ Kafka
+ ElasticSearch
+ Amazon S3
+ ...and more through custom connectors

### Create a DataFrame from an RDD, pandas.DataFrame or a list

In [None]:
rdd = sc.textFile("/data/spark/lecture06/ml-100k/u.user").map(lambda x: x.split("|"))

In [None]:
rdd.take(5)

In [None]:
df = spark.createDataFrame(rdd)

### RDD нетипизирован и никакой схемы не имеет

In [None]:
df

In [None]:
df = spark.createDataFrame(rdd, schema=schema)

In [None]:
df

In [None]:
df.show(5)

### Type mismatch, maybe we should ignore schema verification??

In [None]:
df = spark.createDataFrame(rdd, schema=schema, verifySchema=False)

In [None]:
df.show(5)

### No chance, have to convert data to proper types

In [None]:
rdd = rdd.map(lambda x: (int(x[0]), int(x[1]), x[2], x[3], int(x[4])))

In [None]:
df = spark.createDataFrame(rdd, schema=schema)

In [None]:
df.show(5)

## Работать будем со сгенерированным логом доступа

In [None]:
!hdfs dfs -tail /data/spark/lecture06/logsM.txt

In [None]:
log_schema = StructType(fields=[
    StructField("ip", StringType()),
    StructField("timestamp", LongType()),
    StructField("url", StringType()),
    StructField("size", IntegerType()),
    StructField("code", IntegerType()),
    StructField("ua", StringType())
])

In [None]:
log = spark.read.csv("/data/spark/lecture06/logsM.txt", sep="\t", schema=log_schema).cache()

In [None]:
log

In [None]:
log.rdd.getNumPartitions()

In [None]:
log = log.repartition(4)

In [None]:
log.show(5, vertical=True, truncate=False)

## Проекции и фильтры
**Проекция** возвращает подмножество столбцов

**Фильтр** возвращает подмножество строк

In [None]:
log.select(["ip", "timestamp", "url"])

In [None]:
log.select("ip", "code").show(5)

In [None]:
log.select(log.ip, log.code).show(5)

In [None]:
log.ip

## Зачем нужны столбцы-объекты? Потому что у них есть методы!

In [None]:
log.select(log.ip,
           log.code.alias("response")).show(5)

In [None]:
import pyspark.sql.functions as f

In [None]:
log.select("ip", 
           f.col("code").alias("response")).show(5)

## Pandas-like

In [None]:
log[["ip", "code"]].show(5)

In [None]:
log[[log.ip, log.code.alias("response")]].show(5)

## Фильтрация

In [None]:
log.where("code = 200").show(5)

In [None]:
log.filter(log.code == 200).show(5, truncate=False)

In [None]:
log.filter("code == 200 AND url LIKE '%rambler%'").show(5, truncate=False, vertical=True)

In [None]:
log.filter((log.code.isin([200, 404])) & (log.url.like("%rambler%"))).show(5)

## Pandas

In [None]:
log[(log.code == 200) & (log.url.like("%rambler%"))].show(5)

## И все вместе

In [None]:
log[(log.code == 200) & (log.url.like("%rambler%"))][["ip", "code"]].show(5)

## А SQL-то можно писать?

In [None]:
query = """
SELECT ip, code FROM log_table 
WHERE code == 200 AND url LIKE '%rambler%'
"""

In [None]:
spark.sql(query).show(5)

### При запуске на YARN `SparkSession` автоматически запускается с поддержкой HIVE. Поэтому надо зарегистрировать `DataFrame` как таблицу во внутреннем каталоге Spark SQL

In [None]:
log.createOrReplaceTempView("log_table")

In [None]:
spark.catalog.listTables()

In [None]:
spark.sql(query).show(5)

### Какая разница между Temp view и Global Temp View?
`SparkSession` - не singleton в отличие от `SparkContext`

In [None]:
spark2 = spark.newSession()

In [None]:
spark2 is spark

In [None]:
spark2.sparkContext is spark.sparkContext

## Функции
Не все вычисления можно реализовать стандартным SQL. Здесь на помощь приходят функции (встроенные или пользовательские). Встроенные функции находятся в модуле [`pyspark.sql.functions`](https://spark.apache.org/docs/2.4.7/api/python/pyspark.sql.html#module-pyspark.sql.functions) 

In [None]:
log.select("ua", f.length("ua")).show(5)

### Функции возвращают объект типа `Column`

In [None]:
f.length("ua")

In [None]:
log.select("ua", f.length("ua").alias("length")).show(5)

### Довольно часто возникает ошибка с неверными именами столбцов

In [None]:
log.select(f.concat("url", "?utm_medium=email")).show(5)

### Нужно убедиться в правильности имени или типа

In [None]:
log.select(f.concat("url", f.lit("?utm_medium=email")).alias("newurl")).show(5, False)

## Взрывы!
Посчитаем word count

In [None]:
log.select("ua", f.split("ua", " ").alias("word_list")).show(5, False, True)

### Прелесть ленивых вычислений и строгой типизации

In [None]:
log.select("ua", f.split("ua", " ").alias("word_list")).printSchema()

### К элементам сложных типов можно получить доступ

In [None]:
log.select("ua", f.split("ua", " ").alias("word_list"))\
   .select(f.col("word_list")[0].alias("first_word"), f.col("word_list")[1].alias("second_word"))\
   .show(5)

In [None]:
log.select("ua", f.split("ua", " ").alias("word_list"))\
   .select(f.explode("word_list").alias("word"))\
   .groupby("word").count()\
   .orderBy("count", ascending=False)\
   .show(5)

### Самое время посмотреть в Spark UI!

## Joins

In [None]:
!hdfs dfs -tail /data/spark/lecture06/ipDataM.txt

In [None]:
ip_schema = StructType(fields=[
    StructField("ip", StringType()),
    StructField("region", StringType())
])

In [None]:
ips = spark.read.csv("/data/spark/lecture06/ipDataM.txt", schema=ip_schema, sep="\t").cache()

In [None]:
ips.show(5)

### Трюк для отключения автоматической оптимизации

In [None]:
spark.sql("SET spark.sql.autoBroadcastJoinThreshold = 100500")

In [None]:
log_with_regions = log.join(ips, on="ip", how="inner")

In [None]:
log_with_regions

In [None]:
log_with_regions.show(5)

### Можно делать не только equi-join, но и по произвольному выражению

In [None]:
ips_with_ts = ips.withColumn("timestamp", f.lit(20140127041332).cast("bigint"))

In [None]:
log.join(ips_with_ts, on=((log.ip == ips_with_ts.ip) & (log.timestamp > ips_with_ts.timestamp))).count()

In [None]:
log_with_regions.count()

### А что там с партициями?

In [None]:
(log.rdd.getNumPartitions(),
 ips.rdd.getNumPartitions(),
 log_with_regions.rdd.getNumPartitions())

In [None]:
spark.conf.get("spark.sql.shuffle.partitions")

In [None]:
log_with_regions = log_with_regions.coalesce(4).cache()

### По-умолчанию Spark SQL использует алгоритм SortMergeJoin

In [None]:
log_with_regions.explain(extended=True)

### Если одна из таблиц мала, то можно реализовать map-side join через broadcast

In [None]:
log_with_regions = log.join(f.broadcast(ips), on="ip", how="inner")

In [None]:
log_with_regions.explain()

## Аггрегация
```python
df.groupBy(*cols)\
  .agg(*expressions)
```

In [None]:
log_with_regions.groupBy("region")\
                .agg(f.count("ip").alias("count"))\
                .orderBy("count", ascending=False)\
                .show(10)

In [None]:
log_with_regions.groupBy("region")\
                .count()\
                .withColumnRenamed("count", "row_count")\
                .orderBy("row_count", ascending=False)\
                .show(10)

In [None]:
length_stat = log_with_regions.groupBy(f.length("url").alias("url_length"))\
                              .agg(f.count("*").alias("row_count"))\
                              .orderBy("row_count", ascending=False)\
                              .toPandas()

In [None]:
length_stat

## UDF (User Defined Functions)
The function type of the UDF can be one of the following:
+ **SCALAR**. A scalar UDF defines a transformation: One or more `pandas.Series` -> A `pandas.Series`. calar UDFs are used with `pyspark.sql.DataFrame.withColumn()` and `pyspark.sql.DataFrame.select()`
+ **GROUPED_MAP**. A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`. Grouped map UDFs are used with `pyspark.sql.GroupedData.apply()`

In [None]:
@f.pandas_udf(StringType())
def encode_http_status(codes):
    mapping = {
        1: "info",
        2: "success",
        3: "redirect",
        4: "client error",
        5: "server error"
    }
    return (codes // 100).replace(mapping)

In [None]:
log.withColumn("http_status", encode_http_status("code"))\
   .groupBy("http_status").count().show()

In [None]:
spark.stop()