# Setup Environment

In [None]:
import os
import sys
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

In [None]:
def init_spark():
    return SparkSession \
        .builder \
        .master("local[4]") \
        .appName("Chess Preprocess") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()

spark = init_spark()

In [None]:
seed = 42

# Load Dataset

In [None]:
df = spark.read.csv('data/games.csv', header=True, inferSchema=True)

# Data Inspection

In [None]:
import pandas as pd

In [None]:
df.toPandas().head()

## Inspect Datatypes

In [None]:
df.dtypes

In [None]:
categorical = [feature for feature, dtype in df.dtypes if dtype in {'string', 'boolean'}]
numerical = [feature for feature, dtype in df.dtypes if dtype in {'double', 'int'}]

### Numerical Features

In [None]:
for col in numerical:
    unique_values = df.select(col).distinct()
    n_unique = unique_values.count()
    if n_unique < 50:
        print(f'{col:20s}:{[row[col] for row in unique_values.collect()]}')
    else:
        print(f'{col:20s}:{n_unique} unique values')

- `created_at`: Timestamp in UTC
- `last_move_at`: Timestamp in UTC
- `turns`: Number of turns in the match
- `white_rating`: white player rating
- `black_rating`: black player rating
- `opening_ply`: Number of plies used to set up opening

### Categorical Features

In [None]:
for col in categorical:
    unique_values = df.select(col).distinct()
    n_unique = unique_values.count()
    if n_unique < 50:
        print(f'{col:20s}:{[row[col] for row in unique_values.collect()]}')
    else:
        print(f'{col:20s}:{n_unique} unique values')

- `id`: Game ID, uniquely identifies a match record
- `rated`: If rated, the game result affects player ratings
- `victory_status`: How the game ended
- `winner`: Match winner
- `increment_code`: Game time setting
- `white_id`: white player id
- `black_id`: black player id
- `moves`: Sequence of moves recorded during the match
- `opening_eco`: ECO classification code for the chess openings moves
- `opening_name`: Name of opening moves

## Overview of the Target Columns

In [96]:
df.groupBy('winner').count().show()

+------+-----+
|winner|count|
+------+-----+
| white|10001|
| black| 9107|
|  draw|  950|
+------+-----+



In [64]:
df.withColumn('rating_diff', df.black_rating - df.white_rating).describe('black_rating', 'white_rating', 'rating_diff').show()

+-------+------------------+
|summary|       rating_diff|
+-------+------------------+
|  count|             19629|
|   mean| -7.76560191553314|
| stddev|248.75135625634906|
|    min|             -1499|
|    max|              1605|
+-------+------------------+



## Look for Anomalies

In [None]:
print( f'unique samples / total samples: {df.distinct().count()} / {df.count()} ' )

In [None]:
print('Count Null Values in each Column')
df.select([F.count(F.when(F.isnull(col), col)).alias(col) for col in df.columns]).toPandas()

- There are duplicate rows, which should be dropped from the dataset.
- Luckily, this dataset does not contain missing values.

# Data Preparation (Preprocessing)
Scikit-Learn offers a range of useful methods for preprocessing and data splits. With the approval from the course instructor, we will transform the datasets into *Pandas DataFrames* in this part.

In [None]:
from sklearn.preprocessing import OneHotEncoder
from pyspark.ml.feature import StringIndexer, VectorAssembler

## Fix Anomalies

In [None]:
# Drop Duplicates
df = df.distinct()

## Label `winner`


In [None]:
label_indexer = StringIndexer(inputCol='winner', outputCol='winner_label')
label_indexer_model = label_indexer.fit(df)
df = label_indexer_model.transform(df)
df.select('winner', 'winner_label').distinct().show()
df = df.drop('winner').withColumnRenamed('winner_label', 'winner')

## Feature `increment_code`
As per inspection, the feature contains a string with two numbers separated by `+`. After research, we found that the first number refers to initial total clock time per player in *minutes*; the second number refers to the number of *seconds* added to the total clock time after the player makes a move. We decided to extract these two numbers as two separate features `clock` and `increment` replacing `increment_code`.

In [None]:
df.select('increment_code').head(3)

In [None]:
splits = F.split(df.increment_code, '[+]')
df = df.withColumn('clock', splits.getItem(0).cast('Integer')).withColumn('increment', splits.getItem(1).cast('Integer'))
df.select('clock', 'increment').tail(3)

