In [None]:
%load_ext tensorboard
%mkdir logs
#!wget http://files.fast.ai/data/examples/dogscats.tgz  # Possibly later
#!tar xzf dogscats.tgz

In [None]:
"""
MLP on {digits or faces or wine or dogs/cats} datasets
JJV for Deep Learning course, 2022
"""
from typing import List
from datetime import datetime
import torch
from torch import nn
from torch.utils.tensorboard import SummaryWriter
from torchvision import transforms, datasets
from sklearn.datasets import fetch_lfw_people, load_digits
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler  # Possibly later
from sklearn.decomposition import PCA
from sklearn.metrics import ConfusionMatrixDisplay
from sklearn.manifold import TSNE  # Possibly later
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import tqdm  # Nice progress bars
import pandas as pd


# DATA = 'digits'
DATA = 'wine'
# DATA = 'faces'
# DATA = 'catsdogs'
N_EPOCHS = 10  # if DATA == 'catsdogs' else 50
LEARNING_RATE = 0.01
BATCH_SIZE = 100
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
TIME = datetime.now().strftime("%Hh%Mm%Ss")


def preprocess_data():
    '''
    Prepare datasets.
    Perform various operations (matrix rotation, normalization),
    then split into train and test datasets.
    Returns iterators over train and test.
    '''
    if DATA == 'catsdogs':
        data_transform = transforms.Compose([
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
        ])
        train_dataset = datasets.ImageFolder(
            root='dogscats/train', transform=data_transform)
        test_dataset = datasets.ImageFolder(
            root='dogscats/valid', transform=data_transform)
        # plt.imshow(train_dataset[0][0].permute(1, 2, 0).numpy())  # Display
        # plt.show()
        input_shape = (3, 224, 224)
    else:
        if DATA == 'faces':
            faces = fetch_lfw_people(min_faces_per_person=70, color=True)
            # plt.imshow(faces.images[0])  # Display one image
            # plt.show()
            X = torch.Tensor(faces.images).permute(0, 3, 1, 2)
            y = torch.LongTensor(faces.target)
            input_shape = (3, 62, 47)
        elif DATA == 'digits':
            digits = load_digits()
            X = torch.Tensor(digits.images)
            y = torch.LongTensor(digits.target)
            X_train, X_test, y_train, y_test = train_test_split(
                X, y, test_size=0.2, shuffle=True)
            train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
            test_dataset = torch.utils.data.TensorDataset(X_test, y_test)
            input_shape = (8, 8)
        elif DATA == 'wine':
            wine = pd.read_csv('winequality-red.csv')
            X_raw = wine.drop(columns='quality').to_numpy()
            # scaler = MinMaxScaler()
            X = torch.Tensor(X_raw)
            y = torch.LongTensor(wine['quality'])
            input_shape = (11,)
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, shuffle=True)
        train_dataset = torch.utils.data.TensorDataset(X_train, y_train)
        test_dataset = torch.utils.data.TensorDataset(X_test, y_test)

    train_iter = torch.utils.data.DataLoader(
        train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    test_iter = torch.utils.data.DataLoader(test_dataset, batch_size=100)
    return X_train, y_train, X_test, y_test, train_iter, test_iter, input_shape


class MLP(nn.Module):
    """
    Multilayer perceptron.
    Takes as argument:
    - input_shape: the shape of each sample given as input
    - dimensions: a list describing the number of neurons in each layer.
    """
    def __init__(self, input_shape, dimensions: List[int]):
        super().__init__()
        self.input_shape = input_shape
        layers = [nn.Flatten()]  # Flattens the input into 2-dim tensor: batches x features
        sizes = [np.prod(self.input_shape)] + dimensions
        for i in range(len(sizes) - 1):
            # Your code here for adding layers according to the dimensions parameter
            # Do not forget ReLU layers
            layers.append(...)
        self.fully_connected_layers = nn.Sequential(*layers)
        # the '*' before 'layers' is to transform a list into several arguments

    def forward(self, x):
        logits = self.fully_connected_layers(x)
        return logits

    def __str__(self):
        return f"{DATA}-mlp-{'-'.join(map(str, dimensions))}"  # e.g. mlp-32-10


def train(dataloader, model, loss_function, optimizer, writer):
    model.train()  # Training mode
    losses = []
    accuracies = [0.]
    for inputs, targets in tqdm(dataloader):
        inputs = inputs.to(DEVICE)
        targets = targets.to(DEVICE)
        writer.add_graph(model, inputs)  # Display graph in TensorBoard
        # writer.add_images('images', inputs)  # Display images in TensorBoard

        # Compute prediction error
        logits = model(inputs)
        loss = loss_function(logits, targets)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        predictions = logits.argmax(axis=1)
        accuracies.append(torch.sum(predictions == targets).item())

        losses.append(loss.item())
    return np.mean(losses), np.sum(accuracies) / len(dataloader.dataset)


def test(dataloader, model):
    model.eval()  # Test mode
    accuracies = []
    with torch.no_grad():  # No training
        for inputs, targets in dataloader:
            inputs = inputs.to(DEVICE)
            targets = targets.to(DEVICE)

            logits = model(inputs)
            predictions = logits.argmax(axis=1)
            accuracies.append(torch.sum(predictions == targets).item())
    return np.sum(accuracies) / len(dataloader.dataset)


def plot_2d(matrix, colors):
    """
    Projecting along the first two principal components (having largest eigenvalue)
    """
    pca = PCA(n_components=2)
    X_pca = pca.fit_transform(matrix)
    plt.scatter(X_pca[:, 0], X_pca[:, 1], c=colors, marker='.')


def plot_tsne(matrix, colors):
    """
    Non-linear projection called t-distributed stochastic neighbor embedding (t-SNE).
    """
    tsne = TSNE()
    X_tsne = tsne.fit_transform(matrix)
    plt.scatter(X_tsne[:, 0], X_tsne[:, 1], c=colors, marker='.')

In [None]:
%rm -rf logs/fit

In [None]:
X_train, y_train, X_test, y_test, train_iter, test_iter, input_shape = preprocess_data()

dimensions = [10]  # Number of neurons for each layer
model = MLP(input_shape, dimensions).to(DEVICE)
print(model.fully_connected_layers)
writer = SummaryWriter(log_dir=f'logs/fit/{model}-{TIME}')  # TBoard

n_parameters = 0
for name, parameter in model.named_parameters():
    print(name, parameter.numel())
    n_parameters += parameter.numel()
print(f'Total number of parameters of {model}: {n_parameters}')

loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=LEARNING_RATE)

