In [1]:
import chess
import chess.pgn
import os
import re
import numpy as np
import pandas as pd
from tqdm import tqdm

In [2]:
os.chdir('..')

In [3]:
def load_pgns(file_path, num_games=None, start_index=0, encoding="utf-8"):
    games = []
    with open(file_path, "r", encoding=encoding) as file:
        for _ in tqdm(range(start_index), desc='Skipping games', unit='game', leave=False):
            game = chess.pgn.read_game(file)
            if game is None:
                break
        for _ in tqdm(range(num_games), desc='Loading games', unit='game', leave=True) if num_games else iter(int, 1):
            game = chess.pgn.read_game(file)
            if game is None:
                break
            games.append(game)
    return games

In [4]:
%%time
assets_path = os.path.join(os.getcwd(), 'asset')
single_path = os.path.join(assets_path, 'rating_split_7_rand/group_9.pgn')
games = load_pgns(single_path, 100)
game = games[0]
board = game.board()

#244780

Loading games: 100%|██████████████████████████████████████████████████████████████| 100/100 [00:00<00:00, 346.86game/s]

CPU times: total: 46.9 ms
Wall time: 309 ms





In [5]:
def get_eval_from_result(result):
    if result == '1-0':
        return 64
    elif result == '0-1':
        return -64
    else:
        return 0
        
def extract_eval_clk_from_pgn(input, result=''):
    clk_pattern = r"\[%clk\s+([0-9:]+)\]"
    eval_pattern = r"\[%eval\s+([0-9.-]+)\]"
    mate_pattern = r"\[%eval\s+#([0-9.-]+)\]"
    clk = re.search(clk_pattern, input)
    clk = clk.group(1) if clk else None
    eval = re.search(eval_pattern, input)
    eval = eval.group(1) if eval else None
    if not eval:
        eval = re.search(mate_pattern, input)
        eval = int(eval.group(1)) if eval else get_eval_from_result(result)
        if eval > 0:
            eval = 64
        else:
            eval = -64
    return eval, clk

def time_control_to_list(time_control_obj):
    time_control = [float(x) for x in time_control_obj.split('+')]
    time_control += [0] * (2 - len(time_control))
    if time_control[0] < 30:
        time_control[0] = time_control[0] * 60
    return time_control
        
def eval_to_cp(eval):
    eval = float(eval)
    return eval * 100

def clk_to_time(clk):
    h, m, s = clk.split(":")
    t = int(h) * 3600 + int(m) * 60 + int(s)
    return t

def eval_to_game_state(value, cuts = None):
    if cuts == None:
        cuts = [np.inf, 375, 250, 150, 75, 25, -25, -75, -150, -250, -375, -np.inf]
    for i in range(len(cuts) - 1):
        if cuts[i] >= value > cuts[i + 1]:
            return round(1-i/10, 2)

def fen_to_array(fen):
    piece_mapping = {'p': -1, 'n': -2, 'b': -3, 'r': -4, 'q': -5, 'k': -6,
                     'P': 1, 'N': 2, 'B': 3, 'R': 4, 'Q': 5, 'K': 6}
    board_fen, turn, castling, en_passant, halfmove, fullmove = fen.split()
    board_array = [0] * 64
    rank = 7
    file = 0
    for char in board_fen:
        if char.isdigit():
            file += int(char)
        elif char == '/':
            rank -= 1
            file = 0
        else:
            index = rank * 8 + file
            board_array[index] = piece_mapping[char]
            file += 1
    return np.array(board_array)

def determine_move_quality(df):
    subjective_state = 20 * (df['game_state'] - 0.5) * np.power(-1, np.arange(len(df))) / 10
    delta_state = -10 * pd.concat([pd.Series([0.6]), df['game_state']]).diff().iloc[1:] * np.power(-1, np.arange(len(df))) / 10
    near_equality = np.abs(subjective_state) < 0.5
    decisive_advantage = np.abs(subjective_state) > 0.7
    fate_sealed = np.abs(subjective_state) > 0.9

    move_quality = np.full(len(df), 4) # Default to 4 (Mistake)
    move_quality[(delta_state > .45) | ((delta_state > .35) & decisive_advantage) | ((delta_state > .25) & fate_sealed)] = 5 # Blunder
    move_quality[((delta_state < .25) & ~decisive_advantage) | ((delta_state < .35) & near_equality)] = 3 # Inaccuracy
    move_quality[(delta_state < .15)] = 2 # Good
    move_quality[(delta_state < .05) & (subjective_state > -1)] = 1 # Great
    return move_quality.tolist()

