In [46]:
import chess.pgn
import io
from typing import List
from chess.pgn import Game
import numpy as np
import torch.nn as nn
import torch.nn.functional as F

In [2]:
def load_first_chunk_of_pgn(file_path, chunk_size) -> List[Game]:
    """
    Loads the first chunk of games from a PGN file.
    
    :param file_path: Path to the PGN file.
    :param chunk_size: Number of games to load in the chunk.
    :return: List of games in the first chunk.
    """
    games: List[Game] = []
    with open(file_path) as pgn_file:
        for _ in range(chunk_size):
            try:
                game = chess.pgn.read_game(pgn_file)
                if game is None:  # End of file
                    break
                games.append(game)
            except Exception as e:
                print(f"Error reading game: {e}")
                continue
    return games

In [4]:
# Define your file path and chunk size
pgn_file_path = "../dataset/lichess_db_standard_rated_2016-05.pgn"
chunk_size = 100  # for example, load the first 100 games

# Load the first chunk
first_chunk = load_first_chunk_of_pgn(pgn_file_path, chunk_size)
print(f"Loaded {len(first_chunk)} games in the first chunk")


Loaded 100 games in the first chunk


In [5]:
game = first_chunk[0]

In [12]:
vars(game.headers)

{'_tag_roster': {'Event': 'Rated Bullet tournament https://lichess.org/tournament/IaRkDsvp',
  'Site': 'https://lichess.org/r0cYFhsy',
  'Date': '????.??.??',
  'Round': '?',
  'White': 'GreatGig',
  'Black': 'hackattack',
  'Result': '0-1'},
 '_others': {'UTCDate': '2016.04.30',
  'UTCTime': '22:00:03',
  'WhiteElo': '1777',
  'BlackElo': '1809',
  'WhiteRatingDiff': '-11',
  'BlackRatingDiff': '+11',
  'ECO': 'B01',
  'Opening': 'Scandinavian Defense: Mieses-Kotroc Variation',
  'TimeControl': '60+0',
  'Termination': 'Time forfeit'}}

In [6]:
moves = [move.uci() for move in game.mainline_moves()]
print("Moveas:", " ".join(moves))


Moves: e2e4 d7d5 e4d5 d8d5 b1c3 d5d8 d2d4 g8f6 g1f3 c8g4 h2h3 g4f3 g2f3 c7c6 f1g2 b8d7 c1e3 e7e6 d1d2 f6d5 c3d5 c6d5 e1c1 f8e7 c2c3 d8c7 c1b1 e8c8 f3f4 c8b8 h1g1 b8a8 g2h1 g7g6 h3h4 e7h4 f2f3 h4e7 d2c2 d7f6 h1g2 f6h5 g2h3 h5f4 e3f4 c7f4 d1f1 f4d6 g1g4 d8f8 f1g1 f7f5 g4g2 e7f6 g2g3 f8g8 h3f1 g8g7 f1d3 h8g8 c2h2 d6b8 h2g2 b8c8 f3f4 c8c6 g2f2 f6h4 g3g6 h4f2 g6g7 g8g7 g1g7 a7a6 g7g8 a8a7 g8h8 c6d7 h8h7 d7h7


In [7]:
game.headers["Result"]

'0-1'

In [135]:
moves = [move.uci() for move in game.mainline_moves()]
print("Moveas:", " ".join(moves))

Moveas: d2d4 g8f6 c1f4 d7d6 e2e3 g7g6 f1d3 f8g7 h2h4 e8g8 h4h5 b8c6 h5g6 f7g6 f4h6 f6g4 h6g7 g8g7 g1f3 h7h5 c2c3 e7e5 d1b3 c6a5 b3a4 c7c6 b1d2 b7b5 a4c2 d8f6 h1f1 e5d4 d2e4 f6e7 c3d4 c8f5 f3g5 a8e8 e1c1 a5c4 d3c4 b5c4 c2c4 f5e4 g5f3 e4f3 g2f3 d6d5 c4d3 g4f6 f1g1 e7f7 g1g5 f6h7 g5g2 f7f5 d3d2 e8c8 d1g1 f8f6 f3f4 c6c5 c1d1 c5d4 d2d4 g7f7 d4a7 f7f8 a7a3 f8g7 d1e2 c8c2 e2f3 f5e4 f3g3 g6g5 g3h2 g5g4 a3b3 e4f3 g2g3 c2f2 g1g2 f2g2 g3g2 f3e3 b3d5 e3f4 h2g1 f6d6 d5b3 d6c6 b3b7 c6c7


