In [2]:
import os
import numpy as np # type: ignore
import time
import torch
import torch.nn as nn # type: ignore
import torch.optim as optim # type: ignore
from torch.utils.data import DataLoader # type: ignore
from chess import pgn # type: ignore
from tqdm import tqdm # type: ignore
import chess



In [3]:
def load_pgn(file_path):
    games = []
    with open(file_path, 'r') as pgn_file:
        i=0
        while i<100:
            game = pgn.read_game(pgn_file)
            if game is None:
                break
            games.append(game)
            i+=1
    return games
print(12)
files = [file for file in os.listdir("data/pgn") if file.endswith(".pgn")]
LIMIT_OF_FILES = min(len(files), 28)
games = []
i = 1
for file in tqdm(files):
    print("hey")
    games.extend(load_pgn(f"data/pgn/{file}"))
    if i >= LIMIT_OF_FILES:
        break
    i += 1



print(f"GAMES PARSED: {len(games)}")



12


  0%|          | 0/1 [00:00<?, ?it/s]

hey


  0%|          | 0/1 [00:00<?, ?it/s]

GAMES PARSED: 100





In [4]:
import numpy as np
from chess import Board, SQUARES
piece_map = {
        'p': 0, 'n': 1, 'b': 2, 'r': 3, 'q': 4, 'k': 5,
        'P': 6, 'N': 7, 'B': 8, 'R': 9, 'Q': 10, 'K': 11,
    }

def board_to_matrix(board:Board):
    #he adds 13the board for legal moves?
    matrix = np.zeros((8, 8, 13), dtype=np.int8)
    for square in SQUARES:
        piece = board.piece_at(square)
        if piece:
            row, col = divmod(square, 8)
            matrix[7 - row, col, piece_map[piece.symbol()]] = 1
    return matrix

def create_input_for_nn(games):
    X = []
    y = []
    for game in games:
        board = game.board()
        for move in game.mainline_moves():
            X.append(board_to_matrix(board))
            y.append(move.uci())
            board.push(move)
    return np.array(X, dtype=np.float32), np.array(y)


def encode_moves(moves):
    move_to_int = {move: idx for idx, move in enumerate(set(moves))}
    print(move_to_int)
    return np.array([move_to_int[move] for move in moves], dtype=np.float32), move_to_int



In [5]:
X, y = create_input_for_nn(games)

In [6]:

# from auxillary_func import create_input_for_nn, encode_moves



print(f"NUMBER OF SAMPLES: {len(y)}")   
X = X[0:2500000]
y = y[0:2500000]
y, move_to_int = encode_moves(y)
num_classes = len(move_to_int)
print(move_to_int)


X = torch.tensor(X, dtype=torch.float32)
y = torch.tensor(y, dtype=torch.long)

X = X.reshape((9615,13,8,8))


NUMBER OF SAMPLES: 9615
{np.str_('a4d4'): 0, np.str_('d5a8'): 1, np.str_('g6f5'): 2, np.str_('d2d3'): 3, np.str_('a7e7'): 4, np.str_('g1d4'): 5, np.str_('e1b1'): 6, np.str_('g7c3'): 7, np.str_('c6f6'): 8, np.str_('d4c3'): 9, np.str_('g4e5'): 10, np.str_('c4b6'): 11, np.str_('b3d4'): 12, np.str_('a1g1'): 13, np.str_('b6b8'): 14, np.str_('e3e7'): 15, np.str_('g8f7'): 16, np.str_('a5a6'): 17, np.str_('g6e5'): 18, np.str_('d5e6'): 19, np.str_('e7e5'): 20, np.str_('f5d6'): 21, np.str_('c7d6'): 22, np.str_('h1e1'): 23, np.str_('e1d3'): 24, np.str_('h2g2'): 25, np.str_('e6e3'): 26, np.str_('d7d4'): 27, np.str_('d6c8'): 28, np.str_('c8c3'): 29, np.str_('e6c7'): 30, np.str_('e7c5'): 31, np.str_('a4a1'): 32, np.str_('b4b6'): 33, np.str_('a7d7'): 34, np.str_('h8c8'): 35, np.str_('a1g7'): 36, np.str_('d2c1'): 37, np.str_('a6a7'): 38, np.str_('e3b3'): 39, np.str_('b1f5'): 40, np.str_('f5g4'): 41, np.str_('b4b3'): 42, np.str_('c5c4'): 43, np.str_('e4e5'): 44, np.str_('e5g3'): 45, np.str_('h7g8'): 46

In [None]:
import torch.nn as nn


class ChessModel(nn.Module):
    def __init__(self, num_classes):
        super(ChessModel, self).__init__()
        # conv1 -> relu -> conv2 -> relu -> flatten -> fc1 -> relu -> fc2
        self.conv1 = nn.Conv2d(13, 64, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(8 * 8 * 128, 256)
        self.fc2 = nn.Linear(256, num_classes)
        self.relu = nn.ReLU()

        # Initialize weights
        nn.init.kaiming_uniform_(self.conv1.weight, nonlinearity='relu')
        nn.init.kaiming_uniform_(self.conv2.weight, nonlinearity='relu')
        nn.init.xavier_uniform_(self.fc1.weight)
        nn.init.xavier_uniform_(self.fc2.weight)

    def forward(self, x):
        x = self.relu(self.conv1(x))
        x = self.relu(self.conv2(x))
        x = self.flatten(x)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)  # Output raw logits
        return x

In [None]:
from dataset import ChessDataset
# from model import ChessModel
print(X.shape, y.shape,"r")
print(X)
dataset = ChessDataset(X, y)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True)

# Check for GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'Using device: {device}')

# Model Initialization
model = ChessModel(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.0001)
 
num_epochs = 50
for epoch in range(num_epochs):
    start_time = time.time()
    model.train()
    running_loss = 0.0
    for inputs, labels in tqdm(dataloader):
        inputs, labels = inputs.to(device), labels.to(device)  # Move data to GPU
        optimizer.zero_grad()
        print(inputs.shape,12)
        outputs = model(inputs)  # Raw logits

        # Compute loss
        loss = criterion(outputs, labels)
        loss.backward()
        
        # Gradient clipping
        torch.nn.utils.clip_grad_norm_(model.parameters(), max_norm=1.0)
        
        optimizer.step()
        running_loss += loss.item()
    end_time = time.time()
    epoch_time = end_time - start_time
    minutes: int = int(epoch_time // 60)
    seconds: int = int(epoch_time) - minutes * 60
    print(f'Epoch {epoch + 1 + 50}/{num_epochs + 1 + 50}, Loss: {running_loss / len(dataloader):.4f}, Time: {minutes}m{seconds}s')




In [None]:

# Save the model
torch.save(model.state_dict(), "models/TORCH_100EPOCHS.pth")

import pickle

with open("models/heavy_move_to_int", "wb") as file:
    pickle.dump(move_to_int, file)      