In [63]:
%%capture

import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

import chess

import torch
torch.set_default_dtype(torch.float32)

In [64]:
def fen_str_to_tensor(fen):
    # Define a mapping from pieces to integers
    piece_to_int = {
        'P': 1, 'N': 2, 'B': 3, 'R': 4, 'Q': 5, 'K': 6,
        'p': -1, 'n': -2, 'b': -3, 'r': -4, 'q': -5, 'k': -6,
        '.': 0
    }

    # Split the FEN string into parts ## 'rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1'
    parts = fen.split(' ')
    ranks = parts[0].split('/') # Only process the board position (the first part)

    # Convert the ranks to a list of integers
    board = []
    for rank in ranks:
        for char in rank:
            if char.isdigit():
                # If the character is a digit, add that many zeros to the board
                board.extend([0] * int(char))
            else:
                # Otherwise, add the integer representation of the piece to the board
                board.append(piece_to_int[char])

    # Convert the board to a tensor
    board_tensor = torch.tensor(board, dtype=torch.float32).reshape(8,8)

    return board_tensor


fen = 'rnbqkbnr/pppppppp/8/8/8/8/PPPPPPPP/RNBQKBNR w KQkq - 0 1'
print(fen_str_to_tensor(fen))

tensor([[-4., -2., -3., -5., -6., -3., -2., -4.],
        [-1., -1., -1., -1., -1., -1., -1., -1.],
        [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
        [ 0.,  0.,  0.,  0.,  0.,  0.,  0.,  0.],
        [ 1.,  1.,  1.,  1.,  1.,  1.,  1.,  1.],
        [ 4.,  2.,  3.,  5.,  6.,  3.,  2.,  4.]])


In [65]:
df = pd.read_csv('Chess_Jan_aa.csv')

In [66]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

class EvalNet(nn.Module):
    def __init__(self):
        super(EvalNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size = 6, stride = 1, padding = 1)
        self.flatten = nn.Flatten()
        self.fc1 = nn.Linear(400 + 2, 128) ## Add two for scalar inputs
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x, scalar_inputs):
        x = x.unsqueeze(1)
        # print(x.shape)
        x = F.leaky_relu(self.conv1(x))
        x = self.flatten(x)
        # print(f"x shape: {x.shape}, scalar_input shape: {scalar_inputs.shape}")
        x = torch.cat((x, scalar_inputs), dim=1)
        # print(x.shape)
        x = F.leaky_relu(self.fc1(x))
        x = F.leaky_relu(self.fc2(x))
        return x
    

In [67]:
''' NO BATCHING
def train(model, dataset, criterion, optimizer, num_epochs, num_samples_per_epoch=100):
    print('Begin Training!')
    model.train()  # Set the model to training mode
    for epoch in range(num_epochs):
        train_indices = torch.randperm(len(dataset))[:num_samples_per_epoch]
        val_indices = torch.randperm(len(dataset))[-num_samples_per_epoch:]
        train_running_loss = 0.0
        val_running_loss = 0.0
        try:
            ## TRAINING PHASE
            for idx in train_indices:
                print(idx.item())
                data = dataset[idx.item()] # grab int from one element tensor

                # Feature Variables
                fen           = fen_str_to_tensor( data['board'] ).to(device)
                white_active  = torch.tensor( data['white_active'] ).to(device).unsqueeze(0)
                is_check      = torch.tensor( data['is_check'] ).to(device).unsqueeze(0)
                scalar_inputs = torch.cat( (white_active, is_check), dim = 0 ).T

                # Predictor Variables
                cp = torch.tensor( data['cp'] ).to(device)

                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward pass
                train_outputs = model(fen, scalar_inputs)
                train_batch_loss = criterion(train_outputs, cp)

                # Backward pass and optimization
                train_batch_loss.backward()
                optimizer.step()

                train_running_loss += train_batch_loss.item()
            
            ## VALIDATION PHASE
            model.eval()  # Set the model to evaluation mode
        
            with torch.no_grad():
                for idx in val_indices:
                    # Extract the inputs and labels from the validation data
                    
                    val_data = dataset[idx.item()] # grab int from one element tensor

                    # Feature Variables
                    fen           = fen_str_to_tensor( val_data['board'] ).to(device)
                    white_active  = torch.tensor( val_data['white_active'] ).to(device).unsqueeze(0)
                    is_check      = torch.tensor( val_data['is_check'] ).to(device).unsqueeze(0)
                    scalar_inputs = torch.cat( (white_active, is_check), dim = 0 ).T

                    # Predictor Variables
                    cp = torch.tensor( val_data['cp'] ).to(device)
                    
                    # Forward pass
                    val_outputs = model(fen, scalar_inputs)
                    val_batch_loss = criterion(val_outputs, cp)
                    
                    val_loss += val_batch_loss.item()

        except KeyboardInterrupt:
            print("Manual Stop: Finished Training Early!")
            break
        
        print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {train_running_loss/len(train_data_loader)}, Validation Loss: {val_running_loss/len(val_data_loader)}')

    print('Finished Training!')

    torch.save(model, 'models/autosave.pth')
'''

