In [None]:
import json
import os
import re

from matplotlib import pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns

%matplotlib inline

In [None]:
DATA_ROOT = './data'
PGN = os.path.join(DATA_ROOT, 'data.pgn')
STOCKFISH = os.path.join(DATA_ROOT, 'stockfish.csv')

# Get to know the data

### First, let's take a look at the PGN file

In [None]:
with open(PGN) as f:
    for i in range(100):
        print(next(f))

Very well structured text data. We need a function to parse it into a DataFrame for easier processing.

In [None]:
def pgn2dataframe(file_obj):
    """Read a PGN file and return a `pandas.DataFrame` containing all data."""
    
    # Store all match data as a list of dictionary
    json_objs = []
    entry = {}
    # Since move sequences span multiple lines, we should
    # store all lines and join them at appropriate time
    move_seq = []
    # Regex pattern for metadata lines
    pattern = re.compile(r'\[([A-Za-z]+) "(.*)"\]')
    
    # Loop through each line in the file, building the list of match data
    for row in file_obj:
        row_ = row.strip()
        m = pattern.match(row_)
        if m:
            # Metadata line
            entry[m.group(1)] = m.group(2)
        else:
            if not row_:
                # Empty line: skip
                continue
            for result in '0-1', '1-0', '1/2-1/2':
                if row_.endswith(result):
                    # Last row of move sequence.
                    # Remove the result at the end
                    t = row_[:-len(result)].strip()
                    if t:
                        move_seq.append(t)
                    entry['Moves'] = ' '.join(move_seq)
                    # Here we already have a complete record of 1 match
                    # Put the record to our list then reset the temporary
                    # object to record the next match
                    json_objs.append(entry)
                    entry = {}
                    move_seq = []
                    break
            else:
                    # Move sequence line
                    move_seq.append(row_)

    return pd.DataFrame(json_objs)

In [None]:
# Parse pgn data to json
# Might take a few seconds
with open(PGN) as f:
    all_matches = pgn2dataframe(f)

all_matches.info()

So far so good.
The only columns with null are `"BlackElo"` and `"WhiteElo"`, which are the ones we have to predict.

In [None]:
all_matches.head()

Let's see if we can drop those `"??"` columns

In [None]:
for col in 'Black', 'Date', 'Round', 'Site', 'White':
    print(all_matches[col].value_counts())

OK, we don't need those columns. 

In [None]:
all_matches.drop(columns=['Black', 'Date', 'Round', 'Site', 'White'], inplace=True)

We can also use the `"Event"` column as index column. Will come in handy later.

In [None]:
all_matches['Event'] = all_matches['Event'].astype(np.int64)
all_matches.set_index('Event', inplace=True)
all_matches.info()

### Now for the Stockfish file

It's already in CSV format. Convenient!

In [None]:
move_scores = pd.read_csv(STOCKFISH)
move_scores.info()

In [None]:
move_scores.head()

We can join the 2 data frames into 1 here.
Note that it won't work if we don't set `"Event"` to be the index column in both data frames.

In [None]:
all_matches = all_matches.join(move_scores.set_index('Event'))
all_matches.info()

In [None]:
all_matches.head()

Now we don't need `move_scores` anymore 

In [None]:
del move_scores

In [None]:
all_matches['WhiteElo'] = pd.to_numeric(all_matches['WhiteElo'])
all_matches['BlackElo'] = pd.to_numeric(all_matches['BlackElo'])

# Feature engineer & Visualization

@todo: I choose not to make use of the `"Moves"` column and remove it completely.
You can try to make use of it, for example by look up the first few moves to know the name of the opening pattern.

In [None]:
all_matches.drop('Moves', axis=1, inplace=True)

Let's make a new column `"WhitePoint"` to replace `"Result"`.
We don't need black point since we know that `black point + white point = 1`.

@todo: It might be better to treat `"Result"` as categorical data as there are only 3 possible values.

