Notes:
- Implement multiclass to handle draws? currently win = 1, loss = 0, draw = .5? win = 1, loss = -1, draw = 0?
- Ideas to create NN value function
    - against outcome: gross generalization, can't handle blunders
    - some f(outcome, current move #): more repersentative, but can't handle blunders, f?
    - encode piece and positional value? balance...
- (CNN for Reward, Transformer for Move Prediction) or (Transformer for Both)?

To Do:
- Value function, looking deeper into multiple states
- Move prediction
    - Transformer architecture, fit
    - Ensure only valid moves in transformer, adjust probability distribution, Masking

Tuning:
- Update scoring function, f(Center, King Safety, Coverage, material, pawns?), weights
- Backprop of scoring, weights, discount factor
- CNN
- Transformer
- Optimization of architecture
- Optimizers/Hyperparamters/Architectures
- Promotions?

Plan:
- Let the engine self play, for each state map to intermediate reward + discount*result
- Intermediate reward is f(material score, positional score)
- Only use heuristic for nn training, not value function
- Tuning is nightmare

In [352]:
import chess
import math
import random
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim

piece values

In [353]:
piece_values = {
    chess.PAWN: 1,
    chess.KNIGHT: 3,
    chess.BISHOP: 3,
    chess.ROOK: 5,
    chess.QUEEN: 9,
    chess.KING: .01
}

move tokenization, encoding, masking

In [354]:
move_tokens = {}

move_tokens["<SOS>"] = 0
move_tokens["<EOS>"] = 1
move_tokens["<PAD>"] = 2
token_id = 3

def update_tokens(move):
    global token_id
    if move not in move_tokens:
        move_tokens[move] = token_id
        token_id += 1

In [None]:
def update_seqs(past_seqs, next_move, max_length):
    prev_seq = past_seqs[-1]
    curr_seq = prev_seq + [next_move]
    if len(prev_seq) > max_length - 2:
        past_seqs[-1] = [move_tokens["<SOS>"]] + prev_seq[-(max_length - 2):] + [move_tokens["<EOS>"]]
    else:
        past_seqs[-1] = [move_tokens["<SOS>"]] + prev_seq + [move_tokens["<EOS>"]] + [move_tokens["<PAD>"]] * (max_length - (len(prev_seq) + 2))
    return curr_seq

In [None]:
def get_mask(board, vocab_size):
    mask = torch.zeros(vocab_size, dtype=torch.bool)
    # Mark legal moves as valid
    for move in board.legal_moves:
        move = move.uci()
        if move in move_tokens:  # Ensure the move exists in your vocabulary
            mask[move_tokens[move]] = True
    return mask

board scoring

In [356]:
def material_score(board):
    white_score = 0
    black_score = 0
    for piece_type in piece_values:
        white_score += len(board.pieces(piece_type, chess.WHITE)) * piece_values[piece_type]
        black_score += len(board.pieces(piece_type, chess.BLACK)) * piece_values[piece_type]
    if board.turn:
        return (white_score - black_score)/white_score
    else:
        return (black_score - white_score)/black_score

In [357]:
def positional_score(board):
    return board_coverage(board)

def get_pieces_locs(board):
    pieces_locs = {}
    for piece_type in chess.PIECE_TYPES:
        piece_symbol = chess.Piece(piece_type, board.turn).symbol()
        squares = [chess.square_name(square) for square in board.pieces(piece_type, board.turn)]
        if squares:  # Only add if there are pieces of this type and color
            pieces_locs[piece_symbol] = squares
    return pieces_locs

def board_coverage(board):
    self_coverage = len(list(board.legal_moves))
    return self_coverage

In [358]:
def total_board_score(board, mat_w, pos_w):
    mat_score = material_score(board)
    pos_score = positional_score(board)
    #return (mat_score*mat_w + pos_score*pos_w)/(mat_score + pos_score)
    return mat_score

encode board state as tensor

In [359]:
def encode_board(board):
    # Initialize the 3D array with 13 layers: 12 for the pieces, 1 for the turn
    one_hot_vector = np.zeros((8, 8, 13), dtype=np.uint8)
    piece_to_layer = {
        chess.PAWN: 0,
        chess.KNIGHT: 1,
        chess.BISHOP: 2,
        chess.ROOK: 3,
        chess.QUEEN: 4,
        chess.KING: 5,
    }
    
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            layer_index = piece_to_layer[piece.piece_type] + (6 if piece.color == chess.BLACK else 0)
            row, col = divmod(square, 8)
            one_hot_vector[row, col, layer_index] = 1
            
    # Add current player turn as a feature (all ones in a layer if it's white's turn)
    one_hot_vector[:, :, 12] = int(board.turn == chess.WHITE)
    return torch.tensor(one_hot_vector, dtype=torch.float32).permute(2, 0, 1)


Reward CNN

In [None]:
class Board_Value(nn.Module):
    def __init__(self, input_channels=13, conv1_out_channels=64, conv2_out_channels=128, conv3_out_channels=256, 
                 kernel_size=3, padding=1, fc1_units=1024, output_units=1, batch_size=10, learning_rate=0.001):
        super(Board_Value, self).__init__()
        
        # Save hyperparameters for future reference or debugging
        self.batch_size = batch_size
        self.learning_rate = learning_rate

        # Convolutional layers
        self.conv1 = nn.Conv2d(input_channels, conv1_out_channels, kernel_size=kernel_size, padding=padding)
        self.conv2 = nn.Conv2d(conv1_out_channels, conv2_out_channels, kernel_size=kernel_size, padding=padding)
        self.conv3 = nn.Conv2d(conv2_out_channels, conv3_out_channels, kernel_size=kernel_size, padding=padding)
        
        # Batch normalization layers
        self.bn1 = nn.BatchNorm2d(conv1_out_channels)
        self.bn2 = nn.BatchNorm2d(conv2_out_channels)
        self.bn3 = nn.BatchNorm2d(conv3_out_channels)
        
        # Fully connected layers
        # Assumes input size of 8x8 after convolution
        self.fc1 = nn.Linear(conv3_out_channels * 8 * 8, fc1_units)
        self.fc2 = nn.Linear(fc1_units, output_units)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.relu(self.bn3(self.conv3(x)))
        
        # Flatten
        x = x.view(-1, self.fc1.in_features)
        x = F.relu(self.fc1(x))
        return self.fc2(x)
    
    def fit(self, x, y, reward_max_epochs):
        x_tensor = torch.stack(x)
        y_tensor = torch.tensor(y, dtype=torch.float32).view(-1, 1)
        print(x_tensor.shape, y_tensor.shape)
        dataset = TensorDataset(x_tensor, y_tensor)

        # Create a DataLoader for handling batching and shuffling
        dataloader = DataLoader(dataset, batch_size=self.batch_size, shuffle=True)
        optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
        criterion = nn.MSELoss()  # Mean Squared Error Loss
        
        for epoch in range(reward_max_epochs):
            self.train()
            for data, targets in dataloader:
                optimizer.zero_grad()
                outputs = self(data)
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()

            self.eval()
            test_loss = 0
            with torch.no_grad():
                for data, targets in dataloader:
                    outputs = self(data)
                    test_loss += criterion(outputs, targets).item()

            print(f'Reward Epoch {epoch+1}, Test Loss: {test_loss / len(dataloader)}')


NameError: name 'nn' is not defined

Move-Prediction Transformer

In [None]:
class Prediction(nn.Module):
    def __init__(self, vocab_size=5000, embed_dim=128, num_heads=8,num_layers=6,
                 ff_dim=512, max_seq_len=50,output_size=5000, dropout=0.1, batch_size = 64,
                 learning_rate = .001):
        super(Prediction, self).__init__()
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        
        # Embedding for input tokens and positional encoding
        self.token_embedding = nn.Embedding(vocab_size, embed_dim)
        self.positional_encoding = nn.Parameter(torch.zeros(1, max_seq_len, embed_dim))

        # Transformer encoder
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim, 
            nhead=num_heads, 
            dim_feedforward=ff_dim, 
            dropout=dropout
        )
        self.transformer_encoder = nn.TransformerEncoder(encoder_layer, num_layers)

        # Output head
        self.output_head = nn.Linear(embed_dim, output_size)

    def forward(self, x, masks = None):
        # Input shape: (batch_size, seq_len)
        seq_len = x.size(1)
        # Token embedding + positional encoding
        x = self.token_embedding(x) + self.positional_encoding[:, :seq_len, :]
        
        # Pass through transformer encoder
        x = self.transformer_encoder(x)

        # Average pooling for sequence output
        x = x.mean(dim=1)  # Shape: (batch_size, embed_dim)

        # Map to output space
        output = self.output_head(x)  # Shape: (batch_size, output_size)
        return output
    
    def fit(self, x, y, masks, prediction_max_epochs):
        x_tensor = torch.tensor(x, dtype=torch.float32)
        y_tensor = torch.tensor(y, dtype=torch.float32).view(-1, 1)
        mask_tensor = torch.stack(masks, dtype=torch.bool)
        print(x_tensor.shape, y_tensor.shape)
        dataset = TensorDataset(x_tensor, y_tensor)

        # Create a DataLoader for handling batching and shuffling
        dataloader = DataLoader(dataset, batch_size= self.batch_size, shuffle=True)
        optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
        criterion = nn.CrossEntropyLoss()  # Cross Entropy Loss
        
        for epoch in range(prediction_max_epochs):
            self.train()
            for data, targets in dataloader:
                optimizer.zero_grad()
                outputs = self(data)
                loss = criterion(outputs, targets)
                loss.backward()
                optimizer.step()

            self.eval()
            test_loss = 0
            with torch.no_grad():
                for data, targets in dataloader:
                    outputs = self(data)
                    test_loss += criterion(outputs, targets).item()

            print(f'Prediction Epoch {epoch+1}, Test Loss: {test_loss / len(dataloader)}')

