# Imports

In [1]:
import pandas as pd
import random
import math
import sklearn
import numpy as np
from sklearn.base import TransformerMixin
from sklearn.metrics import accuracy_score
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import mean_squared_error
from statistics import mean
from sklearn.pipeline import Pipeline

# Initial Loading of Data

In [2]:
raw_data = pd.read_csv(r'..\data\data_train.csv')

# Feature Engineering

In [3]:
class CustomDummifier(TransformerMixin):
    def __init__(self, cols=None):
        self.cols = cols

    def transform(self, X):
        return pd.get_dummies(X, columns=self.cols)

    def fit(self, *_):
        return self

### Imputing Missing Ratings

In [4]:
class RatingImputer(TransformerMixin):
    def __init__(self, cols=None, strategy='0'):
        self.cols = cols
        self.strategy = strategy

    def transform(self, X):
        X[self.cols] = X[self.cols].replace(to_replace = 0, value = np.nan)
        if self.strategy == '0':
            X[self.cols] = X[self.cols].fillna(value = 0)
        if self.strategy == 'row_mean':
            X[self.cols] = X[self.cols].fillna(X[self.cols].mean())
            
        return X

    def fit(self, *_):
        return self

### Rating Differences

In [5]:
class RatingDifferences(TransformerMixin):
    def __init__(self, white_cols=None, black_cols=None):
        self.white_cols = white_cols
        self.black_cols = black_cols

    def transform(self, X):
        for white, black in zip(self.white_cols, self.black_cols):
            X[white[6:] + "_difference"] = X[white] - X[black]
        
        X.drop(columns=self.white_cols, inplace=True)
        X.drop(columns=self.black_cols, inplace=True)
        
        return X

    def fit(self, *_):
        return self

# Initial Modeling of Data

### Load Baseline Data

In [6]:
baseline_data = pd.read_csv(r'..\data\baseline.csv')

### Miscellaneous Methods

In [9]:
def round_to(x, to):
    return to * round(x/to)

def create_labels_data(df, label_column):
    data = df[[column for column in df if column != label_column]]    
    labels = df[label_column]
    
    return data, labels

def cross_val_accuracy(estimator, X, y, num_cuts=5):
    chunks = np.array_split(list(X['id'].unique()), num_cuts)
    scores = []
    rmse = []
    
    for i, ids in enumerate(chunks):
        fit_index = list(X[~X['id'].isin(ids)].index)
        score_index = list(X[X['id'].isin(ids)].index)
        
        estimator.fit(X.drop(columns=['id']).iloc[fit_index], y.iloc[fit_index])
        scores.append(estimator.score(X.drop(columns=['id']).iloc[score_index], y.iloc[score_index]))
    
    return scores, mean(scores)

def cross_val_rmse(estimator, X, y, num_cuts=5):
    chunks = np.array_split(list(X['id'].unique()), num_cuts)
    rmse = []
    
    for i, ids in enumerate(chunks):
        fit_index = list(X[~X['id'].isin(ids)].index)
        score_index = list(X[X['id'].isin(ids)].index)
        
        y_hat = [prob[1] for prob in estimator.predict_proba(X.drop(columns=['id']))]
        rmse.append(math.sqrt(mean_squared_error(y_hat, y)))
    
    return rmse, mean(rmse)

class BaselineClassifier:
    def __init__(self, winner_function):
        self.winner_function = winner_function
        
    def fit(self, X, y):
        return self
    
    def predict(self, X):
        return [1 if self.winner_function(X) else 0] + [0 if self.winner_function(X) else 1]
    
    def score(self, X, y):
        return accuracy_score([self.predict(feature)[1] for index, feature in X.iterrows()], y)
    
class BaselinePredictor:
    def __init__(self, baseline_data):
        self.baseline_data = baseline_data
        
    def fit(self, X, y):
        return self
    
    def predict_proba(self, X, rating_difference=True):
        probs = []
        for index, feature in X.iterrows():
            prob = self.baseline_data[self.baseline_data['rating_difference'] == round_to(feature['white_rating']-feature['black_rating'], 10)]['winning_percentage'].mean()
            probs.append([prob, 1-prob])
        return probs

