# NameProj
The idea of this project is to implement and evaluate a multi-biometric system using Deep Learning.


# Mount drive and load data

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install faiss-cpu

In [5]:
import os
import zipfile
from google.colab import drive

source_folder = '/content/drive/MyDrive/BiometricSystems/SpeakingFaces'
destination_folder = '/content/drive/MyDrive/BiometricSystems/SpeakingFacesExt'
files_to_delete = []

for filename in os.listdir(source_folder):
    if files_to_delete != []: print(f"Please delete these files from {destination_folder}\n{files_to_delete}")

    if filename.split(".")[0] in os.listdir(destination_folder):
      print( f"Unpacked {filename} already exists in the destination folder")
      files_to_delete.append(filename)
      continue

    if filename.endswith(".zip"):
        zip_filepath = os.path.join(source_folder, filename)
        try:
            with zipfile.ZipFile(zip_filepath, 'r') as zip_ref:
                # Extract all contents to the destination folder
                zip_ref.extractall(destination_folder)
            print(f"Successfully extracted {filename} to {destination_folder}")

            # Delete the unzipped file from the source folder
            os.remove(zip_filepath)
            print(f"Deleted {filename} from {source_folder} (Also check manually)")

        except zipfile.BadZipFile:
            print(f"Error: {filename} is not a valid zip file.")
        except Exception as e:
            print(f"An error occurred while processing {filename}: {e}")

Unpacked sub_47_ia.zip already exists in the destination folder
Successfully extracted sub_50_ia.zip to /content/drive/MyDrive/BiometricSystems/SpeakingFacesExt
Deleted sub_50_ia.zip from /content/drive/MyDrive/BiometricSystems/SpeakingFaces
Please delete these files from /content/drive/MyDrive/BiometricSystems/SpeakingFacesExt
['sub_47_ia.zip']


# Load Libraries

In [6]:
import torch
import torch.nn as nn
from torch.nn import init
import torch.optim as optim
from torchvision import models
import torch.nn.functional as F
from torch.autograd import Variable
from torch.optim.lr_scheduler import StepLR
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Dataset
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torchaudio

import os
import shutil
from sklearn.model_selection import train_test_split
from tqdm.auto import tqdm
import timm
import json
import numpy as np
from PIL import Image
import copy
import random
import itertools
import math

device = "cuda" if torch.cuda.is_available() else "cpu"
torch.manual_seed(404)
np.random.seed(404)
random.seed(404)

*The first step is to implement manually a visual transformer that extracts the feature from thermal images*

In [None]:
def weights_init_kaiming(m):
    classname = m.__class__.__name__
    # print(classname)
    if classname.find('Conv') != -1:
        init.kaiming_normal_(m.weight.data, a=0, mode='fan_in')
    elif classname.find('Linear') != -1:
        init.kaiming_normal_(m.weight.data, a=0, mode='fan_out')
        init.constant_(m.bias.data, 0.0)
    elif classname.find('BatchNorm1d') != -1:
        init.normal_(m.weight.data, 1.0, 0.02)
        init.constant_(m.bias.data, 0.0)

def weights_init_classifier(m):
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        init.normal_(m.weight.data, std=0.001)
        init.constant_(m.bias.data, 0.0)

class ClassBlock(nn.Module):
    def __init__(self, input_dim, class_num, droprate, relu=False, bnorm=True, num_bottleneck=512, linear=True, return_f = False):
        super(ClassBlock, self).__init__()
        self.return_f = return_f
        add_block = []
        if linear:
            add_block += [nn.Linear(input_dim, num_bottleneck)]
        else:
            num_bottleneck = input_dim
        if bnorm:
            add_block += [nn.BatchNorm1d(num_bottleneck)]
        if relu:
            add_block += [nn.LeakyReLU(0.1)]
        if droprate>0:
            add_block += [nn.Dropout(p=droprate)]
        add_block = nn.Sequential(*add_block)
        add_block.apply(weights_init_kaiming)

        classifier = []
        classifier += [nn.Linear(num_bottleneck, class_num)]
        classifier = nn.Sequential(*classifier)
        classifier.apply(weights_init_classifier)

        self.add_block = add_block
        self.classifier = classifier
    def forward(self, x):
        x = self.add_block(x)
        if self.return_f:
            f = x
            x = self.classifier(x)
            return [x,f]
        else:
            x = self.classifier(x)
            return x

