In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchsummary import summary
import torchvision #This library is used for image-based operations (Augmentations)

import os
import gc
from tqdm import tqdm
import math
from PIL import Image
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score
import glob

DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'
print("Device: ", DEVICE)

Device:  cpu


# ArcFace Loss

[ArcFace: Additive Angular Margin Loss for Deep
Face Recognition](https://arxiv.org/pdf/1801.07698.pdf) [equation 3]

ArcFace Loss is trying to maximize the geodesic distance on the hypersphere between features of different classes to make the features more separately. Here is a blog that explains ArcFace Loss in detail: [link](https://medium.com/analytics-vidhya/face-recognition-and-arcface-additive-angular-margin-loss-for-deep-face-recognition-44abc56916c#:~:text=The%20ArcFace%20loss%20maximizes%20the,implemented%20with%20negligible%20computational%20overhead)

$$L_{afl} = - log \frac{e^{scos(\theta_{y_i} + m)}}{e^{s cos(\theta_{y_i} + m)} + \sum_{j=1,j \neq y_i}^N e^{s cos(\theta_j)}}$$

Play around with the `margin` and `scaler` hyperparameters as they are instrumental to the performance of this loss in fine tuning your model.



In [None]:
class ArcFaceModel(torch.nn.Module):
    '''
    To train in a standard training loop make sure to modify the train function so you pass in the inputs and the labels
    i.e. output = model(images, labels)
    Experiment with different values of margin and scaler
    '''
    def __init__(self, model, margin=0.5, scaler=64, embedding_size=NotImplemented, num_classes=NotImplemented):
        super(ArcFaceModel, self).__init__()
        self.embedding_size = embedding_size
        self.num_classes = num_classes

        # small number to avoid invalid arcCos values
        self.eps = 1e-7

        # hyperparameters
        self.margin = margin
        self.scaler = scaler

        # load classification model
        self.model = model

        # Initializing the arcface linear layer with the weights of the classifier from the trained CNN
        self.AFL_linear = torch.nn.Linear(embedding_size, num_classes, bias=False) # Why set bias=False? Check out the paper.
        with torch.no_grad():
          self.AFL_linear.weight.copy_(self.model.cls_layer.weight)

        # Initializing utility functions for normalization, arcCos, cos and onehot encoding
        self.normalizer = torch.nn.functional.normalize
        self.arcCos = torch.acos
        self.cos = torch.cos
        self.one_hot = torch.nn.functional.one_hot


    def forward(self, x, labels):
        # Get face embedding. Note that we pass return_feats=True to get the image's features and not the final logits.
        embedding = self.model(x, return_feats=True)

        # TODO: normalize face embedding
        embedding = NotImplemented

        # TODO: normalize linear layer weights.
        # NOTE: The normalized weights need to be wrapped in torch.nn.Parameter before assigning to AFL_linear.
        with torch.no_grad():
          self.AFL_linear.weight = torch.nn.Parameter(NotImplemented)

        # TODO: take dot product to get cos theta, remember that Wx = ||W||||x||cos(\theta) and ||W|| = 1, ||x|| = 1
        cosine = NotImplemented

        # We clamp the values to be a little higher than -1 and a little lower than one so we don't get nan values when we call arccos
        cosine = torch.clamp(cosine, min=-1.0+self.eps, max=1.0-self.eps)

        # TODO: get theta by performing arccos(cos(theta))
        theta = NotImplemented

        # TODO: convert labels to one-hot
        one_hot_labels = NotImplemented
        # TODO: create a mask with m at positions with label 1 and 0 at positions with label 0
        margin_mask = NotImplemented
        # TODO: add margin m to theta
        theta_m = NotImplemented

        # calculate the cosine value for theta with margin added and scale with self.scaler
        logits = NotImplemented # this value is then passed to crossEntropyLoss in train loop to calculate arcface loss

        return logits

# SphereFace Loss
[SphereFace: Deep Hypersphere Embedding for Face Recognition](https://arxiv.org/pdf/1704.08063.pdf)

[SphereFace Revived:
Unifying Hyperspherical Face Recognition](https://arxiv.org/pdf/2109.05565.pdf)

$$L_{sfl} = - log \frac{e^{scos(m\theta_{y_i})}}{e^{s cos(m\theta_{y_i})} + \sum_{j=1,j \neq y_i}^N e^{s cos(\theta_j)}}$$

Notice that the only difference between arcface loss and sphere loss is from $e^{scos(\theta_{y_i} + m)}$ to $e^{scos(m\theta_{y_i})}$. You should be able to implement this based on the comments in ArcFace loss and update `margin_mask` variable accordingly.

Play around with the `margin` and `scaler` hyperparameters as they are instrumental to the performance of this loss in fine tuning your model.

Please note that this is a basic version of SphereFace loss. As you can read in the above listed papers, there are several modifications you can make to it.


In [None]:
class SphereFaceModel(torch.nn.Module):
    '''
    To train in a standard training loop make sure to modify the train function so you pass in the inputs and the labels
    i.e. output = model(images, labels)
    Experiment with different values of margin and scaler
    '''
    def __init__(self, model, margin=0.5, scaler=64, embedding_size=NotImplemented, num_classes=NotImplemented):
        super(SphereFaceModel, self).__init__()
        self.embedding_size = embedding_size
        self.num_classes = num_classes

        # small number to avoid invalid arcCos values
        self.eps = 1e-7

        # hyperparameters
        self.margin = margin
        self.scaler = scaler

        # load classification model
        self.model = model

        # Initializing the arcface linear layer with the weights of the classifier from the trained CNN
        self.SFL_linear = torch.nn.Linear(embedding_size, num_classes, bias=False) # Why set bias=False? Check out the paper.
        with torch.no_grad():
          self.SFL_linear.weight.copy_(self.model.cls_layer.weight)

        # Initializing utility functions for normalization, arcCos, cos and onehot encoding
        self.normalizer = torch.nn.functional.normalize
        self.arcCos = torch.acos
        self.cos = torch.cos
        self.one_hot = torch.nn.functional.one_hot


    def forward(self, x, labels):
        # Get face embedding. Note that we pass return_feats=True to get the image's features and not the final logits.
        embedding = self.model(x, return_feats=True)

        # TODO: Normalize face embedding using self.normalizer
        embedding = NotImplemented

        # TODO: normalize linear layer weights.
        # NOTE: The normalized weights need to be wrapped in torch.nn.Parameter before assigning to AFL_linear.
        with torch.no_grad():
          self.SFL_linear.weight = torch.nn.Parameter(NotImplemented)

        # TODO: take dot product to get cos theta, remember that Wx = ||W||||x||cos(\theta) and ||W|| = 1, ||x|| = 1
        cosine = NotImplemented

        # We clamp the values to be a little higher than -1 and a little lower than one so we don't get nan values when we call arccos
        cosine = torch.clamp(cosine, min=-1.0+self.eps, max=1.0-self.eps)

        # TODO: get theta by performing arccos(cos(theta))
        theta = NotImplemented

        # TODO: convert labels to one-hot
        one_hot_labels = NotImplemented
        # TODO: create a mask with m at positions with label 1 and 0 at positions with label 0
        margin_mask = NotImplemented
        # TODO: multiply margin m to theta
        theta_m = NotImplemented

        # calculate the cosine value for theta with margin multiplied and scale with self.scaler
        logits = NotImplemented # this value is then passed to crossEntropyLoss in train loop to calculate sphereface loss

        return logits


#Example of Training Procedure for Fine Tuning with ArcFace or SphereFace

There are small changes you will need to implement in your train function of the code (We recommend you make a new cell block or function for fine tune train). An example of the train function is given here.


In [None]:
def finetune_train(model, dataloader, optimizer, criterion):

    model.train()

    # Progress Bar
    batch_bar   = tqdm(total=len(dataloader), dynamic_ncols=True, leave=False, position=0, desc='Train', ncols=5)

    num_correct = 0
    total_loss  = 0

    for i, (images, labels) in enumerate(dataloader):

        optimizer.zero_grad() # Zero gradients

        images, labels = images.to(device), labels.to(device)

        with torch.cuda.amp.autocast(): # This implements mixed precision. Thats it!
            outputs = model(images, labels) #Why are we giving labels as well in here ?
            loss    = criterion(outputs, labels)

        # Update no. of correct predictions & loss as we iterate
        num_correct     += int((torch.argmax(outputs, axis=1) == labels).sum())
        total_loss      += float(loss.item())

        # tqdm lets you add some details so you can monitor training as you train.
        batch_bar.set_postfix(
            acc         = "{:.04f}%".format(100 * num_correct / (config['batch_size']*(i + 1))),
            loss        = "{:.04f}".format(float(total_loss / (i + 1))),
            num_correct = num_correct,
            lr          = "{:.04f}".format(float(optimizer.param_groups[0]['lr'])))

        scaler.scale(loss).backward() # This is a replacement for loss.backward()
        scaler.step(optimizer) # This is a replacement for optimizer.step()
        scaler.update()

        # TODO? Depending on your choice of scheduler,
        # You may want to call some schdulers inside the train function. What are these?

        batch_bar.update() # Update tqdm bar

    batch_bar.close() # You need this to close the tqdm bar

    acc         = 100 * num_correct / (config['batch_size']* len(dataloader))
    total_loss  = float(total_loss / len(dataloader))

    return acc, total_loss

# Center Loss

## Description:
Briefly speaking, Center Loss will decrease the variation of the feature cluster of each class.

In other words, the objective of Center Loss is to minimize the intra-class variance of the output feature (the output of the model before being passed to the final classification layer).

$$\mathcal{L}_C = \frac{1}{2}\sum_{i=1}^{m}||\pmb{x}_i-\pmb{c}_{y_i}||_2^2$$

Here $\mathcal{L}_C$ denotes Center Loss, $\pmb{x}_i$ denotes the feature vector of class $i$, $\pmb{c}_{y_i}$ denotes the center of feature vectors within the class of $y_i$, and $m$ is the number of ($\pmb{x}_i$,$y_i$) pairs.

What we will actually implement here will be the mean of the loss, so that the scale of loss matches with cross entropy loss.

$$\mathcal{L}_C = \frac{1}{2m}\sum_{i=1}^{m}||\pmb{x}_i-\pmb{c}_{y_i}||_2^2$$

However, it is too time-wasting to calculate the intra-class centers of ALL the data in every epoch. Therefore, Wen et.al decides to update the centers by batches. "In each iteration, the centers are computed by
averaging the features of the corresponding classes (In this case, some of the
centers may not update)."

The centers are updated by a learning rate $\alpha$ .

$$\frac{\partial\mathcal{L}_C}{\partial\pmb{x}_i} = \pmb{x}_i-\pmb{c}_{y_i}$$

$$\Delta\pmb{c}_j = \frac{\sum_{i=1}^{m}\delta(y_i=j)\cdot(\pmb{c}_i-\pmb{x}_i)}{1+\sum_{i=1}^{m}\delta(y_i=j)}$$

$$\pmb{c}_{j}^{t+1}=\pmb{c}_{j}^{t}-\alpha\cdot\Delta\pmb{c}_j$$

Inside the class of Center Loss, you do not need to implement the update part. Update is handled by the optimizer, which means that you only need to calculate the loss.

In [None]:
class CenterLoss(nn.Module):
    """Center Loss
        Center Loss Paper:
        https://ydwen.github.io/papers/WenECCV16.pdf
    Args:
        nn (_type_): _description_
    """
    def __init__(self,
                 num_classes=NotImplemented, # TODO: What is the number of classes for our model?
                 feat_dim=NotImplemented, # TODO: What is the dimension of your output feature?
                 ) -> None:
        super(CenterLoss, self).__init__()
        self.num_classes = num_classes
        self.feat_dim = feat_dim

        # I have written the initialization of centers for you here
        # Consider why the shape of centers is (num_classes, feat_dim)
        # You may want to adjust here if you want to test the program on cpu
        self.centers = nn.Parameter(torch.randn(self.num_classes, self.feat_dim).cuda())

    def forward(self, x, labels):
        """
        Args:
            x: feature matrix with shape (batch_size, feat_dim).
            labels: ground truth labels with shape (batch_size).
        """
        centers = # TODO: Broadcast your self.centers so that centers[i] will contain the center of true label of x[i]
        dist = # TODO: Calculate the squared euclidian distances between your inputs and current centers

        # Each element in dist is actually the Center Loss of each input

        # Here you have to first wrap 'dist' inside torch.clamp() function, because log(0) will cause NaN output.
        # To avoid the 0 in 'dist', we will set the lower bound in 'dist' to a value that is close to 0

        dist = torch.clamp(dist, min=1e-12, max=1e+12)

        loss = # TODO: Calculate the mean loss across the batch.

        return loss

# Example in Training Procedure for Center Loss

When you use FP16 in your training, there is a specific usage you have to follow if you use multiple losses in your training. Here is the example code for multiple loss training when you use Center Loss

More detailed information in this link:
[link](https://pytorch.org/docs/stable/notes/amp_examples.html#working-with-multiple-models-losses-and-optimizers)

The hyperparameters you need to tune: loss weight $\lambda$, loss learning rate $\alpha$

In [None]:
center_loss = CenterLoss(num_classes=NotImplemented, feat_dim=NotImplemented)
optimizer_center_loss = torch.optim.SGD(center_loss.parameters(), lr = NotImplemented) # TODO: select a learning rate

In [None]:
def train(model: nn.Module,
          train_loader: Dataloader,
          optimizer: optim.Optimizer,
          optimizer_center_loss: optim.Optimizer,
          criterion: nn.Module,
          fine_tuning_loss: nn.Module, # here we are using Center Loss as our fine_tuning_loss
          loss_weight,
          scheduler: optim.lr_scheduler._LRScheduler,
          scaler: torch.cuda.amp.GradScaler,
          device):

    num_correct = 0.0
    total_loss = 0.0
    model.train()

    # Progress Bar
    batch_bar = tqdm(total=len(train_loader), dynamic_ncols=True, leave=False, position=0, desc='Train', ncols=5)

    for i, (images, labels) in enumerate(train_loader):
        optimizer.zero_grad()
        optimizer_center_loss.zero_grad()

        images, labels = images.to(device), labels.to(device)

        with torch.cuda.amp.autocast():
            outputs = model(images, return_feats=False)
            feats = model(images, return_feats=True)
            loss0 = criterion(outputs, labels) # calculate cross entropy loss from outputs and labels
            loss1 = loss_weight * fine_tuning_loss(feats, labels) # calculate weighted fine_tuning_loss (center loss) from feats and labels
            loss = loss0 + loss1

        # Update no. of correct predictions & loss as we iterate
        num_correct += int((torch.argmax(outputs, axis=1) == labels).sum())
        total_loss += float(loss.item())

        # tqdm lets you add some details so you can monitor training as you train.
        batch_bar.set_postfix(
            acc="{:.04f}%".format(100 * num_correct / (config['batch_size']*(i + 1))),
            loss="{:.04f}".format(float(total_loss / (i + 1))),
            num_correct=num_correct,
            lr="{:.04f}".format(float(optimizer.param_groups[0]['lr'])))


        # backward loss0 to calculate gradients for model paramters
        # Hint: You have to pass retain_graph=True here, so that the scaler will remember this backward call
        scaler.scale(loss0).backward(retain_graph=True)

        # backward loss1 to calculate gradients for fine_tuning_loss paramters
        scaler.scale(loss1).backward()

        # update fine tuning loss' parameters
        # the paramerters should be adjusted according to the loss_weight you choose
        for parameter in fine_tuning_loss.parameters():
            parameter.grad.data *= (1.0 / loss_weight)

        scaler.step(optimizer_center_loss)
        scaler.step(optimizer)
        scaler.update()
        batch_bar.update() # Update tqdm bar

        # if you use a scheduler to schedule your learning rate for Center Loss
        # scheduler_center_loss.step()

        del images, labels, outputs, loss0, loss1
        torch.cuda.empty_cache()

    batch_bar.close() # You need this to close the tqdm bar
    acc = 100 * num_correct / (config['batch_size']* len(train_loader))
    total_loss = float(total_loss / len(train_loader))

    return acc, total_loss