In [36]:
import chess
import chess.engine
import torch
import torch.nn as nn
import torch.nn.functional as F
import random
import math
import numpy as np
from evaluation.evaluation import Evaluation

In [37]:
NUM_GAMES = 100
INPUT_DIM = 774
POLICY_DIM = 64 * 64
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
STOCKFISH_PATH = "C:/Users/Windows/Downloads/stockfish-windows-x86-64-avx2/stockfish/stockfish.exe"
eval = Evaluation()

In [38]:
piece_to_index = {
    'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5,
    'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11
}

In [39]:
# FEN encoding function for the chess board
def encode_fen(fen: str) -> torch.Tensor:
    board = chess.Board(fen)
    tensor = torch.zeros(774, dtype=torch.float)  # Increased tensor size to 774
    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            idx = piece_to_index[piece.symbol()]
            tensor[idx * 64 + square] = 1.0
    offset = 768
    tensor[offset] = 1.0 if board.turn == chess.WHITE else 0.0
    offset += 1
    tensor[offset] = 1.0 if board.has_kingside_castling_rights(chess.WHITE) else 0.0
    tensor[offset + 1] = 1.0 if board.has_queenside_castling_rights(chess.WHITE) else 0.0
    tensor[offset + 2] = 1.0 if board.has_kingside_castling_rights(chess.BLACK) else 0.0
    tensor[offset + 3] = 1.0 if board.has_queenside_castling_rights(chess.BLACK) else 0.0
    offset += 4
    tensor[offset] = board.ep_square % 8 / 7.0 if board.ep_square else -1.0
    return tensor

In [40]:
# Neural Network for Policy and Value Function
class PolicyValueNet(nn.Module):
    def __init__(self, input_dim, policy_output_dim):
        super().__init__()
        self.fc1 = nn.Linear(input_dim, 256)
        self.fc2 = nn.Linear(256, 256)
        self.policy_head = nn.Linear(256, policy_output_dim)
        self.value_head = nn.Linear(256, 1)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        policy_logits = self.policy_head(x)
        value = self.value_head(x)
        return policy_logits, value


In [41]:
def move_to_index(move):
    return move.from_square * 64 + move.to_square

