# Setup

In [None]:
!git clone -q https://github.com/timesler/facenet-pytorch.git
!mv facenet-pytorch/models ./
!rm -rf facenet-pytorch/

In [None]:
import os
import pickle
import random
from copy import deepcopy
from pathlib import Path

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import plotly.express as px
import plotly.graph_objects as go
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.transforms as tts
from google.colab import drive, output
from IPython import display
from models import inception_resnet_v1
from PIL import Image
from plotly.subplots import make_subplots
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import (
    DataLoader,
    Dataset,
    WeightedRandomSampler,
)
from tqdm.notebook import tqdm

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"

In [None]:
output.enable_custom_widget_manager()

# Data Preparation

In [None]:
drive.mount("drive")

In [None]:
!unzip -q 'drive/MyDrive/Datasets/CelebAFaces.zip'
!unzip -q 'drive/MyDrive/Datasets/CelebA_IR.zip'

In [None]:
images_root = "celebA_train_1k/celebA_imgs"
labels_file = "celebA_train_1k/celebA_anno.txt"
train_split_file = "celebA_train_1k/celebA_train_split.txt"

query_images_root = "celebA_ir/celebA_query"
distractors_images_root = "celebA_ir/celebA_distractors"
query_labels_file = "celebA_ir/celebA_anno_query.txt"

In [None]:
def denormalize(tensor, mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)):
    return torch.stack([tensor[i] * std[i] + mean[i] for i in range(3)])

