### 1. Load and Preprocess Data 

In [None]:
import os
from PIL import Image
import torch
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader

class CUB_ImageFolder(Dataset):
    def __init__(self, path: str, transform: transforms.Compose, train: bool=True) -> None:
        """
        Initialize an ImageFolder like the one provided in `torchvision.datasets.ImageFolder`.
        
        Args:
        - path: The path of the root directory of the dataset.
        - transform: The transform applied to the dataset.
        - train: Boolean, return the train dataset if True else test dataset, default is True. 
        """
        super(CUB_ImageFolder, self).__init__()
        self.root = path
        self.transform = transform
        self.train = train
        self.images = []
        self.labels = []
        self.train_idx = []
        self.test_idx = []
        
        self._load_dataset()
        self._get_train_test()
        
        self.idx = self.train_idx if self.train else self.test_idx
        
    def _load_dataset(self):
        """
        Load the image path and corresponding labels from the 'images.txt'
        and 'image_class_labels.txt'. 
        """
        # load image paths
        with open(os.path.join(self.root, 'images.txt')) as f:
            for line in f:
                self.images.append(line.strip().split()[1])
        # load image labels
        with open(os.path.join(self.root, 'image_class_labels.txt')) as f:
            for line in f:
                self.labels.append(line.strip().split()[1])
        
    def _get_train_test(self):
        """
        Get the indices of the training and testing dataset from the 'train_test_split.txt'.
        """
        with open(os.path.join(self.root, 'train_test_split.txt')) as f:
            for line in f: 
                idx, is_train = map(int, line.strip().split())
                self.train_idx.append(idx) if is_train == 1 else self.test_idx.append(idx)
                
    def __len__(self):
        return len(self.idx) 

    def __getitem__(self, index):
        image_id = self.idx[index] - 1
        image_path, image_label = self.images[image_id], self.labels[image_id]
        # get raw images and apply transformation
        image_matrix = Image.open(os.path.join(self.root, 'images', image_path)).convert('RGB')
        if self.transform:
            image_matrix = self.transform(image_matrix)
        # convert the returned label into a tensor, here we need "minius one" to align with the 
        # custom that Python's index starts from 0
        image_label = torch.tensor(int(image_label) - 1)
        return image_matrix, image_label

def preprocess_data(batch_size: int=64) -> tuple[DataLoader, DataLoader]:
    """
    Preprocess the CUB-200-2011 dataset and return the train and test 'DataLoader'.
    
    Args:
    - batch_size: The number of samples in one batch, default is 64.
    """
    # resize and normalize the images. Apply data augumentation to the training dataset.
    train_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomRotation(degrees=(-15, 15)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    test_transform = transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])

    data_dir = '/kaggle/input/cub2002011/CUB_200_2011'

    # load the dataset and extract the train/test Dataloader
    train_dataset = CUB_ImageFolder(data_dir, transform=train_transform)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    
    test_dataset = CUB_ImageFolder(data_dir, transform=test_transform, train=False)
    test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
    
    return train_loader, test_loader


### 2. Define `ResNet18` Model

In [2]:
import torch
import torch.nn as nn
import torch.nn.init as init
import torchvision.models as models

class CUB_ResNet_18(nn.Module):
    def __init__(self, num_classes: int=200, pretrain: bool=True):
        """
        Create a neural network with the same architecture as ResNet-18. The output layer is 
        resized to (`in_features`, `num_classes`) to fit into the specific dataset.
        
        Args:
        - num_classes: Number of classes(labels), default is 200.
        - pretrain: Boolean, whether the paramters of ResNet-18 is pretrained or not. Default
        is True.
        """
        super(CUB_ResNet_18, self).__init__()
        # initialize the parameters
        if pretrain:
            self.resnet18 = models.resnet18(weights="ResNet18_Weights.IMAGENET1K_V1")
        else:
            self.resnet18 = models.resnet18(weights=None)
            
        # change the output layer
        self.resnet18.fc = nn.Linear(self.resnet18.fc.in_features, num_classes)
        
        # apply Kaiming initialization to the fully connected layer
        init.kaiming_normal_(self.resnet18.fc.weight, mode='fan_out', nonlinearity='relu')
        if self.resnet18.fc.bias is not None:
            nn.init.constant_(self.resnet18.fc.bias, 0)
        
    def forward(self, x: torch.Tensor):
        """
        Forward pass of the network.
        """
        return self.resnet18(x)

### 3. Define Solver

In [None]:
import os
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.tensorboard import SummaryWriter

