In [None]:
import torch as t
import torch.nn as nn
import torch.optim as optim
from datasets import Dataset
from datasets import load_dataset
from tqdm import tqdm

In [1]:
def get_dataset(split: str) -> Dataset:
    """
    Loads the tensor dataset from hugging face
    """
    if split not in ['train', 'validation', 'test']:
        raise ValueError(f"Invalid split: {split}")

    dataset = load_dataset('markstanl/u3t', data_dir='data/state_eval', split=split)
    return dataset


def get_loader(split: str, batch_size: int = 256) -> t.utils.data.DataLoader:
    """
    Returns a DataLoader for the given split
    """
    dataset = get_dataset(split)
    return t.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True)


def get_loader_gpu(split: str,
                   batch_size: int = 256,
                   num_workers: int = 2,
                   pin_memory: bool = True) -> t.utils.data.DataLoader:
    """
    Returns a DataLoader for the given split
    """
    dataset = get_dataset(split)
    return t.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=True, num_workers=num_workers,
                                   pin_memory=pin_memory)

NameError: name 'Dataset' is not defined

In [None]:
def evaluate(model: nn.Module,
             loader: t.utils.data.DataLoader,
             criterion: t.nn.modules.loss,
             num_batches: int = 100):
    model.eval()
    with t.no_grad():
        total_loss = 0
        total_error = 0
        num_data_points = 0

        for i, batch in enumerate(loader):
            if i >= num_batches:
                break
            state, score = batch['tensor_state'], batch['score']
            output = model(state)
            loss = criterion(output, score.unsqueeze(1))

            num_data_points += len(score)
            total_loss += loss.item()
            total_error += t.sum(t.abs(output - score.unsqueeze(1))).item()
        loss = total_loss / num_data_points
        error = total_error / num_data_points

        return loss, error


def train(model: nn.Module,
          loader: t.utils.data.DataLoader,
          optimizer: t.optim,
          criterion: t.nn.modules.loss,
          num_epochs: int,
          test_loader: t.utils.data.DataLoader,
          epoch_start: int = 0,
          path_name: str = None) -> list[tuple[float, float, float, float]]:
    """
    Train the model on the given data
    Args:
        model: the model to train
        loader: the DataLoader for the data
        optimizer: the optimizer to use
        criterion: the loss function to use
    """
    model.train()
    history = []
    save_path = path_name if path_name else "sloth_models/model_cnn_epoch_{epoch}.pth"

    for epoch in range(num_epochs):
        total_loss = 0
        total_error = 0
        num_data_points = 0

        for i, batch in tqdm(enumerate(loader)):
            optimizer.zero_grad()
            state, score = batch['tensor_state'], batch['score']
            output = model(state)
            loss = criterion(output, score.unsqueeze(1))
            loss.backward()
            optimizer.step()

            num_data_points += len(score)
            total_loss += loss.item()
            total_error += t.sum(t.abs(output - score.unsqueeze(1))).item()

        loss = total_loss / num_data_points
        error = total_error / num_data_points

        if test_loader is None:
            print(f"Epoch {epoch + epoch_start} Loss: {loss} Average Error {error}")
            history.append((loss, error))
        else:
            test_loss, test_error = evaluate(model, test_loader, criterion, num_batches=100)
            print(
                f"Epoch {epoch + epoch_start} Loss: {loss} Average Error {error} Test Loss: {test_loss} Test Error: {test_error}")
            history.append((loss, error, test_loss, test_error))

        try:
            t.save({
                'epoch': epoch + epoch_start + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': loss,
            }, save_path.format(epoch=epoch + epoch_start + 1))
        except Exception:
            t.save({
                'epoch': epoch + epoch_start + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': loss,
            }, "sloth_models/backup_model_{epoch}.pth".format(epoch=epoch + 10))

    return history


def train_gpu(model: nn.Module,
              loader: t.utils.data.DataLoader,
              optimizer: t.optim,
              criterion: t.nn.modules.loss,
              device: t.device,
              num_epochs: int,
              test_loader: t.utils.data.DataLoader,
              epoch_start: int = 0,
              path_name: str = None) -> list[tuple[float, float, float, float]]:
    """
    Train the model on the given data
    Args:
        model: the model to train
        loader: the DataLoader for the data
        optimizer: the optimizer to use
        criterion: the loss function to use
        device: the device to train on
    """
    model.to(device)
    model.train()
    scaler = t.cuda.amp.GradScaler()
    history = []

    save_path = path_name if path_name else "sloth_models/model_cnn_epoch_{epoch}.pth"

    for epoch in range(num_epochs):
        total_loss = 0
        total_error = 0
        num_data_points = 0

        for batch in tqdm(loader):
            state = batch['tensor_state'].to(device, non_blocking=True, memory_format=t.channels_last)
            score = batch['score'].to(device, non_blocking=True)

            optimizer.zero_grad()

            with t.cuda.amp.autocast():
                output = model(state)
                loss = criterion(output, score.unsqueeze(1))

            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

            num_data_points += len(score)
            total_loss += loss.item()
            total_error += t.sum(t.abs(output - score.unsqueeze(1))).item()

        loss = total_loss / num_data_points
        error = total_error / num_data_points

        if test_loader is None:
            print(f"Epoch {epoch + epoch_start} Loss: {loss} Average Error {error}")
            history.append((loss, error))
        else:
            test_loss, test_error = evaluate(model, test_loader, criterion, num_batches=100)
            print(
                f"Epoch {epoch + epoch_start} Loss: {loss} Average Error {error} Test Loss: {test_loss} Test Error: {test_error}")
            history.append((loss, error, test_loss, test_error))

        try:
            t.save({
                'epoch': epoch + epoch_start + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': loss,
            }, save_path.format(epoch=epoch + epoch_start + 1))
        except Exception:
            t.save({
                'epoch': epoch + epoch_start + 1,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': loss,
            }, "sloth_models/backup_model_{epoch}.pth".format(epoch=epoch + 10))

    return history

