# Made. Spark DataFrame API

Для начала импортнем содержимое пакета spark.sql.

В нем есть SparkSession -- часть, которая выполняется на драйвере и обеспечивает выполнение кода на Spark.

In [2]:
import org.apache.spark.sql._

## SparkSession
Конфигурация spark https://spark.apache.org/docs/latest/configuration.html

In [2]:
val spark = SparkSession.builder()
    // адрес мастера
    .master("local[*]")
    // имя приложения в интерфейсе спарка
    .appName("made-demo")
//     .config("spark.executor.memory",  "2g")
//     .config("spark.executor.cores", "2")
//     .config("spark.driver.memory", "2g")
    .getOrCreate()

spark = org.apache.spark.sql.SparkSession@75d837b9


org.apache.spark.sql.SparkSession@75d837b9

Импортируем синтаксический сахар.

In [3]:
import spark.implicits._

## Создание DataFrame

In [3]:
val names = Seq("Vanya", "Petya", "Vasya")

names = List(Vanya, Petya, Vasya)


List(Vanya, Petya, Vasya)

Из коллекций функцией .toDF (нужен импорт spark.implicits._)

In [5]:
val df = names.toDF("name")
df.show

+-----+
| name|
+-----+
|Vanya|
|Petya|
|Vasya|
+-----+



df = [name: string]


[name: string]

Коллекция кейс классов

In [4]:
case class Person(name: String)
// List(("Vanya", 25, "Moscow"))
val df = spark.createDataFrame(names.map(Person))
df.show

+-----+
| name|
+-----+
|Vanya|
|Petya|
|Vasya|
+-----+



defined class Person
df = [name: string]


[name: string]

RDD[Row] и Schema

In [18]:
import org.apache.spark.sql.types._

val schema = StructType(Seq(
    StructField("name", StringType)
))

schema = StructType(StructField(name,StringType,true))


StructType(StructField(name,StringType,true))

In [20]:
val rdd = spark
    .sparkContext
    .parallelize(names.map(x => Row(x)))

val df = spark.createDataFrame(rdd, schema)
df.show

+-----+
| name|
+-----+
|Vanya|
|Petya|
|Vasya|
+-----+



rdd = ParallelCollectionRDD[0] at parallelize at <console>:41
df = [name: string]


[name: string]

RDD от кейс классов

In [21]:
val rdd = spark
    .sparkContext
    .parallelize(names.map(x => Person(x)))

val df = spark.createDataFrame(rdd)
df.show

+-----+
| name|
+-----+
|Vanya|
|Petya|
|Vasya|
+-----+



rdd = ParallelCollectionRDD[5] at parallelize at <console>:42
df = [name: string]


[name: string]

Чтением файла

In [22]:
val df = spark.read
    .option("header", "true")
    .option("inferSchema", "true")
    .option("sep", ",")
    .csv("names.csv")

df.show

+-----+
| name|
+-----+
|Vanya|
|Petya|
|Vasya|
+-----+



df = [name: string]


[name: string]

Передавать параметры можно единой мапой

In [24]:
val options = Seq(
    "header" -> "true",
    "inferSchema" -> "true"
).toMap

val df = spark.read
    .options(options)
    .csv("names.csv")

df.show

+-----+
| name|
+-----+
|Vanya|
|Petya|
|Vasya|
+-----+



options = Map(header -> true, inferSchema -> true)
df = [name: string]


[name: string]

## Базовые функции

Достать схему

In [25]:
df.printSchema

root
 |-- name: string (nullable = true)



Показать семпл

In [13]:
df.show() // top 20 rows

+-----+
| name|
+-----+
|Vanya|
|Petya|
|Vasya|
+-----+



In [14]:
df.show(1) // top 1 row

+-----+
| name|
+-----+
|Vanya|
+-----+
only showing top 1 row



In [15]:
df.show(1, 3) // truncate strings to given number

+----+
|name|
+----+
| Van|
+----+
only showing top 1 row



In [16]:
df.show(3, 3, true) // print vertical

-RECORD 0---
 name | Van 
-RECORD 1---
 name | Pet 
-RECORD 2---
 name | Vas 



Посчитать количество строк

In [26]:
df.count

3

Функции над Spark DataFrames принимают на вход колонки.

Посмотрим на примере фильтрации

In [27]:
import org.apache.spark.sql.functions._