In [6]:
def game_to_df(game):
    result = game.headers['Result']
    board = game.board()
    moves = [move for move in game.mainline_moves()]
    board = game.board()
    node = game
    ply = 0
    df = pd.DataFrame(columns=['lan', 'game_state', 'time_remain', 'time_spent', 'fen_array', 'fen_string'])
    time_controls = time_control_to_list(game.headers['TimeControl'])
    current_clock = [time_controls[0] + time_controls[1]]*2
    for move in moves:
        turn = ply%2
        current_clock[turn] += time_controls[1]
        ply += 1
        node = node.next()
        eval, clk = extract_eval_clk_from_pgn(node.comment, result)
        eval = eval_to_game_state(eval_to_cp(eval))
    
        clk = clk_to_time(clk)    
        t_spent = current_clock[turn] - clk
        current_clock[turn] = clk
        
        lan = board.lan(move)
        fen_str = board.fen()
        fen_arr = fen_to_array(fen_str)
        board.push(move)
        new_row = {'lan': lan, 'game_state': eval, 'time_remain': clk, 'time_spent':t_spent, 'fen_array': fen_arr, 'fen_string': fen_str}
        df.loc[ply] = new_row
    return df
    #df['move_quality'] = determine_move_quality(df)
    #df[df['move_quality']>2]

In [7]:
%%time
lans = [game_to_df(game)['lan'].tolist() for game in games]
len(lans)
#df.iloc[1::2]
#df['lan']

CPU times: total: 7.94 s
Wall time: 8.73 s


100

In [8]:
import torch
import torch.nn as nn
import torch.optim as optim
import math
import torchtext
from tqdm import tqdm

In [9]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

In [10]:
train = lans[:int(len(lans) * 0.7)]
validate = lans[int(len(lans) * 0.7):int(len(lans) * 0.85)]
test = lans[int(len(lans) * 0.85):]

In [11]:
vocab = torchtext.vocab.build_vocab_from_iterator(train, min_freq=10) 
vocab.insert_token('<unk>', 0)
vocab.insert_token('<eos>', 1)
vocab.set_default_index(vocab['<unk>'])
print(len(vocab))   
print(vocab.get_itos()[:10])

95
['<unk>', '<eos>', 'O-O', 'Ng8-f6', 'Ng1-f3', 'd2-d4', 'Nb1-c3', 'e2-e4', 'd7-d5', 'e7-e6']


In [12]:
def get_data(games, vocab, batch_size):
    data = []                             
    for game in games:
        if game:
            moves = game + ['<eos>']
            moves = [vocab[move] for move in game] 
            data.extend(moves)                                    
    data = torch.LongTensor(data)                                 
    num_batches = data.shape[0] // batch_size 
    data = data[:num_batches * batch_size]                       
    data = data.view(batch_size, num_batches)                                         
    return data

In [13]:
batch_size = 128
train_data = get_data(train, vocab, batch_size)
valid_data = get_data(validate, vocab, batch_size)
test_data = get_data(test, vocab, batch_size)

In [14]:
class LSTM(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers, dropout_rate, 
                tie_weights):
                
        super().__init__()
        self.num_layers = num_layers
        self.hidden_dim = hidden_dim
        self.embedding_dim = embedding_dim

        self.embedding = nn.Embedding(vocab_size, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers=num_layers, 
                    dropout=dropout_rate, batch_first=True)
        self.dropout = nn.Dropout(dropout_rate)
        self.fc = nn.Linear(hidden_dim, vocab_size)
        
        if tie_weights:
            assert embedding_dim == hidden_dim, 'cannot tie, check dims'
            self.embedding.weight = self.fc.weight
        self.init_weights()

    def forward(self, src, hidden):
        embedding = self.dropout(self.embedding(src))
        output, hidden = self.lstm(embedding, hidden)          
        output = self.dropout(output) 
        prediction = self.fc(output)
        return prediction, hidden
        
    def init_weights(self):
        init_range_emb = 0.1
        init_range_other = 1/math.sqrt(self.hidden_dim)
        self.embedding.weight.data.uniform_(-init_range_emb, init_range_emb)
        self.fc.weight.data.uniform_(-init_range_other, init_range_other)
        self.fc.bias.data.zero_()
        for i in range(self.num_layers):
            self.lstm.all_weights[i][0] = torch.FloatTensor(self.embedding_dim,
                    self.hidden_dim).uniform_(-init_range_other, init_range_other) 
            self.lstm.all_weights[i][1] = torch.FloatTensor(self.hidden_dim, 
                    self.hidden_dim).uniform_(-init_range_other, init_range_other) 

    def init_hidden(self, batch_size, device):
        hidden = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(device)
        cell = torch.zeros(self.num_layers, batch_size, self.hidden_dim).to(device)
        return hidden, cell

    def detach_hidden(self, hidden):
        hidden, cell = hidden
        hidden = hidden.detach()
        cell = cell.detach()
        return hidden, cell

