In [1]:
# import libaries

import numpy as np
import torch
import random
import torch.nn as nn
import sys
import pandas as pd
from PIL import Image

from tqdm.notebook import tqdm
import os
import matplotlib.pyplot as plt
from torchvision import transforms
from torch import optim
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
import torch.utils.model_zoo as model_zoo


from torchinfo import summary

In [2]:
print('Python version:', sys.version)
print('CUDA Available:', torch.cuda.is_available())

if torch.cuda.is_available():
    print('GPU Name:', torch.cuda.get_device_name())
    print('GPU Properties:\n', torch.cuda.get_device_properties('cuda'))
    device = "cuda"
    torch.cuda.set_per_process_memory_fraction(0.95, 0)
    torch.cuda.empty_cache()
else:
    print("Cuda is not available, please use cpu instead")
    device = "cpu"
!nvidia-smi

Python version: 3.9.0 (tags/v3.9.0:9cf6752, Oct  5 2020, 15:34:40) [MSC v.1927 64 bit (AMD64)]
CUDA Available: True
GPU Name: NVIDIA GeForce RTX 2070
GPU Properties:
 _CudaDeviceProperties(name='NVIDIA GeForce RTX 2070', major=7, minor=5, total_memory=8191MB, multi_processor_count=36)
Mon Mar 20 18:31:07 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 526.98       Driver Version: 526.98       CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name            TCC/WDDM | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  NVIDIA GeForce ... WDDM  | 00000000:01:00.0  On |                  N/A |
|  0%   47C    P2    49W / 175W |    263MiB /  8192MiB |      0%      Default |
|                               |         

In [3]:
seed = 41
# Define custom dataset
class FaceDataset(Dataset):
    def __init__(self, training_csv = None, training_dir = None, transform = None):
        # set random seed for FaceDataset
        np.random.seed(seed)
        random.seed(seed)
        # create contructors
        self.unique_img_name = None
        self.data = dict()
        self.images = list()
        # label to indices
        self.label_to_indices = dict()
        self.labels = list()

        # read csv file
        self.train_df = pd.read_csv(training_csv)
        #get the length of entire dataset
        self.len_train = len(self.train_df)
        # set the transformation
        self.transform = transform
        # set the train directory
        self.train_dir = training_dir
        # group each identity together and create list of each identity imgs
        self.train_df = self.train_df.groupby('identity')['filename'].apply(list).reset_index().rename({'filename': 'filenames'}, axis = 1)
        # load imgs
        self.load_imgs(self.train_df)

    def __len__(self):
        return self.len_train

    # get each pair of images -> 1: same identity, 0: different identity 
    # if index is even -> same pair 
    # if index is odd -> random identity
    def __getitem__(self, idx):
        anchor_img = self.images[idx]
        anchor_label = self.labels[idx]

        pos_idx = np.random.choice(np.arange(len(self.images))[self.labels == anchor_label])
        neg_idx = np.random.choice(np.arange(len(self.images))[self.labels != anchor_label])

        pos_img = self.images[pos_idx]
        neg_img = self.images[neg_idx]

        pos_label = self.labels[pos_idx]
        neg_label = self.labels[neg_idx]

        if self.transform is None:
            img_to_tensor = transforms.ToTensor()
            anchor_img = img_to_tensor(anchor_img)
            pos_img = img_to_tensor(pos_img)
            neg_img = img_to_tensor(neg_img)
        else:
            anchor_img = self.transform(anchor_img)
            pos_img = self.transform(pos_img)
            neg_img = self.transform(neg_img)

        return anchor_img, pos_img, neg_img

    def load_imgs(self, df):
        # iterate thought each row
        for i, row in df.iterrows():
            # get identity of each row
            row_identities = row['identity']
            # append each identity to numberical value
            self.label_to_indices[row_identities] = i
            # loop imgs in each identity
            for img_name in row['filenames']:
                # concatenate the directoru and image name
                path_to_image = self.train_dir+img_name
                # open image and convert to RGB
                img = Image.open(path_to_image).convert('RGB')
                    
                self.images.append(img)
                self.labels.append(i)

        self.labels = np.array(self.labels)

# Preperation of dataset

In [4]:
img_size = 112
train_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.CenterCrop(img_size),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5],
                         std=[0.5, 0.5, 0.5])
])