class LATransformer(nn.Module):
    def __init__(self, model, lmbd, print_verbose = False, test=False, pretraining=False):
        super(LATransformer, self).__init__()

        if print_verbose:
            self._print = print
        else:
            self._print = lambda *args, **kwargs: None
        self.class_num = 751
        self.part = 14 # We cut the pool5 to sqrt(N) parts
        self.num_blocks = 12
        self.model = model
        self.model.head.requires_grad_ = False
        self.cls_token = self.model.cls_token
        self.pos_embed = self.model.pos_embed
        self.avgpool = nn.AdaptiveAvgPool2d((self.part,768))
        self.dropout = nn.Dropout(p=0.5)
        self.lmbd = lmbd
        self.test = test
        self.pretraining = pretraining
        if not (self.test or self.pretraining):
          for i in range(self.part):
              name = 'classifier'+str(i)
              setattr(self, name, ClassBlock(768, self.class_num, droprate=0.5, relu=False, bnorm=True, num_bottleneck=256))

        if self.pretraining:
          self.fc = nn.Sequential(nn.Conv1d(14, 32, 3),
                                  nn.BatchNorm1d(32),
                                  nn.LeakyReLU(0.1),
                                  nn.Conv1d(32, 3, 3),
                                  nn.BatchNorm1d(3),
                                  nn.LeakyReLU(0.1),
                                  nn.Flatten(),
                                  nn.Linear(2292, 1024),
                                  nn.LeakyReLU(0.1),
                                  nn.Linear(1024,128))

          self.fc.apply(weights_init_kaiming)



    def forward(self,x):

        # Divide input image into patch embeddings and add position embeddings
        # cls token is a learnable parameter added to the start of sequence
        # It contains global info about the whole image
        # Used in classical Transformers like BERT and ViT to do classification with just itself
        # Here it is later combined to enrich local features with global features of the image
        self._print(f"x before pos embedding: {x.shape}")
        x = self.model.patch_embed(x)
        self._print(f"x after pos embedding: {x.shape}")
        cls_token = self.cls_token.expand(x.shape[0], -1, -1)
        self._print(f"cls token: {cls_token.shape}")
        x = torch.cat((cls_token, x), dim=1)
        self._print(f"x with concatenation with cls token: {x.shape}")
        x = self.model.pos_drop(x + self.pos_embed)
        self._print(f"x after pos drop: {x.shape}")

        # Feed forward through transformer blocks
        for i in range(self.num_blocks):
            self._print(f"x before block {i}: {x.shape}")
            x = self.model.blocks[i](x)
        x = self.model.norm(x)
        self._print(f"x after blocks: {x.shape}")

        # extract the cls token
        cls_token_out = x[:, 0].unsqueeze(1)
        self._print(f"cls token out: {cls_token_out.shape}")

        # Average pool
        x = self.avgpool(x[:, 1:])
        self._print(f"x after avgpool: {x.shape}")

        if self.test:
          return x

        # Add global cls token to each local token
        for i in range(self.part):
            self._print(f"x before mul: {x.shape}")
            out = torch.mul(x[:, i, :], self.lmbd)
            x[:,i,:] = torch.div(torch.add(cls_token_out.squeeze(),out), 1+self.lmbd)

        if self.pretraining:
          x = x.reshape(x.size(0), 14, -1)
          x = self.fc(x)
          return x

        # Locally aware network
        part = {}
        predict = {}
        for i in range(self.part):
            part[i] = x[:,i,:]
            name = 'classifier'+str(i)
            c = getattr(self,name)
            predict[i] = c(part[i])
        return predict

# Data 🗃️


The dataset has a total of 142 subjects that we can use to finetune each model. The split will be virtually assigned for each model in order to add some noise.

In [7]:
data_dir= "./drive/MyDrive/BiometricSystems/SpeakingFacesExt"

train_dir, test_dir , val_dir = "train", "test", "val"
subjects_ids = [] #[str(i) for i in range(1,142)]

Checking the sanity of the dataset