In [15]:
vocab_size = len(vocab)
embedding_dim = 1024             # 400 in the paper
hidden_dim = 1024                # 1150 in the paper
num_layers = 4                   # 3 in the paper
dropout_rate = 0.05
tie_weights = True                  
lr = 1e-3                        # They used 30 and a different optimizer

In [16]:
model = LSTM(vocab_size, embedding_dim, hidden_dim, num_layers, dropout_rate, tie_weights).to(device)
optimizer = optim.Adam(model.parameters(), lr=lr)
criterion = nn.CrossEntropyLoss()
num_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
print(f'The model has {num_params:,} trainable parameters')

The model has 33,684,575 trainable parameters


In [17]:
def get_batch(data, seq_len, num_batches, idx):
    src = data[:, idx:idx+seq_len]                   
    target = data[:, idx+1:idx+seq_len+1]             
    return src, target

In [18]:
def train(model, data, optimizer, criterion, batch_size, seq_len, clip, device):
    
    epoch_loss = 0
    model.train()
    # drop all batches that are not a multiple of seq_len
    num_batches = data.shape[-1]
    data = data[:, :num_batches - (num_batches -1) % seq_len]
    num_batches = data.shape[-1]

    hidden = model.init_hidden(batch_size, device)
    
    for idx in tqdm(range(0, num_batches - 1, seq_len), desc='Training: ',leave=False):  # The last batch can't be a src
        optimizer.zero_grad()
        hidden = model.detach_hidden(hidden)

        src, target = get_batch(data, seq_len, num_batches, idx)
        src, target = src.to(device), target.to(device)
        batch_size = src.shape[0]
        prediction, hidden = model(src, hidden)               

        prediction = prediction.reshape(batch_size * seq_len, -1)   
        target = target.reshape(-1)
        loss = criterion(prediction, target)
        
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        epoch_loss += loss.item() * seq_len
    return epoch_loss / num_batches

In [19]:
def evaluate(model, data, criterion, batch_size, seq_len, device):

    epoch_loss = 0
    model.eval()
    num_batches = data.shape[-1]
    data = data[:, :num_batches - (num_batches -1) % seq_len]
    num_batches = data.shape[-1]

    hidden = model.init_hidden(batch_size, device)

    with torch.no_grad():
        for idx in range(0, num_batches - 1, seq_len):
            hidden = model.detach_hidden(hidden)
            src, target = get_batch(data, seq_len, num_batches, idx)
            src, target = src.to(device), target.to(device)
            batch_size= src.shape[0]

            prediction, hidden = model(src, hidden)
            prediction = prediction.reshape(batch_size * seq_len, -1)
            target = target.reshape(-1)

            loss = criterion(prediction, target)
            epoch_loss += loss.item() * seq_len
    return epoch_loss / num_batches

In [20]:
n_epochs = 5
seq_len = 50
clip = 0.25
saved = False

lr_scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.5, patience=0)

if saved:
    model.load_state_dict(torch.load('best-val-lstm_lm.pt',  map_location=device))
    test_loss = evaluate(model, test_data, criterion, batch_size, seq_len, device)
    print(f'Test Perplexity: {math.exp(test_loss):.3f}')