val_transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.CenterCrop(img_size),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5, 0.5, 0.5],
                         std=[0.5, 0.5, 0.5])
])

train_batch_size = 64
val_batch_size = 64

train_triplet_dataset = FaceDataset(training_csv = "./large_prepared_data/train/label_df.csv", training_dir="./large_prepared_data/train/", transform = train_transform)
train_triplet_dataloader = DataLoader(train_triplet_dataset, batch_size=train_batch_size, shuffle=True, pin_memory=True)

val_triplet_dataset = FaceDataset(training_csv = "./large_prepared_data/val/label_df.csv", training_dir="./large_prepared_data/val/", transform = val_transform)
val_triplet_dataloader = DataLoader(val_triplet_dataset, batch_size=val_batch_size, shuffle=True, pin_memory=True)

# Triplet Network

In [5]:
from torch.nn import Linear, Conv2d, BatchNorm1d, BatchNorm2d, PReLU, ReLU, ReLU6, Sigmoid, Dropout2d, Dropout, AvgPool2d, MaxPool2d, AdaptiveAvgPool2d, Sequential, Module, Parameter

class Flatten(Module):
    def forward(self, input):
        return input.view(input.size(0), -1)

def l2_norm(input,axis=1):
    norm = torch.norm(input,2,axis,True)
    output = torch.div(input, norm)
    return output

class h_sigmoid(Module):
    def __init__(self, inplace=True):
        super(h_sigmoid, self).__init__()
        self.relu = ReLU6(inplace=inplace)

    def forward(self, x):
        return self.relu(x + 3) / 6


class h_swish(Module):
    def __init__(self, inplace=True):
        super(h_swish, self).__init__()
        self.sigmoid = h_sigmoid(inplace=inplace)

    def forward(self, x):
        return x * self.sigmoid(x)