In [28]:
df.filter('name === "Vanya").show
df.filter($"name" === "Vanya").show
df.filter(col("name") === "Vanya").show // предпочтительный вариант, ибо более явный, находится в org.apache.spark.sql.functions._

+-----+
| name|
+-----+
|Vanya|
+-----+

+-----+
| name|
+-----+
|Vanya|
+-----+

+-----+
| name|
+-----+
|Vanya|
+-----+



In [20]:
df.filter("name = 'Vanya'").show

+-----+
| name|
+-----+
|Vanya|
+-----+



In [29]:
df.registerTempTable("df")



In [32]:
spark.sql("""
    select *
    from df
    where name = "Vanya"
""").show

+-----+
| name|
+-----+
|Vanya|
+-----+



Функции над колонками

https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/functions.html

In [34]:
val dfWithLength = df
    .withColumn("length", length(col("name")))
    .withColumn("constant", lit(1))
dfWithLength.show

+-----+------+--------+
| name|length|constant|
+-----+------+--------+
|Vanya|     5|       1|
|Petya|     5|       1|
|Vasya|     5|       1|
+-----+------+--------+



dfWithLength = [name: string, length: int ... 1 more field]


[name: string, length: int ... 1 more field]

In [35]:
dfWithLength
    .select(
        col("*"),
        (col("length") - 1).as("l1"),
        (col("length") % 2).alias("l2"),
        (col("length") * 3).name("l3")
    ).show

+-----+------+--------+---+---+---+
| name|length|constant| l1| l2| l3|
+-----+------+--------+---+---+---+
|Vanya|     5|       1|  4|  1| 15|
|Petya|     5|       1|  4|  1| 15|
|Vasya|     5|       1|  4|  1| 15|
+-----+------+--------+---+---+---+



In [36]:
dfWithLength.drop("length").show

+-----+--------+
| name|constant|
+-----+--------+
|Vanya|       1|
|Petya|       1|
|Vasya|       1|
+-----+--------+



## Пропуски

Посмотрим на датасет со статистиками игроков NBA

https://www.kaggle.com/drgilermo/nba-players-stats

In [37]:
val players = spark.read.options(Map(
    "header" -> "true",
    "inferSchema" -> "true"
)).csv("data/Players.csv")

players = [_c0: int, Player: string ... 6 more fields]


[_c0: int, Player: string ... 6 more fields]

In [38]:
players.show(truncate=false)

+---+----------------+------+------+-------------------------------+----+-------------+------------+
|_c0|Player          |height|weight|collage                        |born|birth_city   |birth_state |
+---+----------------+------+------+-------------------------------+----+-------------+------------+
|0  |Curly Armstrong |180   |77    |Indiana University             |1918|null         |null        |
|1  |Cliff Barker    |188   |83    |University of Kentucky         |1921|Yorktown     |Indiana     |
|2  |Leo Barnhorst   |193   |86    |University of Notre Dame       |1924|null         |null        |
|3  |Ed Bartels      |196   |88    |North Carolina State University|1925|null         |null        |
|4  |Ralph Beard     |178   |79    |University of Kentucky         |1927|Hardinsburg  |Kentucky    |
|5  |Gene Berce      |180   |79    |Marquette University           |1926|null         |null        |
|6  |Charlie Black   |196   |90    |University of Kansas           |1921|Arco         |Idah

In [39]:
players.count()

3922

In [44]:
val filler = Map(
    "height" -> 0,
    "weight" -> 0,
    "collage" -> ""
)

val filledPlayers = players
    .na.drop(Seq("born"))
    .na.fill(filler)

filler = Map(height -> 0, weight -> 0, collage -> "")
filledPlayers = [_c0: int, Player: string ... 6 more fields]


lastException: Throwable = null


[_c0: int, Player: string ... 6 more fields]

In [45]:
val findState = udf((collage: String) => {
    collage.toLowerCase.replace("state", "").replace("university", "").replace("of", "").replace(" ", "").capitalize
})

findState = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))


UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

In [63]:
val fromUniversity = filledPlayers
    .withColumn("fromUniverity", findState(col("collage")))

fromUniversity = [_c0: int, Player: string ... 7 more fields]


[_c0: int, Player: string ... 7 more fields]

In [65]:
val northCarolina = fromUniversity
    .na.replace("fromUniverity", Map("Northcarolina" -> "North Carolina"))

northCarolina = [_c0: int, Player: string ... 7 more fields]


[_c0: int, Player: string ... 7 more fields]

In [66]:
northCarolina.show