Engine

In [None]:
class Engine:
    def __init__(self, total_game_batches = 10000, discount_factor = .95,
                material_weight = .8, positional_weight = .3, reward_max_epochs = 20,
                board_score_weight = .1, game_batch_size = 100, epsilon = .2,
                sequence_length = 20, prediction_max_epochs = 20, max_depth = 10):
        self.total_game_batches = total_game_batches
        self.game_batch_size = game_batch_size
        
        self.discount = discount_factor
        self.material_w = material_weight
        self.positional_w = positional_weight
        self.board_score_weight = board_score_weight
        self.epsilon = epsilon
        
        self.reward_max_epochs = reward_max_epochs
        self.prediction_max_epochs = prediction_max_epochs
        
        self.reward = Board_Value()
        self.sequence_length = sequence_length
        self.prediction = Prediction()
        
        self.depth = max_depth
        

    
    def self_play(self):
        for batch in range(self.total_game_batches):
            states, scores, move_seqs, next_moves, masks = [], [], [], [], []
            
            for game in range(self.game_batch_size):
                board = chess.Board()
                
                w_game_states = [encode_board(board)]
                b_game_states = []
                
                w_game_score = [0.0]
                b_game_score = []
                
                game_move_seqs = [[]]
                game_next_moves = []
                game_masks = []
                
                while not(board.is_game_over()):
                    move = self.policy(board, game_move_seqs[-1])
                    update_tokens(move)
                    
                    next_move_token = move_tokens[move]
                    game_next_moves.append(next_move_token)
                    
                    curr_seq = update_seqs(move_seqs, next_move_token, self.sequence_length)
                    game_move_seqs.append(curr_seq)
                    
                    game_masks.append(get_mask(board))
                    
                    board.push_san(move)
                    
                    # Add score after move, as include start board and score
                    current_score = total_board_score(board, self.material_w, self.positional_w)
                    
                    if board.turn:
                        #print(board,"white")
                        #print(current_score,"white")
                        w_game_states.append(encode_board(board))
                        w_game_score.append(current_score)
                    else:
                        #print(board,"black")
                        #print(current_score,"black")
                        b_game_states.append(encode_board(board))
                        b_game_score.append(current_score)
                
                result = board.result()
                #print(result)
                if result == '1-0':
                    result_w, result_b = 1, 0
                elif result == '0-1':
                    result_b, result_w = 1, 0
                else:
                    result_b, result_w = .5, .5
                
                w_state_scores = self.backpropagate_rewards(w_game_score, result_w)
                b_state_scores = self.backpropagate_rewards(b_game_score, result_b)
                
                states += w_game_states
                scores += w_state_scores
                states += b_game_states
                scores += b_state_scores
                
                move_seqs = move_seqs[:-1]
                move_seqs += game_move_seqs
                next_moves += game_next_moves
                masks += game_masks
            
            self.reward.fit(states, scores, self.reward_max_epochs)
            self.prediction.fit(move_seqs, next_moves, masks, self.prediction_max_epochs)
    
    def backpropagate_rewards(self, scores, result):
        #print(scores)
        # Initialize the next state's score with the final reward
        scores[-1] = self.board_score_weight*scores[-1] + result
        
        # Traverse the game scores in reverse order
        for index in range(len(scores) - 2, -1, -1):
            prev_score = scores[index+1]
            
            # Calculate the score for the current state
            current_score = scores[index]
            update_score = self.discount * prev_score + self.board_score_weight * current_score
            scores[index] = update_score
        #print(scores)
        return scores
        
    def policy(self, board, past_moves):
        possible_moves = board.legal_moves
        best_move = None
        best_move_value = -math.inf  # Set to negative infinity to simplify logic
        
        if random.random() < self.epsilon:
            random_move = random.choice(list(possible_moves))
            return random_move.uci()
        
        for move in possible_moves:
            board.push_san(move.uci())  # Make the move
            
            new_board_value = self.value(board, past_moves)  # Evaluate the board state based on past moves
            board.pop()  # Undo the move
            
            # Update the best move if the current one is better
            if new_board_value > best_move_value:
                best_move = move
                best_move_value = new_board_value      
        return best_move.uci()
    
    def value(self, board, past_moves):
        initial_reward = self.reward(encode_board(board).unsqueeze(0))
        for moves in board.legal_moves:
            prob_move = prediction

Test

In [363]:
my_engine = Engine(total_game_batches = 1, game_batch_size = 1, reward_max_epochs = 10)
my_engine.self_play()

[0, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
[3]
1 curr
[0, 3, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
[3, 4]
2 curr
[0, 3, 4, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
[3, 4, 5]
3 curr
[0, 3, 4, 5, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
[3, 4, 5, 6]
4 curr
[0, 3, 4, 5, 6, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
[3, 4, 5, 6, 7]
5 curr
[0, 3, 4, 5, 6, 7, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
[3, 4, 5, 6, 7, 8]
6 curr
[0, 3, 4, 5, 6, 7, 8, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
[3, 4, 5, 6, 7, 8, 9]
7 curr
[0, 3, 4, 5, 6, 7, 8, 9, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
[3, 4, 5, 6, 7, 8, 9, 10]
8 curr
[0, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2]
[3, 4, 5, 6, 7, 8, 9, 10, 11]
9 curr
[0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 1, 2, 2, 2, 2, 2, 2, 2, 2, 2]
[3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
10 curr
[0, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 1, 2, 2, 2, 2, 2, 2, 2, 2]
[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13]
11 curr
[0, 3, 4, 5, 6, 7, 8, 9, 