# 1. Import required libraries

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
import torchvision
import torchvision.transforms as transforms
from torch.optim import SGD, Adam
from torch.nn import CrossEntropyLoss
import torchmetrics
import logging
from itertools import product
import random

# 2. Define MLP

In [None]:
# Define MLP model
class MLP(nn.Module):
    def __init__(self, n_hidden_nodes, n_classes, image_width=32, image_height=32, color_channels=3, n_hidden_layers=1):
        super(MLP, self).__init__()
        input_size = image_width * image_height * color_channels
        self.layers = nn.Sequential(
            nn.Linear(input_size, n_hidden_nodes),
            nn.ReLU(),
            nn.Linear(n_hidden_nodes, n_classes)
        )
        
        if n_hidden_layers > 1:
            self.added_layers = nn.Sequential()
            for i in range(n_hidden_layers - 1):
                self.added_layers.add_module(str(2 * (i + 1) + 1), nn.Linear(n_hidden_nodes, n_hidden_nodes))
                self.added_layers.add_module(str(2 * (i + 1) + 2), nn.ReLU())
            layers = list(self.layers)
            layers.insert(2, self.added_layers)
            self.layers = nn.Sequential(*layers)
            
    def forward(self, x):
        x = x.view(x.size(0), -1)
        return self.layers(x)


# 3. Define Dataloader

In [None]:
class DataLoaderFactory:
    def __init__(self, root='./data', transform=None, batch_size=32, num_workers=2, download=True):
        self.root = root
        self.transform = transform or transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.RandomCrop(32, padding=4),
            transforms.ToTensor(),
            transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
        ])
        self.batch_size = batch_size
        self.num_workers = num_workers
        self.download = download

    def load_data(self, train=True):
        dataset = torchvision.datasets.CIFAR10(
            root=self.root, 
            train=train, 
            download=self.download, 
            transform=self.transform
        )
        return DataLoader(
            dataset, 
            batch_size=self.batch_size, 
            shuffle=train, 
            num_workers=self.num_workers
        )

Files already downloaded and verified
Files already downloaded and verified


# 4. Define Model trainer

In [None]:
class ModelTrainer:
    def __init__(self, model=None, criterion=None, optimizer=None, dataloader=None):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = model or MLP(n_classes=10, n_hidden_nodes=100, image_width=32, image_height=32, color_channels=3)
        self.model.to(self.device)
        self.criterion = criterion or CrossEntropyLoss()
        self.optimizer = optimizer or SGD(self.model.parameters(), lr=0.005)
        self.dataloader = dataloader or DataLoaderFactory(root='./data', batch_size=32, num_workers=4, download=False)
        self.epochs = 0

    def train(self, epochs=10, write_log=False):
        self.epochs = epochs
        train_loader = self.dataloader.load_data(train=True)
        train_accuracy = torchmetrics.Accuracy(task="multiclass", num_classes=10).to(self.device)
        test_loader = self.dataloader.load_data(train=False)
        test_accuracy = torchmetrics.Accuracy(task="multiclass", num_classes=10).to(self.device)

        if write_log:
            num_layers = len(list(self.model.children())) // 2 + 1
            id = random.randint(0, 1000)
            logging.basicConfig(filename=f'training_mlp_{num_layers}_hidden_layers_{id}.log', level=logging.INFO)
            logging.info("Training started\n")

        for epoch in range(epochs):
            running_loss = 0.0
            self.model.train()
            train_accuracy.reset()

            for inputs, labels in train_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                running_loss += loss.item()
                train_accuracy.update(outputs.argmax(dim=1), labels)

            final_train_accuracy = train_accuracy.compute()
            print(f'Epoch [{epoch+1}/{epochs}]\n', 
                  f'Loss: {running_loss/len(train_loader):.4f}\n',
                  f'Train Accuracy: {final_train_accuracy * 100:.2f}\n',
                  '--------------------------------------------------\n')

            final_test_accuracy = self.evaluate(test_loader=test_loader, test_accuracy=test_accuracy)
            if write_log:
                logging.info(f"Epoch: {epoch + 1}, Loss: {running_loss/len(train_loader):.4f}, Train accuracy: {final_train_accuracy}\t|\t Test accuracy: {final_test_accuracy}\n")

        print('======================Finished=========================')
        return running_loss / len(train_loader)

    def evaluate(self, test_loader=None, test_accuracy=None):
        test_loader = test_loader or self.dataloader.load_data(train=False)
        test_accuracy = test_accuracy or torchmetrics.Accuracy(task="multiclass", num_classes=10).to(self.device)
        self.model.eval()
        test_accuracy.reset()

        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(self.device), labels.to(self.device)
                outputs = self.model(inputs)
                test_accuracy.update(outputs.argmax(dim=1), labels)

        final_test_accuracy = test_accuracy.compute()
        print(f'Test Accuracy: {final_test_accuracy * 100:.2f}\n',
              '--------------------------------------------------\n')
        return final_test_accuracy

    def predict(self, data):
        predictions = []
        self.model.eval()

        with torch.no_grad():
            for input in data:
                input = input.to(self.device)
                outputs = self.model(input)
                predictions.append(outputs.argmax(dim=1))

        return predictions

    def save(self, name=None):
        parent = 'models'
        checkpoint_path = str(name) + '.pth' or 'mlp_checkpoint.pth'
        path = f'{parent}/{checkpoint_path}'
        checkpoint = {
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'criterion_state_dict': self.criterion.state_dict(),
            'epochs': self.epochs
        }
        torch.save(checkpoint, path)
        print(f"Checkpoint saved to {path}")