def seed_everything(seed: int=None):
    """
    Set the random seed for the whole neural network.
    """
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

def get_data_model_criterion(pretrain: bool=True) -> tuple:
    """
    Get the DataLoader, model and loss criterion.
    """
    # load the dataset
    train_loader, test_loader = preprocess_data()

    # get the pretrained model
    model = CUB_ResNet_18(pretrain=pretrain)

    # define loss function
    criterion = nn.CrossEntropyLoss()
    
    return train_loader, test_loader, model, criterion

def calculate_topk_correct(output: torch.Tensor, target: torch.Tensor, topk=(1, 5)) -> list[int]:
    """
    Computes the top-k correct samples for the specified values of k.

    Args:
    - output (torch.Tensor): The model predictions with shape (batch_size, num_classes).
    - target (torch.Tensor): The true labels with shape (batch_size, ).
    - topk (tuple): A tuple of integers specifying the top-k values to compute.

    Returns:
    - List of top-k correct samples for each value in topk.
    """
    maxk = max(topk)

    # get the indices of the top k predictions
    _, pred = output.topk(maxk, 1, True, True)
    pred = pred.t()
    correct = pred.eq(target.view(1, -1).expand_as(pred))

    res = []
    for k in topk:
        correct_k = correct[:k].reshape(-1).float().sum(0, keepdim=True)
        res.append(correct_k.item())
    return res

def train_resnet_with_cub(
    num_epochs: list[int], 
    fine_tuning_lr: float=0.0001, 
    output_lr: float=0.001, 
    pretrain: bool=True, 
    save: bool=False,
    **kwargs: dict
) -> list[float]:
    """
    Train the modified ResNet-18 model using the CUB-200-2011 dataset and return the best accuracy.
    Some hyper-parameters can be modified here.
    
    Args:
    - num_epochs: A list of number of training epochs.
    - fine_tuning_lr: Learning rate of the parameters outside the output layer, default is 0.0001.
    - output_lr: Learning rate of the parameters inside the output layer, default is 0.001.
    - pretrain: Boolean, whether the ResNet-18 model is pretrained or not. Default is True.
    - save: Boolean, whether the parameters of the best model will be save. Default is False.
    
    Return:
    - best_acc: The best validation accuracy list during the training process.
    """
    # set the random seed
    seed_everything(kwargs.pop('seed', 42))
    
    # get the dataset, model and loss criterion
    train_loader, test_loader, model, criterion = get_data_model_criterion(pretrain)
    
    # move the model to CUDA (GPU)
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    # get the parameters of the model expect the last layer
    former_params = [p for name, p in model.resnet18.named_parameters() if 'fc' not in name]
    
    # pop the hyper-parameters from the kwargs dict
    momentum = kwargs.pop('momentum', 0.9)
    weight_decay = kwargs.pop('weight_decay', 1e-4)
        
    # define optimizer
    optimizer = optim.SGD([
                {'params': former_params, 'lr': fine_tuning_lr, 'weight_decay': weight_decay},
                {'params': model.resnet18.fc.parameters(), 'lr': output_lr, 'weight_decay': weight_decay}
            ], momentum=momentum
        )
    
    # scheduler step size and gamma
    step_size = kwargs.pop('step', 30)
    gamma = kwargs.pop('gamma', 0.1)

    # custom step scheduler
    def custom_step_scheduler(optimizer: optim, epoch: int, step_size: int, gamma: float):
        """
        Decay the learning rate of the second parameter group by gamma every step_size epochs.
        """
        if epoch % step_size == 0 and epoch > 0:
            for param_group in optimizer.param_groups:
                param_group['lr'] *= gamma
    
    # init the tensorboard
    tensorboard_name = "/kaggle/working/Fine_Tuning_With_Pretrain"
    if len(num_epochs) != 1:
        tensorboard_name = "/kaggle/working/Full_Train"
    writer = SummaryWriter(tensorboard_name, comment="-{}-{}".format(fine_tuning_lr, output_lr))
        
    # best accuracy
    best_acc = 0.0
    store_best_acc, count = [0 for _ in range(len(num_epochs))], 0
    max_num_epoch = max(num_epochs)

    print("=" * 70)
    print("Training with configuration ({:>7.5f}, {:>7.5f})".format(fine_tuning_lr, output_lr))
    
    # iterate
    for epoch in range(max_num_epoch):
        # train
        model.train()
        samples = 0
        running_loss = 0.0
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()
            
            samples += inputs.size(0)
            running_loss += loss.item() * inputs.size(0)
        
        # learning rate decay
        custom_step_scheduler(optimizer, epoch, step_size, gamma)
        
        train_loss = running_loss / samples
        print("[Epoch {:>2} / {:>2}], Training loss is {:>8.6f}".format(epoch + 1, max_num_epoch, train_loss))

        # test
        model.eval()
        correct_top1 = 0
        correct_top5 = 0
        samples = 0
        running_loss = 0.0
        with torch.no_grad():
            for inputs, labels in test_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                
                top1, top5 = calculate_topk_correct(outputs, labels, topk=(1, 5))
                correct_top1 += top1
                correct_top5 += top5
                samples += labels.size(0)
                
                running_loss += criterion(outputs, labels).item() * inputs.size(0)

        # add loss and accuracy to tensorboard
        test_loss = running_loss / samples
        writer.add_scalars('Loss', {'Train': train_loss, 'Valid': test_loss}, epoch + 1)
        accuracy_top1 = correct_top1 / samples
        accuracy_top5 = correct_top5 / samples
        writer.add_scalars(
            'Valid Accuracy', 
            {
                'Top1': accuracy_top1,
                'Top5': accuracy_top5,
            },
            epoch + 1
            
        )
        
        print("[Epoch {:>2} / {:>2}], Validation loss is {:>8.6f}, Top-5 accuracy is {:>8.6f}, Top-1 accuracy is {:>8.6f}".format(
            epoch + 1, max_num_epoch, test_loss, accuracy_top5, accuracy_top1
        ))
        
        # update the best accuracy and save the model if it improves
        if accuracy_top1 > best_acc:
            best_acc = accuracy_top1
            if save:
                torch.save(model.state_dict(), 'resnet18_cub.pth')
            
        if epoch + 1 == num_epochs[count]:
            store_best_acc[count] = best_acc
            count += 1

    # close the tensorboard
    writer.close()
    
    return store_best_acc

