# Inception-ResNet-V2 : Face Recognition

#### Developed by Szegedy et. al.

### Import Calls

In [None]:
import torch
from torch import nn
from torch.nn import functional as F
import os
import pandas
from torchvision.io import read_image   
from torch import optim
from tqdm.notebook import tqdm_notebook
from prefetch_generator import BackgroundGenerator
from torch.utils import data
from torch.utils.tensorboard import SummaryWriter
import time
from torch.utils.data import DataLoader
from torchvision import transforms
from PIL import Image

### Definition of custom LambdaScale

In [None]:
class LambdaScale(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.lambda_f = lambda x:x*0.1
    def forward(self, X):
        X = self.lambda_f(X)
        return X

### Definition of Stem

In [None]:
class InceptionResnetv2Stem(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.sub0conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=2)
        self.sub0conv2 = nn.Conv2d(32, 32, kernel_size=3)
        self.sub0conv3 = nn.Conv2d(32, 64, kernel_size=3, padding='same')
        
        self.sub1p1_mpool1 = nn.MaxPool2d(kernel_size=3, stride=2)
        
        self.sub1p2_conv1 = nn.Conv2d(64, 80, kernel_size=3, stride=2)
        
        self.sub2p1_conv1 = nn.Conv2d(64, 80, kernel_size=1, padding='same')
        self.sub2p1_conv2 = nn.Conv2d(80, 192, kernel_size=3)
        
        self.sub3p2_mpool1 = nn.MaxPool2d(kernel_size=3, stride=2)
        
        self.branch0 = nn.Conv2d(192, 96, kernel_size=1)
        
        self.branch1a = nn.Conv2d(192, 48, kernel_size=1)
        self.branch1b = nn.Conv2d(48, 64, kernel_size=5, padding=2)
        
        self.branch2a = nn.Conv2d(192, 64, kernel_size=1)
        self.branch2b = nn.Conv2d(64, 96, kernel_size=3, padding=1)
        self.branch2c = nn.Conv2d(96, 96, kernel_size=3, padding=1)
        
        self.branch3a = nn.AvgPool2d(3, padding=1, count_include_pad=False)
        self.branch3b = nn.Conv2d(192, 64, kernel_size=1, stride=1)
        
        self.batchNorm = nn.BatchNorm2d(320)
    
    def forward(self, X):
        
        X = F.relu(self.sub0conv1(X)) 
        X = F.relu(self.sub0conv2(X)) 
        X = F.relu(self.sub0conv3(X)) 
        
        X = self.sub1p1_mpool1(X)
        X = F.relu(self.sub2p1_conv1(X))
        X = F.relu(self.sub2p1_conv2(X))
        
        X = self.sub3p2_mpool1(X)
        
        X0 = self.branch0(X)
        
        X1 = self.branch1a(X)
        X1 = self.branch1b(X1)
        
        X2 = self.branch2a(X)
        X2 = self.branch2b(X2)
        X2 = self.branch2c(X2)
        
        X3 = self.branch3a(X)
        X3 = self.branch3b(X)
        
        X = torch.cat((X0, X1, X2, X3), 1)
        
        X = self.batchNorm(X)
        X = F.relu(X)
        return X

### Definition of ResNet Block A

In [None]:
class InceptionResnetv2A(nn.Module):
    def __init__(self, scale=True):
        super().__init__()
        self.scale = scale
        
        self.p1_conv1 = nn.Conv2d(320, 32, kernel_size=1, padding='same')
        
        self.p2_conv1 = nn.Conv2d(320, 32, kernel_size=1, padding='same')
        self.p2_conv2 = nn.Conv2d(32, 32, kernel_size=3, padding='same')
        
        self.p3_conv1 = nn.Conv2d(320, 32, kernel_size=1, padding='same')
        self.p3_conv2 = nn.Conv2d(32, 48, kernel_size=3, padding='same')
        self.p3_conv3 = nn.Conv2d(48, 64, kernel_size=3, padding='same')
        
        self.p_conv1 = nn.Conv2d(128, 320, kernel_size=1, padding='same')
        
        self.batchNorm = nn.BatchNorm2d(320, affine=True)
        
        if self.scale:
            self.scaleLayer = LambdaScale()
        
    def forward(self, X):
        
        # X is relu-activated
        old = X
        
        X1 = F.relu(self.p1_conv1(X))
        
        X2 = F.relu(self.p2_conv1(X))
        X2 = F.relu(self.p2_conv2(X2))
        
        X3 = F.relu(self.p3_conv1(X))
        X3 = F.relu(self.p3_conv2(X3))
        X3 = F.relu(self.p3_conv3(X3))
        
        X = torch.cat((X1, X2, X3), dim=1)
        
        X = self.p_conv1(X)
        if self.scale:
            X = self.scaleLayer(X)
        
        X = self.batchNorm(X)
        X = F.relu(X)
        
        return X

### Definition of ResNet Block B

In [None]:
class InceptionResnetv2B(nn.Module):

    def __init__(self, scale=True):
        super().__init__()
        self.scale = scale
        self.p1_conv1 = nn.Conv2d(1088, 192, kernel_size=1, stride=1, padding='same')
        
        self.p2_conv1 = nn.Conv2d(1088, 128, kernel_size=1, padding='same')
        self.p2_conv2 = nn.Conv2d(128, 160, kernel_size=(1,7), padding='same')
        self.p2_conv3 = nn.Conv2d(160, 192, kernel_size=(7,1), padding='same')
        
        self.p3_conv = nn.Conv2d(384, 1088, kernel_size=1, padding='same')
        
        self.batchNorm = nn.BatchNorm2d(1088, affine=True)
        if self.scale:
            self.scaleLayer = LambdaScale()
            
    def forward(self, X):
        old = X
        X1 = F.relu(self.p1_conv1(X))
        
        X2 = F.relu(self.p2_conv1(X))
        X2 = F.relu(self.p2_conv2(X2))
        X2 = F.relu(self.p2_conv3(X2))
        
        X = torch.cat((X1, X2), dim=1)
        
        X = F.relu(self.p3_conv(X))
        if self.scale:
            X = self.scaleLayer(X)
        
        X = self.batchNorm(X)
        X = F.relu(X)
        
        return X

### Definition of ResNet Block C

In [None]:
class InceptionResnetv2C(nn.Module):
    def __init__(self, scale=True, noRelu=False):
        super().__init__()
        self.scale = scale
        
        self.noRelu = noRelu
        self.p1_conv1 = nn.Conv2d(2080, 192, kernel_size=1, padding='same')
        
        self.p2_conv1 = nn.Conv2d(2080, 192, kernel_size=1, padding='same')
        self.p2_conv2 = nn.Conv2d(192, 224, kernel_size=(1,3), padding='same')
        self.p2_conv3 = nn.Conv2d(224, 256, kernel_size=(3,1), padding='same')
        
        self.p3_conv = nn.Conv2d(448, 2080, kernel_size=1, padding='same')
        
        self.batchNorm = nn.BatchNorm2d(2080, affine=True)
        if self.scale:
            self.scaleLayer = LambdaScale()
    def forward(self, X):
        old = X
        X1 = F.relu(self.p1_conv1(X))
        
        X2 = F.relu(self.p2_conv1(X))
        X2 = F.relu(self.p2_conv2(X2))
        X2 = F.relu(self.p2_conv3(X2))
        
        X = torch.cat((X1, X2), dim=1)
        
        X = F.relu(self.p3_conv(X))
        if self.scale:
            X = self.scaleLayer(X)
        
        X = self.batchNorm(X)
        if not self.noRelu:
            X = F.relu(X)
        
        return X

### Definition of ResNet Block - Reduction A

In [None]:
class InceptionResnetv2ReductionA(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.p1_mpool1 = nn.MaxPool2d(kernel_size=3, stride=2)
        
        self.p2_conv1 = nn.Conv2d(320, 384, kernel_size=3, stride=2)
        
        self.p3_conv1 = nn.Conv2d(320, 256, kernel_size=1, padding='same')
        self.p3_conv2 = nn.Conv2d(256, 256, kernel_size=3, padding='same')
        self.p3_conv3 = nn.Conv2d(256, 384, kernel_size=3, stride=2)
        
        self.batchNorm = nn.BatchNorm2d(1088, affine=True)
        
    def forward(self, X):
        
        X1 = self.p1_mpool1(X)
        
        X2 = F.relu(self.p2_conv1(X))
        
        X3 = F.relu(self.p3_conv1(X))
        X3 = F.relu(self.p3_conv2(X3))
        X3 = F.relu(self.p3_conv3(X3))
        
        X = torch.cat((X1, X2, X3), dim=1)
        
        X = self.batchNorm(X)
        X = F.relu(X)
        
        return X

### Definition of ResNet Block - Reduction B

In [None]:
class InceptionResnetv2ReductionB(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.p1_mpool1 = nn.MaxPool2d(kernel_size=3, stride=2)
        
        self.p2_conv1 = nn.Conv2d(1088, 256, kernel_size=1, padding='same')
        self.p2_conv2 = nn.Conv2d(256, 384, kernel_size=3, stride=2)
        
        self.p3_conv1 = nn.Conv2d(1088, 256, kernel_size=1, padding='same')
        self.p3_conv2 = nn.Conv2d(256, 288, kernel_size=3, stride=2)
        
        self.p4_conv1 = nn.Conv2d(1088, 256, kernel_size=1, padding='same')
        self.p4_conv2 = nn.Conv2d(256, 288, kernel_size=3, padding=1)
        self.p4_conv3 = nn.Conv2d(288, 320, kernel_size=3, stride=2)
        
        self.batchNorm = nn.BatchNorm2d(2080, affine=True)
        
    def forward(self, X):
        
        X1 = self.p1_mpool1(X)
        
        X2 = F.relu(self.p2_conv1(X))
        X2 = F.relu(self.p2_conv2(X2))
        
        X3 = F.relu(self.p3_conv1(X))
        X3 = F.relu(self.p3_conv2(X3))
        
        X4 = F.relu(self.p4_conv1(X))
        X4 = F.relu(self.p4_conv2(X4))
        X4 = F.relu(self.p4_conv3(X4))
        
        X = torch.cat((X1, X2, X3, X4), dim=1)
        
        X = self.batchNorm(X)
        X = F.relu(X)
        
        return X

### Definition of final Model

In [None]:
class InceptionResnetV2(nn.Module):
    def __init__(self, scale=True, feature_list_size=1001):
        super().__init__()
        
        self.scale = scale
        self.stem = InceptionResnetv2Stem()
        self.a = InceptionResnetv2A(scale=True)
        self.b = InceptionResnetv2B(scale=True)
        self.c = InceptionResnetv2C(scale=True)
        self.noreluc = InceptionResnetv2C(scale=True, noRelu=True)
        self.red_a = InceptionResnetv2ReductionA()
        self.red_b = InceptionResnetv2ReductionB()
        
        self.avgpool = nn.AvgPool2d(8)
        
        self.conv2d = nn.Conv2d(2080, 1536, kernel_size=1,)
        
        self.dropout = nn.Dropout(0.8)
        self.flatten = nn.Flatten()
        
        self.linear = nn.Linear(in_features=1536, out_features=feature_list_size)
        
    
    def forward(self, X):
        X = self.stem(X)
        
        for i in range(10):
            X = self.a(X)
        
        X = self.red_a(X)
        
        for i in range(20):
            X = self.b(X)
        
        X = self.red_b(X)
        
        for i in range(9):
            X = self.c(X)
            
        X = self.noreluc(X)
        
        X = self.conv2d(X)
        
        X = self.dropout(X)
        
        X = self.avgpool(X)
        
        X = X.view(X.size(0), -1)
        
        X = self.linear(X)
        
        return X
        

### Test run of a random Tensor through the model

In [None]:
X = torch.randn(1, 3, 299, 299)
model = InceptionResnetV2(feature_list_size=7)
model.forward(X)

### Getting details of GPU present on machine and defining helpers to load previous models

In [None]:
!nvidia-smi

In [None]:
torch.cuda.device_count()

In [None]:
def try_gpu_else_cpu():
    devices = [torch.device(f'cuda:{i}') for i in range(torch.cuda.device_count())]
    return devices if devices else [torch.device('cpu')]
device = try_gpu_else_cpu()

In [None]:
def load_model_from_checkpoint(path):
    res = torch.load(path)
    model = InceptionResnetV2(feature_list_size=7)
    model.load_state_dict(res['model.state_dict'])
    optimizer = optim.Adam(net.parameters(), weight_decay=0.009, amsgrad=True)
    optimizer.load_state_dict(res['optimizer.state_dict'])
    epoch = res['epoch']
    return model, optimizer, epoch

### Original-paper specified the following parameters

In [None]:
optimizer = optim.RMSprop(model.parameters(), weight_decay=0.9, eps=1.0, lr=0.045)
loss_fn = nn.NLLLoss()

### Dataset preprocessing and Model training

In [None]:
import os
PATH = '/Users/suvad/Python Works/images' ## Set this to your own folder which stores the dataset images
print(os.listdir(PATH))
from torchvision import datasets, transforms
train_transforms = transforms.Compose([transforms.ToTensor(), transforms.Resize(size=(299,299), interpolation=transforms.InterpolationMode.BILINEAR), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
train_dataset = datasets.ImageFolder('/Users/suvad/Python Works/images/train', transform=train_transforms)

data_train = DataLoader(train_dataset, shuffle=True, batch_size=5)

images, labels = next(iter(data_train))

img_tr = transforms.ToPILImage()
for i in images:
    img_tr(i).show()
print(labels)

### Getting CUDA Memory summary and usage diagnostics

In [None]:
print(torch.cuda.memory_summary(device=device, abbreviated=False))

### Cleaning all previous cache before using GPU

In [None]:
torch.cuda.empty_cache()

### Setting all seeds and options required to maintain reproducibility

In [None]:
torch.backends.cudnn.benchmark = True
torch.manual_seed(1)
torch.cuda.manual_seed(1)

### Training function

In [None]:
def train_net(train_loader, epochs=2):
    
    CURRENT_DIRECTORY = os.getcwd()
    EPOCH_DIRECTORY = os.path.join(CURRENT_DIRECTORY, 'resnet-v2-epochs')
    if not os.path.exists(EPOCH_DIRECTORY):
        os.mkdir(EPOCH_DIRECTORY)
        
        
    net = InceptionResnetV2(feature_list_size=7).cuda()
    if os.path.exists(f'model_ckpt_epoch{}.pkl') 
    loss_fn = nn.CrossEntropyLoss()
    optimizer = optim.Adam(net.parameters(), weight_decay=0.009, amsgrad=True)
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.2, threshold=0.01, patience=5)
    
    running_loss = 0.00
    count = 0
    
    writer = SummaryWriter()
    
    
    
    for i in range(epochs):
        
        pbar = tqdm_notebook(enumerate(BackgroundGenerator(train_loader), 0),
                    total=len(train_loader))
        start_time = time.time()
        
        CHECKPOINT_PATH = os.path.join(EPOCH_DIRECTORY, f'model_ckpt_epoch{i+1}.pkl')
        
        for j, data in pbar:
            images, labels = data
            if torch.cuda.is_available():
                inp = torch.autograd.Variable(images).cuda()
                targs = torch.autograd.Variable(labels).cuda()
                
            prepare_time = start_time-time.time()

            optimizer.zero_grad()

            output = net(inp)
            loss = loss_fn(output, targs)
            loss.backward()
            optimizer.step()
            count+=1
            
            process_time = start_time-time.time()-prepare_time
            pbar.set_description(f'Efficiency = {process_time/(process_time+prepare_time)}\nEpochs: {i+1}/{epochs}')
            running_loss += loss.item()
            
            writer.add_scalar('Compute Time efficiency (per mini-batch)', process_time/(process_time+prepare_time),
                             j)
            writer.add_scalar('Training Loss', loss, j)
            
        scheduler.step(loss)
        torch.save({
            "model.state_dict" : net.state_dict(),
            "optimizer.state_dict" : optimizer.state_dict(),
            "epoch":i
        }, CHECKPOINT_PATH)
    
    writer.flush()
    writer.close()
    return net, optimizer

In [None]:
net, opt = train_net(data_train, epochs=10)

### Using Tensorboard. Navigate to http://localhost:6006/ while cell is executing

In [None]:
# View Tensorboard
%pip install tensorboard
%tensorboard --logdir=runs

### Defining functions to generate predictions from model

In [None]:
def predict_class(img, transform_func):
    classes = ['Anger', 'Disgust', 'Fear', 'Happy', 'Neutral', 'Sad', 'Surprise']
    var = torch.autograd.Variable(img)
    
    # Use latest model epoch by changing path
    model, opt, ep = load_model_from_checkpoint("C:\\Users\\suvad\\Python Works\\resnet-v2-epochs\\model_ckpt_epoch2.pkl")
    res = model(var)
    res = res.cpu()
    clsf = res.data.numpy().argmax()
    print(clsf)
    pred = classes[clsf]
    return pred

In [None]:
test_transforms = transforms.Compose([transforms.ToTensor(), transforms.Resize(size=(299,299), interpolation=transforms.InterpolationMode.BICUBIC), transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))])
test_dataset = datasets.ImageFolder('C:/Users/suvad/Python Works/images/validation', transform=test_transforms)

data_test = DataLoader(test_dataset, shuffle=True, batch_size=1)

for i, data in enumerate(data_test):
    images, labels = data
    predict_class(images, test_transforms)