# 5. Hyperparameters tuning

In [1]:
def hyper_tuning(model, epochs=3, write_log=False, download=True):
    param_grid = {
        'batch_size': [16, 32, 64],
        'learning_rate': [1e-3, 5e-3],
        'optimizer': ['SGD', 'Adam']
    }

    param_combinations = list(product(*param_grid.values()))

    best_params = None
    best_loss = float('inf')
    for params in param_combinations:
        batch_size, lr, opt = params
        print(f"\nTesting with batch size={batch_size}, learning rate={lr}, optimizer={opt}")
        
        dataloader = DataLoaderFactory(root='./data', batch_size=batch_size, download=download)

        optimizer = SGD(model.parameters(), lr=lr) if opt == 'SGD' else Adam(model.parameters(), lr=lr)

        model_trainer = ModelTrainer(model=model, dataloader=dataloader, optimizer=optimizer)
        avg_loss = model_trainer.train(epochs=epochs, write_log=write_log)
        
        if avg_loss < best_loss:
            best_loss = avg_loss
            best_params = {
                'batch_size': batch_size,
                'learning_rate': lr,
                'optimizer': opt
            }

    return best_params, best_loss, model

# 6. Practice

## 6.1. MLP with 1 hidden layer

### 6.1.1. Random choice

In [None]:
one_hlayer_model = MLP(n_classes=10, n_hidden_nodes=100, image_width=32, image_height=32, color_channels=3)

criterion = CrossEntropyLoss()
optimizer = SGD(one_hlayer_model.parameters(), lr=0.005)

model_trainer = ModelTrainer(model=one_hlayer_model, criterion=criterion, optimizer=optimizer)

model_trainer.train(epochs=20, write_log=True)

model_trainer.save('one_hidden_model_1')

### 6.1.2. Hyper tuning

In [None]:
one_hlayer_model_tuning = MLP(n_classes=10, n_hidden_nodes=100, image_width=32, image_height=32, color_channels=3)

best_params, best_lost, one_hlayer_model_tuning = hyper_tuning(model=one_hlayer_model_tuning, epochs=10, write_log=True, download=False)

print('Best hyperparameters: ', best_params)
print('Best loss: ', best_lost)

torch.save(best_params, '1h_best_hparams.pth')

## 6.2. MLP with 2 hidden layer

### 6.2.1. Random choice

In [None]:
two_hlayer_model = MLP(n_classes=10, n_hidden_nodes=100, image_width=32, image_height=32, color_channels=3, n_hidden_layers=2)

criterion = CrossEntropyLoss()
optimizer = SGD(one_hlayer_model.parameters(), lr=0.005)

model_trainer = ModelTrainer(model=one_hlayer_model, criterion=criterion, optimizer=optimizer)

model_trainer.train(epochs=20, write_log=True)

model_trainer.save('two_hidden_model_1')

### 6.2.2. Hyper tuning

In [None]:
two_hlayer_model_tuning = MLP(n_classes=10, n_hidden_nodes=100, image_width=32, image_height=32, color_channels=3, n_hidden_layers=2)

best_params, best_lost, one_hlayer_model_tuning = hyper_tuning(model=one_hlayer_model_tuning, epochs=10, write_log=True, download=False)

print('Best hyperparameters: ', best_params)
print('Best loss: ', best_lost)

torch.save(best_params, '2h_best_hparams.pth')