# Import modules

In [1]:
import os
import zipfile
import torch
import torchvision
from torchvision import datasets, transforms
import pandas as pd
from tqdm.auto import tqdm
from pathlib import Path
from typing import Tuple, Dict
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import DataLoader, random_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

  from .autonotebook import tqdm as notebook_tqdm


# Download dataset

In [2]:
data_dir = Path('data/')

if data_dir.is_dir():
    print('Folder already exists')
else:
    print('Downloading subset of Food101 dataset')
    data_dir.mkdir(parents=True,exist_ok=True)
    !kaggle datasets download -d satish1v/food101subset -p data/
    zip_path = data_dir / 'food101subset.zip'
    with zipfile.ZipFile(zip_path) as zipref:
        zipref.extractall(data_dir)
    os.remove(zip_path)

Folder already exists


# Experimentation setup

In [3]:
model_names = ['alexnet','vgg11','resnet18']

batch_sizes = [10,20]

dropouts = [0.3,0.5]

# Helper functions

In [28]:
def train_step(model:torch.nn.Module,
               dataloader:torch.utils.data.DataLoader,
               loss_fn:torch.nn.Module,
               optimizer: torch.optim.Optimizer,  
               device:torch.device) -> Tuple[float,float,float,float,float]:
    # train mode
    model.train()

    # Set up metrics    
    train_loss, train_acc, train_precision, train_recall, train_f1 = 0, 0, 0, 0, 0

    # Loop through batches
    for X, y in dataloader:

        X, y = X.to(device), y.to(device)

        # 1. Forward pass
        logits = model(X)

        # 2. Compute loss
        loss = loss_fn(logits, y)

        # 3. Zero gradients
        optimizer.zero_grad()

        # 4. Backward propagation
        loss.backward()

        # 5. Update weights and biases
        optimizer.step()

        # Get the most confident predictions
        predictions = torch.argmax(logits,dim=1)

        # Calculate metrics
        train_loss += loss.item()
        train_acc += accuracy_score(predictions, y)
        train_precision += precision_score(predictions, y, average='micro')
        train_recall += recall_score(predictions, y, average='micro')
        train_f1 += f1_score(predictions, y, average='micro')
    
    # Averaging metrics
    train_loss = round(train_loss/len(dataloader),2)
    train_acc = round(train_acc/len(dataloader),2)
    train_precision = round(train_precision/len(dataloader),2)
    train_recall = round(train_recall/len(dataloder),2)
    train_f1 = round(train_f1/len(dataloader),2)
    
    return train_loss, train_acc, train_precision, train_recall, train_f1
    

def test_step(model:torch.nn.Module,
              dataloader:torch.utils.data.DataLoader,
              loss_fn:torch.nn.Module,
              device:torch.device) -> Tuple[float,float,float,float,float]:

    # Set up metrics    
    test_loss, test_acc, test_precision, test_recall, test_f1 = 0, 0, 0, 0, 0

    # Turn off updating gradients
    model.eval()
    with model.inference_mode():
        # Loop through batches
        for X, y in dataloader:

            X, y = X.to(device), y.to(device)

            # 1. Forward pass
            logits = model(X)

            # 2. Compute loss
            loss = loss_fn(logits, y)

            # Calculate metrics
            test_loss += loss.item()
            test_acc += accuracy_score(predictions, y)
            test_precision += precision_score(predictions, y, average='micro')
            test_recall += recall_score(predictions, y, average='micro')
            test_f1 += f1_score(predictions, y, average='micro')

    # Averaging metrics
    test_loss = round(test_loss/len(dataloader),2)
    test_acc = round(test_acc/len(dataloader),2)
    test_precision = round(test_precision/len(dataloader),2)
    test_recall = round(test_recall/len(dataloder),2)
    test_f1 = round(test_f1/len(dataloader),2)
    
    return test_loss, test_acc, test_precision, test_recall, test_f1


def create_writer(model_name:str, batch_size:int, dropout_rate:float) -> SummaryWriter:
    log_dir = Path("experiments") / model_name / str(batch_size) / str(dropout_rate)
    return SummaryWriter(log_dir=log_dir)
    

