In [None]:
def train_multiple_folds(n_epochs, n_folds, batch_size, splits, writer, model_class, optimizer_class, optimizer_params, loss_fn):
    """Trains and validates the model on multiple folds"""

    # Initializing best validation loss of the model
    best_vloss = 1_000

    # Initializing losses and accuracies for training and validation to save them on TensorBoard
    epochs_tloss = [0 for _ in range(n_epochs)]
    epochs_tacc = [0 for _ in range(n_epochs)]
    epochs_vloss = [0 for _ in range(n_epochs)]
    epochs_vacc = [0 for _ in range(n_epochs)]
    
    # Looping through all folds for cross validation 
    for fold, (train_idx, val_idx) in enumerate(splits.split(np.arange(len(train_dataset)))):

        print(f"FOLD {fold+1}")

        # Getting the sampler for training and validation
        train_sampler = SubsetRandomSampler(train_idx)
        val_sampler = SubsetRandomSampler(val_idx)
        # Loaders for training and validation
        train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)
        val_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=val_sampler)

        # Resetting the model each fold
        model = model_class()
        model.to(device)

        # Resetting the optimizer and its parameters each fold
        optimizer = optimizer_class(model.parameters(), **optimizer_params)
        
        # Getting the sample sizes of training and validation on this fold
        train_sampler_size = len(train_sampler)
        val_sampler_size = len(val_sampler)

        # Looping through all epochs
        for epoch in range(n_epochs): 
            # Training the model one epoch
            train_loss, train_acc = train_epoch(model, optimizer, train_loader, loss_fn, train_sampler_size)
            # Validating the model on one epoch
            val_loss, val_acc = validation_epoch(model, val_loader, loss_fn, val_sampler_size)

            # Adding losses and accuracies for insights 
            epochs_tloss[epoch] += train_loss
            epochs_tacc[epoch] += train_acc
            epochs_vloss[epoch] += val_loss
            epochs_vacc[epoch] += val_acc

            # Printing insights
            print(f"Epoch: {epoch + 1} Train Loss: {train_loss}, Valid Loss: {val_loss} |\
                   Train Acc: {train_acc}, Valid Acc: {val_acc}")

            # Saving the model if the loss on the validation is lower than the best one
            if val_loss < best_vloss:
                best_vloss = val_loss
                model_path = f"models/piece_to_move_net_{timestamp}_{fold+1}_{epoch+1}"
                torch.save(model.state_dict(), model_path)
    
    # Averaging losses and accuracies on each fold
    for i in range(n_epochs):
        epochs_tloss[i] /= (n_folds)
        epochs_tacc[i] /= (n_folds)
        epochs_vloss[i] /= (n_folds)
        epochs_vacc[i] /= (n_folds)

    # Saving losses and accuracies on TensorBoard
    for i in range(n_epochs):
        # Adding insights
        writer.add_scalars("Loss", {"Training": epochs_tloss[i], "Validation": epochs_vloss[i]}, i + 1)
        writer.add_scalars("Accuracy", {"Training": epochs_tacc[i], "Validation": epochs_vacc[i]}, i + 1)
        writer.flush()

In [None]:
EPOCHS = 25      # Number of epochs
BATCH_SIZE = 32  # Number of batches
K = 3            # Number of folds

# Logs training statistics for TensorBoard visualization
writer = SummaryWriter(f"runs/piece_to_move_{timestamp}")  

# Obtaining folds for cross validaiton
splits = KFold(n_splits=K, shuffle=True, random_state=42)

# Initializing optimizer parameters to pass during training
optimizer_class = optim.Adam
optimizer_params = {
    "lr": 1e-4,
    "weight_decay": 1e-5
}
# Initializing the model type
model_class = PieceToMoveNet

# Initializing the loss function
# Cross entropy loss used since it is a classification
loss_fn = torch.nn.CrossEntropyLoss()  

