# Setup Environment

Import required packages:

In [None]:
import copy, os, time, shutil
from tqdm import tqdm
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

import torch
import torch.nn as nn 
import torch.optim as optim
from torch.optim import lr_scheduler
import torchvision
from torchvision import datasets, models, transforms
from torchvision.io import read_image
from torch.utils.data import sampler, Dataset

from torch.utils.tensorboard import SummaryWriter

Make PyTorch use the GPU:

In [None]:
assert torch.cuda.is_available()
device = torch.device('cuda')

Global variables and settings:

In [None]:
path_dir_project = "/thecube/students/jravagli"
path_dir_output = os.path.join(path_dir_project, "outputs", "resnet50-ft")
path_dir_tensorboard = os.path.join(path_dir_output, "tensorboard")

# Paths datasets
path_dir_datasets = os.path.join(path_dir_project, "datasets", "used-pp")
path_train_dataset = os.path.join(path_dir_datasets, "train")
path_valid_dataset = os.path.join(path_dir_datasets, "valid")
path_test_dataset = os.path.join(path_dir_datasets, "test")
# Path annotation files
path_train_ann = os.path.join(path_dir_datasets, "train.txt")
path_valid_ann = os.path.join(path_dir_datasets, "valid.txt")
path_test_ann = os.path.join(path_dir_datasets, "test.txt")
# Path models
path_models_folder = os.path.join(path_dir_output, "models")
path_model_ft = os.path.join(path_models_folder, "resnet50-ft.pth")
path_model_ft_best = os.path.join(path_models_folder, "resnet50-ft-best.pth")
# Input Path
model_path = os.path.join(path_dir_project, "outputs","resnet50-tl-final","models","resnet50-tl-best.pth")

n_classes = 14
img_height = 150
img_width = 150

batch_size = 64
lr_ft = 5e-5
weight_decay = 1e-3
ft_epochs = 32

Clean output dir:

In [None]:
if os.path.isdir(path_dir_output):
    shutil.rmtree(path_dir_output)

os.mkdir(path_dir_output)
os.mkdir(path_dir_tensorboard)
os.mkdir(path_models_folder)

Configure tensorboard:

In [None]:
writer = SummaryWriter(path_dir_tensorboard)

# Data Loading

Define data preprocessing operations (apply some data augmentation to the training set):

In [None]:
# Normalize the dataset using the ImageNet mean and std
# since we will use pretrained networks
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
size = [img_height, img_width]
data_transform = {
    'train': transforms.Compose([
        transforms.Resize(size),
        transforms.RandomHorizontalFlip(),
        transforms.RandomRotation(30),
        transforms.Normalize(mean,std)
    ]),
    'valid': transforms.Compose([
        transforms.Resize(size),
        transforms.Normalize(mean,std)
    ])
}

Define the custom dataset that reads the data from a csv file:

In [None]:
class UsedDataset(Dataset):
    def __init__(self, csv_file, transform=None):
        self.np_data = pd.read_csv(csv_file).to_numpy()
        self.transform = transform

    def __len__(self):
        return len(self.np_data)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        path_image = self.np_data[idx, 0]
        # Read the image in a PyTorch tensor and squeeze values in [0, 1]
        image = read_image(path_image)/255.
        label = self.np_data[idx, 1]
        clothes = self.np_data[idx, 2]

        if self.transform:
            image = self.transform(image)
            
        sample = {"image": image, "label": label, "clothes": clothes}

        return sample

In [None]:
train_dataset = UsedDataset(path_train_ann, transform=data_transform["train"])
valid_dataset = UsedDataset(path_valid_ann, transform=data_transform["valid"])

Function to calculate the samples weights to balance the dataset:

In [None]:
def make_weights_for_balanced_classes(images, nclasses):                        
    count = [0] * nclasses                                                      
    for item in images:                                                         
        count[item[1]] += 1                                                     
    weight_per_class = [0.] * nclasses                                      
    N = float(sum(count))                                                   
    for i in range(nclasses):                                                   
        weight_per_class[i] = N/float(count[i])                                 
    weight = [0] * len(images)                                              
    for idx, val in enumerate(images):                                          
        weight[idx] = weight_per_class[val[1]]                                  
    return weight   

Create the data loaders. Use the WeightedRandomSampler to balance the classes.

In [None]:
# For the unbalanced dataset we create a weighted sampler
weights_train = make_weights_for_balanced_classes(train_dataset.np_data, n_classes)
weights_valid = make_weights_for_balanced_classes(valid_dataset.np_data, n_classes)
weights_train = torch.DoubleTensor(weights_train)
weights_valid = torch.DoubleTensor(weights_valid)