In [None]:
all_matches['WhitePoint'] = all_matches['Result'].apply(lambda x: eval(x[:len(x)//2]))
all_matches.drop(columns=['Result'], inplace=True)
all_matches.head()

In [None]:
all_matches['WhitePoint'].value_counts().plot.pie(autopct='%1.1f%%', startangle=90)

White wins 38.6% to Black's 30.3%. Let's look at the distribution of black and white's elo rating.

In [None]:
fig, axes = plt.subplots(1, 2, figsize=(15,8))
all_matches[all_matches['WhiteElo'].notnull()]['WhiteElo'].plot.hist(ax=axes[0], bins=20, title='WhiteElo')
all_matches[all_matches['BlackElo'].notnull()]['BlackElo'].plot.hist(ax=axes[1], bins=20, title='BlackElo')

The training set is unbalanced toward high levels players (rating over 2000).
This might cause some overfitting later.

Let's change the `"MoveScores"` column from strings to list of strings, and look at the length of the matches.

In [None]:
all_matches['MoveScores'] = all_matches['MoveScores'].apply(lambda x: x.split(' '))

In [None]:
plt.figure(figsize=(10,10))
all_matches['MatchLength'] = all_matches['MoveScores'].apply(len)
all_matches['MatchLength'].plot.hist(bins=20)

In [None]:
all_matches['MatchLength'].describe()

Most games last 60-100 moves, with some extreme cases.
There is no way we can predict the rating of a player with just one or 2 moves,
so we can consider too-short games anomalies and handle them accordingly.

@todo: Here I choose to remove them completely from the training set.
You might try a different strategy to handle short games

In [None]:
# All short games (< 8 moves)
short_games = all_matches[all_matches['MatchLength'] < 8]
len(short_games)

In [None]:
# Drop all short games
all_matches.drop(short_games.index, inplace=True)

Now we need to analyze the flow of the game. Here I choose to simply count the good/bad moves each player made.

In [None]:
def analyze_move_scores(
    move_scores,
    *,
    excellent_thres=300,
    good_thres=100,
    mistake_thres=-100,
    blunder_thres=-300,
):
    """
    Count the number of each player's moves in different categories.
    
    The categories are:
    - Excellent: Moves that give major advantage to the player
    - Good: Moves that give minor advantage to the player
    - Mistake: Moves that give minor advantage to the opponent
    - Blunder: Moves that give major advantage to the opponent
    
    We don't need to consider the moves that don't fall in the above categories,
    because it correlates heavily with those.
    
    @todo:
        Currently the threshold values are just some random numbers.
        They can be changed to some better values, or use an adaptive strategy
        to better classify them.
    """
    white = {'excellent': 0, 'good': 0, 'mistake': 0, 'blunder': 0}
    black = {'excellent': 0, 'good': 0, 'mistake': 0, 'blunder': 0}
    last_score = 0
    
    # Coefficient: white = 1, black = -1
    coef = 1
    for i, score_str in enumerate(move_scores):
        current_player = black if i % 2 == 1 else white
        try:
            score = int(score_str)
        except:
            score = last_score
        change = score - last_score
        point = coef * change
        if point >= excellent_thres:
            current_player['excellent'] += 1
        elif point >= good_thres:
            current_player['good'] += 1
        elif point <= blunder_thres:
            current_player['blunder'] += 1
        elif point <= mistake_thres:
            current_player['mistake'] += 1
        coef *= -1
        last_score = score
    
    white_move_count = (len(move_scores) + 1) // 2
    black_move_count = len(move_scores) // 2
    white_ratio = np.array(list(white.values())) / white_move_count
    black_ratio = np.array(list(black.values())) / black_move_count
    return np.concatenate([black_ratio, white_ratio])

In [None]:
(
    all_matches['black_excellent'],
    all_matches['black_good'],
    all_matches['black_mistake'],
    all_matches['black_blunder'],
    all_matches['white_excellent'],
    all_matches['white_good'],
    all_matches['white_mistake'],
    all_matches['white_blunder'],
) = zip(*all_matches['MoveScores'].apply(analyze_move_scores))

all_matches.head()

In [None]:
all_matches.describe()

We have no more use for `"MoveScores"`, so let's delete it

In [None]:
all_matches.drop('MoveScores', axis=1, inplace=True)

# Correlation

In [None]:
# Split into train and test set
train_data = all_matches[all_matches['WhiteElo'].notnull()]
test_data = all_matches[all_matches['WhiteElo'].isnull()]

In [None]:
corrmat = train_data.corr()
plt.subplots(figsize=(12,9))
sns.heatmap(corrmat, vmax=0.9, square=True)

`"WhitePoint"` correlates strongly with many other features.
We can remove it, but I choose to leave it for now.

# Building models

In [None]:
from sklearn.linear_model import ElasticNet, Lasso, BayesianRidge, LassoLarsIC
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
from sklearn.kernel_ridge import KernelRidge
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import RobustScaler
from sklearn.base import BaseEstimator, TransformerMixin, RegressorMixin, clone
from sklearn.model_selection import KFold, cross_val_score, train_test_split
from sklearn.metrics import mean_squared_error

In [None]:
def train_and_evaluate(model, x_train, y_train, n_folds=5):
    kf = KFold(n_folds, shuffle=True).get_n_splits(x_train.values)
    score = -cross_val_score(model, x_train.values, y_train.values, scoring='neg_mean_absolute_error', cv=kf)
    return score

In [None]:
x_train = train_data.drop(['BlackElo', 'WhiteElo'], axis=1)
y_train = train_data[['BlackElo', 'WhiteElo']]

In [None]:
ridge = KernelRidge(alpha=0.6, kernel='polynomial', degree=2, coef0=2.5)

In [None]:
score = train_and_evaluate(ridge, x_train, y_train)