In [8]:
for file in os.listdir(data_dir):
  if file not in ["train", "val", "test"]:

    index = file.split("_")[1]

    sub_dir = os.listdir(os.path.join(data_dir, file))
    if "trial_1" in sub_dir and "trial_2" in sub_dir:
      subj_id = index
      subjects_ids.append(int(subj_id))
    #print(file,index)
    #shutil.move(os.path.join(data_dir, file), os.path.join(data_dir, train_dir))


print(sorted(subjects_ids))
print(len(subjects_ids))


[1, 2, 3, 4, 6, 8, 9, 10, 11, 12, 13, 14, 15, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 46, 47, 48, 50, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141]
87


In [None]:
def generate_split(data_path, subjects_ids):
  shuffled_data = subjects_ids.copy()
  random.shuffle(shuffled_data)
  pivot1, pivot2, n = math.ceil(0.7*len(shuffled_data)), math.ceil(0.15*len(shuffled_data)), len(shuffled_data)
  print(pivot1)
  return shuffled_data[:pivot1], shuffled_data[pivot1:n-pivot2+1], shuffled_data[n-pivot2+1:]

In [None]:
train,val,test = generate_split(data_dir, subjects_ids)
print(train)
print(val)
print(test)
print(len(train), len(val), len(test))

54
[17, 21, 138, 36, 20, 26, 115, 23, 136, 122, 106, 132, 15, 11, 25, 108, 22, 12, 14, 113, 111, 107, 29, 121, 30, 110, 35, 99, 34, 131, 100, 39, 102, 10, 133, 127, 114, 18, 31, 38, 24, 141, 130, 125, 118, 32, 19, 1, 37, 3, 109, 126, 137, 4]
[123, 140, 117, 104, 103, 112, 135, 27, 128, 40, 139]
[101, 124, 2, 119, 134, 120, 116, 129, 33, 105, 13]
54 11 11


# Custom Data Loader
*We need to define the function to extract the training, validation and test data*

In [None]:
class CustomDataset(Dataset):
    def __init__(self, root, labels, transform=None):

        self.root = root
        self.labels = labels
        self.rgb_images_names = {label: [] for label in self.labels}
        self.thrml_images_names = {label: [] for label in self.labels}
        self.audio_names = {label: [] for label in self.labels}

        self.valid_indices = list(self.rgb_images_names.keys())

        self.add_data(label_limit=1, data_limit=5)

        self.mapping = {int(label): label in self.labels}
        self.transform = transform

    def data_path_extractor(self, attribute, label, data_limit, subpath, data_folder):
      limit_data = data_limit
      for data_element in os.listdir(os.path.join(self.root, subpath, data_folder)):
          if not limit_data: break
          attribute[label] = os.path.join(self.root, subpath, data_folder, data_element)
          limit_data -= 1

    def add_data(self, label_limit, data_limit):
      limit_labels = label_limit
      for label in self.labels:
          if not limit_labels: break

          trial = random.choice([1, 2])
          mic_number = random.choice([1, 2])
          subpath = os.path.join(f"sub_{label}_ia",f"trial_{trial}")

          self.data_path_extractor(self.rgb_images_names, label, data_limit, subpath, "rgb_image_cmd_aligned")
          self.data_path_extractor(self.thrml_images_names, label, data_limit, subpath, "thr_image_cmd")
          self.data_path_extractor(self.audio_names, label, data_limit, subpath, f"mic{mic_number}_audio_cmd_trim")
          '''
          for image_name in os.listdir(os.path.join(self.root, subpath, "rgb_image_cmd_aligned")):
              if not limit_images: break
              self.images_names[label]= os.path.join(self.root, subpath, image_name)
              limit_images -= 1
          '''
          limit_labels -= 1

    def __len__(self):
        return len(self.labels)

    def __iter__(self):
      for idx in self.valid_indices:
        if len(self.rgb_images_names[idx]):
            yield self.__getitem__(idx)

    def __getitem__(self, idx):
        img = Image.open(self.rgb_images_names[idx])
        img_thr = Image.open(self.thrml_images_names[idx])
        audio_tensor, sample_rate = torchaudio.load(self.audio_names[idx])
        if self.transform:
            img = self.transform(img)
            img_thr = self.transform(img_thr)
        return img, img_thr, (audio_tensor, sample_rate), idx