' NO BATCHING\ndef train(model, dataset, criterion, optimizer, num_epochs, num_samples_per_epoch=100):\n    print(\'Begin Training!\')\n    model.train()  # Set the model to training mode\n    for epoch in range(num_epochs):\n        train_indices = torch.randperm(len(dataset))[:num_samples_per_epoch]\n        val_indices = torch.randperm(len(dataset))[-num_samples_per_epoch:]\n        train_running_loss = 0.0\n        val_running_loss = 0.0\n        try:\n            ## TRAINING PHASE\n            for idx in train_indices:\n                print(idx.item())\n                data = dataset[idx.item()] # grab int from one element tensor\n\n                # Feature Variables\n                fen           = fen_str_to_tensor( data[\'board\'] ).to(device)\n                white_active  = torch.tensor( data[\'white_active\'] ).to(device).unsqueeze(0)\n                is_check      = torch.tensor( data[\'is_check\'] ).to(device).unsqueeze(0)\n                scalar_inputs = torch.cat( (whi

In [68]:
def train(model, dataset, train_data_loader, val_data_loader, criterion, optimizer, num_epochs):
    print('Begin Training!')
    model.train()  # Set the model to training mode
    for epoch in range(num_epochs):
        train_running_loss = 0.0
        val_running_loss = 0.0
        try:
            ## TRAINING PHASE
            for i, data in enumerate(train_data_loader):

                # Feature Variables
                fen           = data['fen'].to(device)
                white_active  = data['white_active'].to(device).unsqueeze(0)
                is_check      = data['is_check'].to(device).unsqueeze(0)
                scalar_inputs = torch.cat( (white_active, is_check), dim = 0 ).T

                # Predictor Variables
                cp = (data['cp'].to(device)).unsqueeze(1)

                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward pass
                train_outputs = model(fen, scalar_inputs)

                # print(train_outputs.shape, cp.shape)

                # print(torch.isnan(train_outputs).sum(), torch.isnan(cp).sum())

                train_batch_loss = criterion(train_outputs, cp)

                # print(train_batch_loss)

                # Backward pass and optimization
                train_batch_loss.backward()
                optimizer.step()

                train_running_loss += train_batch_loss.item()
            
            ## VALIDATION PHASE
            model.eval()  # Set the model to evaluation mode
        
            with torch.no_grad():
                for i, val_data in enumerate(val_data_loader):
                    # Extract the inputs and labels from the validation data
                    
                    # Feature Variables
                    fen           = val_data['fen'].to(device)
                    white_active  = val_data['white_active'].to(device).unsqueeze(0)
                    is_check      = val_data['is_check'].to(device).unsqueeze(0)
                    scalar_inputs = torch.cat( (white_active, is_check), dim = 0 ).T

                    # Predictor Variables
                    cp = (val_data['cp'].to(device)).unsqueeze(1)
                    
                    # Forward pass
                    val_outputs = model(fen, scalar_inputs)
                    val_batch_loss = criterion(val_outputs, cp)
                    
                    val_running_loss += val_batch_loss.item()

        except KeyboardInterrupt:
            print("Manual Stop: Finished Training Early!")
            break
        
        print(f'Epoch {epoch+1}/{num_epochs}, Training Loss: {train_running_loss/len(train_data_loader)}, Validation Loss: {val_running_loss/len(val_data_loader)}')

    print('Finished Training!')

    torch.save(model, 'models/autosave.pth')


In [69]:
from torch.utils.data import IterableDataset, DataLoader, Dataset