# Training the model with cross validation on multiple epochs
# train_multiple_folds(EPOCHS, K, BATCH_SIZE, splits, writer, model_class, optimizer_class, optimizer_params, loss_fn)
train_multiple_folds(EPOCHS, K, BATCH_SIZE, splits, writer, SquareToMoveToNet, optimizer_class, optimizer_params, loss_fn)


In [32]:
import chess
import torch
import random

# Function to create a sample board
def create_board_with_one_white_piece():
    # Create an empty board
    board = chess.Board(None)  # None initializes an empty board
    
    # Define the piece type (a white pawn in this case)
    piece = chess.Piece(chess.PAWN, chess.WHITE)

    # Randomly select a square from 0 to 63 (representing the board)
    while True:
        square = random.randint(0, 63)
        if board.piece_at(square) is None:  # Check if the square is empty
            board.set_piece_at(square, piece)
            break  # Exit loop once piece is placed

    return board

# Function to convert the chess board to a tensor
def board_to_tensor(board) -> torch.Tensor:
    """Returns a 14x8x8 tensor with ones where each piece is"""
    tensor = torch.zeros((14, 8, 8))

    PIECE_MAP = {chess.PAWN: 0, chess.KNIGHT: 1, chess.BISHOP: 2,
                 chess.ROOK: 3, chess.QUEEN: 4, chess.KING: 5}

    for square in chess.SQUARES:
        piece = board.piece_at(square)
        if piece:
            row, col = divmod(square, 8)
            piece_idx = PIECE_MAP[piece.piece_type]
            color_offset = 0 if piece.color == chess.WHITE else 6
            tensor[piece_idx + color_offset, row, col] = 1

    # Set current turn (layer 12 for white's turn, 13 for black's turn)
    if board.turn == chess.WHITE:
        tensor[12] = 1  # White's turn
    else:
        tensor[13] = 1  # Black's turn

    return tensor

# Create a sample board and convert it to a tensor
board = create_board_with_one_white_piece()
tensor2 = board_to_tensor(board)

# Run the mask generation function
positions = generate_mask(tensor2)

# Output the results
print("Positions of movable pieces:", positions)


In [None]:

EPOCHS = 10
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
writer = SummaryWriter(f"runs/piece_to_move_{timestamp}")

train_data, validation_data = torch.utils.data.random_split(train_data, [1-TEST_PERCENT, TEST_PERCENT])

training_loader = torch.utils.data.dataLoader(train_data, BATCH_SIZE, shuffle=True, pin_memory=True)
validation_loader = torch.utils.data.dataLoader(validation_data, BATCH_SIZE, shuffle=True, pin_memory=True)

def train_one_epoch(epoch_index: int, tb_writer, optimizer, training_loader, loss_fn): 
    running_loss = 0.
    last_loss = 0.

    for i, data in enumerate(training_loader):
        inputs = data[0]
        labels = data[1]

        optimizer.zero_grad()
        outputs = piece_to_move_net(inputs)

               
        loss = loss_fn(outputs, labels)
        loss.backward()

        optimizer.step()

        running_loss += loss.item()
        if i % 1000 == 999: 
            last_loss = running_loss / 1000
            print(f" batch {i + 1}, loss: {last_loss}")
            tb_x = epoch_index * len(training_loader) + i + 1
            tb_writer.add_scalar("Loss/train", last_loss, tb_x)
            running_loss = 0.

    return last_loss


def train_multiple_epochs(n_epochs, model, writer, validation_loader, loss_fn):
    for epoch in range(EPOCHS): 
        print(f"EPOCH {epoch + 1}: ")

        model.train(True)
        avg_loss = train_one_epoch(epoch, writer)

        model.train(False)
        running_vloss = 0.0
        for i, v_data in enumerate(validation_loader):
            vinputs = v_data[0]
            vlabels = v_data[1]

            voutputs = model(vinputs)
            vloss = loss_fn(voutputs, vlabels)
            running_vloss += vloss

        avg_vloss = running_vloss / (i + 1)
        print(f"LOSS train {avg_loss}, valid {avg_vloss}")

        writer.add_scalars("Training vs Validation Loss", {
            "Training": avg_loss, "Validation": avg_vloss}, 
            epoch + 1)
        writer.flush()
        
        if avg_vloss < best_vloss:
            best_vloss = avg_loss
            model_path = f"model_{timestamp}_{epoch}"
            torch.save(model.state_dict(), model_path)

        epoch += 1