In building the supervised learning part of the model, we'll focus on training a network to predict the next move in a given chess position.

We will therefore process the PGN dataset to extract board positions and corresponding moves. This will involve converting chessboard positions into a suitable numerical format for the CNN.

We will design a CNN that takes the board position as input. The output layer should predict the probability of each possible move. Since there are a limited number of legal moves in any given position, this becomes a multi-class classification problem.

### The Input Layer

The input layer should reflect the board layout (8x8 squares with channels representing different pieces).

### The Output Layer

The output should represent all possible moves. In chess, a common approach is to have a fixed-size array where each entry corresponds to a potential move.

Our first mission is to construct the input and output tensor:
X --> Y

Where X, the input is the board layout at every single game state and Y is the corresponding next move (output).

This is an example of the data for a given game:

```Moves: e2e4 d7d5 e4d5 d8d5 b1c3 d5d8 d2d4 g8f6 g1f3 c8g4 h2h3 g4f3 g2f3 c7c6 f1g2 b8d7 c1e3 e7e6 d1d2 f6d5 c3d5 c6d5 e1c1 f8e7 c2c3 d8c7 c1b1 e8c8 f3f4 c8b8 h1g1 b8a8 g2h1 g7g6 h3h4 e7h4 f2f3 h4e7 d2c2 d7f6 h1g2 f6h5 g2h3 h5f4 e3f4 c7f4 d1f1 f4d6 g1g4 d8f8 f1g1 f7f5 g4g2 e7f6 g2g3 f8g8 h3f1 g8g7 f1d3 h8g8 c2h2 d6b8 h2g2 b8c8 f3f4 c8c6 g2f2 f6h4 g3g6 h4f2 g6g7 g8g7 g1g7 a7a6 g7g8 a8a7 g8h8 c6d7 h8h7 d7h7```

To construct the input we will have to recreate the board layout for every single game state.

### X Tensor (Board State Representation)
The X tensor represents the state of the chess board.

X will be a 3-dimensional tensor, 2-dimensions representing the layout (8x8) and a third dimension represnting the type of chess piece discriminated by color. The tensor can be represented as follows:

$X \in \mathbb{R}^{8 \times 8 \times 12}$

Each element $X_{i,j,k}$ of this tensor can be defined as:

$$\ X_{i,j,k} = 
   \begin{cases} 
   1 & \text{if piece type } k \text{ is present at position } (i, j) \\
   0 & \text{otherwise}
   \end{cases}
\$$

So in essence we're dealing with a 3-dimensional tensor with binary states.

### Y Tensor (Next Move Representation)

There are 64 squares on a chessboard, so there are 64 possible starting points and 64 possible ending points for each move, leading to 64×64=4096 possible moves (including illegal ones, which the model should learn to never predict).

The Y tensor represents the next move for each given game state. We encode each move as a one-hot vector of length 64x64 (representing all possible source and destination squares), the tensor can be represented as:

$Y \in \{0, 1\}^{4096}$

Each element $Y_{l}$ of this tensor, where $l$ corresponds to a combination of source and destination squares, can be defined as:

$$Y_{l} = 
   \begin{cases} 
   1 & \text{if the move corresponds to the index } l \\
   0 & \text{otherwise}
   \end{cases}
\$$

In this representation, the index $l$ is calculated based on the source square and the destination square of the move. For instance, if you flatten the 8x8 board into a 64-element array, then a move from square $a$ to square $b$ would correspond to an index $l = 64 \times a + b$.

### Summary
- **X Tensor**: Represents the board state with a 3D tensor where the dimensions are board height, board width, and number of piece types.
- **Y Tensor**: Represents the move as a one-hot encoded vector in a flattened 2D space of source and destination squares.

In [128]:
import torch
import chess
from torch.utils.data import Dataset