sampler_train = sampler.WeightedRandomSampler(weights_train, len(weights_train))
sampler_valid = sampler.WeightedRandomSampler(weights_valid, len(weights_valid))

dataloaders = {
    "train": torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, 
                                         num_workers=8, sampler=sampler_train),
    "valid": torch.utils.data.DataLoader(valid_dataset, batch_size=batch_size, 
                                         num_workers=8, sampler=sampler_valid),
}

dataset_sizes = {
    "train": len(train_dataset),
    "valid": len(valid_dataset)
}

# Model

Load a ResNet-50, connect a custom classifier part and load the learned weights obtained by the transfer learning procedure:

In [None]:
model = models.resnet50(pretrained=False)

# Custom classifier
model.fc = nn.Linear(model.fc.in_features, 512)
model.relu = nn.ReLU()
model.do = nn.Dropout(p=0.5)
model.fc_2 = nn.Linear(in_features=512, out_features=n_classes)

model.load_state_dict(torch.load(model_path))

Keep the first three stages of ResNet frozen during fine tuning:

In [None]:
def count_trainable_params(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

def freeze_resnet_stages(model):
    for param in model.conv1.parameters():
        param.requires_grad = False
    for param in model.bn1.parameters():
        param.requires_grad = False
    for param in model.relu.parameters():
        param.requires_grad = False
    for param in model.maxpool.parameters():
        param.requires_grad = False
    for param in model.layer1.parameters():
        param.requires_grad = False
    for param in model.layer2.parameters():
        param.requires_grad = False
    for param in model.layer3.parameters():
        param.requires_grad = False

# Freeze the first two stages of ResNet50 before fine tuning
print(f"Trainable params before freeze: {count_trainable_params(model)}")
freeze_resnet_stages(model)
print(f"Trainable params after freeze: {count_trainable_params(model)}")

Display the model:

In [None]:
print(model)

# Training

Define the training loop:

In [None]:
def do_epoch(model, dataloaders, dataset_sizes, criterion, epoch):
    train_loss = train_acc = val_loss = val_acc = 0
    
    for phase in ['train', 'valid']:
        if phase == 'train':
            model.train()
        else:
            model.eval()

        running_loss = 0.0
        running_corrects = 0
        curr_batch = 0
        for batch_data in tqdm(dataloaders[phase]):
            images = batch_data["image"].to(device)
            labels = batch_data["label"].to(device)

            with torch.set_grad_enabled(phase == 'train'):
                outputs = model(images)
                _, preds = torch.max(outputs,1)
                loss = criterion(outputs, labels)

                if phase == 'train':
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()

            running_loss += loss.item() * images.size(0)
            running_corrects += torch.sum(preds == labels.data)

        epoch_loss = running_loss / dataset_sizes[phase]
        epoch_acc = running_corrects.double() / dataset_sizes[phase]
        
        if phase == "train":
            train_loss = epoch_loss
            train_acc = epoch_acc
        else:
            val_loss = epoch_loss
            val_acc = epoch_acc

    return train_loss, train_acc, val_loss, val_acc

def train_model(model, criterion, optimizer, dataloaders, dataset_sizes, num_epochs=100):
    since = time.time()

    best_model_wts = copy.deepcopy(model.state_dict())
    best_val_loss = np.Inf
    best_val_acc = 0

    for epoch in range(num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)
        
        loss, acc, val_loss, val_acc = do_epoch(model, dataloaders, dataset_sizes, criterion, epoch)
        
        print(f"Train Loss: {loss:.4f} - Train Acc: {acc:.4f}")
        print(f"Valid Loss: {val_loss:.4f} - Valid Acc: {val_acc:.4f}")
        
        writer.add_scalar('Training loss', loss, epoch)
        writer.add_scalar('Training accuracy', acc, epoch)
        writer.add_scalar('Valid loss', val_loss, epoch)
        writer.add_scalar('Valid accuracy', val_acc, epoch)
        
        # Save the model at each epoch
        torch.save(model.state_dict(), path_model_ft)
        
        # Save best model monitoring validation loss
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            best_val_acc = val_acc
            torch.save(model.state_dict(), path_model_ft_best)
        
    time_elapsed = time.time() - since
    h = time_elapsed//3600
    m = time_elapsed//60 - h*60
    s = time_elapsed%60
    print(f'Training complete in {h}h {m}m {s}s')
    print(f'Best Valid Acc: {best_val_acc:4f}')
    
    return model

Train the model:

In [None]:
model.to(device)

criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=lr_ft, weight_decay=weight_decay)

model = train_model(model, criterion, optimizer, dataloaders,
                    dataset_sizes, num_epochs=ft_epochs)