def train(model:torch.nn.Module,
            train_dataloader:torch.utils.data.DataLoader,
            test_dataloader:torch.utils.data.DataLoader,
            loss_fn:torch.nn.Module,
            optimizer: torch.optim.Optimizer,  
            device:torch.device,
            model_name:str,
            batch_size:int,
            dropout_rate:float,
            epochs:int,
            ):
    
    lowest_test_loss = float('inf')
    for epoch in tqdm(range(epochs)):
        # Train step
        train_loss, train_acc, train_precision, train_recall, train_f1 = train_step(model,train_dataloader,loss_fn,optimizer,device)
        print(f'EPOCH_{epoch+1}:\n \
            Train metrics: loss = {train_loss}; acc = {train_acc}; precision = {train_precision}; recall = {train_recall}; f1 = {train_f1}')
        
        # Test step
        test_loss, test_acc, test_precision, test_recall, test_f1 = test_step(model,test_dataloader,loss_fn,device)
        print(f'EPOCH_{epoch+1}:\n \
            Test metrics: loss = {test_loss}; acc = {test_acc}; precision = {test_precision}; recall = {test_recall}; f1 = {test_f1}')

        # Add results to SummaryWriter
        writer.add_scalars(main_tag='CrossEntropyLoss',
                           tag_scalar_dict={'train_loss':train_loss,
                                             'test_loss':test_loss})
        writer.add_scalars(main_tag='Accuracy',
                           tag_scalar_dict={'train_acc':train_acc,
                                             'test_acc':test_acc})
        writer.add_scalars(main_tag='Precision',
                           tag_scalar_dict={'train_precision':train_precision,
                                             'test_precision':test_precision})
        writer.add_scalars(main_tag='Recall',
                           tag_scalar_dict={'train_recall':train_recall,
                                             'test_recall':test_recall})
        writer.add_scalars(main_tag='F1',
                            tag_scalar_dict={'train_f1':train_f1,
                                                'test_f1':test_f1})
        writer.close()

        # Checking if test_loss decreased
        if test_loss<lowest_test_loss:
            model_path = Path('models/') / f'{model_name}_BS{batch_size}_DR{dropout_rate}.pt'
            torch.save(model, model_path)

# Settings

In [29]:
# Set the seed for reproducibility
torch.manual_seed(42)
torch.cuda.manual_seed(42)

train_threshold = 0.8
out_features = len(os.listdir(data_dir))
loss_fn = torch.nn.CrossEntropyLoss()
learning_rate = 0.001
epochs = 5

experiment_dir = Path('experiments/')
experiment_dir.mkdir(parents=True,exist_ok=True)

device = 'cuda' if torch.cuda.is_available() else 'cpu'
device

'cpu'

# Experimenting by iterating through "experimentation setup"

In [31]:
for model_name in model_names:

    if model_name=='alexnet':
        weights = torchvision.models.AlexNet_Weights.DEFAULT
        model = torchvision.models.alexnet(weights=weights)
    
    elif model_name=='vgg11':
        weights = torchvision.models.VGG11_Weights.DEFAULT
        model = torchvision.models.vgg11(weights=weights)
    
    elif model_name=='resnet18':
        weights = torchvision.models.ResNet18_Weights.DEFAULT
        model = torchvision.models.resnet18(weights=weights)
        # Making the last layer similar to first two models for purposes of experiment
        model.classifier = model.fc
        del model.fc
        model.classifier = torch.nn.Sequential(
            torch.nn.Dropout(p=random.random()),
            model.classifier
        )

    model.to(device)

    model.classifier[-1] = torch.nn.Linear(in_features=model.classifier[-1].in_features, 
                                            out_features=out_features, bias=True)

    # Get model's default transform
    auto_transforms = weights.transforms()

    # Create datasets
    dataset = datasets.ImageFolder(root=data_dir, transform=auto_transforms)
    train_size = int(train_threshold * len(dataset))
    test_size = int(len(dataset) - train_size)
    train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

    for batch_size in batch_sizes:
        # Create dataloaders
        train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
        test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

        for dropout in dropouts:
            # Changing dropout rate
            for i, layer in enumerate(model.classifier):
                if type(layer)==torch.nn.modules.dropout.Dropout:
                    model.classifier[i] = torch.nn.Dropout(p=dropout, inplace=False)
            
            # Putting only classifier params equal to freezing feature extraction layers
            optimizer = torch.optim.Adam(model.classifier.parameters(), lr=learning_rate)
            
            # Train, save the best model, track metrics
            train(model,train_dataloader,test_dataloader,
                loss_fn,optimizer,device,model_name,
                batch_size,dropout,epochs)

  0%|          | 0/5 [00:01<?, ?it/s]


KeyboardInterrupt: 