class ChessIterableDataset(IterableDataset): # TODO! Write docstrings 

    def __init__(self, csv_files, chunksize):
        self.csv_files = csv_files
        self.chunksize = chunksize

    def process_chunk(self, chunk):
        # Process the chunk and return a list of samples
        samples = []
        for index, row in chunk.iterrows():
            
            # Feature Variables
            board = row['board']
            white_active = row['white_active']
            cp = row['cp']
            is_check = row['is_check'] # add to predictions

            # Convert data to tensors
            board_tensor = fen_str_to_tensor(board)
            white_active = torch.tensor(white_active, dtype=torch.float32)
            cp = torch.tensor(cp, dtype=torch.float32)
            is_check = torch.tensor(is_check, dtype=torch.float32)
        
            samples.append({'fen': board_tensor, 'fen_str': board, 'white_active': white_active, 'cp': cp, 'is_check': is_check})
        return samples


    def __len__(self):
        return sum(1 for _ in self.__iter__())


    def __getitem__(self, idx):
        board = self.dataframe.iloc[idx]['board']
        white_active = self.dataframe.iloc[idx]['white_active']
        cp = self.dataframe.iloc[idx]['cp']
        is_check = self.dataframe.iloc[idx]['is_check']
        
        # Convert data to tensors
        board_tensor = fen_str_to_tensor(board)
        white_active = torch.tensor(white_active, dtype=torch.float32)
        cp = torch.tensor(cp, dtype=torch.float32)

        return {'fen': board_tensor, 'fen_str': board, 'white_active': white_active, 'cp': cp, 'is_check': is_check}
    

    def __iter__(self):
        for idx, csv_file in enumerate(self.csv_files):
            chunk_iter = pd.read_csv(csv_file, chunksize=self.chunksize)#, dtype={15:str, 33:str})#, engine='pyarrow')
            
            if idx % 10 == 0:
                # print(f'Read file {idx} of {len(self.csv_files)}')
                pass

            for chunk in chunk_iter:
                
                chunk = chunk.loc[chunk['white_elo'] >= 1350] # select a subset of players with a certain rating
                chunk = chunk.loc[chunk['black_elo'] >= 1350]
                chunk = chunk.dropna(subset=['cp'])
                
                yield from self.process_chunk(chunk) # Returns a list of yielded elements
            
            del chunk_iter # clean up garbage



In [70]:
import glob

# path = "../Data/DataTrain"

path = "." ## use this for laptop

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

csv_files = glob.glob(f'{path}/*.csv')


In [71]:

# Create a dataset
dataset = ChessIterableDataset(csv_files, chunksize = 50000)

# Create a data loader
train_data_loader = DataLoader(dataset, batch_size = 150)
val_data_loader = DataLoader(dataset, batch_size = 150)

# Create a model
model = EvalNet()
model = model.to(device)

criterion = nn.L1Loss() # nn.MSELoss()
optimizer = optim.SGD(model.parameters(), lr=0.03, momentum=0.9)

# Train the model
train(model, dataset, train_data_loader, val_data_loader, criterion, optimizer, num_epochs=10)


Begin Training!
Manual Stop: Finished Training Early!
Finished Training!


RuntimeError: Parent directory models does not exist.

In [None]:
'''NO BATCHING
import pandas as pd
from torch.utils.data import Dataset, DataLoader

class CustomDataset(Dataset):
    def __init__(self, csv_files):
        self.data = pd.concat([pd.read_csv(file) for file in csv_files], ignore_index=True)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        sample = self.data.iloc[idx]
        return sample


# Create the dataset
dataset = CustomDataset(csv_files)
'''

'NO BATCHING\nimport pandas as pd\nfrom torch.utils.data import Dataset, DataLoader\n\nclass CustomDataset(Dataset):\n    def __init__(self, csv_files):\n        self.data = pd.concat([pd.read_csv(file) for file in csv_files], ignore_index=True)\n    \n    def __len__(self):\n        return len(self.data)\n    \n    def __getitem__(self, idx):\n        sample = self.data.iloc[idx]\n        return sample\n\n\n# Create the dataset\ndataset = CustomDataset(csv_files)\n'

In [None]:
model = EvalNet()
model = model.to(device)

criterion = nn.L1Loss()
optimizer = optim.SGD(model.parameters(), lr=0.003, momentum=0.9)

num_epochs = 10
train(model, dataset, criterion, optimizer, num_epochs)

TypeError: train() missing 2 required positional arguments: 'optimizer' and 'num_epochs'