In [42]:
def index_to_move(index):
    return chess.Move(index // 64, index % 64)

In [43]:
def evaluation(fen: str) -> float:
    board = chess.Board(fen)
    return eval.evaluate(board)

In [44]:
class TreeNode:
    def __init__(self, move=None, parent=None):
        self.move = move
        self.parent = parent
        self.children = {}
        self.N = 0  # Visit count
        self.W = 0  # Win count
        self.Q = 0  # Action value (Q-value)
        self.P = 0  # Prior probability from the policy head

    def is_fully_expanded(self, legal_moves):
        return len(self.children) == len(legal_moves)

    def best_child(self, c_param=1.0):
        if len(self.children) == 0:
            return None  # or raise an exception if no children are available
        
        best_child = None
        best_value = -float('inf')
        for child in self.children.values():
            u = child.Q + c_param * child.P * math.sqrt(self.N) / (1 + child.N)
            if u > best_value:
                best_child = child
                best_value = u
        return best_child

In [45]:
class MCTS:
    def __init__(self, model, num_simulations=1000, c_param=1.0):
        self.model = model
        self.num_simulations = num_simulations
        self.c_param = c_param

    def search(self, board, encode_fn):
        random_move = random.choice(list(chess.Board().legal_moves))
        board.push(random_move)
        root = TreeNode(move = random_move)
        root.P = self.get_policy_from_nn(board, encode_fn)
        
        # Ensure that root.move is valid before using it
        if root.move is None:
            print("Error: Root move is None!")
            return None  # Handle error appropriately
        
        for _ in range(self.num_simulations):
            value = self.simulate(root, board)
            self.backpropagate(root, value)
        
        best_move_node = root.best_child(self.c_param)
        return best_move_node.move

    def select(self, node):
        while not node.is_fully_expanded():
            node = node.best_child(self.c_param)
        return node

    def expand(self, node, board):
        legal_moves = list(board.legal_moves)
        if not legal_moves:
            return  # No legal moves to expand, do nothing
        for move in legal_moves:
            if move not in node.children:
                new_node = TreeNode(move=move, parent=node)
                new_node.P = self.get_policy_from_nn(board, move)
                node.children[move] = new_node

    def simulate(self, node, board):
        move = node.move
        if move is None:
            print("Error: Invalid move!")
            return 0  # Or handle as needed, like returning a default value

        board.push(move)
        if board.is_game_over():
            return evaluation(board.fen())
        return 0

    def backpropagate(self, node, value):
        while node is not None:
            node.N += 1
            node.W += value
            node.Q = node.W / node.N
            node = node.parent

    def get_policy_from_nn(self, board, encode_fn):
        input_tensor = encode_fn(board.fen()).to(DEVICE).unsqueeze(0)
        policy_logits, _ = self.model(input_tensor)
        policy_probs = F.softmax(policy_logits, dim=1).detach().cpu().numpy().squeeze()
        legal_moves = list(board.legal_moves)
        legal_indices = [move_to_index(m) for m in legal_moves]
        legal_probs = np.array([policy_probs[i] for i in legal_indices])
        total_prob = legal_probs.sum()
        if total_prob == 0:
            legal_probs += 1e-8  # Avoid divide by zero
            total_prob = legal_probs.sum()
        return legal_probs / total_prob  # Normalize

In [46]:
board = chess.Board()
print(list(board.legal_moves))

[Move.from_uci('g1h3'), Move.from_uci('g1f3'), Move.from_uci('b1c3'), Move.from_uci('b1a3'), Move.from_uci('h2h3'), Move.from_uci('g2g3'), Move.from_uci('f2f3'), Move.from_uci('e2e3'), Move.from_uci('d2d3'), Move.from_uci('c2c3'), Move.from_uci('b2b3'), Move.from_uci('a2a3'), Move.from_uci('h2h4'), Move.from_uci('g2g4'), Move.from_uci('f2f4'), Move.from_uci('e2e4'), Move.from_uci('d2d4'), Move.from_uci('c2c4'), Move.from_uci('b2b4'), Move.from_uci('a2a4')]


In [47]:
def play_game_with_mcts(model, mcts, encode_fn):
    board = chess.Board()
    game_data = []

    while not board.is_game_over():
        if board.turn == chess.WHITE:
            move = mcts.search(board, encode_fn)
        else:
            move = chess.engine.SimpleEngine.popen_uci(STOCKFISH_PATH).play(board, chess.engine.Limit(time=0.1)).move

        if move is None:
            print("Warning: move is None.")
            break

        game_data.append((board.fen(), move))
        board.push(move)

    return game_data, board.result()

In [48]:
def result_to_value(result, side='white'):
    if result == '1-0': return 1 if side == 'white' else -1
    if result == '0-1': return -1 if side == 'white' else 1
    return 0


In [49]:
def train_model(model, optimizer, data, encode_fn):
    model.train()
    for fen, move, reward in data:
        input_tensor = encode_fn(fen).to(DEVICE).unsqueeze(0)
        move_index = move_to_index(move)
        target_policy = torch.zeros(1, POLICY_DIM).to(DEVICE)
        target_policy[0, move_index] = 1.0
        target_value = torch.tensor([[reward]], dtype=torch.float).to(DEVICE)

        policy_logits, value = model(input_tensor)
        loss_policy = F.cross_entropy(policy_logits, target_policy)
        loss_value = F.mse_loss(value, target_value)

        loss = loss_policy + loss_value
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

In [None]:
import chess

# Initialize the board
board = chess.Board()

# Check for legal moves
legal_moves = list(board.legal_moves)
print("Legal moves:", legal_moves)

# Attempt to apply a move and check if it is legal
move = chess.Move.from_uci('d2d4')
if move in legal_moves:
    board.push(move)
else:
    print("Illegal move:", move)


In [50]:
def self_play_training_loop(evaluation_fn, encode_fn, num_games=NUM_GAMES):
    model = PolicyValueNet(INPUT_DIM, POLICY_DIM).to(DEVICE)
    optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
    mcts = MCTS(model)

    for game_num in range(num_games):
        print(f"[Game {game_num + 1}/{num_games}] Playing...")
        game_data, result = play_game_with_mcts(model, mcts, encode_fn)
        reward = result_to_value(result)
        training_data = [(fen, move, reward) for fen, move in game_data]
        train_model(model, optimizer, training_data, encode_fn)
        print(f"Game result: {result} | Reward: {reward}")

    return model

In [51]:
# Run self-play training loop
trained_model = self_play_training_loop(evaluation, encode_fen, num_games=100)

[Game 1/100] Playing...


AssertionError: push() expects move to be pseudo-legal, but got d2d4 in rnbqkbnr/pppppppp/8/8/3P4/8/PPP1PPPP/RNBQKBNR

In [16]:
# Save trained model
torch.save(trained_model.state_dict(), "self_trained_bot_with_mcts.pth")