### 4. Set Environment Variables

In [None]:
import os
from itertools import product

KAGGLE_INPUT_DIR = "cub2002011/CUB_200_2011"
KAGGLE_PATH = os.path.join("/kaggle", "input", KAGGLE_INPUT_DIR)
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'

### 5. Grid-Search Optimal Learning Rate

#### 5.1 Set the range of hyper-parameters 

In [None]:
# set hyper-parameters here
num_epochs = [15]
fine_tuning_lrs = [5e-5, 1e-4, 5e-4]
output_lrs = [1e-3, 2e-3, 5e-3, 8e-3, 1e-2]

configurations = list(product(fine_tuning_lrs, output_lrs))

#### 5.2 Train

In [None]:
best_accs = []

# train with the pretrained model
for config in configurations:
    curr_best_acc = train_resnet_with_cub(num_epochs, fine_tuning_lr=config[0], output_lr=config[1])
    best_accs.extend(curr_best_acc)

best_acc = max(best_accs)    
best_fine_tune_lr, best_output_lr = None, None

# write the results into a txt file
with open('best_accuracy_lr.txt', 'w') as f:
    f.write("Configuration        Accuracy\n")
    f.write("=" * 30 + "\n")
    for config, accuracy in zip(configurations, best_accs):
        # find the best config
        if accuracy == best_acc and best_fine_tune_lr is None:
            best_fine_tune_lr, best_output_lr = config
        # write the training record into file 
        f.write("({:>7.5f}, {:>7.5f})   {:>8.6f}\n".format(config[0], config[1], accuracy))

#### 5.3 Inspect the Accuracy

In [None]:
# !more best_accuracy_lr.txt

### 6. Find the Optimal Training Epoch

In [None]:
num_epochs = [15, 30, 45]

# init the configurations
best_accs = []

# train with the pretrained model
curr_best_acc = train_resnet_with_cub(num_epochs, best_fine_tune_lr, best_output_lr, save=True)
best_accs.extend(curr_best_acc)
    
# write the results into a txt file
with open('best_accuracy_ep.txt', 'w') as f:
    f.write("Epoch  Accuracy\n")
    f.write("=" * 50 + "\n")
    for num_epoch, accuracy in zip(num_epochs, best_accs):
        f.write("{:>2}     {:>8.6f}\n".format(num_epoch, accuracy))

### 7. Train with Random Initialized Weights

In [None]:
curr_best_acc = train_resnet_with_cub(num_epochs, best_fine_tune_lr, best_output_lr, pretrain=False)