# FASTROLL

TODO: write smt about project

## Data generation

We decided to gather data from 3 different sources.
1) Kaggle data set https://www.kaggle.com/milesh1/35-million-chess-games
2) Random moves
3) Random FEN

Each set of data consist of 100,000 data points:
    FEN record -> Win or Lose

Kaggle set was genereted by taking pre-existing games, then we would extract each state from a game in FEN.
Random moves was done in similar maner, only the games were playout randomly at time of gerneration.
Random FEN is just a random placement of the pieces.

Win or Lose was obtained by evaluating each FEN by stockfish.

## Imports and utility functions

In [None]:
import numpy as np
import scipy.optimize as sopt
import graphviz
import pandas as pd
import random
from typing import Iterable
import chess
from chess import Board
from matplotlib import pyplot as plt
from collections import defaultdict
from tqdm.auto import tqdm

def valign(v):
    "Align vector vertically."
    return v.reshape(-1, 1)


def slog(x):
    "Numerically stable logarithm."
    return np.log(x + 1e-10)

## Model
### Logistic regression

In [None]:
def sigmoid(x):
    "Numerically stable sigmoid function."

    def _positive_sigmoid(x):
        return 1 / (1 + np.exp(-x))

    def _negative_sigmoid(x):
        exp = np.exp(x)
        return exp / (exp + 1)

    positive = x >= 0
    negative = ~positive
    result = np.empty_like(x)
    result[positive] = _positive_sigmoid(x[positive])
    result[negative] = _negative_sigmoid(x[negative])
    return result


def logreg(X, Theta):
    return sigmoid(X @ Theta)


def logreg_loss(Theta, X, Y, alpha):
    "Logistic regression cost suitable for use with fmin_l_bfgs."

    # Reshape Theta into a column vector - lBFGS gives us a flat array
    ThetaR = valign(Theta)

    hx = logreg(X, ThetaR)
    nll = -np.sum(Y.T @ slog(hx) + (1-Y).T @ slog(1-hx)) + alpha * (ThetaR @ ThetaR.T)
    grad = X.T @ (hx - Y)

    # Reshape grad into the shape of Theta, for fmin_l_bfsgb to work
    return nll, grad.reshape(Theta.shape)


class LogisticRegression:
    def __init__(self, X, Y, alpha=0, theta: np.ndarray = None):
        if theta is not None:
            self.theta = theta
            return

        assert X.shape[0] == Y.shape[0], "shape mismatch!"
        assert Y.shape[1] == 1, "expected a vertical vector for Y!"

        # Call a solver
        self.theta = sopt.fmin_l_bfgs_b(
            lambda Theta: logreg_loss(Theta, X, Y, alpha), np.zeros(X.shape[1])
        )[0]

    def error(self, X, Y):
        "Return percentage of failed predictions."
        assert X.shape[0] == Y.shape[0], "shape mismatch!"
        assert Y.shape[1] == 1, "expected a vertical vector for Y!"

        preds = valign(np.round(self.__call__(X)).astype(int))
        return np.sum(np.abs(Y - preds)) / len(Y)

    def __call__(self, X):
        assert X.shape[1] == self.theta.shape[0], "theta shape mismatch!"
        return logreg(X, self.theta)

    def __repr__(self):
        return str(self.theta)

### Forests implemenation

In [None]:
def entropy(counts):
    P = counts/(np.sum(counts))
    return -np.sum(P*np.log2(P + 1e-10))


def gini(counts):
    return 1 - np.sum((counts / np.sum(counts))**2)


def mean_err_rate(counts):
    return 1 - np.max(counts)/np.sum(counts)


class AbstractSplit:
    """Split the examples in a tree node according to a criterion."""

    def __init__(self, attr):
        self.attr = attr
        self.purity_gain = 0.0

    def mean_purity(self, df, purity_function):
        """Return mean purity across all split's children."""
        def purity(child):
            return purity_function(child['target'].value_counts()) * len(child)
        return sum(purity(child) for _, child in df.groupby(self.attr)) / len(df)

    def __call__(self, x):
        """Return the subtree corresponding to x."""
        raise NotImplementedError

    def build_subtrees(self, df, subtree_kwargs):
        """Recuisively build the subtrees."""
        raise NotImplementedError

    def iter_subtrees(self):
        """Return an iterator over subtrees."""
        raise NotImplementedError

    def add_to_graphviz(self, dot):
        """Add the split to the graphviz vizalization."""
        raise NotImplementedError

    def __str__(self):
        return f"{self.__class__.__name__}: {self.attr}"


def get_split(df, purity_function=entropy, nattrs=None):
    target_value_counts = df["target"].value_counts()
    if len(target_value_counts) == 1:
        return None

    possible_splits = [attr for attr in df.columns if attr !=
                       'target' and len(df[attr].unique()) > 1]
    if not possible_splits:
        return None

    # Random Forest support
    # restrict possible_splits to a few radomly selected attributes
    if nattrs is not None:
        random.shuffle(possible_splits)
        possible_splits = possible_splits[:nattrs]

    splits = [
        CategoricalMultivalueSplit(attr)
        for attr in possible_splits
    ]

    best_split = min(splits, key=lambda s: s.mean_purity(df, purity_function))
    base_purity = purity_function(target_value_counts)
    best_split.purity_gain = base_purity - \
        best_split.mean_purity(df, purity_function)
    if best_split.purity_gain >= 0:
        return best_split
    return None


