# Introduction
---
**Rhino** is a deep learning chess engine largely inspired by the [DeepChess](http://www.cs.tau.ac.il/~wolf/papers/deepchess.pdf) paper released in 2016. A CNN has been add among other modifications to optimize learning. If you wish to read more about the higher level details you can read [this article](http://) discussing these topics. Below is everything it takes to play Rhino or build it from scratch.

Enjoy!

# Imports and Google Drive Mount
---

In [2]:
import chess
import chess.svg
import chess.pgn 
import numpy as np 
import random
import h5py
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import torch.nn.functional as F
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from IPython.display import SVG, display
import time

In [3]:
from google.colab import drive
drive.mount('/content/drive')
path='/content/drive/MyDrive/Colab_Static/Rhino/'

Mounted at /content/drive


# Getting Data
---
Load/format training data from the 
[CCRL dataset](http://ccrl.chessdom.com/ccrl/4040/games.html). 
Considering only non-draw games, we select 10 random positions within the game and add them to our training set. Positions are selected such that it wasn't within the first 5 moves and the previous move was not a capture. 


An instance is a 773 long array and its associated label is the outcome of the game
(1 = White wins, 0 = Black wins).

Our end dataset contains two million positions, half from games that white won and the other half from games black won. 

In [None]:
def get_positions(game,num_pos=10):
   legal_positions = []
   board = game.board()
   for i,move in enumerate(game.mainline_moves()):
      if board.is_capture(move) or i<5:
         board.push(move)
      else: #legal
         board.push(move) 
         legal_positions.append(board)
   return random.sample(legal_positions,num_pos)

In [4]:
def to_bitboard(board):
   bb = np.zeros((2,6,64),dtype=np.uint8) # players x pieces x board-size    
   for colour in range(2):
      for piece in range(6):
         for square in range(64):
            cur_piece = board.piece_at(square)
            if cur_piece is not None:
               if cur_piece.piece_type == piece+1 and cur_piece.color == bool(colour):
                  bb[colour][piece][square] = 1
   
   info = np.zeros(5,dtype=np.uint8)
   info[0] = board.has_kingside_castling_rights(chess.WHITE)
   info[1] = board.has_kingside_castling_rights(chess.BLACK)
   info[2] = board.has_queenside_castling_rights(chess.WHITE)
   info[3] = board.has_queenside_castling_rights(chess.BLACK)   
   info[4] = board.turn

   bb = bb.flatten()
   bb = np.concatenate((bb,info))
   return bb

In [None]:
def preprocess(num_samples,fn='CCRL-4040.[1199203].pgn'):
   X_win,Y_win,X_lose,Y_lose = [],[],[],[]
   n,m = 0,0
   all_games = open(fn)
   while n<num_samples or m<num_samples:
      game = chess.pgn.read_game(all_games)
      if game == None: # EOF
         print("EOF:", n, "games obtained")
      if game.headers['Result'] != '1-1': # not a draw         
         boards = get_positions(game)
         for board in boards:
            if game.headers['Result'] == '1-0' and n<num_samples:
               n+=1
               X_win.append(to_bitboard(board)) # white win
               Y_win.append(1)
            elif m<num_samples: 
               m+=1
               X_lose.append(to_bitboard(board)) # white lose 
               Y_lose.append(0)
   return X_win,Y_win,X_lose,Y_lose

In [None]:
# Collect Data
X_win,Y_win,X_lose,Y_lose = preprocess(1000e3)
assert len(X_win) == len(Y_win) and len(X_win) == len(Y_win)
print("Number of winning games (for White)",len(X_win))
print("Number of winning games (for Black)",len(X_lose))
# Save Data
hf = h5py.File('data_1M.h5','w')
hf.create_dataset('X_win',data=X_win)
hf.create_dataset('Y_win',data=Y_win)
hf.create_dataset('X_lose',data=X_lose)
hf.create_dataset('Y_lose',data=Y_lose)
hf.close()

Number of winning games (for White) 1000000
Number of winning games (for Black) 1000000


# Building Tensor Datasets
---

In [None]:
# Load Data and Compute Train/Test Splits
hf = h5py.File(path+'data_1M.h5','r')
X_win_train,X_win_val,Y_win_train,Y_win_val = train_test_split(np.array(hf.get('X_win')), 
                                                   np.array(hf.get('Y_win')), 
                                                   test_size=0.1, random_state=2)
X_lose_train,X_lose_val,Y_lose_train,Y_lose_val = train_test_split(np.array(hf.get('X_lose')), 
                                                   np.array(hf.get('Y_lose')), 
                                                   test_size=0.1, random_state=2)

hf.close()

In [None]:
# Convert Data To Tensors 

# Train
X_win_train = torch.tensor(X_win_train).float()
Y_win_train = torch.tensor(Y_win_train)
winData_train = TensorDataset(X_win_train,Y_win_train)

X_lose_train = torch.tensor(X_lose_train).float()
Y_lose_train = torch.tensor(Y_lose_train)
loseData_train = TensorDataset(X_lose_train,Y_lose_train)

# Val
X_win_val = torch.tensor(X_win_val).float()
Y_win_val = torch.tensor(Y_win_val)
winData_val = TensorDataset(X_win_val,Y_win_val)

X_lose_val = torch.tensor(X_lose_val).float()
Y_lose_val = torch.tensor(Y_lose_val)
loseData_val = TensorDataset(X_lose_val,Y_lose_val)

# Model
---
The architecture for Rhino is largely inspired by [DeepChess](http://www.cs.tau.ac.il/~wolf/papers/deepchess.pdf) with a CNN replacing the autoencoders in the *Pos2Vec* module. The goal of this network is to determine which of two given positions is more preferable from white's perspective. There are two components:

1.   **Position Convertor**: This compares to the *Pos2Vec* module in the paper. The objective is to convert a board ($2 \times 6 \times 64$ array) into a 1D vector of length 100. In essence we are breaking down a given position into its key components. 

2.   **Position Comparator**: Given two converted boards we include some extra information carried in the last five bits (castling rights and turn). Using three fully connected layers the network deduces which of the two positions are more beneficial for white. Note that this is similar to the *DeepChess* module in the paper.



In [5]:
class RhinoNN(nn.Module):
    def __init__(self):
        super(RhinoNN, self).__init__()
        self.p2v = nn.Sequential(
            nn.Conv2d(2,32,3),
            nn.ReLU(inplace=True),

            nn.Conv2d(32,64,3),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(2,2)
        )
        self.p2v_last = nn.Linear(1920,100)
        self.comp_pos = nn.Sequential(
            nn.Linear(210,400),
            nn.ReLU(inplace=True),
            nn.Linear(400,200),
            nn.ReLU(inplace=True),
            nn.Linear(200,100),
            nn.ReLU(inplace=True),
            nn.Linear(100,2)
        )

    def forward(self,x1,x2):
        x1_b, x1_i = x1[:768], x1[768:]
        x2_b, x2_i = x2[:768], x2[768:]
        
        x1_b = self.p2v(x1_b.view(1,2,6,64))
        x1_b = self.p2v_last(x1_b.view(1,1920))

        x2_b = self.p2v(x2_b.view(1,2,6,64))
        x2_b = self.p2v_last(x2_b.view(1,1920))

        x1 = torch.cat((x1_b,x1_i.unsqueeze(0)),1)
        x2 = torch.cat((x2_b,x2_i.unsqueeze(0)),1)

        x = torch.cat((x1,x2),1)
        x = self.comp_pos(x)
        return F.softmax(x,dim=1)
rhinoNN = RhinoNN()

# Train & Test
---
Upon every epoch we select $25,000$ winning and losing positions to form a set of `(W,L)` pairs. The goal is training Rhino to recognize winning positions. We randomly reverse a subset of the board position pairs `(L,W)` to avoid a bias towards one class. So there are $2\times 10^{12}$ such pairs which avoids overfitting for relatively low `MAX_EPOCHS` values. 

At the end of every epoch we multiply the learning rate by $0.98$

In [None]:
rhinoNN = RhinoNN()
#rhinoNN.load_state_dict(torch.load(path+'rhinoNN_100epoch.pth'))
lr_h = 1e-5
criterion = nn.MSELoss()
optimizer = optim.Adam(rhinoNN.parameters(),lr=lr_h)

MAX_EPOCHS = 50
accuracy = []
DATASET_RANGE_W = range(len(winData_train))
DATASET_RANGE_L = range(len(loseData_train))
k = 25000
for epoch in range(MAX_EPOCHS):  # loop over the dataset multiple times
    running_loss = 0.0
    rhinoNN.train()

    # Selects k Random Samples
    idx_w = random.sample(DATASET_RANGE_W,k)
    idx_l = random.sample(DATASET_RANGE_L,k)
    
    for i in range(k):
        input_w,_ = winData_train[idx_w[i]]
        input_l,_ = loseData_train[idx_l[i]]
        # zero the parameter gradients
        optimizer.zero_grad()
        # forward + backward + optimize
        if random.randint(1,2) == 1:
            # Reverse wins and losses
            output = rhinoNN(input_l,input_w)
            loss = criterion(output, torch.tensor([[0,1]]).float())
        else:    
            output = rhinoNN(input_w,input_l)
            loss = criterion(output, torch.tensor([[1,0]]).float())
            
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
            
    # Print Statistics    
    print('Epoch: %d of %d, loss: %.4f' %
         (epoch + 1, MAX_EPOCHS, running_loss / k))
    
    # Decrease Learning Rate
    lr_h = lr_h * 0.98
    for g in optimizer.param_groups:
        g['lr'] = lr_h


print('Finished Training')
torch.save(rhinoNN.state_dict(), path+'rhinoNN_'+str(MAX_EPOCHS)+'epoch.pth')

In [None]:
def test_on_validation(model,winData=winData_val,loseData=loseData_val):
    model.eval()
    correct = 0
    with torch.no_grad():
        for (data1,data2) in zip(winData,loseData):   
            input_w,_ = data1
            input_l,_ = data2

            if random.randint(1,2) == 1:
                # Reverse wins and losses
                output = model(input_l,input_w)
                if torch.argmax(output) == torch.tensor(1):
                    correct += 1
            else:    
                output = model(input_w,input_l)
                if torch.argmax(output) == torch.tensor(0):
                    correct += 1
            
        acc = 100 * correct / len(winData_val)
        print('Accuracy on Validation Set:', acc,'%')
        return acc

In [None]:
test_on_validation(rhinoNN)

Accuracy on Validation Set: 97.173 %


97.173

# Comparison Based Alpha-Beta Search
---
A modification of the Alpha-Beta Search Algorithm proposed in the DeepChess paper. 
> In order to incorporate DeepChess, we use a novel version of an alpha-beta algorithm that does not require any position scores for performing the search. Instead of $\alpha$ and $\beta$ values, we store positions $\alpha_{pos}$ and $\beta_{pos}$. For each new position, we compare it with the existing $\alpha_{pos}$ and $\beta_{pos}$ positions using DeepChess, and if the comparison
shows that the new position is better than $\alpha_{pos}$, it would become the new $\alpha_{pos}$, and if the new position is better than $\beta_{pos}$, the current node is pruned. Note that since DeepChess always compares the positions from White’s perspective, when using it from Black’s perspective, the predictions should be reversed.  

In [26]:
def alphabeta(node, depth, alpha, beta, isMaximizingPlayer):
    if depth == 0:
        return node
    if isMaximizingPlayer:
        best = -1
        for move in node.legal_moves:
            new_node = node.copy()
            new_node.push(move)
            val = alphabeta(new_node,depth-1,alpha,beta,False)

            # Conversion of max(best,val)
            if best == -1:
                best = val
            else:
                best = rhinoPredict(best,val)[0]
            
            # Conversion of max(alpha,best)
            if alpha == -1:
                alpha = best
            else:
                alpha = rhinoPredict(alpha,best)[0]

            if beta != 1:
                if rhinoPredict(alpha,beta)[0] == alpha:
                    break
        return best
    else: 
        best = 1
        for move in node.legal_moves:
            new_node = node.copy()
            new_node.push(move)
            val = alphabeta(new_node,depth-1,alpha,beta,True)
            
            # Conversion of min(best,val)
            if best == 1:
                best = val 
            else:
                best = rhinoPredict(best,val)[1]

            # Conversion of min(beta,best)
            if beta == 1: 
                beta = best
            else:
                beta = rhinoPredict(beta,best)[1]

            if alpha != -1:
                if rhinoPredict(alpha,beta)[0] == alpha:
                      break

        return best

def rhinoPredict(b1,b2):
    x1 = torch.tensor(to_bitboard(b1)).float()
    x2 = torch.tensor(to_bitboard(b2)).float()
    output = rhino(x1,x2)
    if torch.argmax(output) == torch.tensor(0):
        return (b1,b2)
    else:
        return (b2,b1)
  

# Game Interface
---

In [6]:
def showBoard(b,flip=False):
    display(SVG(chess.svg.board(b,size=300,flipped=flip)))

In [50]:
def findMoveInBook(b):
    opening_book = open(path+'8moves_v3.pgn')
    game = chess.pgn.read_game(opening_book)
    b_temp = game.board()
    while True:
        for move in game.main_line():
            if len(b_temp.move_stack) == len(b.move_stack):
                if b_temp == b:
                    return move
                break
            b_temp.push(move)
        game = chess.pgn.read_game(opening_book)
        if game is None: #EOF
            break
        b_temp = game.board()
    return None

In [75]:
def computerMove(board,depth,isWhite):
    # Check Opening Book
    if len(board.move_stack) < 8:
        bookMove = findMoveInBook(board)
        if bookMove is not None:
            print('Book Move Found: ', bookMove)
            board.push(bookMove)
            full_game.add_main_variation(bookMove)
            return board  
    print('No Book Moves Found, Using Engine...')
    alpha = -1 
    beta = 1
    best = -1
    if isWhite:
        for move in board.legal_moves:
            new_board = board.copy()
            new_board.push(move)
            if best == -1:
                best = alphabeta(new_board,depth-1,alpha,beta,True) 
                bestMove = move
                if beta == 1:
                    beta = best
            else:
                new_best = rhinoPredict(best,alphabeta(new_board,depth-1,alpha,beta,True))[1]
                if new_best != best:
                    bestMove = move
                    best = new_best
                beta = rhinoPredict(beta,best)[1]

    else:
        for move in board.legal_moves:
            new_board = board.copy()
            new_board.push(move)
            if best == -1:
                best = alphabeta(new_board,depth-1,alpha,beta,False) 
                bestMove = move
                if alpha == -1:
                    alpha = best
            else:
                new_best = rhinoPredict(best,alphabeta(new_board,depth-1,alpha,beta,False))[0]
                if new_best != best:
                    bestMove = move
                    best = new_best
                alpha = rhinoPredict(alpha,best)[0] 
     
    print(bestMove)
    board.push(bestMove)
    full_game.add_main_variation(bestMove)
    return board

def playerMove(board):
      while True:
          try:
              move = input('Enter a move:\n')
              board.push_san(move)
              break
          except ValueError:
              print('Illegal move, try again')
      return board      

In [76]:
def playGame(rhino_dict='rhinoNN_125epoch.pth'):
    global rhino 
    global full_game
  
    rhino = RhinoNN()
    rhino.load_state_dict(torch.load(path+rhino_dict))
    full_game = chess.pgn.Game()

    board = chess.Board()

    max_depth = int(input("Set Rhino's Maximum Search Depth\n"))
    game_type = int(input("Enter '1' for You VS. Rhino \nEnter '2' for Rhino VS. Rhino\n"))

    # ------------ USER VS. RHINO ------------
    if game_type == 1:
        game.headers["Event"] = "Human vs. Rhino"
        isPlayerTurn = bool(random.randint(0,1))
        computerColour = not isPlayerTurn
        while board.is_game_over() == False:
            showBoard(board,flip=computerColour)
            if isPlayerTurn:
                board = playerMove(board)
            else:
                board = computerMove(board,max_depth,computerColour) 

            isPlayerTurn = not isPlayerTurn  
        showBoard(board,flip=computerColour) # show final position

    # ------------ RHINO VS. RHINO ------------        
    elif game_type == 2:
        game.headers["Event"] = "Rhino vs. Rhino"
        whiteToMove = True
        while board.is_game_over() == False:
            showBoard(board)
            if whiteToMove:
                board = computerMove(board,max_depth,True)
            else:
                board = computerMove(board,max_depth,False)
            
            whiteToMove = not whiteToMove
        showBoard(board) # show final position

    # Print Results
    if board.is_checkmate():
        print("Checkmate")
    elif board.is_stalemate():
        print("Draw: Stalemate")
    elif board.is_insufficient_material():
        print("Draw: Insufficient Material")

    print('PGN:\n',full_game)


# Play Rhino
---
I have implemented two options to use Rhino. By entering `1`  you can play against Rhino. Enter `2` to watch Rhino play itself.

In [None]:
playGame()

# Final Thoughts
---
General Notes
* Rhino becomes incredibly slow for depths over 3. 
* Seems to have a low understanding of piece value 

# Resouces
---
* Model Architecture: [DeepChess]()
* Data: [CCRL]()
* Game Interface: [Oripress/DeepChess](https://github.com/oripress/DeepChess)
* Alpha-Beta Search: [Chess Programming Wiki](https://www.chessprogramming.org/Alpha-Beta)
* Opening Book: [Stockfish 8moves_v3](https://github.com/official-stockfish/books)

# To Do
---
* Larger Opening Book
* Endgame Table
* Train on longer (200+ epochs)
* Position Hashing (speed performance)
* Change bitboard representation?