<a href="https://colab.research.google.com/github/kikatuso/ChessTransformer/blob/main/Colab_Train_Chess.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>



[google Palm](https://arxiv.org/pdf/2204.02311.pdf)

TODO:
1. add embedding to the model, with initiliased weights from gensim fasttext:
```
weights = torch.FloatTensor(embedding.embed_layer.wv)
nn.Embedding.from_pretrained(weights)
```
you should learning the model with embedding parameters freezed, wait til the training converges and then unfreeze the weights and train some more ([ref here](https://stackoverflow.com/questions/58630101/using-torch-nn-embedding-for-glove-should-we-fine-tune-the-embeddings-or-just-u))
```
embeddings = nn.Embedding.from_pretrained(fasttext_vectors, freeze=True) # freezing weights
embeddings.weight.requires_grad = True # unfreezing weights
```







In [None]:
! pip install chess
import chess
from random import choice
import torch
from torch.utils.data import Dataset,DataLoader,SubsetRandomSampler
from sklearn.model_selection import train_test_split
from torch.autograd import Variable
import numpy as np 
from tqdm import tqdm
from torch import nn 
device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
!git clone https://github.com/kikatuso/ChessTransformer

In [None]:
from ChessTransformer.model import *

In [None]:
! pip install -q kaggle
! pip install sklearn

In [None]:
from google.colab import files
files.upload()

In [None]:
! mkdir ~/.kaggle
! cp kaggle.json ~/.kaggle/
! chmod 600 ~/.kaggle/kaggle.json

In [None]:
!kaggle datasets download -d zuzannaskorniewska/fasttext-for-chess-move-modelling

In [None]:
! unzip fasttext-for-chess-move-modelling.zip -d embedding

In [None]:
!pip install -r ChessTransformer/requirements.txt 

In [None]:
class LegalMovesBase(object):

  def __init__(self,num_games:int=int(1e3),max_len:int=30):
    self.num_games = num_games
    self.max_len = max_len
    self.games_arr = []

  def generate_games(self):
    games_arr = []
    print('This may take a while...Please wait.')
    for idx in tqdm(range(self.num_games)):
      board = chess.Board()
      game = ['<START>']
      game = []
      game_over = False
      while game_over is False:
          move=board.lan(choice(list(board.legal_moves)))
          if '=' in move:
            move=move.replace('=','')
          board.push_san(move)
          game.append(move)
          game_over =  board.is_checkmate() or board.is_insufficient_material() or board.is_stalemate()
          if game_over:
              break
      game.append('<END>')
      games_arr.extend(game)
    self.games_arr = games_arr
    
  
  def __getitem__(self, i):
        x = self.games_arr[i: i + self.max_len]
        y = self.games_arr[i+1: i+ self.max_len+1]
        return x, y

  def __len__(self):
        return max((len(self.games_arr) - self.max_len),0)


In [None]:
dataset = LegalMovesBase(num_games=1000,max_len=30)
dataset.generate_games()

In [None]:
valid_test_split = 0.4
random_seed= 31
batch_size = 128
data_size = len(dataset)

# dividing training set to 0.6 of the total dataset
train_idx, valid_test_idx = train_test_split(np.arange(data_size),test_size=valid_test_split,shuffle=True)

# dividing validation and test set to sets of equal size, i.e. each 0.2 of the total dataset
valid_idx, test_idx = train_test_split(valid_test_idx,test_size=0.5,shuffle=True)


train_sampler = SubsetRandomSampler(train_idx)
valid_sampler = SubsetRandomSampler(valid_idx)
test_sampler = SubsetRandomSampler(test_idx)

train_loader = DataLoader(dataset, batch_size=batch_size, sampler=train_sampler)
validation_loader = DataLoader(dataset, batch_size=batch_size,sampler=valid_sampler)
test_loader = DataLoader(dataset, batch_size=batch_size,sampler=test_sampler)



In [None]:
def accuracy_score(predictions, labels,thresh=0.5):
    pred_labels = predictions.argmax(dim=-1)
    corrects = (pred_labels == labels)
    accuracy = corrects.sum().float() / float(labels.size(0))
    return accuracy.cpu().detach().numpy()


def run_epoch(train_mode,loader,epoch, model, optimizer, loss_fnc):
    epoch_metrics = {
        'epoch': epoch,
        'loss': 0.0,
        'n_batches': len(loader),
        'running_accuracy':0.0,
        'perplexity':0.0
    }

    if train_mode:
        model.train()
    else:
        model.eval()

    num_batches = len(loader)

    msg= 'Training' if train_mode else 'Validation'


    for (X,target) in tqdm(loader, desc=f'{msg} epoch {epoch}', total=num_batches,position=0,leave=True):
        
        X,target = np.array(X).T,np.array(target).T
        target = model.embedding.translate_wti(target)
        target = Variable(torch.from_numpy(target)).type(torch.LongTensor).to(device)

        if train_mode:
            optimizer.zero_grad()
            output = model.forward(X)
            loss = loss_fnc(output.transpose(1,2), target)
            loss.backward()
            optimizer.step()
        else:
          torch.cuda.empty_cache()
          with torch.no_grad():
            output = model.forward(X)
            loss = loss_fnc(output.transpose(1,2), target)


                
        epoch_metrics['running_accuracy'] += accuracy_score(output,target)

        epoch_metrics['loss'] += float(loss.cpu().detach().numpy())

    epoch_metrics['loss'] = epoch_metrics['loss'] / epoch_metrics['n_batches']
    epoch_metrics['running_accuracy'] = epoch_metrics['running_accuracy']/epoch_metrics['n_batches']
    epoch_metrics['perplexity']  = np.exp(epoch_metrics['loss'])

    
    return epoch_metrics

In [None]:
N_epochs=10
embed_path = '/content/embedding/fasttext_chess2vec.model'

model = ChessTransformer(embed_path=embed_path)
model = model.to(device)
optim = torch.optim.Adam(params=model.parameters(),lr=0.1)
loss_fnc = nn.CrossEntropyLoss()

In [None]:
def metrics_message(name,metrics):
  call='\n {}: loss: {}; accuracy: {}; perplexity: {}'.format(name,round(metrics['loss'],2),round(metrics['running_accuracy'],2),round(metrics['perplexity'],2))
  return call


In [None]:
train_log = np.zeros([N_epochs,3])
valid_log = np.zeros([N_epochs,3])
train_acc_curve = []
valid_acc_curve = []
for epoch in range(N_epochs):
  train_metrics = run_epoch(train_mode=True,loader=train_loader,epoch=epoch,model=model, optimizer=optim,loss_fnc = loss_fnc)
  valid_metrics = run_epoch(train_mode=False,loader=validation_loader,epoch=epoch,model=model, optimizer=optim,loss_fnc = loss_fnc)

  print("\n Metrics after epoch:{}".format(epoch))
  print(metrics_message('train',train_metrics))
  print(metrics_message('valid',valid_metrics))


  train_log[epoch,:]=(train_metrics['loss'],train_metrics['running_accuracy'],train_metrics['perplexity'])
  valid_log[epoch,:]=(valid_metrics['loss'],valid_metrics['running_accuracy'],valid_metrics['perplexity'])
 
