First part of notebook stolen from kaggle:



In [None]:
import copy
import datetime
import pathlib
import os

import torch
from torch.utils.data import Dataset, DataLoader
import pandas as pd
from sklearn.dummy import DummyClassifier
from collections import OrderedDict

In [None]:
def current_time() -> str:
    return datetime.datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")

def log_print(inp: str) -> None:
    # FIXME(m-jeu): Actually log in addition to printing
    inp = f"{current_time()}: {inp}"
    print(inp)

In [None]:
DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
#DEVICE = "cpu"
log_print(f"Setting device to {DEVICE} {f'named {torch.cuda.get_device_name()}' if torch.cuda.is_available() else 'with cuda not available'}")

In [None]:
# Stolen from https://www.kaggle.com/code/hojjatk/read-mnist-dataset/notebook :)

import numpy as np # linear algebra
import struct
from array import array
from os.path  import join

#
# MNIST Data Loader Class
#
class MnistDataloader(object):
    def __init__(self, training_images_filepath,training_labels_filepath,
                 test_images_filepath, test_labels_filepath):
        self.training_images_filepath = training_images_filepath
        self.training_labels_filepath = training_labels_filepath
        self.test_images_filepath = test_images_filepath
        self.test_labels_filepath = test_labels_filepath
    
    def read_images_labels(self, images_filepath, labels_filepath):        
        labels = []
        with open(labels_filepath, 'rb') as file:
            magic, size = struct.unpack(">II", file.read(8))
            if magic != 2049:
                raise ValueError('Magic number mismatch, expected 2049, got {}'.format(magic))
            labels = array("B", file.read())        
        
        with open(images_filepath, 'rb') as file:
            magic, size, rows, cols = struct.unpack(">IIII", file.read(16))
            if magic != 2051:
                raise ValueError('Magic number mismatch, expected 2051, got {}'.format(magic))
            image_data = array("B", file.read())        
        images = []
        for i in range(size):
            images.append([0] * rows * cols)
        for i in range(size):
            img = np.array(image_data[i * rows * cols:(i + 1) * rows * cols])
            img = img.reshape(28, 28)
            images[i][:] = img            
        
        return images, labels
            
    def load_data(self):
        x_train, y_train = self.read_images_labels(self.training_images_filepath, self.training_labels_filepath)
        x_test, y_test = self.read_images_labels(self.test_images_filepath, self.test_labels_filepath)
        return (x_train, y_train),(x_test, y_test) 

In [None]:
#
# Verify Reading Dataset via MnistDataloader class
#
%matplotlib inline
import random
import matplotlib.pyplot as plt
from os.path  import join


#
# Set file paths based on added MNIST Datasets
#
input_path = 'dataset/'
training_images_filepath = join(input_path, 'train-images-idx3-ubyte/train-images-idx3-ubyte')
training_labels_filepath = join(input_path, 'train-labels-idx1-ubyte/train-labels-idx1-ubyte')
test_images_filepath = join(input_path, 't10k-images-idx3-ubyte/t10k-images-idx3-ubyte')
test_labels_filepath = join(input_path, 't10k-labels-idx1-ubyte/t10k-labels-idx1-ubyte')

#
# Helper function to show a list of images with their relating titles
#
def show_images(images, title_texts):
    cols = 5
    rows = int(len(images)/cols) + 1
    plt.figure(figsize=(30,20))
    index = 1    
    for x in zip(images, title_texts):        
        image = x[0]        
        title_text = x[1]
        plt.subplot(rows, cols, index)        
        plt.imshow(image, cmap=plt.cm.gray)
        if (title_text != ''):
            plt.title(title_text, fontsize = 15);        
        index += 1

#
# Load MINST dataset
#
mnist_dataloader = MnistDataloader(training_images_filepath, training_labels_filepath, test_images_filepath, test_labels_filepath)
(x_train, y_train), (x_test, y_test) = mnist_dataloader.load_data()

#
# Show some random training and test images 
#
images_2_show = []
titles_2_show = []
for i in range(0, 10):
    r = random.randint(1, 60000)
    images_2_show.append(x_train[r])
    titles_2_show.append('training image [' + str(r) + '] = ' + str(y_train[r]))    

for i in range(0, 5):
    r = random.randint(1, 10000)
    images_2_show.append(x_test[r])        
    titles_2_show.append('test image [' + str(r) + '] = ' + str(y_test[r]))    