else:
    best_valid_loss = float('inf')

    for epoch in range(n_epochs):
        train_loss = train(model, train_data, optimizer, criterion, 
                    batch_size, seq_len, clip, device)
        valid_loss = evaluate(model, valid_data, criterion, batch_size, 
                    seq_len, device)
        
        lr_scheduler.step(valid_loss)

        if valid_loss < best_valid_loss:
            best_valid_loss = valid_loss
            torch.save(model.state_dict(), 'best-val-lstm_lm.pt')

        print(f'\tTrain Perplexity: {math.exp(train_loss):.3f}')
        print(f'\tValid Perplexity: {math.exp(valid_loss):.3f}')

                              

	Train Perplexity: 1.000
	Valid Perplexity: 1.000


                              

	Train Perplexity: 1.000
	Valid Perplexity: 1.000


                              

	Train Perplexity: 1.000
	Valid Perplexity: 1.000


                              

	Train Perplexity: 1.000
	Valid Perplexity: 1.000


                              

	Train Perplexity: 1.000
	Valid Perplexity: 1.000




In [21]:
def generate(prompt, max_seq_len, temperature, model, vocab, device, real_move, seed=None):
    board = chess.Board()
    for move in prompt:
        board.push(board.parse_san(move))
    legal_moves = [board.lan(move) for move in board.legal_moves]
    legal_moves_index = [vocab[move] for move in legal_moves]
    legal_moves_index = torch.LongTensor(legal_moves_index)

    real_ind = vocab[board.lan(board.parse_san(real_move))]
    
    if seed is not None:
        torch.manual_seed(seed)
    model.eval()
    indices = [vocab[t] for t in prompt]
    batch_size = 1
    hidden = model.init_hidden(batch_size, device)
    with torch.no_grad():
        for i in range(max_seq_len):
            src = torch.LongTensor([indices]).to(device)
            prediction, hidden = model(src, hidden)
            
            probs = torch.softmax(prediction[:, -1] / temperature, dim=-1)
            if legal_moves_index is not None:
                mask = torch.zeros_like(probs)
                mask[:, legal_moves_index] = 1
                probs = probs * mask
                probs_sum = probs.sum(dim=-1, keepdim=True)
                probs = probs / probs_sum
            
            #prediction_r = torch.multinomial(probs, num_samples=1).item()
            prediction = torch.max(probs, dim=-1)[1].item()         

            while prediction == vocab['<unk>']:
                prediction = torch.multinomial(probs, num_samples=1).item()

            if prediction == vocab['<eos>']:
                break
            print(probs[0, [real_ind, prediction]], real_ind == prediction)
            indices.append(prediction)

    itos = vocab.get_itos()
    tokens = [itos[i] for i in indices]
    return tokens

In [22]:
max_seq_len = 1
seed = 0
temperature = 0.5

game = load_pgns(os.path.join(assets_path, 'qgdm.pgn'))[0]
lans_ = game_to_df(game).lan.tolist()

for i, move in enumerate(lans_):
    if i < 1:
        continue
    moves = lans_[:i]
    generation = generate(moves, max_seq_len, temperature, model, vocab, device, move, seed)
    print('prediction:', generation[-1])
    print('reality:', move)
    print()

                                      

tensor([0.0562, 0.0563], device='cuda:0') False
prediction: g7-g6
reality: Ng8-f6

tensor([0.0475, 0.0490], device='cuda:0') False
prediction: Nb1-c3
reality: c2-c4

tensor([0.0605, 0.0607], device='cuda:0') False
prediction: g7-g6
reality: e7-e6

tensor([0.0545, 0.0545], device='cuda:0') True
prediction: Nb1-c3
reality: Nb1-c3

tensor([0.0522, 0.0546], device='cuda:0') False
prediction: g7-g6
reality: d7-d5

tensor([0.0554, 0.0564], device='cuda:0') False
prediction: a2-a3
reality: c4xd5

tensor([0.0506, 0.0518], device='cuda:0') False
prediction: g7-g6
reality: e6xd5

tensor([0.0590, 0.0598], device='cuda:0') False
prediction: a2-a3
reality: Bc1-g5

tensor([0.0539, 0.0545], device='cuda:0') False
prediction: g7-g6
reality: c7-c6

tensor([0.0659, 0.0676], device='cuda:0') False
prediction: a2-a3
reality: e2-e3

tensor([0.0557, 0.0575], device='cuda:0') False
prediction: g7-g6
reality: Bf8-e7

tensor([0.0617, 0.0636], device='cuda:0') False
prediction: Bf1-e2
reality: Bf1-d3

tensor([0