In [None]:
def show_images(images_root, n_images=16, ncols=4):
    images_files = list(Path(images_root).glob("**/*.jpg"))
    fig, axes = plt.subplots(n_images // ncols, ncols, figsize=(4 * n_images // ncols, 4 * ncols))

    for i in range(n_images):
        image = Image.open(images_files[i])
        axes[i // ncols][i % ncols].imshow(np.array(image))
        axes[i // ncols][i % ncols].set_title(str(images_files[i]).split("/")[-1])
        axes[i // ncols][i % ncols].axis("off")
    plt.show()

In [None]:
def show_images_by_labels(dataset, *labels, n):
    if type(labels[0]) == list:
        labels = labels[0]
    for label in labels:
        if not (dataset.labels == label).any():
            return "No images with the given label are found in dataset"
        images = get_images_by_label(dataset, label, n)
        fig, axes = plt.subplots(1, n, figsize=(12, 12))
        for i in range(n):
            axes[i].imshow(denormalize(images[i]).permute(1, 2, 0))
            axes[i].axis("off")
    plt.show()

In [None]:
def get_images_by_label(dataset, label, n=None):
    if (dataset.labels == label).any():
        if n:
            return torch.stack([dataset[i][0] for i in (dataset.labels == label).nonzero()[:n]])
        return torch.stack([dataset[i][0] for i in (dataset.labels == label).nonzero()])

In [None]:
class CelebADataset(Dataset):
    def __init__(self, images_root, train_split_file, labels_file, mod, transforms, aug=None):
        super().__init__()

        mods = {"train": 0, "val": 1, "test": 2}

        self.mod = mod
        self.transforms = transforms
        self.aug = aug

        with open(train_split_file, "r") as train_test_f:
            images = train_test_f.readlines()
        self.images = [
            os.path.join(images_root, image.strip().split()[0])
            for image in images
            if int(image.strip().split()[1]) == mods[mod]
        ]

        with open(labels_file, "r") as labels_f:
            labels = labels_f.readlines()
        labels_by_images = {label.strip().split()[0]: int(label.strip().split()[-1]) for label in labels}
        self.labels = torch.LongTensor([labels_by_images[image.split("/")[-1]] for image in self.images])

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        image = Image.open(image)
        if self.aug:
            image = self.aug(image)
        image = self.transforms(image)
        label = self.labels[idx]

        return image, label

In [None]:
def dataset_distribution(dataset, sorted=True, sampler=None):
    if sampler:
        classes_count = {i: 0 for i in range(1000)}
        loader = DataLoader(dataset, batch_size=1, sampler=sampler)
        for X_batch, y_batch in tqdm(loader):
            for y in y_batch:
                classes_count[y.item()] += 1
        classes_count = np.array(list(classes_count.values()))
        display.clear_output()
    else:
        classes_count = np.unique(dataset.labels, return_counts=True)[1]

    histogram = px.histogram(x=classes_count)
    histogram.update_layout(
        xaxis_title="images per class",
        yaxis_title="classes count",
        title=f"Classes by images. Total images: {len(dataset)}",
    )

    bar_xaxis_title = "classes"

    if sorted:
        classes_count.sort()
        bar_xaxis_title = "sorted classes"

    bar = px.bar(
        y=classes_count,
        title=f"Images per class. Mean: {classes_count.mean():.2f} / "
        f"Std: {classes_count.std():.2f} ({classes_count.std() / classes_count.mean():.2f})",
    )
    bar.update_layout(xaxis_title=bar_xaxis_title, yaxis_title="images count")

    histogram.show()
    bar.show()

In [None]:
def uniform_sampler(dataset):
    classes_weights = 1 / np.unique(dataset.labels, return_counts=True)[1]
    samples_weights = classes_weights[dataset.labels]
    uniform_sampler = WeightedRandomSampler(samples_weights, len(samples_weights), replacement=True)
    return uniform_sampler

In [None]:
show_images(images_root)

In [None]:
transforms = tts.Compose([tts.Resize(160), tts.ToTensor(), tts.Normalize((0.485, 0.456, 0.406), (0.229, 0.224, 0.225))])

aug = None

## Training Data

In [None]:
train_dataset = CelebADataset(images_root, train_split_file, labels_file, "train", transforms, aug)

In [None]:
dataset_distribution(train_dataset)

In [None]:
train_sampler = uniform_sampler(train_dataset)
dataset_distribution(train_dataset, sampler=train_sampler)

## Validation Data

In [None]:
val_dataset = CelebADataset(images_root, train_split_file, labels_file, "val", transforms, aug)

In [None]:
missing_val_labels = 1000 - len(np.unique(val_dataset.labels))
missing_val_labels

In [None]:
dataset_distribution(val_dataset)

## Test Data

In [None]:
test_dataset = CelebADataset(images_root, train_split_file, labels_file, "test", transforms, aug)

In [None]:
missing_test_labels = 1000 - len(np.unique(test_dataset.labels))
missing_test_labels

In [None]:
dataset_distribution(test_dataset)

# Training

## Training Functions

In [None]:
class TrainingPlot:
    def __init__(self, epochs, log_score, score_name="score", short_score_name="score"):
        self.display = display.display(display_id=True)
        self.log_score = log_score
        self.score_name = score_name
        self.short_score_name = short_score_name

        if log_score:
            fig = make_subplots(rows=1, cols=2, column_widths=[0.5, 0.5])
            fig.add_trace(go.Scatter(name="train", marker_color="blue", showlegend=False), row=1, col=2)
            fig.add_trace(go.Scatter(name="val", marker_color="red", showlegend=False), row=1, col=2)
            fig.update_xaxes(title="epoch", range=[0, epochs], tickformat=",d", row=1, col=2)
            fig.update_yaxes(title=score_name, range=[-0.1, 1.1], row=1, col=2)
        else:
            fig = make_subplots(rows=1, cols=1)
            fig.update_layout(width=600)
        fig.add_trace(go.Scatter(name="train", marker_color="blue"), row=1, col=1)
        fig.add_trace(go.Scatter(name="val", marker_color="red"), row=1, col=1)
        fig.update_xaxes(title="epoch", range=[0, epochs], tickformat=",d", row=1, col=1)
        fig.update_yaxes(title="loss", row=1, col=1)
        fig.update_layout(title_text="Training Progress")

        self.fig = go.FigureWidget(fig)

    def show(self):
        self.display.update(self.fig)

    def update(self, log):
        self.fig.data[2].y = log["train_loss"]
        self.fig.data[3].y = log["val_loss"]
        if self.log_score:
            self.fig.data[0].y = log[f"train_{self.short_score_name}"]
            self.fig.data[1].y = log[f"val_{self.short_score_name}"]

    def close(self, train_log, test_log=None):
        display.clear_output()
        self.fig.show()
        for epoch in range(len(train_log["train_loss"])):
            if self.log_score:
                print(
                    f"epoch {epoch+1}: "
                    f"train loss: {train_log['train_loss'][epoch]:.3f} val loss: {train_log['val_loss'][epoch]:.3f} "
                    f"train {self.short_score_name}: {train_log[f'train_{self.short_score_name}'][epoch]:.3f} "
                    f"val {self.short_score_name}: {train_log[f'val_{self.short_score_name}'][epoch]:.3f}"
                )
            else:
                print(
                    f"epoch {epoch+1}: "
                    f"train loss: {train_log['train_loss'][epoch]:.3f} val loss: {train_log['val_loss'][epoch]:.3f}"
                )
        if test_log:
            if self.log_score:
                print(
                    f"\ncurrent test loss: {test_log['curr_test_loss']:.3f}, "
                    f"current test {self.short_score_name}: {test_log[f'curr_test_{self.short_score_name}']:.3f}"
                )
                print(
                    f"best test loss: {test_log['best_test_loss']:.3f}, "
                    f"best test {self.short_score_name}: {test_log[f'best_test_{self.short_score_name}']:.3f}"
                )
            else:
                print(f"\ncurrent test loss: {test_log['curr_test_loss']:.3f}")
                print(f"best test loss: {test_log['best_test_loss']:.3f}")

In [None]:
class Trainer:
    def __init__(
        self,
        model,
        criterion,
        optimizer,
        train_dataset,
        val_dataset,
        test_dataset,
        epochs=10,
        batch_size=64,
        sampler=None,
        scheduler=None,
    ):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.epochs = epochs
        self.batch_size = batch_size
        self.sampler = sampler
        self.scheduler = scheduler
        self.loss = None
        self.score = 0
        self.best_model = deepcopy(model)
        self.best_loss = None
        self.best_score = 0
        self.train_log = {"train_loss": [], "val_loss": [], "train_acc": [], "val_acc": []}
        self.test_log = {
            "curr_test_loss": None,
            "best_test_loss": None,
            "curr_test_acc": None,
            "best_test_acc": None,
        }
        self.training_plot = None

        self.train_dataset = train_dataset
        self.val_dataset = val_dataset
        self.test_dataset = test_dataset
        self.datasets = {"train": self.train_dataset, "val": self.val_dataset, "test": self.test_dataset}

        train_shuffle = True if not sampler else False
        self.train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=train_shuffle, sampler=sampler)
        self.val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
        self.test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
        self.loaders = {"train": self.train_loader, "val": self.val_loader, "test": self.test_loader}

    def train(self, epochs=None, scheduler=None, log_score=False, plot=True):
        if epochs:
            self.epochs = epochs
        if scheduler:
            self.scheduler = scheduler
        if plot:
            self.training_plot = TrainingPlot(self.epochs, log_score, "accuracy", "acc")
            self.training_plot.show()

        for epoch in tqdm(range(self.epochs), leave=False, desc="Epoch"):
            train_loss, train_score = self.train_epoch()
            val_loss, val_score = self.evaluate(mode="val")
            if self.scheduler:
                self.scheduler.step()

            if epoch == 0:
                self.loss = val_loss
                self.best_loss = val_loss

            self.update_log(epoch, train_loss, train_score, val_loss, val_score, log_score)
            if plot:
                self.training_plot.update(self.train_log)

        self.loss, self.score = self.evaluate(self.model)
        if log_score:
            print(f"\ncurrent model test loss: {self.loss:.3f}, current model test acc: {self.score:.3f}")
        else:
            print(f"\ncurrent model test loss: {self.loss:.3f}")

        self.best_loss, self.best_score = self.evaluate(self.best_model)
        if log_score:
            print(f"best model test loss: {self.best_loss:.3f}, best model test acc: {self.best_score:.3f}")
        else:
            print(f"best model test loss: {self.best_loss:.3f}")

        self.test_log = {
            "curr_test_loss": self.loss,
            "best_test_loss": self.best_loss,
            "curr_test_acc": self.score,
            "best_test_acc": self.best_score,
        }

        if plot:
            self.training_plot.close(self.train_log, self.test_log)

    def train_epoch(self):
        self.model.train()
        train_loss = 0
        train_score = 0

        for X_batch, y_batch in tqdm(self.train_loader, leave=False, desc="Batch"):
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            y_logits = self.model(X_batch)
            y_pred = torch.argmax(y_logits, -1)

            self.optimizer.zero_grad()
            loss = self.criterion(y_logits, y_batch)
            loss.backward()
            self.optimizer.step()

            train_loss += loss.item()
            train_score += (y_pred == y_batch).float().mean().item()

        train_loss /= len(self.train_loader)
        train_score /= len(self.train_loader)

        return train_loss, train_score

    def evaluate(self, model=None, mode="test", stats=False):
        if not model:
            model = self.model
        model.eval()
        test_loss = 0
        test_score = 0
        false_labels = []

        for X_batch, y_batch in tqdm(self.loaders[mode], leave=False, desc="Batch"):
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            with torch.no_grad():
                y_logits = model(X_batch)
                y_pred = torch.argmax(y_logits, -1)
                loss = self.criterion(y_logits, y_batch)

                test_loss += loss.item()
                test_score += (y_pred == y_batch).float().mean().item()
                if stats:
                    false_labels.append(y_batch[y_pred != y_batch])

        test_loss /= len(self.loaders[mode])
        test_score /= len(self.loaders[mode])

        if stats:
            false_labels = torch.concat(false_labels).detach().cpu().numpy()
            false_labels, labels_count = np.unique(false_labels, return_counts=True)
            display.clear_output()
            print(f"Accuracy: {test_score:.3f}")
            print(f"Loss: {test_loss:.3f}\n")
            print(f"Unique false classes: {len(false_labels)}")
            print(f"False labels per class mean: {labels_count.mean():.3f}")
            print(f"False labels per class std: {labels_count.std():.3f}")
            print(f"Max false labels per class: {labels_count.max()}")
        else:
            return test_loss, test_score

    def update_log(self, epoch, train_loss, train_score, val_loss, val_score, log_score):
        self.train_log["train_loss"].append(train_loss)
        self.train_log["val_loss"].append(val_loss)

        if log_score:
            self.train_log["train_acc"].append(train_score)
            self.train_log["val_acc"].append(val_score)
            print(
                f"epoch {epoch+1}: "
                f"train loss: {train_loss:.3f} val loss: {val_loss:.3f} "
                f"train acc: {train_score:.3f} val acc: {val_score:.3f}"
            )
        else:
            print(f"epoch {epoch+1}: " f"train loss: {train_loss:.3f} val loss: {val_loss:.3f} ")

        if val_loss < self.best_loss:
            self.best_model = deepcopy(self.model)
            self.best_loss = val_loss
            if log_score:
                self.best_score = val_score

    def load_model(self, path):
        return self.model.load_state_dict(torch.load(path, map_location=device))

    def save_model(self, model=None, path=None):
        if not model:
            model = self.model
        if not path:
            if not os.path.exists("model.pt"):
                torch.save(model.state_dict(), "model.pt")
            else:
                i = 1
                while os.path.exists(f"model {i}.pt"):
                    i += 1
                torch.save(model.state_dict(), f"model {i}.pt")
        else:
            if os.path.exists(path):
                user = input("Do you want to replace the existing model? [Y/N]: ")
                if user == "Y":
                    torch.save(model.state_dict(), path)
                else:
                    self.save_model(model)
            torch.save(model.state_dict(), path)

## Model & Hyperparameters

In [None]:
model = inception_resnet_v1.InceptionResnetV1(pretrained="vggface2", classify=False, num_classes=1000)

In [None]:
for param in model.parameters():
    param.requires_grad = False
model.repeat_3.requires_grad = True
model.block8.requires_grad = True

In [None]:
model.last_linear = nn.Sequential(nn.Linear(1792, 1024), nn.ReLU(inplace=True), nn.Linear(1024, 1000))
model.last_bn = nn.Identity()

In [None]:
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(
    list(model.repeat_3.parameters()) + list(model.block8.parameters()) + list(model.last_linear.parameters()),
    lr=3e-4,
)
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)
sampler = train_sampler

## Training Process

In [None]:
trainer = Trainer(
    model,
    criterion,
    optimizer,
    train_dataset,
    val_dataset,
    test_dataset,
    sampler=sampler,
    scheduler=scheduler,
)

In [None]:
trainer.train(epochs=20, log_score=True)

## Evaluating Model

In [None]:
trainer.evaluate(model, stats=True)

**False labels per class - only for false classes**

In [None]:
trainer.evaluate(trainer.best_model, stats=True)

In [None]:
trainer.save_model(path="drive/MyDrive/Models/CE_model_906.pt")

In [None]:
trainer.load_model(path="drive/MyDrive/Models/CE_model_906.pt")

# Comparing embeddings

In [None]:
model.last_linear = nn.Identity()
model.last_bn = nn.Identity()

In [None]:
def cossim_compare(model, tensors, *other_tensors, plot=False):
    """Compute cosine similarities of given tensors
    If only tensors are provided, compute elementwise cosine similarities
    If other tensors are provied, compute cosine similarities
    between tensors and other tensors
    """

    model.eval()
    scores = []

    if type(tensors) == list:
        tensors = torch.stack(tensors)
    embeddings = model(tensors.to(device))

    if other_tensors:

        # check if images are passed as a list
        for i in range(len(other_tensors)):
            if type(other_tensors[i]) == list:
                other_tensors[i] = torch.stack(other_tensors[i])

        # iterate over other_tensors
        for i in tqdm(range(len(other_tensors))):
            embeddings_i = model(other_tensors[i].to(device))
            scores_i = []
            for j in range(len(tensors)):
                scores_i.append(F.cosine_similarity(embeddings[j], embeddings_i))
            scores.append(torch.concat(scores_i))
        display.clear_output()

        if plot:
            fig, axes = plt.subplots(1, len(tensors), figsize=(12, 12))

            # plot main tensors
            for i in range(len(tensors)):
                axes[i].imshow(denormalize(tensors[i]).permute(1, 2, 0))
                axes[i].axis("off")
            plt.text(200, 100, f"Mean Cosine Similarities:", fontsize=14)

            # plot other tensors
            for i in range(len(other_tensors)):
                fig, axes = plt.subplots(1, len(other_tensors[i]), figsize=(12, 12))
                mean_score = round(scores[i].mean().item(), 3)
                if len(other_tensors[i]) == 1:
                    axes.imshow(denormalize(other_tensors[i][0]).permute(1, 2, 0))
                    axes.axis("off")
                else:
                    for j in range(len(other_tensors[i])):
                        axes[j].imshow(denormalize(other_tensors[i][j]).permute(1, 2, 0))
                        axes[j].axis("off")
                plt.text(300, 100, f"{mean_score}", fontsize=14)
            return plt.show()

    else:

        for i in range(len(tensors)):
            for j in range(len(tensors) - i - 1):
                scores.append(F.cosine_similarity(embeddings[i], embeddings[i + j + 1], dim=0).item())

        if plot:
            mean_score = round(torch.tensor(scores).mean().item(), 3)
            fig, axes = plt.subplots(1, len(tensors), figsize=(12, 12))
            for i in range(len(tensors)):
                axes[i].imshow(denormalize(tensors[i]).permute(1, 2, 0))
                axes[i].axis("off")
            plt.text(200, 100, f"Mean Cosine Similarity: {mean_score}", fontsize=14)
            return plt.show()

    return scores

In [None]:
# similarity of diffirent people
# the first row compared to the next ones
cossim_compare(
    model,
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    plot=True,
)

In [None]:
cossim_compare(
    model,
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    plot=True,
)

In [None]:
# similarity of the same people
for i in range(5):
    cossim_compare(model, get_images_by_label(train_dataset, np.random.randint(0, 500), 5), plot=True)

## Cosine Similarity Distribution

In [None]:
# the same person from the train and test datasets (only 5 images)
label = np.random.randint(0, 500)
cossim_compare(
    model,
    get_images_by_label(train_dataset, label, 5),
    get_images_by_label(test_dataset, label, 5),
    plot=True,
)

In [None]:
# cosine similarity of all images of this person from train and test datasets
# every photo from train dataset is compared to every photo from test dataset
train_test_cossim = cossim_compare(
    model, get_images_by_label(train_dataset, label), get_images_by_label(test_dataset, label)
)
train_test_cossim = np.sort(train_test_cossim[0].detach().cpu())

In [None]:
fig = px.line(y=train_test_cossim)
fig.update_layout(
    xaxis_title="sorted images pairs",
    yaxis_title="cosine similarity",
    title=f"Person {label}. Mean: {train_test_cossim.mean():.2f} / "
    f"Std: {train_test_cossim.std():.2f} ({train_test_cossim.std()/train_test_cossim.mean():.2f})",
)

In [None]:
# cosine similarities between this person and others
others_cossim = cossim_compare(
    model,
    get_images_by_label(test_dataset, label),
    *(get_images_by_label(train_dataset, i, 5) for i in range(1000)),
)

In [None]:
others_mean_cossim = np.array([np.array(person_cossim.detach().cpu()).mean() for person_cossim in others_cossim])
fig = px.line(y=others_mean_cossim)
fig.update_layout(
    xaxis_title="people",
    yaxis_title="mean cosine similarity",
    title=f"Person {label} and others. Mean: {others_mean_cossim.mean():.2f} / "
    f"Std: {others_mean_cossim.std():.2f} ({others_mean_cossim.std()/others_mean_cossim.mean():.2f})",
)

**We can see that there is a peak value on the chart. It is the person himself. Now we will sort the scores to find the most similar people to this person.**

In [None]:
others_max_cossim = np.array([np.array(person_cossim.detach().cpu()).max() for person_cossim in others_cossim])
fig = px.line(y=np.sort(others_max_cossim))
fig.update_layout(
    xaxis_title="sorted people",
    yaxis_title="max cosine similarity",
    title=f"Person {label} and others. Mean: {others_max_cossim.mean():.2f} / "
    f"Std: {others_max_cossim.std():.2f} ({others_max_cossim.std()/others_max_cossim.mean():.2f})",
)

In [None]:
similar_people = np.argsort(others_max_cossim)[-5:]
print(f"Similar people: {similar_people[:-1]}")

In [None]:
cossim_compare(
    model,
    get_images_by_label(train_dataset, similar_people[-1], 4),
    *(get_images_by_label(train_dataset, label, 4) for label in similar_people[:-1]),
    plot=True,
)

**As we can see, they are pretty similar.**

## Identification Rate

In [None]:
class IRDataset(Dataset):
    def __init__(self, images_root, transforms, labels_file=None):
        super().__init__()

        self.transforms = transforms
        self.images = list(Path(images_root).glob("**/*.jpg"))
        self.labels = None

        if labels_file:
            with open(labels_file, "r") as labels_f:
                labels = labels_f.readlines()
            labels_by_images = {label.strip().split()[0]: int(label.strip().split()[-1]) for label in labels}
            self.labels = torch.LongTensor([labels_by_images[str(image).split("/")[-1]] for image in self.images])

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = self.images[idx]
        image = Image.open(image)
        image = self.transforms(image)
        if self.labels is not None:
            return image, self.labels[idx]
        return image, 0

In [None]:
query_dataset = IRDataset(query_images_root, transforms, query_labels_file)
distractors_dataset = IRDataset(distractors_images_root, transforms)

In [None]:
dataset_distribution(query_dataset)

In [None]:
len(distractors_dataset)

In [None]:
def save_pickle(obj, filename):
    with open(filename, "wb") as pickle_file:
        pickle.dump(obj, pickle_file)

In [None]:
def load_pickle(filename):
    with open(filename, "rb") as pickle_file:
        obj = pickle.load(pickle_file)
    return obj

### True Pairs

In [None]:
# cosine similarities of all persons from the query dataset
true_pairs = []
for i in tqdm(query_dataset.labels.unique()):
    true_pairs += cossim_compare(model, get_images_by_label(query_dataset, i))
true_pairs = np.sort(true_pairs)
save_pickle(true_pairs, "drive/MyDrive/Datasets/true_pairs.pickle")

In [None]:
true_pairs = load_pickle("drive/MyDrive/Datasets/true_pairs.pickle")
print(f"Number of all true pairs: {len(true_pairs)}")

In [None]:
not_fine_tuned_true_pairs = load_pickle("drive/MyDrive/Datasets/not_fine_tuned_true_pairs.pickle")

In [None]:
fig = go.Figure()
fig.add_trace(go.Scatter(y=true_pairs, name="Fine-tuned"))
fig.add_trace(go.Scatter(y=not_fine_tuned_true_pairs, name="Not Fine-tuned"))
fig.update_layout(
    xaxis_title="sorted pairs",
    yaxis_title="cosine similarity",
    title=f"True pairs. Mean: {true_pairs.mean():.2f} / "
    f"Std: {true_pairs.std():.2f} ({true_pairs.std() / true_pairs.mean():.2f})",
)

**On the not fine-tuned model the mean is 0.54, the std: 0.18**

### False Pairs

In [None]:
def cossim_compare_elementwise(model, *tensors):
    """Elementwise compare images in tensors
    Each tensor in tensors should contain several images
    """

    model.eval()
    scores = []

    # check if tensors are passed as a list
    if len(tensors) == 1 and type(tensors[0]) == list:
        tensors = tensors[0]
    # check if images are passed as a list
    for i in range(len(tensors)):
        if type(tensors[i]) == list:
            tensors[i] = torch.stack(tensors[i])
    # compute embeddings
    embeddings = []
    for i in range(len(tensors)):
        embeddings.append(model(tensors[i].to(device)).cpu())

    # iterate over classes
    for i in tqdm(range(len(tensors))):
        # iterate over other classes
        for j in range(len(tensors) - i - 1):
            # iterate over images of the first class
            for k in range(len(tensors[i])):
                scores.append(F.cosine_similarity(embeddings[i][k], embeddings[i + j + 1]))

    return torch.concat(scores)

In [None]:
# cosine similarities of all pairs of different people from the query dataset
false_pairs = cossim_compare_elementwise(
    model, [get_images_by_label(query_dataset, label) for label in query_dataset.labels.unique()]
)
false_pairs = np.sort(false_pairs.cpu())
print(f"Number of false query pairs: {len(false_pairs)}")

In [None]:
def cossim_compare_datasets(model, dataset_1, dataset_2):
    model.eval()
    scores = []

    small_dataset = dataset_1 if len(dataset_1) < len(dataset_2) else dataset_2
    big_dataset = dataset_2 if len(dataset_1) < len(dataset_2) else dataset_1

    small_loader = DataLoader(small_dataset, batch_size=64)
    big_loader = DataLoader(big_dataset, batch_size=64)

    small_embeddings = []
    for X_batch, _ in tqdm(small_loader, desc="First dataset"):
        small_embeddings.append(model(X_batch.to(device)))
    small_embeddings = torch.concat(small_embeddings)

    for X_batch, _ in tqdm(big_loader, desc="Second dataset"):
        embeddings_batch = model(X_batch.to(device))
        for embedding in embeddings_batch:
            scores.append(F.cosine_similarity(embedding, small_embeddings))

    return torch.concat(scores)

In [None]:
# cosine similarities of all pairs between query and distractors images
distractors_pairs = cossim_compare_datasets(model, query_dataset, distractors_dataset)
distractors_pairs = np.array(distractors_pairs.cpu())
print(f"Number of distractors and query pairs: {len(distractors_pairs)}")

In [None]:
false_pairs = np.sort(np.append(false_pairs, distractors_pairs))
save_pickle(false_pairs, "drive/MyDrive/Datasets/false_pairs.pickle")

In [None]:
false_pairs = load_pickle("drive/MyDrive/Datasets/false_pairs.pickle")
print(f"Number of all false pairs: {len(false_pairs)}")

In [None]:
not_fine_tuned_false_pairs = load_pickle("drive/MyDrive/Datasets/not_fine_tuned_false_pairs.pickle")

In [None]:
# only 1/2500 of all false pairs are plotted
fig = go.Figure()
fig.add_trace(go.Scatter(y=false_pairs[[i % 2500 == 0 for i in range(len(false_pairs))]], name="Fine-tuned"))
fig.add_trace(
    go.Scatter(y=not_fine_tuned_false_pairs[[i % 2500 == 0 for i in range(len(false_pairs))]], name="Not Fine-tuned")
)
fig.update_layout(
    xaxis_title="sorted pairs",
    yaxis_title="cosine similarity",
    title=f"False pairs. Mean: {false_pairs.mean():.2f} / "
    f"Std: {false_pairs.std():.2f} ({false_pairs.std() / false_pairs.mean():.2f})",
)

**For not fine-tuned model the mean is 0.11, the std: 0.15**

### Metric

In [None]:
FPR = 0.01
N = int(len(false_pairs) * FPR)
threshold = false_pairs[-N]
threshold

In [None]:
TPR = sum(true_pairs > threshold) / len(true_pairs)
TPR

In [None]:
def IR(true_pairs, false_pairs, fpr=0.01):
    N = int(len(false_pairs) * fpr)
    threshold = false_pairs[-N]
    tpr = sum(true_pairs > threshold) / len(true_pairs)
    return tpr

In [None]:
for fpr in [0.5, 0.2, 0.1, 0.05, 0.01]:
    print(f"FPR: {fpr} IR: {round(IR(true_pairs, false_pairs, fpr), 3)}")

**For not fine-tuned model**

FPR: 0.5 IR: 0.98

FPR: 0.2 IR: 0.934

FPR: 0.1 IR: 0.885

FPR: 0.05 IR: 0.825

FPR: 0.01 IR: 0.658

# Loss Functions

## Triplet Loss

### Data preparation

In [None]:
class TripletDataset(CelebADataset):
    def __init__(self, images_root, train_split_file, labels_file, transforms, n_images=1):
        super().__init__(images_root, train_split_file, labels_file, "train", transforms)

        self.n_images = n_images
        self.unique_labels = self.labels.unique().tolist()
        self.images_by_labels = {}
        with open(labels_file, "r") as labels_f:
            labels = labels_f.readlines()
        for label in labels:
            self.images_by_labels.setdefault(int(label.strip().split()[-1]), [])
            self.images_by_labels[int(label.strip().split()[-1])].append(
                os.path.join(images_root, label.strip().split()[0])
            )

    def __getitem__(self, idx):
        anchor_image = Image.open(self.images[idx])
        anchor_image = self.transforms(anchor_image)
        anchor_images = torch.stack([anchor_image for i in range(self.n_images)])
        label = int(self.labels[idx])

        # get paths of all positive images and remove the anchor
        positive_paths = self.images_by_labels[label].copy()
        positive_paths.remove(self.images[idx])
        # select random paths
        positive_paths = random.choices(positive_paths, k=self.n_images)
        positive_images = []
        for path in positive_paths:
            image = Image.open(path)
            image = self.transforms(image)
            positive_images.append(image)
        if len(positive_images) > 1:
            positive_images = torch.stack(positive_images)
        else:
            positive_images = positive_images[0].unsqueeze(0)

        # get all labels and remove the anchor label
        negative_labels = self.unique_labels.copy()
        negative_labels.remove(label)
        # select random labels
        negative_labels = random.choices(negative_labels, k=self.n_images)
        # select random paths
        negative_paths = [random.choice(self.images_by_labels[label]) for label in negative_labels]
        negative_images = []
        for path in negative_paths:
            image = Image.open(path)
            image = self.transforms(image)
            negative_images.append(image)
        if len(negative_images) > 1:
            negative_images = torch.stack(negative_images)
        else:
            negative_images = negative_images[0].unsqueeze(0)

        return anchor_images, positive_images, negative_images

In [None]:
triplet_dataset = TripletDataset(images_root, train_split_file, labels_file, transforms, 1)

### Train Functions

In [None]:
class TripletPlot(TrainingPlot):
    def __init__(self, epochs, log_score):
        super().__init__(epochs, log_score, "cosine similarity", "cossim")

        if log_score:
            self.fig.data = [self.fig.data[0], self.fig.data[2]]
        else:
            self.fig.data = [self.fig.data[0]]

    def update(self, log):
        self.fig.data[0].y = log["train_loss"]
        if self.log_score:
            self.fig.data[1].y = log["val_cossim"]

    def close(self, train_log, test_log=None):
        display.clear_output()
        self.fig.show()
        for epoch in range(len(train_log["train_loss"])):
            if self.log_score:
                print(
                    f"epoch {epoch+1}: "
                    f"train loss: {train_log['train_loss'][epoch]:.3f} "
                    f"val cossim: {train_log[f'val_cossim'][epoch]:.3f}"
                )
            else:
                print(f"epoch {epoch+1}: " f"train loss: {train_log['train_loss'][epoch]:.3f}")
        if test_log:
            print(f"\ncurrent test cossim: {test_log[f'curr_test_cossim']:.3f}")
            print(f"best test cossim: {test_log[f'best_test_cossim']:.3f}")

In [None]:
class TripletTrainer(Trainer):
    def __init__(
        self,
        model,
        criterion,
        optimizer,
        base_dataset,
        train_dataset,
        val_dataset,
        test_dataset,
        epochs=10,
        batch_size=64,
        sampler=None,
        scheduler=None,
    ):

        super().__init__(
            model,
            criterion,
            optimizer,
            train_dataset,
            val_dataset,
            test_dataset,
            epochs=epochs,
            batch_size=batch_size,
            sampler=sampler,
            scheduler=scheduler,
        )

        self.std = 0
        self.best_std = 0
        self.base_dataset = base_dataset
        self.base_loader = DataLoader(base_dataset, 64)
        self.train_log = {"train_loss": [], "val_cossim": []}
        self.test_log = {"curr_test_cossim": None, "best_test_cossim": None}

    def train(self, epochs=None, scheduler=None, log_score=True, plot=True):
        if epochs:
            self.epochs = epochs
        if scheduler:
            self.scheduler = scheduler
        if plot:
            self.training_plot = TripletPlot(self.epochs, log_score)
            self.training_plot.show()

        for epoch in tqdm(range(self.epochs), leave=False, desc="Epoch"):
            train_loss = self.train_epoch()
            val_score = 0
            if log_score:
                val_score, val_std = self.evaluate(mode="val")
            if self.scheduler:
                self.scheduler.step()

            if epoch == 0:
                self.loss = train_loss
                self.best_loss = train_loss

            self.update_log(epoch, train_loss, val_score, log_score)
            if plot:
                self.training_plot.update(self.train_log)

        self.score, self.std = self.evaluate(self.model)
        print(f"\ncurrent model test cossim: {self.score:.3f}")

        self.best_score, self.best_std = self.evaluate(self.best_model)
        print(f"best model test cossim: {self.best_score:.3f}")

        self.test_log = {"curr_test_cossim": self.score, "best_test_cossim": self.best_score}

        if plot:
            self.training_plot.close(self.train_log, self.test_log)

    def train_epoch(self):
        self.model.train()
        train_loss = 0

        for anchor, positive, negative in tqdm(self.train_loader, leave=False, desc="Batch"):

            anchor = anchor.reshape(-1, 3, 195, 160)
            positive = positive.reshape(-1, 3, 195, 160)
            negative = negative.reshape(-1, 3, 195, 160)

            embeddings = model(torch.concat([anchor, positive, negative]).to(device))

            anchor_embedding = embeddings[: len(anchor)]
            positive_embedding = embeddings[len(anchor) : 2 * len(anchor)]
            negative_embedding = embeddings[2 * len(anchor) :]

            self.optimizer.zero_grad()
            loss = self.criterion(anchor_embedding, positive_embedding, negative_embedding)
            loss.backward()
            self.optimizer.step()

            train_loss += loss.item()

        train_loss /= len(self.train_loader)

        return train_loss

    def evaluate(self, model=None, mode="test", stats=False):
        if not model:
            model = self.model
        model.eval()
        test_score = 0
        test_std = 0

        mean_embeddings = []
        for label in tqdm(self.train_dataset.unique_labels, leave=False, desc="Embeddings"):
            mean_embeddings.append(model(get_images_by_label(self.base_dataset, label).to(device)).mean(axis=0))
        mean_embeddings = torch.stack(mean_embeddings)

        for X_batch, y_batch in tqdm(self.loaders[mode], leave=False, desc="Batch"):
            X_batch = X_batch.to(device)

            with torch.no_grad():
                embeddings = model(X_batch)
                score = F.cosine_similarity(embeddings, mean_embeddings[y_batch])
                test_score += score.mean().item()
                test_std += score.std().item()

        test_score /= len(self.loaders[mode])
        test_std /= len(self.loaders[mode])

        if stats:
            display.clear_output()
            print(f"Cosine Similarity mean: {test_score:.3f}")
            print(f"Cosine Similarity std: {test_std:.3f}")
        else:
            return test_score, test_std

    def update_log(self, epoch, train_loss, val_score, log_score):
        self.train_log["train_loss"].append(train_loss)
        if log_score:
            self.train_log["val_cossim"].append(val_score)
            print(f"epoch {epoch+1}: train loss: {train_loss:.3f} val cossim: {val_score:.3f}")
        else:
            print(f"epoch {epoch+1}: train loss:{train_loss:.3f}")

        if train_loss < self.best_loss:
            self.best_model = deepcopy(self.model)
            self.best_loss = train_loss
            if log_score:
                self.best_score = val_score

In [None]:
def cosine_distance(x1, x2):
    return 1 - F.cosine_similarity(x1, x2)

### Model & Hyperparameters

In [None]:
model = inception_resnet_v1.InceptionResnetV1(pretrained="vggface2", classify=False, num_classes=1000)

In [None]:
for param in model.parameters():
    param.requires_grad = False

In [None]:
model.last_linear = nn.Linear(1792, 1792)
model.last_bn = nn.BatchNorm1d(1792)

In [None]:
model = model.to(device)
criterion = nn.TripletMarginWithDistanceLoss(distance_function=cosine_distance)
optimizer = optim.AdamW(list(model.last_linear.parameters()) + list(model.last_bn.parameters()), lr=3e-4)
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)
sampler = train_sampler

### Training Process

In [None]:
trainer = TripletTrainer(
    model,
    criterion,
    optimizer,
    train_dataset,
    triplet_dataset,
    val_dataset,
    test_dataset,
    sampler=sampler,
    scheduler=scheduler,
)

In [None]:
trainer.train(epochs=20, log_score=False)

### Evaluating Model

In [None]:
trainer.evaluate(model, stats=True)

In [None]:
trainer.evaluate(trainer.best_model, stats=True)

In [None]:
trainer.save_model(path="drive/MyDrive/Models/triplet_model_872.pt")

In [None]:
trainer.load_model(path="drive/MyDrive/Models/triplet_model_872.pt")

### Comparing Embeddings

In [None]:
# similarities of diffirent people
cossim_compare(
    model,
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    plot=True,
)

In [None]:
# similarity of the same people
for i in range(5):
    cossim_compare(model, get_images_by_label(train_dataset, np.random.randint(0, 500), 5), plot=True)

In [None]:
# the same person from the train and test datasets (only 5 images)
label = np.random.randint(0, 500)
cossim_compare(
    model,
    get_images_by_label(train_dataset, label, 5),
    get_images_by_label(test_dataset, label, 5),
    plot=True,
)

In [None]:
# cosine similarity of all images of this person from train and test datasets
train_test_cossim = cossim_compare(
    model, get_images_by_label(train_dataset, label), get_images_by_label(test_dataset, label)
)
train_test_cossim = np.sort(train_test_cossim[0].detach().cpu())

In [None]:
fig = px.line(y=train_test_cossim)
fig.update_layout(
    xaxis_title="sorted images pairs",
    yaxis_title="cosine similarity",
    title=f"Person {label}. Mean: {train_test_cossim.mean():.2f} / "
    f"Std: {train_test_cossim.std():.2f} ({train_test_cossim.std()/train_test_cossim.mean():.2f})",
)

In [None]:
# cosine similarities between this person and others
others_cossim = cossim_compare(
    model,
    get_images_by_label(test_dataset, label),
    *(get_images_by_label(train_dataset, i, 5) for i in range(1000)),
)

In [None]:
others_mean_cossim = np.array([np.array(person_cossim.detach().cpu()).mean() for person_cossim in others_cossim])
fig = px.line(y=others_mean_cossim)
fig.update_layout(
    xaxis_title="people",
    yaxis_title="mean cosine similarity",
    title=f"Person {label} and others. Mean: {others_mean_cossim.mean():.2f} / "
    f"Std: {others_mean_cossim.std():.2f} ({others_mean_cossim.std()/others_mean_cossim.mean():.2f})",
)

In [None]:
others_max_cossim = np.array([np.array(person_cossim.detach().cpu()).max() for person_cossim in others_cossim])
fig = px.line(y=np.sort(others_max_cossim))
fig.update_layout(
    xaxis_title="sorted people",
    yaxis_title="max cosine similarity",
    title=f"Person {label} and others. Mean: {others_max_cossim.mean():.2f} / "
    f"Std: {others_max_cossim.std():.2f} ({others_max_cossim.std()/others_max_cossim.mean():.2f})",
)

In [None]:
similar_people = np.argsort(others_max_cossim)[-5:]
print(f"Similar people: {similar_people[:-1]}")

In [None]:
cossim_compare(
    model,
    get_images_by_label(train_dataset, similar_people[-1], 4),
    *(get_images_by_label(train_dataset, label, 4) for label in similar_people[:-1]),
    plot=True,
)

## ArcFace Loss

### Train Functions

In [None]:
class ArcFaceTrainer(Trainer):
    def __init__(
        self,
        model,
        criterion,
        optimizer,
        train_dataset,
        val_dataset,
        test_dataset,
        epochs=10,
        batch_size=64,
        sampler=None,
        scheduler=None,
    ):

        super().__init__(
            model,
            criterion,
            optimizer,
            train_dataset,
            val_dataset,
            test_dataset,
            epochs=epochs,
            batch_size=batch_size,
            sampler=sampler,
            scheduler=scheduler,
        )

    def train_epoch(self):
        self.model.train()
        train_loss = 0
        train_score = 0

        for X_batch, y_batch in tqdm(self.train_loader, leave=False, desc="Batch"):
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            self.optimizer.zero_grad()
            loss, y_pred = self.criterion(self.model(X_batch), y_batch, y_pred=True)
            loss.backward()
            self.optimizer.step()

            train_loss += loss.item()
            train_score += (y_pred == y_batch).float().mean().item()

        train_loss /= len(self.train_loader)
        train_score /= len(self.train_loader)

        return train_loss, train_score

    def evaluate(self, model=None, mode="test", stats=False):
        if not model:
            model = self.model
        model.eval()
        test_loss = 0
        test_score = 0
        false_labels = []

        for X_batch, y_batch in tqdm(self.loaders[mode], leave=False, desc="Batch"):
            X_batch = X_batch.to(device)
            y_batch = y_batch.to(device)

            with torch.no_grad():
                loss, y_pred = self.criterion(model(X_batch), y_batch, y_pred=True)
                test_loss += loss.item()
                test_score += (y_pred == y_batch).float().mean().item()
                if stats:
                    false_labels.append(y_batch[y_pred != y_batch])

        test_loss /= len(self.loaders[mode])
        test_score /= len(self.loaders[mode])

        if stats:
            false_labels = torch.concat(false_labels).detach().cpu().numpy()
            false_labels, labels_count = np.unique(false_labels, return_counts=True)
            display.clear_output()
            print(f"Accuracy: {test_score:.3f}")
            print(f"Loss: {test_loss:.3f}\n")
            print(f"Unique false classes: {len(false_labels)}")
            print(f"False labels per class mean: {labels_count.mean():.3f}")
            print(f"False labels per class std: {labels_count.std():.3f}")
            print(f"Max false labels per class: {labels_count.max()}")
        else:
            return test_loss, test_score

In [None]:
# reimplementation of https://github.com/cvqluu/Angular-Penalty-Softmax-Losses-Pytorch/blob/master/loss_functions.py
class ArcFaceLoss(nn.Module):
    def __init__(self, in_features, out_features, eps=1e-7, s=None, m=None):
        super().__init__()
        self.s = 64.0 if not s else s
        self.m = 0.5 if not m else m

        self.in_features = in_features
        self.out_features = out_features
        self.fc = nn.Linear(in_features, out_features, bias=False).to(device)
        self.eps = eps

    def forward(self, x, labels, y_pred=False):
        for param in self.fc.parameters():
            param = F.normalize(param, p=2, dim=1)
        x = F.normalize(x, p=2, dim=1)
        y_logits = self.fc(x)

        cos = torch.clamp(torch.diagonal(torch.transpose(y_logits, 0, 1)[labels]), -1.0 + self.eps, 1 - self.eps)
        numerator = self.s * torch.cos(torch.acos(cos) + self.m)
        excluded = torch.cat(
            [torch.cat((y_logits[i, :y], y_logits[i, y + 1 :])).unsqueeze(0) for i, y in enumerate(labels)],
            dim=0,
        )
        denominator = torch.exp(numerator) + torch.sum(torch.exp(self.s * excluded), dim=1)
        loss = numerator - torch.log(denominator)

        if y_pred:
            return -torch.mean(loss), torch.argmax(y_logits, -1)
        return -torch.mean(loss)

### Model & Hyperparameters

In [None]:
model = inception_resnet_v1.InceptionResnetV1(pretrained="vggface2", classify=False, num_classes=1000)

In [None]:
for param in model.parameters():
    param.requires_grad = False
model.repeat_3.requires_grad = True
model.block8.requires_grad = True

In [None]:
model.last_linear = nn.Sequential(nn.Linear(1792, 1024), nn.ReLU(inplace=True))
model.last_bn = nn.Identity()

In [None]:
model = model.to(device)
criterion = ArcFaceLoss(1024, 1000, s=16, m=0.25)
optimizer = optim.AdamW(
    list(model.repeat_3.parameters())
    + list(model.block8.parameters())
    + list(model.last_linear.parameters())
    + list(model.last_bn.parameters())
    + list(criterion.fc.parameters()),
    lr=3e-4,
)
scheduler = StepLR(optimizer, step_size=10, gamma=0.1)
sampler = train_sampler

### Training Process

In [None]:
trainer = ArcFaceTrainer(
    model,
    criterion,
    optimizer,
    train_dataset,
    val_dataset,
    test_dataset,
    sampler=sampler,
    scheduler=scheduler,
)

In [None]:
trainer.train(epochs=20, log_score=True)

### Evaluating Model

In [None]:
trainer.evaluate(model, stats=True)

In [None]:
trainer.evaluate(trainer.best_model, stats=True)

In [None]:
trainer.save_model(path="drive/MyDrive/Models/arcface_model_930.pt", model=trainer.best_model)

In [None]:
trainer.load_model(path="drive/MyDrive/Models/arcface_model_932.pt")

### Comparing Embeddings

In [None]:
# similarities of diffirent people
cossim_compare(
    model,
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    get_images_by_label(train_dataset, np.random.randint(0, 500), 5),
    plot=True,
)

In [None]:
# similarity of the same people
for i in range(5):
    cossim_compare(model, get_images_by_label(train_dataset, np.random.randint(0, 500), 5), plot=True)

In [None]:
# the same person from the train and test datasets (only 5 images)
label = np.random.randint(0, 500)
cossim_compare(
    model,
    get_images_by_label(train_dataset, label, 5),
    get_images_by_label(test_dataset, label, 5),
    plot=True,
)

In [None]:
# cosine similarity of all images of this person from train and test datasets
train_test_cossim = cossim_compare(
    model, get_images_by_label(train_dataset, label), get_images_by_label(test_dataset, label)
)
train_test_cossim = np.sort(train_test_cossim[0].detach().cpu())

In [None]:
fig = px.line(y=train_test_cossim)
fig.update_layout(
    xaxis_title="sorted images pairs",
    yaxis_title="cosine similarity",
    title=f"Person {label}. Mean: {train_test_cossim.mean():.2f} / "
    f"Std: {train_test_cossim.std():.2f} ({train_test_cossim.std()/train_test_cossim.mean():.2f})",
)

In [None]:
# cosine similarities between this person and others
others_cossim = cossim_compare(
    model,
    get_images_by_label(test_dataset, label),
    *(get_images_by_label(train_dataset, i, 5) for i in range(1000)),
)

In [None]:
others_mean_cossim = np.array([np.array(person_cossim.detach().cpu()).mean() for person_cossim in others_cossim])
fig = px.line(y=others_mean_cossim)
fig.update_layout(
    xaxis_title="people",
    yaxis_title="mean cosine similarity",
    title=f"Person {label} and others. Mean: {others_mean_cossim.mean():.2f} / "
    f"Std: {others_mean_cossim.std():.2f} ({others_mean_cossim.std()/others_mean_cossim.mean():.2f})",
)

In [None]:
others_max_cossim = np.array([np.array(person_cossim.detach().cpu()).max() for person_cossim in others_cossim])
fig = px.line(y=np.sort(others_max_cossim))
fig.update_layout(
    xaxis_title="sorted people",
    yaxis_title="max cosine similarity",
    title=f"Person {label} and others. Mean: {others_max_cossim.mean():.2f} / "
    f"Std: {others_max_cossim.std():.2f} ({others_max_cossim.std()/others_max_cossim.mean():.2f})",
)

In [None]:
similar_people = np.argsort(others_max_cossim)[-5:]
print(f"Similar people: {similar_people[:-1]}")

In [None]:
cossim_compare(
    model,
    get_images_by_label(train_dataset, similar_people[-1], 4),
    *(get_images_by_label(train_dataset, label, 4) for label in similar_people[:-1]),
    plot=True,
)

# Trash Photos

In [None]:
class NewInceptionResnetV1(inception_resnet_v1.InceptionResnetV1):
    def forward(self, x):
        """Calculate embeddings or logits given a batch of input image tensors.
        Arguments:
            x {torch.tensor} -- Batch of image tensors representing faces.
        Returns:
            torch.tensor -- Batch of embedding vectors or multinomial logits.
        """
        x = self.conv2d_1a(x)
        x = self.conv2d_2a(x)
        x = self.conv2d_2b(x)
        x = self.maxpool_3a(x)
        x = self.conv2d_3b(x)
        x = self.conv2d_4a(x)
        x = self.conv2d_4b(x)
        x = self.repeat_1(x)
        x = self.mixed_6a(x)
        x = self.repeat_2(x)
        x = self.mixed_7a(x)
        x = self.repeat_3(x)
        x = self.block8(x)
        x = self.avgpool_1a(x)
        x = self.dropout(x)
        x = self.last_linear(x.view(x.shape[0], -1))
        x = self.last_bn(x)
        # if self.classify:
        #     x = self.logits(x)
        # else:
        #     x = F.normalize(x, p=2, dim=1)
        return x

In [None]:
model = NewInceptionResnetV1(pretrained="vggface2", classify=False, num_classes=1000)

In [None]:
model.last_linear = nn.Sequential(nn.Linear(1792, 1024), nn.ReLU(inplace=True), nn.Linear(1024, 1000))
model.last_bn = nn.Identity()

In [None]:
model = model.to(device)
model.load_state_dict(torch.load("drive/MyDrive/Models/CE_model_906.pt", map_location=device))

In [None]:
def get_embeddings_from_dataset(model, dataset):
    model.eval()

    loader = DataLoader(dataset, batch_size=64)

    with torch.no_grad():
        embeddings = []
        for X_batch, _ in tqdm(loader, desc="batch"):
            embeddings.append(model(X_batch.to(device)))
        embeddings = torch.concat(embeddings)

    return embeddings

In [None]:
test_embeddings = get_embeddings_from_dataset(model, test_dataset)
query_embeddings = get_embeddings_from_dataset(model, query_dataset)
distractors_embeddings = get_embeddings_from_dataset(model, distractors_dataset)

In [None]:
test_norms = torch.norm(test_embeddings, dim=1).sort()
query_norms = torch.norm(query_embeddings, dim=1).sort()
distractors_norms = torch.norm(distractors_embeddings, dim=1).sort()

In [None]:
def show_images_by_indexes(dataset, indexes):
    fig, ax = plt.subplots(1, len(indexes), figsize=(16, 16))
    for i, index in enumerate(indexes):
        ax[i].imshow(denormalize(dataset[index][0]).permute(1, 2, 0))
        ax[i].axis("off")
    plt.show()

In [None]:
show_images_by_indexes(query_dataset, query_norms[1][-10:])

In [None]:
show_images_by_indexes(query_dataset, query_norms[1][:10])

**The images of the first row (with the higest norms) do not really look like something that would be considered "trash photos", however, they look worse than the images of the second row (with the lowest norms)**

In [None]:
show_images_by_indexes(test_dataset, test_norms[1][-10:])

In [None]:
show_images_by_indexes(test_dataset, test_norms[1][:10])

**Test images do not really differ**

In [None]:
show_images_by_indexes(distractors_dataset, distractors_norms[1][-10:])

In [None]:
show_images_by_indexes(distractors_dataset, distractors_norms[1][:10])

**The biggest diffirence is seen in the distractors dataset. Now some images of the first row can be categorized as trash images**