for epoch in tqdm(range(N_EPOCHS)):
    print(f'=== Epoch {epoch} ===')
    train_loss, train_acc = train(train_iter, model, loss_function,
                                  optimizer, writer)
    print(f'Train loss: {train_loss:7f} / Train acc: {train_acc:.2f}')
    writer.add_scalar('Loss/train', train_loss, epoch)
    writer.add_scalar('Accuracy/train', train_acc, epoch)

    test_acc = test(test_iter, model)
    print(f'Test accuracy: {test_acc:.2f}')
    writer.add_scalar('Accuracy/test', test_acc, epoch)

writer.close()

In [None]:
pred = model(X_test)
ConfusionMatrixDisplay.from_predictions(y_test, pred.argmax(axis=1))

In [None]:
plot_2d(X_train.reshape(-1, np.prod(input_shape)), y_train)

In [None]:
%tensorboard --logdir logs/fit

In [None]:
activation = {}
def get_activation(name):
    def hook(model, input_, output):
        activation[name] = output.detach()
    return hook

model.fully_connected_layers._modules['1'].register_forward_hook(get_activation('hidden'))

In [None]:
logits = model.forward(X_train)
logits.shape

In [None]:
activation

In [None]:
plot_2d(logits.detach(), y_train)

In [None]:
activation['hidden'].shape

In [None]:
plot_2d(activation['hidden'], y_train)