def model_eval(model):
    print(f"Accuracy Scores")
    print(f"Random Forest Accuracy: {cross_val_accuracy(model, X, y)}")
    print(f"Baseline Rating: {cross_val_accuracy(rating_baseline, X, y)}")
    print(f"Baseline Eval: {cross_val_accuracy(eval_baseline, X, y)}")

    print(f"\nLoss Scores")
    print(f"Random Forest RMSE: {cross_val_rmse(model, X, y)}")
    print(f"Baseline RMSE: {cross_val_rmse(prob_baseline, X, y)}")

In [8]:
rating_baseline = BaselineClassifier(lambda row: row['rating_difference'] >= 0)
eval_baseline = BaselineClassifier(lambda row: row['eval'] >= 0)
prob_baseline = BaselinePredictor(baseline_data)
rf = RandomForestClassifier(n_estimators=500, max_features="log2", min_samples_leaf=5, n_jobs=-1)

### Test Modeling

In [None]:
cd = CustomDummifier(cols=['perf'])
ri = RatingImputer(cols=[column for column in raw_data.columns if 'rating' in column], strategy='0')
rd = RatingDifferences(white_cols = [column for column in raw_data.columns if ('rating' in column) and ('white' in column)],
                       black_cols = [column for column in raw_data.columns if ('rating' in column) and ('black' in column)])

In [None]:
pipeline = Pipeline([("dummify", cd), ('imputer', ri), ('differences', rd)])

data = pipeline.fit_transform(raw_data)

X, y = create_labels_data(data, 'winner')

model_eval(rf)

In [None]:
cd = CustomDummifier(cols=['perf'])
ri = RatingImputer(cols=[column for column in raw_data.columns if 'rating' in column], strategy='row_mean')
rd = RatingDifferences(white_cols = [column for column in raw_data.columns if ('rating' in column) and ('white' in column)],
                       black_cols = [column for column in raw_data.columns if ('rating' in column) and ('black' in column)])

In [None]:
pipeline = Pipeline([("dummify", cd), ('imputer', ri), ('differences', rd)])

data = pipeline.fit_transform(raw_data)

X, y = create_labels_data(data, 'winner')

model_eval(rf)

In [None]:
cd = CustomDummifier(cols=['perf'])
ri = RatingImputer(cols=[column for column in raw_data.columns if 'rating' in column], strategy='row_mean')
rating_baseline = BaselineClassifier(lambda row: row['white_Rating'] >= row['black_Rating'])

In [None]:
pipeline = Pipeline([("dummify", cd), ('imputer', ri)])

data = pipeline.fit_transform(raw_data)

X, y = create_labels_data(data, 'winner')

model_eval(rf)

### Final Model

In [7]:
cd = CustomDummifier(cols=['perf'])
ri = RatingImputer(cols=[column for column in raw_data.columns if 'rating' in column], strategy='row_mean')

In [10]:
pipeline = Pipeline([("dummify", cd), ('imputer', ri)])

data = pipeline.fit_transform(raw_data)

X, y = create_labels_data(data, 'winner')

In [11]:
column_order = ['id', 'perf_blitz', 'perf_bullet', 'perf_classical', 'perf_rapid',
    'perf_ultrabullet', 'white_rating', 'white_ultrabullet_rating',
    'white_bullet_rating', 'white_blitz_rating', 'white_rapid_rating',
    'white_classical_rating', 'white_correspondence_rating', 'black_rating',
    'black_ultrabullet_rating', 'black_bullet_rating', 'black_blitz_rating',
    'black_rapid_rating', 'black_classical_rating', 'black_correspondence_rating',
    'clock_initial', 'clock_increment', 'white_clock', 'black_clock', 'eval', 'is_mate']

X = X[column_order]

In [12]:
rf = RandomForestClassifier(n_estimators=500, max_features="log2", min_samples_leaf=5, n_jobs=-1)
rf.fit(X.drop(columns=['id']), y)

RandomForestClassifier(max_features='log2', min_samples_leaf=5,
                       n_estimators=500, n_jobs=-1)

### Save Model

In [13]:
from joblib import dump, load
dump(rf, r'..\models\rf.joblib') 

['..\\models\\rf.joblib']