class ChessDataset(Dataset):
    def __init__(self, games):
        self.positions = np.ndarray
        self.moves = np.ndarray
        self.process_games(games)

    def process_games(self, games: list[list[str]]) -> None:
        pos = []
        movs = []
        
        for game in games:
            board = chess.Board()
            for move_san in game:
                try:
                    move = board.parse_san(move_san)
                    if board.is_legal(move):
                        # Add current board state to positions
                        pos.append(self.board_to_tensor(board))
                        # Make move and add to moves
                        movs.append(self.move_to_tensor(move))
                        board.push(move)
                except ValueError:
                    # Handle errors or illegal moves
                    break
        
        self.positions = np.array(pos)
        self.moves = np.array(movs)
                    
    def board_to_tensor(self, board: chess.Board) -> np.ndarray:
        # Initialize the tensor
        tensor = np.zeros((8, 8, 12))

        piece_idx = {'P': 0, 'N': 1, 'B': 2, 'R': 3, 'Q': 4, 'K': 5,
                     'p': 6, 'n': 7, 'b': 8, 'r': 9, 'q': 10, 'k': 11}

        for square in chess.SQUARES:
            piece = board.piece_at(square)
            if piece:
                # Get the piece type and color
                piece_type = piece.symbol()
                row, col = divmod(square, 8)
                tensor[row, col, piece_idx[piece_type]] = 1

        return tensor

    def move_to_tensor(_self, move: chess.Move) -> np.ndarray:
        # Initialize a one-hot encoded vector of length 4096
        one_hot_move = np.zeros(64 * 64, dtype=np.float32)
    
        # Calculate the index for the one-hot vector
        source = move.from_square
        destination = move.to_square
        # We multiply the source by 64, given that for each source there are 64 possible moves
        # (the tensor dimensions assumes that a move can have the same source and destination - though
        # the network will learn that those moves are illegal)
        index = source * 64 + destination
    
        # Set the appropriate position to 1
        one_hot_move[index] = 1
    
        return one_hot_move

    def __len__(self):
        return len(self.positions)

    def __getitem__(self, idx):
        return self.positions[idx], self.moves[idx]

In [129]:
# Preprocessing
game_moves = []

for game in first_chunk:
    moves = [move.uci() for move in game.mainline_moves()]

    game_moves.append(moves)

In [130]:
dataset = ChessDataset(game_moves)

In [131]:
# Checksum
for game_moves in dataset.moves:
    assert(sum(game_moves) == 1.0)

In [132]:
dataset.positions[0].shape

(8, 8, 12)

In [133]:
## Checking the positions input dataset
for game_layout in dataset.positions:
    # Ensure the game is a numpy array and then transpose
    game_layout_array = np.array(game_layout)
    game_layout_transposed = np.transpose(game_layout_array, (2, 0, 1))  # Reorder to (12, 8, 8)
    
    for piece_type_coordinates in game_layout_transposed:
        print(piece_type_coordinates)
    break

[[0. 0. 0. 0. 0. 0. 0. 0.]
 [1. 1. 1. 1. 1. 1. 1. 1.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]]