## Feature `opening_eco`
As per inspection, each row of this feature is a concatenation of a letter that denotes an [opening moves category](https://www.365chess.com/eco.php). Although there are variations within each category, we assume that the opening moves in each category to be similar enough that we can ignore the differences within each opening move category. Therefore, we extract the first letter from `opening_eco` to a new feature `open_cat` and ignore `opening_eco` during training.

In [None]:
df.select('opening_eco').head(3)

In [None]:
df = df.withColumn('open_category', df.opening_eco.substr(0, 1))
df.select('open_category').distinct().show()

In [None]:
open_cat_indexer = StringIndexer(inputCol='open_category', outputCol='open_cat')
open_cat_model = open_cat_indexer.fit(df)
df = open_cat_model.transform(df)
df.select('open_category', 'open_cat').distinct().show()

## Feature `rated`
Contains boolean values only and does not require preprocessing. Rated games affect player ratings and may affect performance of the player. We may choose to separate rated games from unrated games for this fact, here we did not.

## Split into Train/Test Sets
1. Encode player ids into numerical values.
    - Ensure that each player is assigned one and only one numerical id.
2. Use encoded player ids to split dataset based on groups.
    - Each group contains the matches played by one player. Make sure that each group are sampled evenly in the training set.

### Scikit-Learn Split methods cannot handle distributed datasets. Because we want to leverage the `GroupSplit` method in Scikit-Learn, we are turning the Spark DF into Pandas DF.

In [None]:
df_pd = df.toPandas()

### Encode categorical player `white_id`s `black_id`s to split based on individual players

In [None]:
player_ids = pd.DataFrame(df_pd[['white_id', 'black_id']])
player_ids = player_ids.stack().pipe(lambda s: pd.Series(pd.factorize(s.values)[0], s.index)).unstack()

df_pd['white_id_num'] = player_ids['white_id']
df_pd['black_id_num'] = player_ids['black_id']

df_pd[['white_id', 'white_id_num', 'black_id', 'black_id_num']]

### Data Split

In [None]:
from sklearn.model_selection import GroupShuffleSplit

In [None]:
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=seed)
train_idx, test_idx = next(gss.split(df_pd, df_pd['winner'], groups=df_pd['white_id_num']))

df_train = df_pd.iloc[train_idx]

df_test = df_pd.iloc[test_idx]

In [None]:
# Proof that none of the IDs from the training set are present in the test set.
df_train_values = set(df_train['white_id'].values)
df_test_values = set(df_test['white_id'].values)

assert len(df_train_values.intersection(df_test_values)) == 0, 'A group is present in both training and test sets'

## Feature `victory_status`
From inspection, we observe 4 unique values for this categorical feature: `'resign', 'outoftime', 'mate', 'draw'`. We perform one-hot encoding before feeding this into the model.

In [None]:
enc = OneHotEncoder(sparse=False, dtype=int, handle_unknown='ignore')
enc.fit(df_train['victory_status'].to_numpy().reshape(-1, 1))
encoded_feature_names = 'status_' + enc.categories_[0]
encoded_feature_names

In [None]:
one_hot_status = enc.transform(df_train['victory_status'].to_numpy().reshape(-1, 1))
df_train.loc[:,encoded_feature_names] = one_hot_status

one_hot_status = enc.transform(df_test['victory_status'].to_numpy().reshape(-1, 1))
df_test.loc[:,encoded_feature_names] = one_hot_status
df_train[encoded_feature_names].head()

## Create Spark DF from Pandas DF Train/Test Set

In [None]:
df_train = spark.createDataFrame(df_train)

df_test = spark.createDataFrame(df_test)

## Feature `moves`
As per inspection, this feature records the list/sequence of moves during the match. Because that the sequence, if parsed correctly, actually indicates the winning player, we need to either drop this feature or obfuscate this information. We find that Spark supports Word2Vec feature transformation, which will turn the sequence into a vector. Such vector representation is independent of the order of the moves, and this can obfuscate part of the information lied within this feature.

In [None]:
from pyspark.ml.feature import Word2Vec

In [None]:
df_train.select('moves').head().moves

In [None]:
vectorSize = 100
word2Vec = Word2Vec(vectorSize=vectorSize, seed=seed, inputCol="moves_list", outputCol="moves_vec")

Parameters for `Word2Vec`
- `vectorSize`: size of the output vector, the choice of 100 is arbitrary here
- `minCount`: Ignores all words with total frequency lower than this.
- `inputCol`: `moves_list` is the input feature, a list of `moves` split by space
- `outputCol`: `moves_vec` is the output feature, the transformed vector

The Word2Vec model will be trained with the corpus gathered from the `moves` in the training set. After which, it will transform both training and testing datasets' `moves` feature.

In [None]:
# split moves from string into list of strings, moves -> [move, move, ...]
train_moves_corpus = df_train.select(F.split(df_train.moves, '\s', -1).alias('moves_list'))
train_moves_corpus #.collect()[0][0]

In [None]:
fitted_word2Vec = word2Vec.fit(train_moves_corpus)
print('Trained Word2Vec Model')
fitted_word2Vec.getVectors().show(5)

In [None]:
'Transformed Moves for the 1st Match in Training Set: ', fitted_word2Vec.transform(train_moves_corpus).head().moves_vec

In [None]:
df_train = df_train.withColumn('moves_list', F.split(df_train.moves, '\s', -1))
df_train = fitted_word2Vec.transform(df_train)

df_test = df_test.withColumn('moves_list', F.split(df_test.moves, '\s', -1))
df_test = fitted_word2Vec.transform(df_test)

# Save Datasets

In [None]:
df_train.toPandas().to_csv('data/train.csv', index=False)
df_test.toPandas().to_csv('data/test.csv', index=False)