In [None]:
train_labels, val_labels, test_labels = generate_split(data_dir, subjects_ids)

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor() # 256 -> 0.0 ~ 1.0
])

train_dataset = CustomDataset(data_dir, train_labels, transform=transform)
train_dataloader = DataLoader(train_dataset, batch_size=64, shuffle=True)


54


In [None]:
for img, img_thr, audio, label in train_dataset:
  a,b,c,d = img, img_thr, audio, label
  print(a,b,c,d)

tensor([[[0.6863, 0.6863, 0.6902,  ..., 0.6824, 0.6824, 0.6824],
         [0.6902, 0.6902, 0.6863,  ..., 0.6863, 0.6824, 0.6784],
         [0.6902, 0.6902, 0.6902,  ..., 0.6863, 0.6863, 0.6863],
         ...,
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000]],

        [[0.6902, 0.6902, 0.6941,  ..., 0.6902, 0.6902, 0.6902],
         [0.6941, 0.6941, 0.6902,  ..., 0.6941, 0.6902, 0.6863],
         [0.6941, 0.6941, 0.6941,  ..., 0.6941, 0.6902, 0.6902],
         ...,
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000],
         [0.0000, 0.0000, 0.0000,  ..., 0.0000, 0.0000, 0.0000]],

        [[0.6941, 0.6941, 0.6980,  ..., 0.6824, 0.6784, 0.6784],
         [0.6980, 0.6980, 0.6941,  ..., 0.6784, 0.6745, 0.6706],
         [0.6980, 0.7020, 0.7020,  ..., 0.6784, 0.6745, 0.

In [None]:
'''
def get_data(data_dir="/content/BiometricsSystems/data"):
  batch_size = 32

  transform_train_list = transforms.Compose([
      transforms.Resize((224,224), interpolation=3),
      transforms.RandomHorizontalFlip(),
      transforms.ToTensor(),
      transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
      ])
  transform_val_list = transforms.Compose([
      transforms.Resize(size=(224,224),interpolation=3),
      transforms.ToTensor(),
      transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
      ])

  dataset_train = datasets.ImageFolder(os.path.join(data_dir, 'train'),transform=transform_train_list)
  dataset_val = datasets.ImageFolder(os.path.join(data_dir, 'val'),transform=transform_val_list)
  train_loader = DataLoader(dataset = dataset_train, batch_size=batch_size, shuffle=True)
  val_loader = DataLoader(dataset = dataset_val, batch_size=batch_size, shuffle=True)

  return train_loader, val_loader
  '''

# Freezing Layers ❄️
*In order to fine-tune the Visual Transformer we need to keep training the model on new data, without losing the knowledge gained from the pre-trained part. This implies freezing the initial layers and gradually unfreeze the next ones without completely overriding the gained knowledge. This requires lower epochs, dropout and a small learning rate*

In [None]:
def freeze_all_blocks(model):
    frozen_blocks = 12
    for block in model.model.blocks[:frozen_blocks]:
        for param in block.parameters():
            param.requires_grad=False

def unfreeze_blocks(model, amount= 1):
    for block in model.model.blocks[11-amount:]:
        for param in block.parameters():
            param.requires_grad=True
    return model

def get_training_objects(pretraining_path = None):
  backbone = timm.create_model('vit_base_patch16_224', pretrained=True, num_classes=751).to(device)
  lmbd = 8 #Weight in averaging with CLS token
  model = LATransformer(backbone, lmbd).to(device)

  if pretraining_path:
    model.load_state_dict(torch.load(pretraining_path), strict=False)

  freeze_all_blocks(model)

  criterion = nn.CrossEntropyLoss()

  optimizer = optim.AdamW(model.parameters(),weight_decay=5e-4, lr=3e-4)

  return model, criterion, optimizer

# Training ⏳⚙️
...

In [None]:
def train(num_epochs, model, train_loader, val_loader, optimizer, loss_fn, save_model_path, log_file):

    os.makedirs(save_model_path, exist_ok=True)
    log_data = []
    unfrozen_blocks = 0
    unfreeze_after = 2
    lr_decay = .8

    with tqdm(total=num_epochs, desc='Total Progress', unit='epoch') as epoch_pbar:
      for epoch in range(num_epochs):

          if epoch%unfreeze_after==0:
            unfrozen_blocks += 1
            model = unfreeze_blocks(model, unfrozen_blocks)
            optimizer.param_groups[0]['lr'] *= lr_decay
            trainable_params = sum(p.numel() for p in model.parameters() if p.requires_grad)
            print("Unfrozen Blocks: {}, Current lr: {}, Trainable Params: {}".format(unfrozen_blocks,
                                                                                optimizer.param_groups[0]['lr'],
                                                                                trainable_params))

          model.train()
          train_loss = 0.0
          train_accuracy = 0.0

          with tqdm(train_loader, unit="batch") as pbar:
              for data, target in pbar:
                  data, target = data.to(device), target.to(device)

                  optimizer.zero_grad()
                  output = model(data)

                  score = sum(nn.Softmax(dim=1)(v) for v in output.values())
                  _, preds = torch.max(score, 1)

                  loss = sum(loss_fn(v, target) for v in output.values())
                  loss.backward()
                  optimizer.step()

                  train_loss += loss.item()
                  train_accuracy += (preds == target).float().mean().item()

                  current_loss = train_loss / (pbar.n + 1)
                  current_accuracy = train_accuracy / (pbar.n + 1)

                  pbar.set_description(f"Epoch {epoch+1}/{num_epochs}")
                  pbar.set_postfix(loss=current_loss, accuracy=current_accuracy)

          avg_train_loss = train_loss / len(train_loader)
          avg_train_accuracy = train_accuracy / len(train_loader)

          model.eval()
          val_loss = 0.0
          val_accuracy = 0.0

          with torch.no_grad():
              with tqdm(val_loader, unit="batch") as val_pbar:
                  for data, target in val_pbar:
                      data, target = data.to(device), target.to(device)

                      output = model(data)

                      score = sum(nn.Softmax(dim=1)(v) for v in output.values())
                      _, preds = torch.max(score, 1)

                      loss = sum(loss_fn(v, target) for v in output.values())

                      val_loss += loss.item()
                      val_accuracy += (preds == target).float().mean().item()

                      current_loss = val_loss / (val_pbar.n + 1)
                      current_accuracy = val_accuracy / (val_pbar.n + 1)

                      val_pbar.set_description(f"Epoch {epoch+1}/{num_epochs} [Val]")
                      val_pbar.set_postfix(loss=current_loss, accuracy=current_accuracy)

          avg_val_loss = val_loss / len(val_loader)
          avg_val_accuracy = val_accuracy / len(val_loader)

          model_save_path = os.path.join(save_model_path, f'model_epoch_{epoch+1}.pth')
          torch.save(model.state_dict(), model_save_path)

          epoch_log = {
              'epoch': epoch + 1,
              'training_loss': avg_train_loss,
              'training_accuracy': avg_train_accuracy,
              'val_loss': avg_val_loss,
              'val_accuracy': avg_val_accuracy
          }
          log_data.append(epoch_log)

          with open(log_file, 'w') as f:
              json.dump(log_data, f, indent=4)

          print(f"Epoch {epoch+1}/{num_epochs} - Training Loss: {avg_train_loss:.4f} - Validation Loss: {avg_val_loss:.4f} - Training Accuracy: {avg_train_accuracy:.4f} - Validation Accuracy: {avg_val_accuracy:.4f}")

          epoch_pbar.update(1)
          epoch_pbar.set_postfix(
                train_loss=avg_train_loss,
                train_accuracy=avg_train_accuracy,
                val_loss=avg_val_loss,
                val_accuracy=avg_val_accuracy
          )
          print("==================================================================================")


In [None]:
def train_model(save_path = "/content/drive/MyDrive/BiometricSystems/LA-Transformers(Vanilla)/", pretraining_path = None):
  train_loader, val_loader = get_data()
  model, criterion, optimizer = get_training_objects(pretraining_path)
  train(30, model, train_loader, val_loader, optimizer, criterion, save_path, os.path.join(save_path, "training_results.json"))

In [None]:
train_model()