class CategoricalMultivalueSplit(AbstractSplit):
    def __call__(self, x):
        a = x[self.attr]
        if a in self.subtrees:
            return self.subtrees[a]
        return None

    def build_subtrees(self, df, subtree_kwargs):
        self.subtrees = {}
        for group_name, group_df in df.groupby(self.attr):
            child = Tree(group_df, **subtree_kwargs)
            self.subtrees[group_name] = child

    def iter_subtrees(self):
        return self.subtrees.values()

    def add_to_graphviz(self, dot, parent, print_info):
        for split_name, child in self.subtrees.items():
            child.add_to_graphviz(dot, print_info)
            dot.edge(f"{id(parent)}", f"{id(child)}", label=f"{split_name}")


class Tree:
    def __init__(self, df, **kwargs):
        super().__init__()

        # Technicality:
        # We need to let subtrees know about all targets to properly color nodes
        # We pass this in subtree arguments.
        if "all_targets" not in kwargs:
            kwargs["all_targets"] = sorted(df["target"].unique())

        # Save keyword arguments to build subtrees
        kwargs_orig = dict(kwargs)

        # Get kwargs we know about, remaning ones will be used for splitting
        self.all_targets = kwargs.pop("all_targets")

        # Save debug info for visualization
        # Debugging tip: contents of self.info are printed in tree visualizations!
        self.counts = df["target"].value_counts()
        self.info = {
            "num_samples": len(df),
            "entropy": entropy(self.counts),
            "gini": gini(self.counts),
        }

        self.split = get_split(df, **kwargs)
        if self.split:
            self.split.build_subtrees(df, kwargs_orig)

    def __call__(self, sample):
        return self.get_target_distribution(sample).idxmax()

    def prune_confidence(self):
        if self.split is None:
            return 0.0

        z = 1.96  # 95% confidence level

        def confidence(counts):
            err = mean_err_rate(counts)
            return err + z * np.sqrt(err*(1-err)/np.sum(counts))

        node_confidence = self.info['confidence_interval'] = confidence(
            self.counts)
        if any(node_confidence <= tree.prune_confidence() for tree in self.split.iter_subtrees()):
            self.split = None
        return node_confidence

    def leaf(self, sample):
        if self.split is None:
            return self
        subtree = self.split(sample)
        return subtree.leaf(sample) if subtree else self

    def measure_purity_gain(self, attr):
        if self.split == None:
            return 0.0
        purity = sum(subtree.measure_purity_gain(attr)
                     for subtree in self.split.iter_subtrees())
        if self.split.attr == attr:
            purity += self.split.purity_gain
        return purity

    def get_target_distribution(self, sample):
        # descend into subtrees and return the leaf target distribution
        return self.leaf(sample).counts

    def draw(self, print_info=True):
        dot = graphviz.Digraph()
        self.add_to_graphviz(dot, print_info)
        return dot

    def add_to_graphviz(self, dot, print_info):
        freqs = self.counts / self.counts.sum()
        freqs = dict(freqs)
        colors = []
        freqs_info = []
        for i, c in enumerate(self.all_targets):
            freq = freqs.get(c, 0.0)
            if freq > 0:
                colors.append(f"{i%9 + 1};{freq}")
                freqs_info.append(f"{c}:{freq:.2f}")
        colors = ":".join(colors)
        labels = [" ".join(freqs_info)]
        if print_info:
            for k, v in self.info.items():
                labels.append(f"{k} = {v}")
        if self.split:
            labels.append(f"split by: {self.split.attr}")
        dot.node(
            f"{id(self)}",
            label="\n".join(labels),
            shape="box",
            style="striped",
            fillcolor=colors,
            colorscheme="set39",
        )
        if self.split:
            self.split.add_to_graphviz(dot, self, print_info)


def error(tree, test: pd.DataFrame):
    predcts = test.apply(lambda row: tree(row), axis=1).values
    targets = test['target'].values
    return sum(predcts[i] != targets[i] for i in range(len(test))) / len(test)


class RandomForest(list):
    def __init__(self, training_set, target_column_name="target", nattrs=1, size=0, pruning=False):
        self.train = training_set.rename(
            columns={target_column_name: "target"}, inplace=False)
        self.targets = target_column_name
        self.nattrs = nattrs

        # Initialize with size
        for _ in range(size):
            self.add_tree()

        if pruning:
            for tree in self:
                tree.prune_confidence()

    def __call__(self, sample):
        votes = [tree(sample) for tree in self]
        return pd.Series(data=votes).value_counts().idxmax()

    def add_tree(self):
        bootstrap = self.train.sample(frac=1.0, replace=True)
        tree = Tree(bootstrap, nattrs=self.nattrs)
        self.append(tree)

    def forest_error(self, test):
        return error(self, test.rename(columns={self.targets: "target"}, inplace=False))

    def trees_agreement(self, test_df):
        agreement = 0
        for _, sample in test_df.iterrows():
            votes = [tree(sample) for tree in self]
            pred = pd.Series(data=votes).value_counts().idxmax()
            agreement += np.sum(pred == votes)/len(votes)
        return agreement/len(test_df)