In [16]:
import torch
from torch.masked import masked_tensor
import aux_functions
import importlib

importlib.reload(aux_functions)
from aux_functions import *



def generadef generate_mask(tensor)-> list:
    """Generates a mask which contains the position of the pieces that can move"""

    # Initiate a list which will contain the position of the pieces that can move
    positions = []

    # If layer 12 has any 1 it will be whites turn
    if  torch.any(tensor[12] == 1):
        # White pieces are in layers 6 to 11, apply a mask which will be 1 when there is a one
        mask = tensor[0:6] == 1
        print("Whites turn")


    # If layer 13 has any 1 it will be blacks turn
    elif torch.any(tensor[13] == 1):
        # Black pieces are in range 0 to 6
        # Apply a mask, if there is a piece it will be a 1
        mask = tensor[6:12] == 1
        print("Blacks turn")
        
    
    # Sparse tensor to obtain index of the non zero elements
    sparse_coo_mt = mask.to_sparse_coo()

    
    #print(f"Number of movable pieces: {sparse_coo_mt.indices().size(1)}")

    # Access the index of the different elements
    for i in range(sparse_coo_mt.indices().size(1)): # Size 1 is for the number of columns
        # Obtain row and column
        layer, row, col = sparse_coo_mt.indices()[:,i].tolist() # Despite not using layers, we still have to store the value for dimensions problems
        #print(f"Piece at layer {layer}, row {row}, column {col}")  
        
        # Obtain the position in a 64 board list
        position = row*8 + col
        #print(f"Corresponding board position: {position}")
        # Add this position to the final list
        positions.append(position)

    # Padding the tensor to have size 16
    while len(positions) < 16:
        positions.append(-1) # Padding with values -1
    
    return positions


In [None]:
def generate_mask(tensor, model, board: chess.Board) -> list:
    """Generates a mask with only legal moves and pieces 
    to move for the current position"""
    piece_mask = torch.zeros(8,8, dtype = torch.int) # Creates board for the pieces that have legla moves
    
    # Generates a mask with ones on the pieces than can be moved
    if isinstance(model, PieceToMoveNet):
        # If layer 12 has any 1 it will be whites turn
        if  torch.any(tensor[12] == 1):
            # White pieces are in layers 0 to 5, summing across those layers
            mask = torch.sum(tensor[0:6], dim = 0).int()

        # If layer 13 has any 1 it will be blacks turn
        else:
            # Black pieces are in layers 6 to 11, summing across those layers
            mask = torch.sum(tensor[6:12], dim = 0).int()

    # Generates a mask with ones on squares where a piece can be moved to
    elif isinstance(model, SquareToMoveToNet):
        # If layer 12 has any 1 it will be whites turn
        if  torch.any(tensor[12] == 1):
            # Possible white movements are on layer 12 
            mask = tensor[12]

        # If layer 13 has any 1 it will be blacks turn
        else:
            # Possible black movements are on layer 13
            mask = tensor[13]

    # Returning the masking in a list format
    return mask.flatten().tolist()


In [10]:
data = torch.arange(24).reshape(2, 3, 4)
mask = data % 2 == 0
mt = masked_tensor(data.float(), mask)

masked_tensor.get_da



In [None]:
def legal_moves_masking(board: chess.Board) -> torch.Tensor:
    """Generates a mask of ones for the the pieces that can move"""

    mask = torch.zeros(8,8, dtype = torch.int) # Generates a matrix with those values

    for move in chess.legal_moves: # Check the possible moves
        from_square = move.from_square # The square where the piece is located before making the move
        row, col = divmod(mask, 8) # Obtain the position of the original square
        mask[row, col] = 1

    return mask