show_images(images_2_show, titles_2_show)

Converting everything to pytorch tensors

In [None]:
x_test[0][0].dtype

In [None]:
np.array(x_test).shape

In [None]:
def list_list_np_to_torch(data):
    data = np.array(data)
    data = torch.from_numpy(data)
    return data

x_test = list_list_np_to_torch(x_test)
y_test = list_list_np_to_torch(y_test)
x_train = list_list_np_to_torch(x_train)
y_train = list_list_np_to_torch(y_train)

In [None]:
x_train.shape

In [None]:
np.unique(y_train, return_counts=True)

In [None]:
train_labels, train_counts = np.unique(y_train, return_counts=True)
plt.bar(train_labels, train_counts, tick_label=train_labels);

In [None]:
test_labels, test_counts = np.unique(y_test, return_counts=True)
plt.bar(test_labels, test_counts, tick_label=test_labels);

In [None]:
baseline = DummyClassifier()
baseline.fit(x_train, y_train)
baseline.score(x_test, y_test)

In [None]:
x_test.shape[0]

Scale input data from 0-255 to 0-1

In [None]:
torch.min(x_train).item(), torch.max(x_train).item()

In [None]:
def scale_to_float(t):

    return t / 255


x_train = scale_to_float(x_train)
x_test = scale_to_float(x_test)

Change output tensors to correct format

In [None]:
y_train[0]

In [None]:
#new = torch.nn.functional.one_hot(y_train.long())
#i = 0
#for o, n in zip(y_train, new):
#    print(o)
#    print(n)
#    print("\n\n##############\n\n")
#    i += 1
#    if i > 100:
#        break

In [None]:
def y_to_one_hot(t):
    return torch.nn.functional.one_hot(t.long())

y_train = y_to_one_hot(y_train)
y_test = y_to_one_hot(y_test)

In [None]:
y_train = y_train.double()
y_test = y_test.double()

In [None]:
class CustomDataset(Dataset):
    def __init__(self, images, labels, device=DEVICE):
        self.images = images
        self.labels = labels

        self.device = device

        if(self.images.shape[0] != self.labels.shape[0]):
            raise ValueError("Amount of images and labels do not align")

    def __len__(self):
        return self.images.shape[0]

    def __getitem__(self, idx):
        return self.images[idx, :, :], self.labels[idx]
    
    def to_device(self):
        self.images = self.images.to(self.device)
        self.labels = self.labels.to(self.device)
        return self
    

train_dataset = CustomDataset(x_train, y_train)
test_dataset = CustomDataset(x_test, y_test)
i, l = train_dataset.__getitem__(13456)
plt.imshow(i, cmap=plt.cm.gray)
print(l)

In [None]:
dl = DataLoader(test_dataset, batch_size=4, shuffle=True)
batch = next(iter(dl))[0]
batch.shape

In [None]:
len(dl.dataset)

In [None]:
class TinyLinear(torch.nn.Module):

    def __init__(self):
        super(TinyLinear, self).__init__()


        self.f = torch.nn.Flatten()
        self.l1 = torch.nn.Sequential(
            torch.nn.Linear(28*28, 100),
            torch.nn.ReLU()
        )
        self.l2 = torch.nn.Sequential(
            torch.nn.Linear(100, 100),
            torch.nn.ReLU()
        )
        self.l3 = torch.nn.Sequential(
            torch.nn.Linear(100, 10),
            torch.nn.Softmax()
        )


    def forward(self, x):

        x = self.f(x)
        x = self.l1(x)
        x = self.l2(x)
        x = self.l3(x)
        
        return x
    
model = TinyLinear()
f"Amount of trainable parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}"


In [None]:
test_dataset.images.dtype

In [None]:
torch.zeros(2) == torch.zeros(2)

In [None]:
def accuracy_sum(outputs, labels):
    outputs = outputs.argmax(dim=1)
    labels = labels.argmax(dim=1)

    return torch.sum(outputs == labels)