## Models

In [2]:
class MLP(nn.Module):
    """
    Simple MLP for the U3T dataset
    """

    def __init__(self):
        super().__init__()
        self.flatten = nn.Flatten()
        self.linear_relu_stack = nn.Sequential(
            nn.Linear(324, 512),
            nn.ReLU(),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Linear(256, 128),
            nn.ReLU(),
            nn.Linear(128, 1)
        )

    def forward(self, x):
        x = self.flatten(x)
        return self.linear_relu_stack(x)


class CNN_small(nn.Module):
    """
    A CNN model with a lower stride.
    """

    def __init__(self):
        super(CNN_small, self).__init__()

        self.conv1 = nn.Conv2d(4, 32, kernel_size=3, stride=1, padding=1)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.relu2 = nn.ReLU()
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.relu3 = nn.ReLU()
        self.conv4 = nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1)
        self.relu4 = nn.ReLU()

        self.fc1 = nn.Linear(256 * 9 * 9, 512)
        self.relu5 = nn.ReLU()
        self.fc2 = nn.Linear(512, 256)
        self.relu6 = nn.ReLU()
        self.fc3 = nn.Linear(256, 1)

    def forward(self, x):
        x = x.float()
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.relu2(x)
        x = self.conv3(x)
        x = self.relu3(x)
        x = self.conv4(x)
        x = self.relu4(x)

        x = t.flatten(x, start_dim=1)
        x = self.fc1(x)
        x = self.relu5(x)
        x = self.fc2(x)
        x = self.relu6(x)
        x = self.fc3(x)

        return x


class Deep_CNN(nn.Module):
    """
    A deep CNN model for the U3T dataset. The stride is 3, the argument being that this ensures the kernel only sees
    the subgames, and not any of the overlap. Except for the next one, which has a stride of 1 because the convolution
    makes the 9x9 into a 3x3 board, in which seeing the overlap may be valuable.
    """

    def __init__(self):
        super(Deep_CNN, self).__init__()

        self.conv1 = nn.Conv2d(4, 128, kernel_size=3, stride=3, padding=0)
        self.relu1 = nn.ReLU()
        self.conv2 = nn.Conv2d(128, 256, kernel_size=2, stride=1, padding=0)
        self.relu2 = nn.ReLU()

        self.fc1 = nn.Linear(256, 512)
        self.relu4 = nn.ReLU()
        self.fc2 = nn.Linear(512, 256)
        self.relu5 = nn.ReLU()
        self.fc3 = nn.Linear(256, 128)
        self.relu6 = nn.ReLU()
        self.fc4 = nn.Linear(128, 1)

    def forward(self, x):
        x = x.float()
        x = self.conv1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.relu2(x)

        x = t.flatten(x, start_dim=1)
        x = self.fc1(x)
        x = self.relu4(x)
        x = self.fc2(x)
        x = self.relu5(x)
        x = self.fc3(x)
        x = self.relu6(x)
        x = self.fc4(x)

        return x


NameError: name 'nn' is not defined

In [None]:
def train_on_cpu(model: nn.Module,
                 num_epochs: int,
                 batch_size: int = 256,
                 learning_rate: float = 0.01,
                 ) -> list[tuple[float, float, float, float]]:
    """
    Function to train a given model on the cpu
    """
    train_loader = get_loader('train', batch_size)
    test_loader = get_loader('validation', batch_size)

    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()

    return train(model, train_loader, optimizer, criterion, num_epochs, test_loader)


def train_on_gpu(model: nn.Module,
                 num_epochs: int,
                 batch_size: int = 256,
                 learning_rate: float = 0.01,
                 num_workers: int = 2,
                 pin_memory: bool = True,
                 ) -> list[tuple[float, float, float, float]]:
    """
    Function to train a given model on the cpu
    """
    device = t.device('cuda' if t.cuda.is_available() else 'cpu')
    if device == 'cpu':
        raise ValueError("No GPU available. Dial in.")

    train_loader = get_loader_gpu('train', batch_size, num_workers, pin_memory)
    test_loader = get_loader_gpu('validation', batch_size, num_workers, pin_memory)

    optimizer = optim.Adam(model.parameters(), lr=learning_rate)
    criterion = nn.MSELoss()

    return train_gpu(model, train_loader, optimizer, criterion, device, num_epochs, test_loader)