[[0. 1. 0. 0. 0. 0. 1. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]]
[[0. 0. 1. 0. 0. 1. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]]
[[1. 0. 0. 0. 0. 0. 0. 1.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]]
[[0. 0. 0. 1. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0

In [None]:
class ChessCNN(nn.Module):
    def __init__(self):
        # x = self.conv1(x) # First conv layer
        # x = F# .relu(x)
        # x = self.conv2(x) # Second conv layer
        # x = F.relu(x)
        # x = x.view(x.size(0), -1) # Flatten the tensor for the dense layer
        # x = self.fc(x) # Dense layer
        
        super(ChessCNN, self).__init__()
        
        self.conv1 = nn.Conv2d(12, 64, kernel_size=2, padding=0)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(128 * 4 * 4, 1024)
        self.fc2 = nn.Linear(1024, 4096)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = self.pool(F.relu(self.conv2(x)))
        x = x.view(-1, 128 * 4 * 4)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=1)


In [136]:
# We do not add padding. This approach respects the inherent structure of the
# chessboard and avoids introducing artificial concepts that are not present in
# the actual game. It allows the model to focus on learning meaningful spatial
# relationships and patterns that are genuinely indicative of chess strategies.

# Since we do not add padding, our spatial dimensions will be squashed to
# a 7x7 versus the original 8x8 layout
conv_layer_1 = nn.Conv2d(12, 64, kernel_size=2, padding=0)

In [51]:
# We then apply a non-linearity which does not change the dimensions of our 7x7x64 tensor
relu1 = nn.ReLU()

In [52]:
# In the second layer we have a bigger kernel size to capture higher-level patterns
# and add padding of 1 to keep the spatial resolution from being further decreased
conv_layer_2 = nn.Conv2d(64, 124, kernel_size = 3, padding=1)

In [53]:
# We then apply a non-linearity which does not change the timensions of our 7x7x124 tensor
relu2 = nn.ReLU()

In [163]:
# We now move to the fully connected layer. We do not apply a pooling because there is no
# need for downsampling as our tensors are very reasonable in size
# Our tensor currently has the following shape: 7x7x124

# So we will need to flatten the tensor into one dimensional tensor with a size of: 7 * 7 * 124 = 6076
# This hidden layer will output 2048 neurons. We will afterwards test this number to balance it against under and overfitting
fc1 = nn.Linear(7 * 7 * 124, 2048)

In [55]:
relu3 = nn.ReLU()

In [56]:
fc2 = nn.Linear(2048, 4096)

In [117]:
# We then apply the sofmax activation function to convert the output to a probability distribution over predicted output classes.
#F.softmax(x, dim=1)

In [137]:
dataset.positions.shape

(7202, 8, 8, 12)

To run your input dataset through the first convolutional layer in PyTorch, we need to make sure that the input tensor is in the correct shape expected by `nn.Conv2d`. The `nn.Conv2d` layer expects the input tensor to have the shape \([N, C, H, W]\), where:

- \( N \) is the batch size,
- \( C \) is the number of channels,
- \( H \) is the height of the image (or in your case, the chessboard),
- \( W \) is the width of the image (or the chessboard).

Given your dataset shape \((7202, 8, 8, 12)\), it seems you have the channels as the last dimension, but they should be the second dimension for `nn.Conv2d`. You will need to rearrange the dimensions of your input tensor to \((N, C, H, W)\), which in your case would be \((7202, 12, 8, 8)\).

In [138]:
# Assuming `data` is your input dataset
data = np.transpose(dataset.positions, (0, 3, 1, 2)) 

In [139]:
data.shape

(7202, 12, 8, 8)

In [141]:
data_tensor = torch.tensor(data, dtype=torch.float)  # Ensure the correct data type (float)

In [142]:
with torch.no_grad():
    # Assuming `data` is a torch tensor and is in the shape (N, C, H, W)
    output = conv_layer_1(data_tensor)

In [144]:
output.shape

torch.Size([7202, 64, 7, 7])

In [147]:
output2 = F.relu(output)

In [149]:
output2.shape

torch.Size([7202, 64, 7, 7])

In [150]:
with torch.no_grad():
    # Assuming `data` is a torch tensor and is in the shape (N, C, H, W)
    output3 = conv_layer_2(output2)

In [151]:
output3.shape

torch.Size([7202, 124, 7, 7])

In [152]:
output4 = F.relu(output3)

In [153]:
output4.shape

torch.Size([7202, 124, 7, 7])

In [161]:
# We now flatten the layer from (124, 7, 7) to: (7 * 7 * 124 = 6076)
output5 = output4.view(7202, -1)  # -1 tells PyTorch to infer the correct size

# Now, output5 has the shape [7202, 124*7*7]
print(output5.shape)

torch.Size([7202, 6076])


In [164]:
with torch.no_grad():
    output6 = fc1(output5)

In [166]:
output6.shape

torch.Size([7202, 2048])

In [167]:
output7 = F.relu(output6)

In [168]:
output7.shape

torch.Size([7202, 2048])

In [169]:
with torch.no_grad():
    output8 = fc2(output7)

In [171]:
output8.shape

torch.Size([7202, 4096])

In [172]:
# We then apply the sofmax activation function to convert the output to a probability distribution over predicted output classes.
output9 = F.softmax(output8, dim=1)

In [174]:
output9.shape

torch.Size([7202, 4096])