## Import

In [2]:
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, expr, when, udf,isnan, count, sum
from pyspark.ml.feature import Imputer
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from pyspark.sql.functions import mean

## Start Connection

In [3]:
spark = SparkSession.builder\
.master("local")\
.appName("Word Count")\
.getOrCreate()
#.config("spark.some.config.option", "some-value")\

## Basic Info

In [4]:
df1 = spark.read.options(header='true', inferSchema='true').csv("../data_source/chess_games.csv")

In [5]:
df1.show(5)
df1.printSchema()

+------------------+---------------+----------+------+----------+-------------------+--------+--------+---------------+---------------+---+--------------------+-----------+------------+--------------------+
|             Event|          White|     Black|Result|   UTCDate|            UTCTime|WhiteElo|BlackElo|WhiteRatingDiff|BlackRatingDiff|ECO|             Opening|TimeControl| Termination|                  AN|
+------------------+---------------+----------+------+----------+-------------------+--------+--------+---------------+---------------+---+--------------------+-----------+------------+--------------------+
|        Classical |        eisaaaa|  HAMID449|   1-0|2016.06.30|2023-11-11 22:00:01|    1901|    1896|           11.0|          -11.0|D10|        Slav Defense|      300+5|Time forfeit|1. d4 d5 2. c4 c6...|
|            Blitz |         go4jas|Sergei1973|   0-1|2016.06.30|2023-11-11 22:00:01|    1641|    1627|          -11.0|           12.0|C20|King's Pawn Openi...|      300+0|

### Check for row duplication

In [6]:
df1\
.groupBy(df1.columns)\
.count().\
where(col('count') > 1)\
.select(sum('count'))\
.withColumnRenamed("sum(count)", "dupliacte")\
.show()

+---------+
|dupliacte|
+---------+
|        2|
+---------+



### Check for NULL, NAN

In [7]:
columns_to_check = [c for c in df1.columns if c != 'UTCTime']

df1.select([count(when(col(c).isNull(), c)).alias(c) for c in columns_to_check]).show()

+-----+-----+-----+------+-------+--------+--------+---------------+---------------+---+-------+-----------+-----------+---+
|Event|White|Black|Result|UTCDate|WhiteElo|BlackElo|WhiteRatingDiff|BlackRatingDiff|ECO|Opening|TimeControl|Termination| AN|
+-----+-----+-----+------+-------+--------+--------+---------------+---------------+---+-------+-----------+-----------+---+
|    0|    0|    0|     0|      0|       0|       0|           4668|           4668|  0|      0|          0|          0|  0|
+-----+-----+-----+------+-------+--------+--------+---------------+---------------+---+-------+-----------+-----------+---+



In [8]:
columns_to_check = [c for c in df1.columns if c != 'UTCTime']

df1.select([count(when(isnan(c), c)).alias(c) for c in columns_to_check]).show()

+-----+-----+-----+------+-------+--------+--------+---------------+---------------+---+-------+-----------+-----------+---+
|Event|White|Black|Result|UTCDate|WhiteElo|BlackElo|WhiteRatingDiff|BlackRatingDiff|ECO|Opening|TimeControl|Termination| AN|
+-----+-----+-----+------+-------+--------+--------+---------------+---------------+---+-------+-----------+-----------+---+
|    0|    0|    0|     0|      0|       0|       0|              0|              0|  0|      0|          0|          0|  0|
+-----+-----+-----+------+-------+--------+--------+---------------+---------------+---+-------+-----------+-----------+---+



In [9]:
df1.filter(col("UTCTime").isNull()).count()

0

## Preprocessing

### Drop duplicate rows

In [10]:
df2 = df1.dropDuplicates().drop("AN")

## Format and filter an Event Column 

In [11]:
df3 = df2.filter(
    (df2["Event"].contains("Blitz")) |
    (df2["Event"].contains("Classic")) |
    (df2["Event"].contains("Bullet"))
)

In [12]:
df4 = df3.withColumn("Event", when(df3["Event"].contains("Blitz"), "Blitz").when(df3["Event"].contains("Classic"), "Classic").when(df3["Event"].contains("Bullet"), "Bullet").otherwise(df3["Event"]))

In [13]:
df5 = df4.filter(
    (df4["Result"] != '*') |
    (df4["WhiteRatingDiff"].isNotNull()) |
    (df4["WhiteRatingDiff"].isNotNull())
)

In [14]:
df6 = df5.withColumn("Result", when((col("WhiteRatingDiff") < 0) & (col("BlackRatingDiff") > 0), "0-1")
    .when((col("WhiteRatingDiff") > 0) & (col("BlackRatingDiff") < 0), "1-0")
    .when((col("WhiteRatingDiff") == 0) & (col("BlackRatingDiff") == 0), "1/2-1/2")
    .otherwise(col("Result"))
)

In [None]:
mean_white_rating_diff = df.filter(col("Result") == "1-0" & (col("WhiteRatingDiff") > 0)).agg(avg("WhiteRatingDiff")).collect()[0][0]

In [None]:
df2.show(5)

In [None]:
df2.groupBy('Event').count().show()

In [None]:
df2.groupBy('Result').count().show()

In [None]:
df2.groupBy('Termination').count().show()

In [None]:
df1.where("Termination = 'Rules infraction'").show()

In [None]:
df1.where("Termination = 'Abandoned'").show()

In [None]:
df2.where("Result = '*'").show()

In [None]:
df2.where("Result = '*'").groupBy('Termination').count().show()

In [None]:
df2.show()

In [None]:
columns_to_check = [c for c in df1.columns if c != 'UTCTime']

df2.select([count(when(col(c).isNull(), c)).alias(c) for c in columns_to_check]).show()

In [None]:
df5.where("Result = '*'").show()

In [None]:
df5.filter(col("WhiteRatingDiff").isNull() & col("BlackRatingDiff").isNull() & (col("Result") == '*')).show()

In [None]:
df6.filter(col("WhiteRatingDiff").isNull() & col("BlackRatingDiff").isNull() & (col("Result") != '*')).show()

In [None]:
df5.filter(col("WhiteRatingDiff").isNotNull() & col("BlackRatingDiff").isNotNull() & (col("Result") == '*')).show()

In [None]:
df6.filter(col("WhiteRatingDiff").isNotNull() & col("BlackRatingDiff").isNotNull() & (col("Result") == '*')).show()