# Made. Spark DataFrame API

На этом семинаре мы разберем основные функции Spark DataFrame API и попробуем их применить на синтетическом примере анализа результатов A/B-теста.

Мы работаем в среде Apache Zeppelin. В этой среде по умолчанию создается сессия спарка и сделан импорт синтаксического сахара для удобства работы со спарком.

In [1]:
spark

Однако, если мы будем работать в других средах, например, собирать jar с заданием и запускать его через `spark-submit`, то нам понадобится в явном виде создать сессию спарка.

Для этого мы можем импортировать зависимость Spark SQL API (Spark DF API является частью его).

In [3]:
import org.apache.spark.sql._

Теперь мы можем создать сессию. При создании сессии нам может быть необходимо передать различные параметры внутрь нее.

Все доступные параметры спарка можно посмотреть [здесь](https://spark.apache.org/docs/latest/configuration.html).

In [5]:
val spark = SparkSession.builder()
    // адрес мастера
    .master("local[*]")
    // имя приложения в интерфейсе спарка
    .appName("made-demo")
//     .config("spark.executor.memory",  "2g")
//     .config("spark.executor.cores", "2")
//     .config("spark.driver.memory", "2g")
    .getOrCreate()

После создания сессии мы можем импортировать синтаксический сахар

In [7]:
import spark.implicits._

## Создание DataFrame

In [9]:
val names = Seq("Vanya", "Petya", "Vasya")

Из коллекций функцией .toDF (нужен импорт spark.implicits._)

In [11]:
val df = names.toDF("name")
df.show

Коллекция кейс классов

In [13]:
case class Person(name: String)
// List(("Vanya", 25, "Moscow"))
val df = spark.createDataFrame(names.map(Person))
df.show

RDD[Row] и Schema

In [15]:
import org.apache.spark.sql.types._

val schema = StructType(Seq(
    StructField("name", StringType)
))

In [16]:
val rdd = spark
    .sparkContext
    .parallelize(names.map(x => Row(x)))

val df = spark.createDataFrame(rdd, schema)
df.show

RDD от кейс классов

In [18]:
val rdd = spark
    .sparkContext
    .parallelize(names.map(x => Person(x)))

val df = spark.createDataFrame(rdd)
df.show

Чтением файла.

In [20]:
val df = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("sep", ",")
    .csv("/notebook/names.csv")

df.show

Передавать параметры можно единой мапой

In [22]:
val options = Seq(
    "header" -> "true",
    "inferSchema" -> "true"
).toMap

val df = spark.read
    .options(options)
    .csv("/notebook/names.csv")

df.show

## Базовые функции

В этом и дальнейшем разделах мы будем использовать симулированные данные о реакциях пользователя на рекомендательные системы.

Данные о реакциях собраны в рамках аб теста между двумя моделями

1. `default` -- модель, которая выдает ровно одно предсказание для пользователя перед началом аб-теста. То есть в этой группе конкретному пользователю всегда показывается один и тот же баннер
2. `model` -- модель, которая учится в онлайн (контекстный бандит). Предполагается, что со временем она должна выучивать более подходящие пользователям рекоммендации.

В рамках этого ноутбука хотим построить отчет, чтобы увидеть

1. Правда ли `model` работает в среднем лучше, чем `default`?
2. Улучшается ли со временем результат работы `model`?
3. Есть ли какой-то период времени, когда `model` работает хуже `default`?


Кроме того, нужно учитывать тот факт, что в рамках одной сессии пользователь может несколько раз увидеть один и тот же баннер / кликнуть на него несколько раз. Дубли событий лучше удалить, чтобы избежать лишнего шума в итоговой метрике.


Для начала нам необходимо прочитать данные

In [24]:
val df = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("sep", ",")
    .csv("/notebook/ab-test-sim-both.csv")

df.show

Превью данных можно сделать в разном виде

1. Выбрать сколько строк показать
2. Ограничить количество символов в колонке
3. Показать данные вертикально (проще читать, когда много колонок в таблице)

In [26]:
df.show(1) // top 1 row

df.show(1, 3) // truncate strings to given number

df.show(3, 3, true) // print vertical

Посмотрим сколько всего наблюдений в таблице

In [28]:
df.count

Давайте посмотрим, что есть в данных, выведя схему. В данном случае мы можем увидеть ее и из `show`, но так как `show` -- операция действия, то каждый раз ее лучше не использовать, схема может вычисляться по метаданным без рассчета всего датасета. Кроме того, данные могут иметь сложную вложенную структуру, что будет удобно отражено в схеме.

In [30]:
df.printSchema

Как видно из схемы, у нас есть пять полей

1. `user_id` -- идентификатор пользователя
2. `item_id` -- идентификатор объекта (баннера)
3. `event` -- событие, которое совершил пользователь, может быть одним из четырех типов
    1. `SHOW` -- означает, что баннер отрендерился на экране пользователя
    2. `CLICK` -- пользователь перешел по баннеру (позитивно отреагировал)
    3. `OPEN` -- пользователь перешел по баннеру (позитивно отреагировал). По факту совпадает с событием `CLICK`, поэтому чуть позже мы его поменяем в данных на `CLICK`.
    4. `HIDE` -- пользователь скрыл баннер (негативно отреагировал)
4. Время клика в секундах
5. Уверенность модели в рекомендации



Функции над Spark DataFrames принимают на вход колонки.

Посмотрим на примере фильтрации

In [33]:
import org.apache.spark.sql.functions._

df.filter(user_id === 9771).show
df.filter($"user_id" === 9771).show
df.filter(col("user_id") === 9771).show // предпочтительный вариант, ибо более явный, находится в org.apache.spark.sql.functions._ + проще переиспользовать между spark / pyspark

In [34]:
df.filter("user_id = 9771").show

 

Также можно зарегистрировать таблицу и пользоваться синтаксисом SQL

In [36]:
df.registerTempTable("df")

spark.sql("""
    select *
    from df
    where user_id = 9771
""").show

## Функции над колонками

https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html


Есть набор стандартных функций, все они принимают на вход колонку.

In [38]:
val dfWithLength = df
    .withColumn("length", length(col("event"))) // take length of each element in column `event`
    .withColumn("constant", lit(1)) // create column from constant 
dfWithLength.show

Метод `select` позволяет выбрать необходимые нам колонки (или все колонки при `"*"`) и добавить новые. Не забывайте их переименовывать, иначе будут непонятные и неудобные для дальнейшего использования имена.

In [40]:
dfWithLength
    .select(
        col("*"),
        (col("length") - 1).as("l1"),
        (col("length") % 2).alias("l2"),
        (col("length") * 3).name("l3")
    ).show

Также мы можем просто удалить колонку

In [42]:
dfWithLength.drop("length").show

## Работа с пропусками

Над spark.DataFrame доступен модуль `.na`. Он содержит несколько функций для работы с пропусками:
1. `.na.drop(colnames: Seq[String])` -- выбрасывает все пропуски, в списке колонок
2. `.na.fill(map: Map[String, Value])` -- маппит пропуски в конкретных колонках (ключ мапы) на соответствующее значение
3. `.na.replace(colname: String, map: Map[Value, Value])` -- заменяет значения в колонке в соответствии с мапой.


Посмотрим, сколько наблюдений останется, если удалить пропуски по колонке `confidence`.

In [44]:
df.na.drop(Seq("confidence")).count

Попробуем заменить пропуски средним. Для начала нам нужно его посчитать и получить значение на драйвере.

In [46]:
val meanArray: Array[Row] = df.select(mean(col("confidence"))).collect

После `collect` мы получаем массив `Row`, который является нетипизированной коллекцией. Поэтому простое взятие элемента будет иметь тип `Any`, что может привести к неприятным последствиям.

In [48]:
meanArray(0)(0)

Давайте скастим его в `Double`

In [50]:
val mn = meanArray(0)(0).asInstanceOf[Double]

И заполним пропуски, используя метод `.na.fill`

In [52]:
val dfWithMean = df.na.fill(Map(
    "confidence" -> mn
))

dfWithMean.show

Также в условии указано, что события `OPEN` и `CLICK` несут одинаковый смысл, поэтому переименуем `OPEN` в `CLICK` используя `.na.replace`

In [54]:
val dfWithClick = dfWithMean.na.replace("event", Map("OPEN" -> "CLICK"))
dfWithClick.show

## Агрегаты


Давайте посмотрим сколько было каждого из событий

In [56]:
dfWithClick
    .groupBy("event")
    .count()
    .orderBy(desc("count"))
    .show

Мы можем также считать и много разных агрегаций за раз по аналогии с pd.DataFrame

In [58]:
val aggregates = dfWithClick
    .groupBy("event")
    .agg(
        count("*").as("count"),
        min(col("confidence")).as("min_confidence"),
        mean(col("confidence")).as("mean_confidence"),
        max(col("confidence")).as("max_confidence")
    )
    .orderBy(desc("count"))

aggregates.show

## Оконные функции

Перед тем как считать статистики, давайте выделим отдельные пользовательские сессии и проведем дедупликацию событий.

Сессией мы будем считать набор событий, между каждым последовательным из которых было меньше 10 минут (600 секунд). Тогда добавить индекс сессии мы можем по следующему пайплайну

1. Взять лаг по времени для каждлого пользователя
2. Заполнить нулями полученные пропуски
3. Посчитать дельту по времени между двумя последовательными событиями пользователя
4. Сравнить дельту с 600 секундами. Если больше, то считаем, что началась новая сессия
5. Считаем кумулятивную сумму по индикаторам начала сессий

In [60]:
import org.apache.spark.sql.expressions.Window

val userWindow = Window.partitionBy("user_id").orderBy("timestamp")

val dfWithLag = dfWithClick.withColumn(
    "lag_timestamp", lag(col("timestamp"), 1).over(userWindow)
    ).na.fill(Map("lag_timestamp" -> 0))

In [61]:
dfWithLag.show

Находим индикаторы начала сессий

In [63]:
val dfWithSessionStart = dfWithLag.withColumn(
    "session_start", (col("timestamp") - col("lag_timestamp") > 600).cast("int")
)

dfWithSessionStart.show

Считаем итоговый индекс сессии. Чтобы посчитать кумулятивную сумму, нам необходимо взять сумму по всем предыдущим строкам пользователя. Для этого мы можем в окне явно указать необходимые строки для агрегации через `rowsBetween`. Для использования всех предыдущих элементов можно использовать `Window.unboundedPreceding`.

In [65]:
val sessionWindow = Window.partitionBy("user_id").orderBy("timestamp").rowsBetween(Window.unboundedPreceding, Window.currentRow)

val dfWithSession = dfWithSessionStart.withColumn("session_id", sum(col("session_start")).over(sessionWindow))

dfWithSession.show

 
## User Defined Functions

По условию, мы проводили аб-тест между двумя моделями. Вариация для пользователя выбирается по через остаток от деления хэша с солью от id пользователя.  В качестве хэша взять MurmurHash3.

Такого метода в DataFrame API по дефолту нет, поэтмоу может расширить его, используя UDF.


In [67]:
import scala.util.hashing.MurmurHash3

val defineGroup = udf((userId: Int) => {
    val hash = Math.floorMod(MurmurHash3.stringHash(s"${userId}|salt"), 100)
    if(hash < 50) {
        "default"
    } else "model"
})

In [68]:
val dfWithGroup = dfWithSession.withColumn(
    "group", defineGroup(col("user_id"))
).cache()

dfWithGroup.show

Посчитаем количество действий по разным группам

In [70]:
val pivoted = dfWithGroup
    .groupBy("group")
    .pivot("event")
    .count()

pivoted.show

Сравним конверсии

In [72]:
pivoted.select(col("group"), round(col("CLICK") / col("SHOW"), 4).as("clickRate"), round(col("HIDE") / col("SHOW"), 4).as("hideRate")).show

Видно, что на сырых событиях модель работает лучше. Давайте также дедуплицируем их по сессиям и также проверим конверсии

In [74]:
val sessionDeduplicated = dfWithGroup.groupBy("group", "user_id", "item_id", "event", "session_id").agg(min("timestamp").as("timestamp")).cache()

sessionDeduplicated
    .groupBy("group")
    .pivot("event")
    .count()
    .select(col("group"), round(col("CLICK") / col("SHOW"), 4).as("clickRate"), round(col("HIDE") / col("SHOW"), 4).as("hideRate")).show

Мы ответили на вопрос о том, что новая модель работает лучше и в терминах кликов, и в терминах скрытий баннера (на самом деле хорошо бы посчитать доверительный интервал, но вы уже можете это сделать сами) ).

