# Basic Project: Transfer Learning

In [20]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import transforms, datasets, models
from torch.utils.data import random_split
import os
from tempfile import TemporaryDirectory
import matplotlib.pyplot as plt
import time 


Tic() Toc() Functions to track training time

In [21]:
def tic():
    global startTime_for_tictoc
    startTime_for_tictoc = time.time()

def toc():
    if 'startTime_for_tictoc' in globals():
        return time.time() - startTime_for_tictoc
        #print("Elapsed time is " + str(time.time() - startTime_for_tictoc) + " seconds.")
    else:
        print("Toc: start time not set")
        

In [22]:
data_root = "datasets"


Download ResNet18

In [23]:
# for ResNet18:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225]),
])


Dataset

In [24]:
trainval_data = datasets.OxfordIIITPet(
        root=data_root,
        split='trainval',
        target_types='binary-category',
        transform=transform,
        download=False
    )
test_data = datasets.OxfordIIITPet(
        root=data_root,
        split='test',
        target_types='binary-category',
        transform=transform,
        download=False
    )


Train / Test Dataset Split

In [25]:
val_ratio = 0.2  # 20% for validation
train_size = int((1 - val_ratio) * len(trainval_data))
val_size = len(trainval_data) - train_size

train_data, val_data = random_split(trainval_data, [train_size, val_size])


In [26]:
len(val_data), len(train_data), len(test_data)


(736, 2944, 3669)

Data Loaders

In [27]:
batch_size = 32
train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)


In [28]:
dataloaders = {}
dataset_sizes = {}

dataloaders['train'] = train_loader
dataloaders['val'] = val_loader
dataloaders['test'] = test_loader

dataset_sizes['train'] = len(train_data)
dataset_sizes['val'] = len(val_data)


Initialize the Network

In [29]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")
if device.type == 'cpu':
    print("If GPU is available: \npip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118")
    print("Restart the kernel and run the code again.")
    print("Check with `print(torch.cuda.is_available())`")
    print("Documentation: https://pytorch.org/get-started/locally/")
    
# ResNet18
network = models.resnet18(weights='DEFAULT')
nf = network.fc.in_features
network.fc = nn.Linear(nf, 2)
network = network.to(device)


Using device: cuda


Function to Train the Network

In [30]:
def train_network(network, dataloaders, dataset_sizes, criterion, optimizer, num_epochs=25):

    train_losses = []
    train_accuracies = []
    val_losses = []
    val_accuracies = []

    # Set Starting Time
    tic()

    with TemporaryDirectory() as tempdir:
        best_network_params_path = os.path.join(tempdir, 'best_network_params.pt')

        torch.save(network.state_dict(), best_network_params_path)
        best_acc = 0.0

        for epoch in range(num_epochs):
            print(f'Epoch {epoch + 1}/{num_epochs}')
            print('-' * 10)

            for phase in ['train', 'val']:
                if phase == 'train':
                    network.train()
                else:
                    network.eval()

                running_loss = 0.0
                running_corrects = 0

                for X, Y in dataloaders[phase]:
                    X = X.to(device)
                    Y = Y.to(device)

                    optimizer.zero_grad()

                    # forward
                    with torch.set_grad_enabled(phase == 'train'):
                        S = network(X)
                        _, P = torch.max(S, 1)
                        loss = criterion(S, Y)

                        # backward + optimize only if in training phase
                        if phase == 'train':
                            loss.backward()
                            optimizer.step()

                    # statistics
                    running_loss += loss.item() * X.size(0)
                    running_corrects += torch.sum(P == Y.data)

                epoch_loss = running_loss / dataset_sizes[phase]
                epoch_acc = running_corrects.double() / dataset_sizes[phase]

                if phase == 'train':
                    train_losses.append(epoch_loss)
                    train_accuracies.append(epoch_acc.item())
                else:
                    val_losses.append(epoch_loss)
                    val_accuracies.append(epoch_acc.item())

                print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

                # deep copy the model
                if phase == 'val' and epoch_acc > best_acc:
                    best_acc = epoch_acc
                    torch.save(network.state_dict(), best_network_params_path)

            print()

        print(f'Best val Acc: {best_acc:4f}')

        network.load_state_dict(torch.load(best_network_params_path, weights_only=True))
        
    # Print Time for Training only
    el_time_training = toc()
    print(f"\nTime for training: {el_time_training:.1f} sec.")
    
    # Return Network and Training Statistics
    train_stats = {
        'train_losses': train_losses,
        'train_accuracies': train_accuracies,
        'val_losses': val_losses,
        'val_accuracies': val_accuracies,
        'elapsed_time': el_time_training,
    }
    
    return network, train_stats
    # return network, train_losses, val_losses, train_accuracies, val_accuracies


In [31]:
def compute_accuracy(network, loader, print_result=True):
    network.eval()
    correct = 0
    total = 0

    with torch.no_grad():
        for X, Y in loader:
            X, Y = X.to(device), Y.to(device)
            S = network(X)
            _, P = torch.max(S, 1)
            correct += (P == Y).sum().item()
            total += Y.size(0)

    acc = 100 * correct / total
    if print_result:
        print(f"Test Accuracy: {acc:.2f}%")
        
    return acc


