## Table of Contents

- [SparkSession](#SparkSession)
- [Get Timestamps](#ConvertTime)
- [Aggregations](#aggregations)

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as sf
from pyspark.sql.window import Window
import re

### <a id="SparkSession">SparkSession + config options</a>

In [2]:
spark = SparkSession\
    .builder\
    .appName("PySpark learning")\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

### Reading csv file with header

In [3]:
df = spark\
    .read\
    .option("header", "true")\
    .option("inferSchema", 'true')\
    .csv("data/chess_games.csv")

In [4]:
df.show(5)

+--------+-----+----------+------------+-----+--------------+------+--------------+-------------+------------+------------+------------+--------------------+-----------+--------------------+-----------+
|      id|rated|created_at|last_move_at|turns|victory_status|winner|increment_code|     white_id|white_rating|    black_id|black_rating|               moves|opening_eco|        opening_name|opening_ply|
+--------+-----+----------+------------+-----+--------------+------+--------------+-------------+------------+------------+------------+--------------------+-----------+--------------------+-----------+
|TZJHLljE|false|1.50421E12|  1.50421E12|   13|     outoftime| white|          15+2|     bourgris|        1500|        a-00|        1191|d4 d5 c4 c6 cxd5 ...|        D10|Slav Defense: Exc...|          5|
|l1NXvwaE| true|1.50413E12|  1.50413E12|   16|        resign| black|          5+10|         a-00|        1322|   skinnerua|        1261|d4 Nc6 e4 e5 f4 f...|        B00|Nimzowitsch Defen..

### <a id="ConvertTime">Assigning new columns and changing existing ones</a>

In [5]:
df = df\
    .withColumn("game_start", sf.to_timestamp(sf.from_unixtime(df.created_at / 1000)))\
    .withColumn("game_end", sf.to_timestamp(sf.from_unixtime(df.last_move_at / 1000)))\
    .withColumn("rating_difference", df.white_rating - df.black_rating)\
    .drop("created_at", "last_move_at")\
    .withColumnRenamed("opening_ply", "opening_move_count")

In [6]:
df.show(5)

+--------+-----+-----+--------------+------+--------------+-------------+------------+------------+------------+--------------------+-----------+--------------------+------------------+-------------------+-------------------+-----------------+
|      id|rated|turns|victory_status|winner|increment_code|     white_id|white_rating|    black_id|black_rating|               moves|opening_eco|        opening_name|opening_move_count|         game_start|           game_end|rating_difference|
+--------+-----+-----+--------------+------+--------------+-------------+------------+------------+------------+--------------------+-----------+--------------------+------------------+-------------------+-------------------+-----------------+
|TZJHLljE|false|   13|     outoftime| white|          15+2|     bourgris|        1500|        a-00|        1191|d4 d5 c4 c6 cxd5 ...|        D10|Slav Defense: Exc...|                 5|2017-08-31 22:06:40|2017-08-31 22:06:40|              309|
|l1NXvwaE| true|   16|  

### Check data types

In [7]:
df.printSchema()

root
 |-- id: string (nullable = true)
 |-- rated: boolean (nullable = true)
 |-- turns: integer (nullable = true)
 |-- victory_status: string (nullable = true)
 |-- winner: string (nullable = true)
 |-- increment_code: string (nullable = true)
 |-- white_id: string (nullable = true)
 |-- white_rating: integer (nullable = true)
 |-- black_id: string (nullable = true)
 |-- black_rating: integer (nullable = true)
 |-- moves: string (nullable = true)
 |-- opening_eco: string (nullable = true)
 |-- opening_name: string (nullable = true)
 |-- opening_move_count: integer (nullable = true)
 |-- game_start: timestamp (nullable = true)
 |-- game_end: timestamp (nullable = true)
 |-- rating_difference: integer (nullable = true)



### <a id="Aggregations">Aggregations</a>

In [8]:
# average rating for players grouped by whether the game was ranked or not

df\
    .groupBy("rated")\
    .agg(
        sf.count(df.id).alias("count"),
        sf.round(sf.mean(df.white_rating)).cast("int").alias("average white rating"),
        sf.round(sf.mean(df.black_rating)).cast("int").alias("average black rating")
    )\
    .show()

+-----+-----+--------------------+--------------------+
|rated|count|average white rating|average black rating|
+-----+-----+--------------------+--------------------+
| true|16155|                1597|                1594|
|false| 3903|                1595|                1569|
+-----+-----+--------------------+--------------------+



In [9]:
# most popular openings and median rank of the players

df\
    .groupBy('opening_name')\
    .agg(
        sf.count(df.id).alias("games played"),
        sf.median(df.white_rating).cast("int").alias("median white rating"),
        sf.median(df.black_rating).cast("int").alias("median black rating")
    )\
    .sort(sf.desc("games played"))\
    .show(10, truncate=False)

+---------------------------------------------+------------+-------------------+-------------------+
|opening_name                                 |games played|median white rating|median black rating|
+---------------------------------------------+------------+-------------------+-------------------+
|Van't Kruijs Opening                         |368         |1319               |1406               |
|Sicilian Defense                             |358         |1514               |1583               |
|Sicilian Defense: Bowdler Attack             |296         |1536               |1569               |
|French Defense: Knight Variation             |271         |1560               |1578               |
|Scotch Game                                  |271         |1535               |1499               |
|Scandinavian Defense: Mieses-Kotroc Variation|259         |1533               |1467               |
|Queen's Pawn Game: Mason Attack              |232         |1645               |1564       

In [10]:
# number of game results by reason (victory status)

df\
    .groupBy("winner", "victory_status")\
    .agg(sf.count(df.id).alias("count"))\
    .sort(sf.desc("count"))\
    .show()

+------+--------------+-----+
|winner|victory_status|count|
+------+--------------+-----+
| white|        resign| 5844|
| black|        resign| 5303|
| white|          mate| 3344|
| black|          mate| 2981|
|  draw|          draw|  906|
| black|     outoftime|  823|
| white|     outoftime|  813|
|  draw|     outoftime|   44|
+------+--------------+-----+



In [11]:
# measures of central tendency + standard deviation for the number of turns played
# grouped by victory status with no out of time category

df\
    .filter(df.victory_status != "outoftime")\
    .groupBy("victory_status")\
    .agg(
        sf.round(sf.mean(df.turns), 2).alias("mean turns"),
        sf.round(sf.median(df.turns), 2).alias("median turns"),
        sf.mode(df.turns).alias("turns mode"),
        sf.round(sf.std(df.turns), 2).alias("turns standard deviation")
    )\
    .show()

+--------------+----------+------------+----------+------------------------+
|victory_status|mean turns|median turns|turns mode|turns standard deviation|
+--------------+----------+------------+----------+------------------------+
|        resign|     53.91|        50.0|        45|                   29.67|
|          mate|     65.42|        59.0|        52|                   33.25|
|          draw|     83.78|        86.0|       107|                   45.32|
+--------------+----------+------------+----------+------------------------+



In [12]:
# most popular time controls

df\
    .groupBy("increment_code")\
    .agg(sf.count(df.id).alias("number of games"))\
    .sort(sf.desc("number of games"))\
    .show(5)

+--------------+---------------+
|increment_code|number of games|
+--------------+---------------+
|          10+0|           7721|
|          15+0|           1311|
|         15+15|            850|
|           5+5|            738|
|           5+8|            697|
+--------------+---------------+
only showing top 5 rows



### Pivoting + UDF

In [13]:
# function to get rid of variations in openings
def get_main_opening_name(opening_name):
    result_list = re.split("(\s\|)|(:\s)|(\s#)", opening_name)
    return result_list[0]

# register get_main_opening_name as a UDF (user defined function)
main_opening_name_udf = sf.udf(get_main_opening_name, "STRING")

In [14]:
# get clean opening name with average rating difference per victory status
df\
    .withColumn("opening_name", main_opening_name_udf("opening_name"))\
    .groupBy("opening_name")\
    .pivot("victory_status")\
    .agg(sf.round(sf.mean("rating_difference"), 2))\
    .sort("opening_name")\
    .show(truncate=False)

+---------------------------------+------+-------+---------+-------+
|opening_name                     |draw  |mate   |outoftime|resign |
+---------------------------------+------+-------+---------+-------+
|Alekhine Defense                 |-34.6 |104.69 |-0.06    |9.36   |
|Amar Opening                     |NULL  |-382.67|-197.25  |-178.57|
|Amazon Attack                    |483.0 |-39.0  |NULL     |-39.0  |
|Anderssen Opening                |-108.0|15.58  |-36.0    |147.36 |
|Australian Defense               |NULL  |425.0  |NULL     |NULL   |
|Barnes Defense                   |35.0  |159.57 |243.0    |185.17 |
|Barnes Opening                   |360.0 |-209.0 |NULL     |NULL   |
|Benko Gambit                     |-254.0|-103.0 |-231.0   |-47.2  |
|Benko Gambit Accepted            |NULL  |-111.0 |NULL     |37.0   |
|Benko Gambit Declined            |NULL  |NULL   |NULL     |86.33  |
|Benoni Defense                   |114.86|6.57   |-155.3   |11.91  |
|Bird Opening                     

### Rollup

In [15]:
df\
    .rollup("increment_code", "victory_status")\
    .count()\
    .orderBy("increment_code", "victory_status")\
    .withColumnsRenamed(
        {
            "increment_code": "increment_code",
            "victory_status": "victory status",
            "count": "number of games"
        }
    )\
    .show()

+--------------+--------------+---------------+
|increment_code|victory status|number of games|
+--------------+--------------+---------------+
|          NULL|          NULL|          20058|
|          0+12|          NULL|              5|
|          0+12|          draw|              1|
|          0+12|          mate|              1|
|          0+12|        resign|              3|
|          0+13|          NULL|              1|
|          0+13|        resign|              1|
|          0+15|          NULL|              8|
|          0+15|          mate|              1|
|          0+15|     outoftime|              1|
|          0+15|        resign|              6|
|          0+16|          NULL|             15|
|          0+16|          draw|              1|
|          0+16|          mate|              1|
|          0+16|     outoftime|             11|
|          0+16|        resign|              2|
|         0+180|          NULL|              9|
|         0+180|     outoftime|         

### Window functions

In [52]:
window_spec = Window\
    .partitionBy("day")\
    .orderBy("day")\
    .rowsBetween(-7, 0)

df\
    .withColumn("day", sf.to_date(df.game_start))\
    .select(
        "day",
        sf.sum("turns").over(window_spec).alias("running_total_moves_7_days"),
        sf.round(sf.mean("turns").over(window_spec), 2).alias("running_average_moves_7_days")
    )\
    .show()

+----------+--------------------------+----------------------------+
|       day|running_total_moves_7_days|running_average_moves_7_days|
+----------+--------------------------+----------------------------+
|2013-08-17|                        43|                        43.0|
|2013-08-17|                        62|                        31.0|
|2013-08-19|                        51|                        51.0|
|2013-08-19|                       135|                        67.5|
|2013-08-19|                       162|                        54.0|
|2013-08-20|                        35|                        35.0|
|2013-08-20|                        74|                        37.0|
|2013-08-21|                        29|                        29.0|
|2013-09-14|                        69|                        69.0|
|2013-09-14|                       130|                        65.0|
|2013-09-14|                       167|                       55.67|
|2013-09-14|                      

### SQL in PySpark

In [13]:
df.createOrReplaceTempView("chess_games")

In [30]:
# 10 time controls that reached the highest number of turns

query = """
    SELECT increment_code, MAX(turns) AS max_number_of_turns
    FROM chess_games
    GROUP BY increment_code
    ORDER BY "max number of turns" DESC
    LIMIT 10;
"""

result = spark.sql(query)
result\
    .withColumnsRenamed(
        {
            "increment_code": "increment code",
            "max_number_of_turns": "max number of turns"
        }
    )\
    .show(truncate=False)

+--------------+-------------------+
|increment code|max number of turns|
+--------------+-------------------+
|8+10          |146                |
|14+0          |108                |
|20+9          |28                 |
|1+13          |45                 |
|14+8          |104                |
|5+30          |87                 |
|10+6          |132                |
|20+120        |52                 |
|18+0          |130                |
|10+1          |154                |
+--------------+-------------------+



In [31]:
# 10 time controls with the highest average rating difference

query = """
    SELECT increment_code, ROUND(AVG(rating_difference), 2) AS mean_rating_difference
    FROM chess_games
    GROUP BY increment_code
    ORDER BY mean_rating_difference DESC
    LIMIT 10;
"""

result = spark.sql(query)
result\
    .withColumnsRenamed(
        {
            "increment_code": "increment code",
            "mean_rating_difference": "mean rating difference"
        }
    )\
    .show()

+--------------+----------------------+
|increment code|mean rating difference|
+--------------+----------------------+
|          4+12|                 815.0|
|         40+40|                 804.0|
|          3+35|                 659.0|
|         30+40|                 650.0|
|         18+19|                 565.0|
|         60+10|                539.33|
|         14+15|                 498.0|
|         15+25|                 477.0|
|         30+20|                476.25|
|         17+11|                 457.0|
+--------------+----------------------+



In [7]:
spark.stop()