+---+----------------+------+------+--------------------+----+-------------+------------+-----------------+
|_c0|          Player|height|weight|             collage|born|   birth_city| birth_state|    fromUniverity|
+---+----------------+------+------+--------------------+----+-------------+------------+-----------------+
|  0| Curly Armstrong|   180|    77|  Indiana University|1918|         null|        null|          Indiana|
|  1|    Cliff Barker|   188|    83|University of Ken...|1921|     Yorktown|     Indiana|         Kentucky|
|  2|   Leo Barnhorst|   193|    86|University of Not...|1924|         null|        null|        Notredame|
|  3|      Ed Bartels|   196|    88|North Carolina St...|1925|         null|        null|   North Carolina|
|  4|     Ralph Beard|   178|    79|University of Ken...|1927|  Hardinsburg|    Kentucky|         Kentucky|
|  5|      Gene Berce|   180|    79|Marquette University|1926|         null|        null|        Marquette|
|  6|   Charlie Black|   196

In [67]:
val withNewCity = northCarolina
    .withColumn("new_city", coalesce(col("birth_city"), col("fromUniverity")))

withNewCity.show

+---+----------------+------+------+--------------------+----+-------------+------------+-----------------+--------------+
|_c0|          Player|height|weight|             collage|born|   birth_city| birth_state|    fromUniverity|      new_city|
+---+----------------+------+------+--------------------+----+-------------+------------+-----------------+--------------+
|  0| Curly Armstrong|   180|    77|  Indiana University|1918|         null|        null|          Indiana|       Indiana|
|  1|    Cliff Barker|   188|    83|University of Ken...|1921|     Yorktown|     Indiana|         Kentucky|      Yorktown|
|  2|   Leo Barnhorst|   193|    86|University of Not...|1924|         null|        null|        Notredame|     Notredame|
|  3|      Ed Bartels|   196|    88|North Carolina St...|1925|         null|        null|   North Carolina|North Carolina|
|  4|     Ralph Beard|   178|    79|University of Ken...|1927|  Hardinsburg|    Kentucky|         Kentucky|   Hardinsburg|
|  5|      Gene 

withNewCity = [_c0: int, Player: string ... 8 more fields]


[_c0: int, Player: string ... 8 more fields]

## Агрегаты

In [68]:
withNewCity
    .groupBy("new_city")
    .count()
    .orderBy(desc("count"))
    .show

+------------+-----+
|    new_city|count|
+------------+-----+
|     Chicago|  114|
|            |   93|
| Los Angeles|   88|
|Philadelphia|   83|
|    Brooklyn|   76|
|    New York|   71|
|  Washington|   65|
|     Detroit|   58|
|   Baltimore|   37|
|     Atlanta|   37|
|      Dallas|   33|
|     Houston|   33|
|     Memphis|   31|
|  Louisville|   28|
|     Seattle|   25|
|     Oakland|   25|
|  Birmingham|   24|
|    Columbus|   24|
| New Orleans|   23|
|   St. Louis|   22|
+------------+-----+
only showing top 20 rows



In [69]:
val aggregates = withNewCity
    .groupBy("new_city")
    .agg(
        count("*").as("count"),
        min(col("weight")).as("min_weight"),
        mean(col("weight")).as("mean_weight"),
        max(col("weight")).as("max_weight")
    )
    .orderBy(desc("count"))

aggregates.show

+------------+-----+----------+-----------------+----------+
|    new_city|count|min_weight|      mean_weight|max_weight|
+------------+-----+----------+-----------------+----------+
|     Chicago|  114|        72|94.16666666666667|       149|
|            |   93|        70|95.03225806451613|       124|
| Los Angeles|   88|        74|92.81818181818181|       122|
|Philadelphia|   83|        73|90.81927710843374|       125|
|    Brooklyn|   76|        74| 90.6842105263158|       117|
|    New York|   71|        68|88.78873239436619|       121|
|  Washington|   65|        74|92.56923076923077|       113|
|     Detroit|   58|        68|94.08620689655173|       128|
|   Baltimore|   37|        61| 94.8108108108108|       124|
|     Atlanta|   37|        79|98.89189189189189|       122|
|     Houston|   33|        77|97.81818181818181|       120|
|      Dallas|   33|        60| 95.3030303030303|       127|
|     Memphis|   31|        68| 90.3225806451613|       119|
|  Louisville|   28|    

aggregates = [new_city: string, count: bigint ... 3 more fields]


[new_city: string, count: bigint ... 3 more fields]

## Оконные функции

In [70]:
import org.apache.spark.sql.expressions.Window

In [71]:
val window = Window.partitionBy("new_city").orderBy("born")

withNewCity
.withColumn("count_new_city", count("*").over(window)).orderBy(desc("count_new_city"))
.show