Define Entropy Criterion and the Optimizer

In [32]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(network.parameters(), lr=1e-4)


Train the Network

In [33]:
# Running for 25 epochs
TrainYN = False
if TrainYN:
    network, train_stats = train_network(
        network, dataloaders, dataset_sizes, criterion, optimizer, num_epochs=25
    )


Plot the Results

In [34]:
def VisLossAccuracy(train_losses, val_losses, train_accuracies, val_accuracies):

    plt.figure(facecolor='white', figsize=(12, 5))
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.legend()
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title('Loss over epochs')
    plt.grid()
    #plt.show()

    #plt.figure(facecolor='white')
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Train Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.legend()
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    #plt.ylim([0.9, 1.02])
    plt.title('Accuracy over epochs')
    plt.grid()
    plt.show()
    

# Visualize the loss and accuracy of the Network
if TrainYN:    
    train_losses, val_losses, train_accuracies, val_accuracies = \
                train_stats['train_losses'], train_stats['val_losses'], \
                train_stats['train_accuracies'], train_stats['val_accuracies']  
    VisLossAccuracy(train_losses, val_losses, train_accuracies, val_accuracies)
    compute_accuracy(network, test_loader)
    

## Multi-Class Problem
- Identifying all 37 breeds of Cats & Dogs

In [35]:
def Load_TrainTestData(data_root, target_types, transform):
    trainval_data = datasets.OxfordIIITPet(
        root=data_root,
        split='trainval',
        target_types=target_types,
        transform=transform,
        download=False
    )
    test_data = datasets.OxfordIIITPet(
        root=data_root,
        split='test',
        target_types=target_types,
        transform=transform,
        download=False
    )

    val_ratio = 0.2  # 20% for validation
    train_size = int((1 - val_ratio) * len(trainval_data))
    val_size = len(trainval_data) - train_size

    train_data, val_data = random_split(trainval_data, [train_size, val_size])
    
    return train_data, val_data, test_data


def DataLoaderFnc(train_data, val_data, test_data, batch_size=32):

    # Create DataLoaders
    train_loader = DataLoader(train_data, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_data, batch_size=batch_size, shuffle=False)
    test_loader = DataLoader(test_data, batch_size=batch_size, shuffle=False)

    dataloaders = {}
    dataset_sizes = {}

    dataloaders['train'] = train_loader
    dataloaders['val'] = val_loader
    dataloaders['test'] = test_loader

    dataset_sizes['train'] = len(train_data)
    dataset_sizes['val'] = len(val_data)

    return dataloaders, dataset_sizes


def Initialize_ResNet18(no_target_classes=2):
    
    network = models.resnet18(weights='DEFAULT')
    nf = network.fc.in_features
    network.fc = nn.Linear(nf, no_target_classes)
    network = network.to(device)
    
    return network



In [36]:
TrainYN = False
if TrainYN:
    
    # Load Train, Validation and Test Data
    train_data, val_data, test_data = Load_TrainTestData(data_root, 'category', transform)

    # Create DataLoaders
    dataloaders, dataset_sizes = DataLoaderFnc(train_data, val_data, test_data, batch_size=32)

    # Initialize ResNet18
    init_network = Initialize_ResNet18(no_target_classes=37)

    # Define the loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(init_network.parameters(), lr=1e-4)

    # Train the network
    network, train_stats = train_network(
        init_network, dataloaders, dataset_sizes, criterion, optimizer, num_epochs=10
    )

    # Visualize the loss and accuracy of the Network
    train_losses, val_losses, train_accuracies, val_accuracies = \
                train_stats['train_losses'], train_stats['val_losses'], \
                train_stats['train_accuracies'], train_stats['val_accuracies']
    VisLossAccuracy(train_losses, val_losses, train_accuracies, val_accuracies)

    # Print the Accuracy
    final_acc = compute_accuracy(network, test_loader, print_result=True)


Build one big Training Function