class SELayer(Module):
    def __init__(self, channel, reduction=4):
        super(SELayer, self).__init__()
        self.avg_pool = AdaptiveAvgPool2d(1)
        self.fc = Sequential(
            Linear(channel, channel // reduction),
            ReLU(inplace=True),
            Linear(channel // reduction, channel),
            h_sigmoid()
        )

    def forward(self, x):
        b, c, _, _ = x.size()
        y = self.avg_pool(x).view(b, c)
        y = self.fc(y).view(b, c, 1, 1)
        return x * y

class PermutationBlock(Module):
    def __init__(self, groups):
        super(PermutationBlock, self).__init__()
        self.groups = groups

    def forward(self, input):
        n, c, h, w = input.size()
        G = self.groups
        output = input.view(n, G, c // G, h, w).permute(0, 2, 1, 3, 4).contiguous().view(n, c, h, w)
        return output

class Conv_block(Module):
    def __init__(self, in_c, out_c, kernel=(1, 1), stride=(1, 1), padding=(0, 0), groups=1):
        super(Conv_block, self).__init__()
        self.conv = Conv2d(in_c, out_channels=out_c, kernel_size=kernel, groups=groups, stride=stride, padding=padding, bias=False)
        self.bn = BatchNorm2d(out_c)
        self.prelu = PReLU(out_c)
    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        x = self.prelu(x)
        return x

class Linear_block(Module):
    def __init__(self, in_c, out_c, kernel=(1, 1), stride=(1, 1), padding=(0, 0), groups=1):
        super(Linear_block, self).__init__()
        self.conv = Conv2d(in_c, out_channels=out_c, kernel_size=kernel, groups=groups, stride=stride, padding=padding, bias=False)
        self.bn = BatchNorm2d(out_c)
    def forward(self, x):
        x = self.conv(x)
        x = self.bn(x)
        return x

class Depth_Wise(Module):
    def __init__(self, in_c, out_c, residual = False, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=1):
        super(Depth_Wise, self).__init__()
        self.conv = Conv_block(in_c, out_c=groups, kernel=(1, 1), padding=(0, 0), stride=(1, 1))
        self.conv_dw = Conv_block(groups, groups, groups=groups, kernel=kernel, padding=padding, stride=stride)
        self.project = Linear_block(groups, out_c, kernel=(1, 1), padding=(0, 0), stride=(1, 1))
        self.residual = residual
    def forward(self, x):
        if self.residual:
            short_cut = x
        x = self.conv(x)
        x = self.conv_dw(x)
        x = self.project(x)
        if self.residual:
            output = short_cut + x
        else:
            output = x
        return output

class Residual(Module):
    def __init__(self, c, num_block, groups, kernel=(3, 3), stride=(1, 1), padding=(1, 1)):
        super(Residual, self).__init__()
        modules = []
        for _ in range(num_block):
            modules.append(Depth_Wise(c, c, residual=True, kernel=kernel, padding=padding, stride=stride, groups=groups))
        self.model = Sequential(*modules)
    def forward(self, x):
        return self.model(x)

In [6]:

class MobileFaceNet(Module):
    def __init__(self, embedding_size=512):
        super(MobileFaceNet, self).__init__()
        self.conv1 = Conv_block(3, 64, kernel=(3, 3), stride=(2, 2), padding=(1, 1))
        self.conv2_dw = Conv_block(64, 64, kernel=(3, 3), stride=(1, 1), padding=(1, 1), groups=64)
        self.conv_23 = Depth_Wise(64, 64, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=128)
        self.conv_3 = Residual(64, num_block=4, groups=128, kernel=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv_34 = Depth_Wise(64, 128, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=256)
        self.conv_4 = Residual(128, num_block=6, groups=256, kernel=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv_45 = Depth_Wise(128, 128, kernel=(3, 3), stride=(2, 2), padding=(1, 1), groups=512)
        self.conv_5 = Residual(128, num_block=2, groups=256, kernel=(3, 3), stride=(1, 1), padding=(1, 1))
        self.conv_6_sep = Conv_block(128, 512, kernel=(1, 1), stride=(1, 1), padding=(0, 0))
        self.conv_6_dw = Linear_block(512, 512, groups=512, kernel=(7,7), stride=(1, 1), padding=(0, 0))
        self.conv_6_flatten = Flatten()
        self.linear = Linear(512, embedding_size, bias=False)
        self.bn = BatchNorm1d(embedding_size)
    def forward_once(self,x):
        out = self.conv1(x)

        out = self.conv2_dw(out)

        out = self.conv_23(out)

        out = self.conv_3(out)

        out = self.conv_34(out)

        out = self.conv_4(out)

        out = self.conv_45(out)

        out = self.conv_5(out)

        out = self.conv_6_sep(out)

        out = self.conv_6_dw(out)

        out = self.conv_6_flatten(out)

        out = self.linear(out)

        out = self.bn(out)
        return l2_norm(out)

    def forward(self, anchor_img, positive_img, negative_img):
        anchor = self.forward_once(anchor_img)
        positive = self.forward_once(positive_img)
        negative = self.forward_once(negative_img)
        return anchor, positive, negative


In [7]:
# from torchvision.models import resnet34

triplet_model = MobileFaceNet()
# resnet18 = list(resnet18)[:-1]

In [8]:
# class TripletNetwork(nn.Module):
#     def __init__(self):
#         super().__init__()
#         self.layers = nn.Sequential(
#             *(resnet18),
#             nn.Flatten(),
#             nn.ReLU(),
#             nn.Dropout(0.5),
#             nn.Linear(512, 256),
#             nn.ReLU(),
#             nn.Dropout(0.5),
#             nn.Linear(256, 128),
#             nn.ReLU()
#         )
#         self.fc1 = nn.Sequential(
#
#         )
#     def forward_once(self, input):
#         output = self.layers(input)
#         return output
#
#     def forward(self, anchor_img, positive_img, negative_img):
#         anchor = self.forward_once(anchor_img)
#         positive = self.forward_once(positive_img)
#         negative = self.forward_once(negative_img)
#         return anchor, positive, negative

In [9]:
# triplet_model = TripletNetwork()
print(summary(triplet_model, input_size=[(32,3,112,112),(32,3,112,112),(32,3,112,112)]))

Layer (type:depth-idx)                        Output Shape              Param #
MobileFaceNet                                 [32, 512]                 --
├─Conv_block: 1-1                             [32, 64, 56, 56]          --
│    └─Conv2d: 2-1                            [32, 64, 56, 56]          1,728
│    └─BatchNorm2d: 2-2                       [32, 64, 56, 56]          128
│    └─PReLU: 2-3                             [32, 64, 56, 56]          64
├─Conv_block: 1-2                             [32, 64, 56, 56]          --
│    └─Conv2d: 2-4                            [32, 64, 56, 56]          576
│    └─BatchNorm2d: 2-5                       [32, 64, 56, 56]          128
│    └─PReLU: 2-6                             [32, 64, 56, 56]          64
├─Depth_Wise: 1-3                             [32, 64, 28, 28]          --
│    └─Conv_block: 2-7                        [32, 128, 56, 56]         --
│    │    └─Conv2d: 3-1                       [32, 128, 56, 56]         8,192
│    │    └

In [10]:
import math
class Arcface(Module):
    # implementation of additive margin softmax loss in https://arxiv.org/abs/1801.05599
    def __init__(self, embedding_size=512,  classnum=51332,  s=64., m=0.5):
        super(Arcface, self).__init__()
        self.classnum = classnum
        self.kernel = Parameter(torch.Tensor(embedding_size,classnum))
        # initial kernel
        self.kernel.data.uniform_(-1, 1).renorm_(2,1,1e-5).mul_(1e5)
        self.m = m # the margin value, default is 0.5
        self.s = s # scalar value default is 64, see normface https://arxiv.org/abs/1704.06369
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.mm = self.sin_m * m  # issue 1
        self.threshold = math.cos(math.pi - m)
        print('Arcface head')

    def forward(self, embbedings, label):
        # weights norm
        nB = len(embbedings)
        kernel_norm = l2_norm(self.kernel,axis=0)
        # cos(theta+m)
        cos_theta = torch.mm(embbedings,kernel_norm)
#         output = torch.mm(embbedings,kernel_norm)
        cos_theta = cos_theta.clamp(-1,1) # for numerical stability
        cos_theta_2 = torch.pow(cos_theta, 2)
        sin_theta_2 = 1 - cos_theta_2
        sin_theta = torch.sqrt(sin_theta_2)
        cos_theta_m = (cos_theta * self.cos_m - sin_theta * self.sin_m)
        # this condition controls the theta+m should in range [0, pi]
        #      0<=theta+m<=pi
        #     -m<=theta<=pi-m
        cond_v = cos_theta - self.threshold
        cond_mask = cond_v <= 0
        keep_val = (cos_theta - self.mm) # when theta not in [0,pi], use cosface instead
        cos_theta_m[cond_mask] = keep_val[cond_mask]
        output = cos_theta * 1.0 # a little bit hacky way to prevent in_place operation on cos_theta
        idx_ = torch.arange(0, nB, dtype=torch.long)
        output[idx_, label] = cos_theta_m[idx_, label]
        output *= self.s # scale up in order to make softmax work, first introduced in normface
        return output

In [11]:
class TripletLoss(nn.Module):
    def __init__(self, margin):
        super(TripletLoss, self).__init__()
        self.margin = margin

    def forward(self, anchor, positive, negative, size_average=True):
        distance_positive = (anchor - positive).pow(2).sum(1)
        distance_negative = (anchor - negative).pow(2).sum(1)  
        losses = F.relu(distance_positive - distance_negative + self.margin)
        return losses.mean() if size_average else losses.sum()

In [12]:
model_config = {
    'margin': 0.534,
    'lr': 1e-4,
    'patience': 5,
    'factor': 0.1,
    'min_lr': 1e-8,
    'threshold': 1e-2
}
criterion = TripletLoss(margin = model_config['margin'])

In [13]:

optimizer = optim.Adam(triplet_model.parameters(), lr=model_config['lr'])
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, 'min', patience=model_config['patience'], factor=model_config['factor'], min_lr=model_config['min_lr'], threshold=model_config['threshold'], verbose=True)


In [14]:
load_weight_path = './weights/triplet_net_resnet18.pth'
save_weight_path = './weights/triplet_net_resnet18.pth'
load_weight = False
# set epochs rounds
num_epochs = 200
# throw model to compute in the device
triplet_model.to(device=device)
total_train_loss = list()
total_val_loss = list()
best_val_loss = 0
best_train_loss = 0
previous_lr = 0

# load weight
if bool(load_weight_path) & load_weight:
    triplet_model.load_state_dict(torch.load(load_weight_path))
    print('Congratulations, Weight has been loaded!')

    
# iterate though each epoch
for epoch in tqdm(range(num_epochs)):
    print(f'Epoch:{epoch+1}/{num_epochs}')
    epoch_train_loss = list()
    train_loss = 0
    # switch to train mode
    triplet_model.train()
    with torch.set_grad_enabled(True):
        # loop though each item in dataloader
        for anchor_img, pos_img, neg_img in tqdm(train_triplet_dataloader):
            # throw img to compute in the device
            anchor_img = anchor_img.to(device)
            pos_img = pos_img.to(device)
            neg_img = neg_img.to(device)
            # clear gradient to prevent gradient vanish
            optimizer.zero_grad()
            # train model
            output1, output2, output3 = triplet_model(anchor_img, pos_img, neg_img)
            # compute loss from criterion
            loss = criterion(output1, output2, output3)
            print(loss)
            # backward propagate
            loss.backward()
            # update parameters
            optimizer.step()
            # append train loss to epoch_train_loss
            train_loss += loss.item() * anchor_img.size(0)

    # calculate loss
    current_train_loss = train_loss / len(train_triplet_dataloader.sampler)
    total_train_loss.append(current_train_loss)

    # switch mode to eval
    triplet_model.eval()
    with torch.no_grad():
        val_loss = 0
        for anchor_img, pos_img, neg_img in tqdm(val_triplet_dataloader):
            # throw img to compute in the device
            anchor_img = anchor_img.to(device)
            pos_img = pos_img.to(device)
            neg_img = neg_img.to(device)
            # clear gradient to prevent gradient vanish
            optimizer.zero_grad()
            # compute the out in eval mode
            output1, output2, output3 = triplet_model(anchor_img, pos_img, neg_img)
            # compute loss in eval mode
            loss = criterion(output1, output2, output3)
            # calculate loss for each feature
            val_loss += loss.item() * anchor_img.size(0)
            # calculate loss for each batch
    current_val_loss = val_loss / len(val_triplet_dataloader.sampler)
    # get learning rate from model
    optim_lr = optimizer.param_groups[0]['lr']
    # append current validation loss to list
    total_val_loss.append(current_val_loss)
    if (best_val_loss == 0) | (best_train_loss == 0):
        best_val_loss = current_val_loss
        best_train_loss = current_train_loss
    # find the best val loss
    if best_val_loss >= current_val_loss:
        best_val_loss = current_val_loss
    # find the best train loss
    if best_train_loss >= current_train_loss:
        best_train_loss = current_train_loss

    print(f'train loss: {current_train_loss}')
    print(f'val loss: {current_val_loss}')
    print(f'Learning rate: {optim_lr:.8f}')
    if (optim_lr < previous_lr) | (optim_lr > previous_lr):
        print('LEARNING RATE HAS CHANGED!')
    print('-------------------------------------------------------------')
    # replace the previous lr with the current lr
    previous_lr = optim_lr
    scheduler.step(current_train_loss)

print('Best Validation loss',best_val_loss)
print('Best Train loss', best_train_loss)

# save model weights
torch.save(triplet_model.state_dict(), save_weight_path)

  0%|          | 0/200 [00:00<?, ?it/s]

Epoch:1/200


  0%|          | 0/39 [00:00<?, ?it/s]

tensor(0.4691, device='cuda:0', grad_fn=<MeanBackward0>)
tensor(0.4631, device='cuda:0', grad_fn=<MeanBackward0>)
tensor(0.4838, device='cuda:0', grad_fn=<MeanBackward0>)
tensor(0.4707, device='cuda:0', grad_fn=<MeanBackward0>)
tensor(0.4669, device='cuda:0', grad_fn=<MeanBackward0>)
tensor(0.4575, device='cuda:0', grad_fn=<MeanBackward0>)
tensor(0.4869, device='cuda:0', grad_fn=<MeanBackward0>)
tensor(0.4950, device='cuda:0', grad_fn=<MeanBackward0>)
tensor(0.4972, device='cuda:0', grad_fn=<MeanBackward0>)
tensor(0.4368, device='cuda:0', grad_fn=<MeanBackward0>)


KeyboardInterrupt: 

In [None]:
plt.plot(np.arange(1,len(total_train_loss)+1), total_train_loss,label = 'Train Loss')
plt.plot(np.arange(1,len(total_val_loss)+1),total_val_loss, label= 'Validation loss')
plt.legend()
plt.show()

##### 