+----+----------------+------+------+--------------------+----+----------+-----------+--------------------+--------+--------------+
| _c0|          Player|height|weight|             collage|born|birth_city|birth_state|       fromUniverity|new_city|count_new_city|
+----+----------------+------+------+--------------------+----+----------+-----------+--------------------+--------+--------------+
|3741|   Jabari Parker|   203|   113|     Duke University|1995|   Chicago|   Illinois|                Duke| Chicago|           114|
|3761| Cliff Alexander|   203|   111|University of Kansas|1995|   Chicago|   Illinois|              Kansas| Chicago|           114|
|3808|   Jahlil Okafor|   211|   124|     Duke University|1995|   Chicago|   Illinois|                Duke| Chicago|           114|
|3576|   Quincy Miller|   206|    95|   Baylor University|1992|   Chicago|   Illinois|              Baylor| Chicago|           111|
|3445|     Evan Turner|   201|    99|Ohio State Univer...|1988|   Chicago|  

window = org.apache.spark.sql.expressions.WindowSpec@545da373


org.apache.spark.sql.expressions.WindowSpec@545da373

## Joins

In [72]:
val seasonStats = spark.read.options(Map(
    "header" -> "true",
    "inferSchema" -> "true"
)).csv("data/Seasons_Stats.csv")

seasonStats = [_c0: int, Year: int ... 51 more fields]


[_c0: int, Year: int ... 51 more fields]

Посчитаем долю игр игрока относительно наибольшего числа игр сыгранных его командой

In [73]:
val maxGames = seasonStats
    .groupBy(col("Year"), col("Tm"))
    .agg(max("G").as("maxGames"))

maxGames = [Year: int, Tm: string ... 1 more field]


[Year: int, Tm: string ... 1 more field]

In [74]:
seasonStats
    .select(col("Player"), col("Year"), col("Tm"), col("G"))
    .join(maxGames, Seq("Tm", "Year"), "left")
    .withColumn("pct", round(col("G") / col("maxGames") * lit(100), 2))
    .show

+---+----+---------------+---+--------+-----+
| Tm|Year|         Player|  G|maxGames|  pct|
+---+----+---------------+---+--------+-----+
|FTW|1950|Curly Armstrong| 63|      68|92.65|
|INO|1950|   Cliff Barker| 49|      64|76.56|
|CHS|1950|  Leo Barnhorst| 67|      68|98.53|
|TOT|1950|     Ed Bartels| 15|      69|21.74|
|DNN|1950|     Ed Bartels| 13|      62|20.97|
|NYK|1950|     Ed Bartels|  2|      68| 2.94|
|INO|1950|    Ralph Beard| 60|      64|93.75|
|TRI|1950|     Gene Berce|  3|      64| 4.69|
|TOT|1950|  Charlie Black| 65|      69| 94.2|
|FTW|1950|  Charlie Black| 36|      68|52.94|
|AND|1950|  Charlie Black| 29|      64|45.31|
|PHW|1950|    Nelson Bobb| 57|      68|83.82|
|PHW|1950|Jake Bornheimer| 60|      68|88.24|
|NYK|1950|   Vince Boryla| 59|      68|86.76|
|WAT|1950|      Don Boven| 62|      62|100.0|
|WAT|1950|  Harry Boykoff| 61|      62|98.39|
|CHS|1950|    Joe Bradley| 46|      68|67.65|
|SHE|1950|    Bob Brannum| 59|      62|95.16|
|NYK|1950|     Carl Braun| 67|    

## Репартиционирование

In [75]:
def printItemPerPartition[T](ds: DataFrame): Unit = {
    ds.mapPartitions { x => Iterator(x.length) }
    .withColumnRenamed("value", "itemPerPartition")
    .show(50, false)
}

printItemPerPartition[java.lang.Long](seasonStats)

printItemPerPartition: [T](ds: org.apache.spark.sql.DataFrame)Unit


+----------------+
|itemPerPartition|
+----------------+
|20463           |
|4228            |
+----------------+



In [76]:
val repartStats = seasonStats.repartition(10, col("Player"))
printItemPerPartition[java.lang.Long](repartStats)

+----------------+
|itemPerPartition|
+----------------+
|2340            |
|2344            |
|2553            |
|2409            |
|2465            |
|2394            |
|2662            |
|2422            |
|2503            |
|2599            |
+----------------+



repartStats = [_c0: int, Year: int ... 51 more fields]


[_c0: int, Year: int ... 51 more fields]