In [43]:
def TrainResNet18_S1(data_root, target_types, transform, TrainParams):
    
    # Extract Training Parameters
    batch_size = TrainParams.get('batch_size', 32)
    num_epochs = TrainParams.get('num_epochs', 25)
    no_target_classes = TrainParams.get('no_target_classes', 2)
    lr = TrainParams.get('lr', 1e-4)
    L = TrainParams.get('L', 0)  # Number of layers to fine-tune simultaneously
    strategy = TrainParams['strategy']  # 'fine-tune' or 'un-freeze'
    curr_layer = TrainParams.get('curr_layer', 0)  # Current layer to unfreeze
    InitNetYN = TrainParams.get('InitNetYN', True)  # Initialize network
    
    # Load Train, Validation and Test Data
    train_data, val_data, test_data = Load_TrainTestData(data_root, target_types, transform)

    # Create DataLoaders
    dataloaders, dataset_sizes = DataLoaderFnc(train_data, val_data, test_data, batch_size)

    # Initialize ResNet18
    init_network = Initialize_ResNet18(no_target_classes)
    
    # Freeze/Unfreeze Layers
    if strategy == 'fine-tune':
        for l, param in enumerate(init_network.parameters()):
            if l <= L:
                param.requires_grad = True
            else:
                param.requires_grad = False
    elif strategy == 'un-freeze':
        for l, param in enumerate(init_network.parameters()):
            param.requires_grad = False
            if l == curr_layer:
                param.requires_grad = True

    # Define the loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(init_network.parameters(), lr)

    # Train the network
    network, train_stats = train_network(
        init_network, dataloaders, dataset_sizes, criterion, optimizer, num_epochs
    )

    # Visualize the loss and accuracy of the Network
    train_losses, val_losses, train_accuracies, val_accuracies = \
                train_stats['train_losses'], train_stats['val_losses'], \
                train_stats['train_accuracies'], train_stats['val_accuracies']  
    VisLossAccuracy(train_losses, val_losses, train_accuracies, val_accuracies)

    # Print the Accuracy
    final_acc = compute_accuracy(network, test_loader, print_result=True)

    # Add the final accuracy to the training statistics
    train_stats['final_accuracy'] = final_acc
    
    return network, train_stats
    

### Strategy 1: Fine-tune $l$ layers simultaneously

In [44]:
# Layers to unfreeze
layers = [0, 3, 5, 10]

# Training Parameters
TrainParams = {
    'batch_size': 32,
    'num_epochs': 10,
    'no_target_classes': 37,
    'lr': 1e-4,
    'L': 0,  # Unfreeze the last layer
    'strategy': 'fine-tune',  # 'fine-tune' or 'un-freeze'
}


# Loop through the layers and train the network
train_stats_list = []
TrainYN = False
if TrainYN:
    for l in layers:
        print(f"\nTraining with fine-tuning layers up to layer {l}...")
        TrainParams['L'] = l  # Set the number of layers to fine-tune
        _, train_stats_S1 = TrainResNet18_S1(data_root, 'category', transform, TrainParams)
        print(f"Finished training with fine-tuning layers up to layer {l}.")
        train_stats_list.append(train_stats)
    

### Strategy 2: Gradual un-freezing

In [None]:
def TrainResNet18_S2(data_root, target_types, transform, TrainParams):
    
    # Extract Training Parameters
    batch_size = TrainParams.get('batch_size', 32)
    num_epochs = TrainParams.get('num_epochs', 25)
    no_target_classes = TrainParams.get('no_target_classes', 2)
    lr = TrainParams.get('lr', 1e-4)
    L = TrainParams.get('L', 0)  # Number of layers to fine-tune simultaneously
    strategy = TrainParams['strategy']  # 'fine-tune' or 'un-freeze'
    curr_layer = TrainParams.get('curr_layer', 0)  # Current layer to unfreeze
    InitNetYN = TrainParams.get('InitNetYN', True)  # Initialize network
    
    # Load Train, Validation and Test Data
    train_data, val_data, test_data = Load_TrainTestData(data_root, target_types, transform)

    # Create DataLoaders
    dataloaders, dataset_sizes = DataLoaderFnc(train_data, val_data, test_data, batch_size)

    # Initialize ResNet18
    init_network = Initialize_ResNet18(no_target_classes)

    # Define the loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(init_network.parameters(), lr)
    
    # Freeze/Unfreeze Layers
    network = init_network
    list_train_stats = []
    print("\nStart Training Network (Strategy: gradually unfreeze layers) ...")
    # Loop around the layers and train the network
    for layer, _ in enumerate(init_network.parameters()):
        # Freeze / unfreeze the right layers (gradually unfreeze)
        for l, param in enumerate(network.parameters()):
            param.requires_grad = False
            if l == layer: # unfreeze the current layer
                param.requires_grad = True
        # Train the network (only the unfreezed layers)
        print(f"Training with unfreezing layer {layer}...")
        network, train_stats = train_network(
            network, dataloaders, dataset_sizes, criterion, optimizer, num_epochs
        )
        list_train_stats.append(train_stats)


    # Visualize the loss and accuracy of the Network
    train_losses, val_losses, train_accuracies, val_accuracies = \
                train_stats['train_losses'], train_stats['val_losses'], \
                train_stats['train_accuracies'], train_stats['val_accuracies']  
    VisLossAccuracy(train_losses, val_losses, train_accuracies, val_accuracies)

    # Print the Accuracy
    final_acc = compute_accuracy(network, test_loader, print_result=True)

    # Add the final accuracy to the training statistics
    train_stats['final_accuracy'] = final_acc
    
    return network, train_stats, list_train_stats
    
    

In [None]:
# Training Parameters
TrainParams['strategy'] = 'un-freeze'

# Train the network with gradually unfreezing layers
TrainYN = True
if TrainYN:
    trained_net_S2, train_stats_S2, list_train_stats = TrainResNet18_S2(data_root, 'category', transform, TrainParams)


## Fine-Tuning with imbalanced classes
- Check the Training behavior with class-imbalance
- Try a strategy (e.g. weighted cross-entropy and/or over-sampling of minority classes) to compensate the imbalanced training set