In [None]:
def train_classifier(
        model,
        train_loader,
        test_loader,
        optimizer,
        loss_fn,
        epochs,
        report_ever_epochs: int = 1,
        return_lowest_test_loss_model: bool = False,
        device = DEVICE):
    
    training_example_amount = len(train_loader.dataset)
    testing_example_amount = len(test_loader.dataset)

    train_losses = []
    test_losses = []

    min_test_loss = float("inf")
    model_checkpoint = None

    train_acc = []
    test_acc = []
    
    for epoch in range(epochs):
        running_loss = 0.
        running_acc = 0.
        
        # Train
        model.train()
        for inputs, labels in train_loader:

            optimizer.zero_grad()

            outputs = model(inputs)

            loss = loss_fn(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            running_acc += accuracy_sum(outputs, labels).item()
            
        
        train_losses.append(running_loss / training_example_amount)
        train_acc.append(running_acc / training_example_amount)
        
        # Test
        model.eval()
        with torch.no_grad():
            running_loss = 0.
            running_acc = 0.
            
            for inputs, labels in test_loader:

                outputs = model(inputs)
                loss = loss_fn(outputs, labels)
                running_loss += loss.item()
                running_acc += accuracy_sum(outputs, labels).item()

            test_loss = running_loss / testing_example_amount
            test_losses.append(test_loss)
            test_acc.append(running_acc / testing_example_amount)

        if (epoch % report_ever_epochs) == 0:
            log_print(f"{epoch}: {train_losses[-1]} | {test_losses[-1]} | {train_acc[-1]} | {test_acc[-1]}")
        
        if return_lowest_test_loss_model and test_losses[-1] < min_test_loss:
            model_checkpoint = OrderedDict({k: v.to('cpu', copy=True) for k, v in model.state_dict().items()})  # https://discuss.pytorch.org/t/copy-best-model-from-gpu-to-cpu/38683/4

    if model_checkpoint is not None:
        model.load_state_dict(model_checkpoint)

    return model, pd.DataFrame({
        "train_loss": train_losses,
        "test_loss": test_losses,
        "train_accuracy": train_acc,
        "test_accuracy": test_acc
    })

In [None]:
MODEL_FOLDER = pathlib.Path("models/")


def train_or_load_classifier(title: str, model, *args, **kwargs):

    model_folder = MODEL_FOLDER / title

    if model_folder.exists():
        log_print(f"Loading model from {model_folder}")

        model.load_state_dict(torch.load(model_folder / "model.pt"))
        model.eval()

        metrics = pd.read_csv(model_folder / "metrics.csv", index_col=0)

    else:
        log_print(f"Training {title} from scratch")

        model, metrics = train_classifier(model, *args, **kwargs)

        log_print(f"Saving model to {model_folder}")

        os.mkdir(model_folder)

        torch.save(model.state_dict(), model_folder / "model.pt")

        metrics.to_csv(model_folder / "metrics.csv")

    return model, metrics


In [None]:
def plot_metrics(metrics: pd.DataFrame) -> None:

    col_amount = metrics.columns.shape[0]

    fig, axes = plt.subplots(
        nrows=col_amount // 2,  # Train / Test
        ncols=1,
        figsize=(15, 3 * col_amount)
    )  

    # Could clean with multiindexed column names (train/test)
    for i, metric_name in enumerate([col_name[6:] for col_name in metrics.columns if "train_" in col_name]):
        axes[i].plot(metrics[f"train_{metric_name}"], color="blue")
        axes[i].plot(metrics[f"test_{metric_name}"], color="red")
        axes[i].legend(["Train", "Test"])
        axes[i].set_title(f"{metric_name} per epoch")
        axes[i].set_xlabel("Epoch")
        axes[i].set_ylabel(f"{metric_name}")
        axes[i].grid(visible=True)

        
    

In [None]:
model = TinyLinear()
train_loader = DataLoader(train_dataset.to_device(), batch_size=100, shuffle=True)
test_loader = DataLoader(test_dataset.to_device(), batch_size=100, shuffle=False)
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)
loss_fn = torch.nn.CrossEntropyLoss()
epochs = 200

model.to(DEVICE)

model, metrics = train_or_load_classifier("tiny_linear", model, train_loader, test_loader, optimizer, loss_fn, epochs, return_lowest_test_loss_model=True)
plot_metrics(metrics)


In [None]:
metrics

In [None]:
metrics["train_loss"].is_monotonic_increasing

In [None]:
testacc = [item.cpu().item() for item in testacc]

In [None]:
plt.plot(testacc)

In [None]:
f"Amount of trainable parameters: {sum(p.numel() for p in model.parameters() if p.requires_grad)}"


In [None]:
class TinyCNN(torch.nn.Module):

    def __init__(self):
        super(TinyCNN, self).__init__()

        



    def forward(self, x):
        
        

        return x
    

tiny_cnn = TinyCNN()