### Data and encoding

In [None]:
kaggle_df   = pd.read_csv('../datasets/kaggle.csv')
random_df   = pd.read_csv('../datasets/random.csv')
rollouts_df = pd.read_csv('../datasets/rollouts.csv')
kaggle_df.columns   = ["FEN", "Winner"]
random_df.columns   = ["FEN", "Winner"]
rollouts_df.columns = ["FEN", "Winner"]

In [None]:
chess_pieces = dict(zip(".prnbqkPRNBQK", range(13)))
weights = {
    "P": 1, "N": 3, "B": 3,
    "R": 5, "Q": 9, "K": 0
}


def squares(fen: str):
    """Encode game state as a vector of squares,
    where each square is associated with the piece
    that it is occupied by or with zero otherwise."""
    board = str(Board(fen))
    return np.array([chess_pieces[p] for p in board if p not in '\n '])


def one_hot_enc_piece(piece: str):
    """One-hot encode chess piece as 12-dimensional (6 types of pieces,
    2 colors each) vector v, where \n
    v_{0}  indicates black pawn, \n
    v_{1}  indicates black rook, \n
    v_{2}  indicates black knight, \n
    v_{3}  indicates black bishop, \n
    v_{4}  indicates black queen, \n
    v_{5}  indicates black king, \n
    v_{6}  indicates white pawn, \n
    v_{7}  indicates white rook, \n
    v_{8}  indicates white knight, \n
    v_{9}  indicates white bishop, \n
    v_{10} indicates white queen, \n
    v_{11} indicates white king.
    """
    if piece == '.':
        return np.zeros(len(chess_pieces) - 1)
    index = chess_pieces[piece] - 1
    zeros_before = [0] * index
    # Why -2:
    # The 1st -1 becasue we have to count in dot at the beginning of `chess_pieces`.
    # The 2nd -1 to get index of last piece.
    zeros_after = [0] * (len(chess_pieces) - 2 - index)
    return np.array(zeros_before + [1] + zeros_after)


def binary(fen: str):
    """Encode game state as a 773-dimensional vector v, where for i in
    {0, 12, ..., 756}
    v_{i}   indicates that there's black pawn on square floor(i/12), \n
    v_{i+1} indicates that there's black rook on square floor(i/12), \n
    v_{i+2} indicates that there's black knight on square floor(i/12), \n
    v_{i+3} indicates that there's black bishop on square floor(i/12), \n
    v_{i+4} indicates that there's black queen on square floor(i/12), \n
    v_{i+5} indicates that there's black king on square floor(i/12), \n
    v_{i+6}  indicates that there's white pawn on square floor(i/12), \n
    v_{i+7}  indicates that there's white rook on square floor(i/12), \n
    v_{i+8}  indicates that there's white knight on square floor(i/12), \n
    v_{i+9}  indicates that there's white bishop on square floor(i/12), \n
    v_{i+10} indicates that there's white queen on square floor(i/12), \n
    v_{i+11} indicates that there's white king on square floor(i/12). \n

    Squares are ordered in row-major way from top left (A8) to bottom right (H1). \n

    v_768 indicates player to move, \n
    v_769 indicates whether black has kingside castling right, \n
    v_770 indicates whether black has queenside castling right, \n
    v_771 indicates whether white has kingside castling right, \n
    v_772 indicates whether white has queenside castling right. \n

    \"indicates\" means \"is set to 0 or 1 depending on whether condition is
    false or true\"."""
    board = Board(fen)
    encoding = np.array([])
    for p in str(board):
        if p in '\n ':
            continue
        encoding = np.hstack((encoding, one_hot_enc_piece(p)))
    encoding = np.hstack((encoding, int(board.turn)))
    castling_rights = []
    for castling_rook_pos in [chess.BB_H8, chess.BB_A8, chess.BB_H1, chess.BB_A1]:
        castling_rights.append(
            int(bool(board.castling_rights & castling_rook_pos)))
    encoding = np.hstack((encoding, np.array(castling_rights)))
    return encoding


def advantage(fen: str):
    """Calculate player advantage based only on moves and pieces they possess.

        - move advantage is the difference between the number of moves
        available to each player
        - material advantage is the difference of sums of weights of all the pieces
        on the board

    Values are positive if White has the advantage and negative otherwise.
    """
    board = Board(fen)
    material_advantage = sum(
        weights[p] if p in weights else -weights[p.upper()]
        for p in str(board) if p not in '\n .'
    )
    board.turn = 1
    move_advantage = len(list(board.legal_moves))
    board.turn = 0
    move_advantage -= len(list(board.legal_moves))
    return np.array([move_advantage, material_advantage])