## Визуализация

Давайте визуализируем динамику конверсий, чтобы ответить на остальные вопросы

In [77]:
// давайте агрегируем количества по 12 часов
val dfWithHour = sessionDeduplicated.withColumn("hour", floor(col("timestamp") / 43200))
val pivotedDynamics = dfWithHour
    .groupBy("group", "hour")
    .pivot("event")
    .count()
    
// посчитаем кумулятивные счетчики по времени
val cumStatWindow = Window.partitionBy("group").orderBy("hour").rowsBetween(Window.unboundedPreceding, Window.currentRow)

val dynamicStats = pivotedDynamics.na.fill(Map(
    "CLICK" -> 0,
    "HIDE" -> 0,
    "SHOW" -> 0
))
.withColumn("cumClicks", sum(col("CLICK")).over(cumStatWindow))
.withColumn("cumHides", sum(col("HIDE")).over(cumStatWindow))
.withColumn("cumShows", sum(col("SHOW")).over(cumStatWindow))
.select(
    col("group"),
    col("hour"),
    round(col("cumClicks") / col("cumShows"), 4).as("cumClickRate"),
    round(col("cumHides") / col("cumShows"), 4).as("cumHideRate")
)

 

Построим графики, используя стандартные средства Zeppelin.


Видно, что новая модель примерно через месяц начинает в общем работать лучше, чем исходная.

In [79]:
z.show(dynamicStats)