In [7]:
import torch
from torchvision.datasets import OxfordIIITPet
from torchvision import transforms
from torchvision import models
from torch import nn
from torch import optim
import wandb
import tqdm

In [8]:
train_transform = transforms.Compose([
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],   # Normalization of ImageSet (necessary if we use a model pre-trained on ImageSet, such as ResNet-18)
                         std=[0.229, 0.224, 0.225])
])
test_transform = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                         std=[0.229, 0.224, 0.225])
])

train_data = OxfordIIITPet(
    root="../data",
    split="trainval",
    transform=train_transform,
    download=True
)
test_data = OxfordIIITPet(
    root="../data",
    split="test",
    transform=test_transform,
    download=True
)
print(f"Training samples: {len(train_data)}")
print(f"Test samples: {len(test_data)}")
print(f'Size of the images: {train_data[0][0].shape}')

Training samples: 3680
Test samples: 3669
Size of the images: torch.Size([3, 224, 224])


In [9]:
def load_model(model, dropout_p=0.5):
    """
    Load a pre-trained model and set it up for training:
    replace the last fully-connected layer and freeze
    the rest of the model.
    """
    # Freeze all the layers in the base model
    for param in model.parameters():
        param.requires_grad = False

    # Get the number of input features for the classifier
    num_ftrs = model.fc.in_features
    num_classes = len(class_names)

    # Create a new fully-connected layer for our new classes
    model.fc = nn.Sequential(
        nn.Dropout(p=dropout_p),
        nn.Linear(num_ftrs, num_classes)
    ) 

    # Move model to device
    model.to(device)
    
    return model

In [10]:
def fine_tune(model, dataloaders, loss_fn, optimizer, epochs=10, device='cuda'):
    """
    """ 
    # Create a dictionary to store training history
    history = {'train_loss': [], 'val_loss': [], 'val_acc': []}
    train_dataloader, test_dataloader = dataloaders

    # Loop through epochs
    for epoch in range(epochs):
        ### Training Phase ###
        model.train()
        
        train_loss = 0.0
        for batch, (X, y) in enumerate(train_dataloader):
            # Move data to target device
            X, y = X.to(device), y.to(device)

            # Forward pass
            y_pred = model(X)

            # Calculate loss
            loss = loss_fn(y_pred, y)
            train_loss += loss.item()

            # Optimizer zero grad
            optimizer.zero_grad()

            # Loss backward
            loss.backward()

            # Optimizer step
            optimizer.step()

        # Calculate average training loss for the epoch
        train_loss /= len(train_dataloader)

        ### Validation Phase ###
        model.eval() # Set model to evaluation mode
        
        val_loss, val_acc = 0.0, 0.0
        with torch.inference_mode():
            for X, y in test_dataloader:
                X, y = X.to(device), y.to(device)
                
                # Forward pass
                val_pred = model(X)
                
                # Calculate loss and accuracy
                val_loss += loss_fn(val_pred, y).item()
                val_acc += (val_pred.argmax(dim=1) == y).sum().item()

        # Calculate metrics over the whole validation set
        val_loss /= len(test_dataloader)
        val_acc /= len(test_dataloader.dataset)

        # Print progress
        print(f"Epoch: {epoch+1} | "
              f"Train loss: {train_loss:.4f} | "
              f"Val loss: {val_loss:.4f} | "
              f"Val acc: {val_acc:.4f}")

        # Store history
        history['train_loss'].append(train_loss)
        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)
        
        wandb.log({
            "train_loss": train_loss,
            "val_loss": val_loss,
            "val_acc": val_acc
        })
        
    return history
    
def fine_tune_full(model, dataloaders, loss_fn, optimizers, epochs_head=5, epochs_full=5, device='cuda'):
    """
    """
    optimizer_head, optimizer_full = optimizers
    
    # Train the head of the model
    print('Training model\'s head...')
    history_head = fine_tune(model, dataloaders, loss_fn, optimizer_head, epochs=epochs_head, device=device)
    
    # Unfreeze all layers
    for param in model.parameters():
        param.requires_grad = True
        
    print('Training full model...')
    # Train the full model
    history_full = fine_tune(model, dataloaders, loss_fn, optimizer_full, epochs=epochs_full, device=device)
    print('Finished Training')
    
    # Join the histories
    history = {'train_loss': history_head['train_loss'] + history_full['train_loss'], 
               'val_loss': history_head['val_loss'] + history_full['val_loss'], 
               'val_acc': history_head['val_acc'] + history_full['val_acc']}
    
    return history

In [11]:
# Config for W&B sweep to find optimal hyperparameters
sweep_config = {
    'method': 'bayes',           # The search strategy: bayes, random, or grid
    'metric': {
        'name': 'val_acc' ,       
        'goal': 'maximize'
    },
    'parameters': {
        'lr_head': {
            'distribution': 'log_uniform',
            'min': 0.00001,
            'max': 0.0005
        },
        'lr_full': {
            'distribution': 'uniform',
            'min': 0.00001,
            'max': 0.0005
        },
        'weight_decay_full': {
            'distribution': 'log_uniform',
            'min': 0.0,
            'max': 0.01
        },
        'dropout_p': {
            'values': [0.25, 0.5, 0.75]
        },
        'epochs_head': {
            'distribution': 'uniform',
            'min': 5,
            'max': 15
        },
        'epochs_full': {
            'distribution': 'uniform',
            'min': 10,
            'max': 30
        }
    }  
}

def sweep(sweep_config, model, dataloaders, loss_fn, device='cuda'):
    """
    """
    with wandb.init(entity = 'nicdeluc-learning', 
                    project = 'pet-breed-classification',
                    config = sweep_config):
        config = wandb.config

        model = load_model(model, dropout=config.dropout_p)
        
        optim_head = optim.Adam(model.fc.parameters(), lr=config.lr_head)
        optim_full = optim.Adam(model.parameters(), lr=config.lr_full)
        optimizers = optim_head, optim_full
        
        history = fine_tune_full(model, 
                                 dataloaders, 
                                 loss_fn, 
                                 optimizers, 
                                 epochs_head=config.epochs_head, 
                                 epochs_full = config.epochs_full)
        
    return history
        