# Notebook with Identification Rate Metric realization
© Реализовать Identification Rate Metric, протестировать ее на предоставленных данных

In [1]:
import os
from copy import deepcopy
from collections import defaultdict
from itertools import combinations

import numpy as np
import pandas as pd
import torch
import torch.nn as nn

from torch.utils.data import Dataset
from torchvision import models
from torchvision.transforms import v2
from torchvision.io import read_image

WORKING_DIR = '/kaggle/working'
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Using device:', DEVICE)

Using device: cpu


## Download data from gdrive

In [2]:
# celebA_ir.rar - An archive (a folder) with pictures (and csv-file)
!gdown 1LsbHPb6pbN0dtnveH1zcHhR2w0iH9Ta9

Downloading...
From: https://drive.google.com/uc?id=1LsbHPb6pbN0dtnveH1zcHhR2w0iH9Ta9
To: /kaggle/working/celebA_ir.rar
100%|██████████████████████████████████████| 7.51M/7.51M [00:00<00:00, 40.6MB/s]


In [3]:
# Package for unarchive
!pip install patool
import patoolib
patoolib.extract_archive(f"{WORKING_DIR}/celebA_ir.rar", outdir="celebA_ir")

Collecting patool
  Downloading patool-3.1.0-py2.py3-none-any.whl.metadata (4.3 kB)
Downloading patool-3.1.0-py2.py3-none-any.whl (98 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m98.4/98.4 kB[0m [31m3.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: patool
Successfully installed patool-3.1.0


INFO patool: Extracting /kaggle/working/celebA_ir.rar ...
INFO patool: ... creating output directory `celebA_ir'.
INFO patool: running /usr/bin/unrar x -kb -or -- /kaggle/working/celebA_ir.rar
INFO patool: ... /kaggle/working/celebA_ir.rar extracted to `celebA_ir'.


'celebA_ir'

In [4]:
celeba_anno_query = pd.read_csv(f"{WORKING_DIR}/celebA_ir/celebA_ir/celebA_anno_query.csv")

In [5]:
class CelebaImageDataset(Dataset):
    def __init__(
        self,
        annotations_file=None,
        img_dir=None,
        transform=None,
        target_transform=None
    ):
        self.img_labels = annotations_file
        self.img_dir = img_dir
        self.transform = transform
        self.target_transform = target_transform

        if self.img_labels is None:
            self.img_labels = []
            for _, _, filenames in os.walk(f"{self.img_dir}"):
                for filename in filenames:
                    self.img_labels.append(filename)
            self.img_labels = pd.DataFrame({'img': self.img_labels})
            self.img_labels['id'] = -1
    
    def __len__(self):
        return len(self.img_labels)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_labels.iloc[idx, 0])
        image = read_image(img_path).to(DEVICE)
        label = torch.tensor(self.img_labels.iloc[idx, 1]).to(DEVICE)
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

    def group_path_by_id(self):
        self.grouped_img_labels = (
            self.img_labels
            .groupby('id')
            .agg(path=pd.NamedAgg(column="img", aggfunc=lambda x: x.tolist()))
            .reset_index()
        )

In [6]:
celeba_distractors = CelebaImageDataset(
    img_dir=f"{WORKING_DIR}/celebA_ir/celebA_ir/celebA_distractors"
)

celeba_query = CelebaImageDataset(
    annotations_file=celeba_anno_query,
    img_dir=f"{WORKING_DIR}/celebA_ir/celebA_ir/celebA_query"
)

celeba_query.group_path_by_id()

## Load model

In [7]:
# Download weights
!gdown 1dMUXx3C5w_cVt30YNCXI6yrx9COUUmqc

Downloading...
From (original): https://drive.google.com/uc?id=1dMUXx3C5w_cVt30YNCXI6yrx9COUUmqc
From (redirected): https://drive.google.com/uc?id=1dMUXx3C5w_cVt30YNCXI6yrx9COUUmqc&confirm=t&uuid=718e38dd-d8b4-4faf-a99f-308797f30ee2
To: /kaggle/working/base_trained_efnet2.pt
100%|██████████████████████████████████████| 34.1M/34.1M [00:00<00:00, 34.2MB/s]


In [8]:
class EfficientNet2(nn.Module):
    def __init__(self, pretrained=False, num_classes=500, bias=True):
        super(EfficientNet2, self).__init__()
        self.model = models.efficientnet_b2(weights=pretrained)
        # Change out_features of last FC-layer
        dim_feats = self.model.classifier[1].in_features
        self.model.classifier[1] = nn.Linear(dim_feats, num_classes, bias=bias)

    def forward(self, x):
        embeddings = self.model.avgpool(self.model.features(x))
        embeddings = embeddings.squeeze()
        output = self.model.classifier(embeddings)
        last_layer_weight = self.model.classifier[1].weight

        return output, embeddings, last_layer_weight

# _____ Define model _____

efnet2 = EfficientNet2().model
efnet2.load_state_dict(torch.load(
    f"{WORKING_DIR}/base_trained_efnet2.pt",
    weights_only=False,
    map_location=torch.device('cpu')
))
efnet2.eval()
print('Everything is good!')



Everything is good!


## Function to compute embeddings

In [9]:
from tqdm import tqdm

def custom_norm(x):
    return x/255

normalize_transformer = v2.Compose([
    custom_norm,
    v2.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
    v2.Resize((224, 224)),
])

def compute_embeddings(model, images_list, batch=64, prefix=''):
    """
    compute embeddings from the trained model for list of images.
    params:
        model: trained nn-model that takes images and outputs embeddings
        images_list: list of images paths to compute embeddings for
    output:
        list: list of model embeddings. Each embedding corresponds to images
          names from images_list
    """
    model.eval()
    embeddings = []
    with torch.no_grad():
        for path in tqdm(images_list):
            image = read_image(prefix+path).to(DEVICE).unsqueeze(0)
            normalized_image = normalize_transformer(image)
            embedding = model.avgpool(model.features(normalized_image))
            embedding = embedding.squeeze(0).cpu().numpy()
            embeddings.append(embedding)

    embeddings = np.array(embeddings)
    embeddings = torch.Tensor(embeddings).squeeze()
    
    return embeddings

## Check it works...

In [10]:
# file with query part annotations: which image belongs to which class
# format:
#     image_name_1.jpg 2678
#     image_name_2.jpg 2679
f = open(f"{WORKING_DIR}/celebA_ir/celebA_ir/celebA_anno_query.csv", 'r')
query_lines = f.readlines()[1:]
f.close()
query_lines = [x.strip().split(',') for x in query_lines]
# lain list of image names from query, needed to compute embeddings for query
query_img_names = [x[0] for x in query_lines]

# dictionary with info of which images from query belong to which class
# format:
#     {class: [image_1, image_2, ...]}
query_dict = defaultdict(list)
for img_name, img_class in query_lines:
  query_dict[img_class].append(img_name)

# list of distractor images
distractors_img_names = os.listdir(f"{WORKING_DIR}/celebA_ir/celebA_ir/celebA_distractors")

In [11]:
query_embeddings = compute_embeddings(efnet2, query_img_names, prefix=f"{WORKING_DIR}/celebA_ir/celebA_ir/celebA_query/")
query_embeddings.shape

100%|██████████| 1222/1222 [01:11<00:00, 17.11it/s]


torch.Size([1222, 1408])

In [12]:
distractors_embeddings = compute_embeddings(efnet2, distractors_img_names, prefix=f"{WORKING_DIR}/celebA_ir/celebA_ir/celebA_distractors/")
distractors_embeddings.shape

100%|██████████| 2001/2001 [01:57<00:00, 17.02it/s]


torch.Size([2001, 1408])

## Function to compute cosine similarities between all *so called* positive pairs from query part
Positive pairs are all such pairs that belong to one person

In [13]:
def compute_cosine_query_pos(query_dict, query_img_names, query_embeddings):
    """
    compute cosine similarities between positive pairs from query (stage 1)
    params:
    query_dict: dict {class: [image_name_1, image_name_2, ...]}. Key: class in
                the dataset. Value: images corresponding to that class
    query_img_names: list of images names
    query_embeddings: list of embeddings corresponding to query_img_names
    output:
    list of floats: similarities between embeddings corresponding
                    to the same people from query list
    """
    pairs_dict = deepcopy(query_dict)
    # pairs_dict is like {class: [(image_name_1, image_name_2), (image_name_1, image_name_3), ...all pairs...]}
    for key in pairs_dict.keys():
        pairs_dict[key] = list(combinations(pairs_dict[key], 2))

    # now compute cosine similarities between pairs in pairs_dict
    list_of_coses = []
    cos = nn.CosineSimilarity(dim=0, eps=1e-12)
    for key in tqdm(pairs_dict.keys()):
        for pair in pairs_dict[key]:
            img1_name = pair[0]
            img2_name = pair[1]

            img1_idx = query_img_names.index(img1_name)
            img2_idx = query_img_names.index(img2_name)

            img1_embedding = query_embeddings[img1_idx]
            img2_embedding = query_embeddings[img2_idx]

            cos_similarity = cos(img1_embedding, img2_embedding).item()
            list_of_coses.append(cos_similarity)
            
    return list_of_coses

# pos_query_pairs_similarities = compute_cosine_query_pos(query_dict, query_img_names, query_embeddings)

## Function to compute cosine similarities between all *so called* negative pairs from query part
Negative pairs are all such pairs that belong to different person

In [14]:
# _ = {
#     '1': ['10', '11', '12', '13'],
#     '2': ['20', '21', '22', '23', '24'],
#     '3': ['30', '31'],
#     '4': ['40']
#     }

def create_negative_pairs_from_special_dict(d):
    pairs = []
    
    keys = list(d.keys())
    for i in range(len(keys)-1):
        for j in range(i+1, len(keys)):
            key1 = keys[i]
            key2 = keys[j]

            for img1 in d[key1]:
                for img2 in d[key2]:
                    pairs.append((img1, img2))

    return pairs

# create_negative_pairs(_)

In [15]:
def compute_cosine_query_neg(query_dict, query_img_names, query_embeddings):
    """
    compute cosine similarities between negative pairs from query (stage 2)
    params:
    query_dict: dict {class: [image_name_1, image_name_2, ...]}. Key: class in
                the dataset. Value: images corresponding to that class
    query_img_names: list of images names
    query_embeddings: list of embeddings corresponding to query_img_names
    output:
    list of floats: similarities between embeddings corresponding
                    to different people from query list
    """
    negative_pairs = create_negative_pairs_from_special_dict(query_dict)

    list_of_coses = []
    cos = nn.CosineSimilarity(dim=0, eps=1e-12)
    for pair in tqdm(negative_pairs):
        img1_name = pair[0]
        img2_name = pair[1]

        img1_idx = query_img_names.index(img1_name)
        img2_idx = query_img_names.index(img2_name)

        img1_embedding = query_embeddings[img1_idx]
        img2_embedding = query_embeddings[img2_idx]

        cos_similarity = cos(img1_embedding, img2_embedding).item()
        list_of_coses.append(cos_similarity)
    
    return list_of_coses
    
# neg_query_pairs_similarities = compute_cosine_query_neg(query_dict, query_img_names, query_embeddings)

## Function to compute cosine similarities between all negative pairs from query and distractors parts

In [16]:
def compute_cosine_query_distractors(query_embeddings, distractors_embeddings):
    """
    compute cosine similarities between negative pairs from query and distractors
    (stage 3)
    params:
    query_embeddings: list of embeddings corresponding to query_img_names
    distractors_embeddings: list of embeddings corresponding to distractors_img_names
    output:
    list of floats: similarities between pairs of people (q, d), where q is
                    embedding corresponding to photo from query, d —
                    embedding corresponding to photo from distractors
    """
    list_of_coses = []
    cos = nn.CosineSimilarity(dim=0, eps=1e-12)
    for q in tqdm(query_embeddings):
        for d in distractors_embeddings:
            cos_similarity = cos(q, d).item()
            list_of_coses.append(cos_similarity)
            
    return list_of_coses

In [17]:
cosine_query_pos = compute_cosine_query_pos(
    query_dict,
    query_img_names,
    query_embeddings
)

cosine_query_neg = compute_cosine_query_neg(
    query_dict,
    query_img_names,
    query_embeddings
)

cosine_query_distractors = compute_cosine_query_distractors(
    query_embeddings,
    distractors_embeddings
)

100%|██████████| 51/51 [00:01<00:00, 49.25it/s]
100%|██████████| 731310/731310 [00:52<00:00, 13909.60it/s]
100%|██████████| 1222/1222 [01:26<00:00, 14.18it/s]


## IdentificationRate implementation

In [18]:
def compute_ir(cosine_query_pos, cosine_query_neg, cosine_query_distractors,
               fpr=0.1):
    """
    compute identification rate using precomputer cosine similarities between pairs
    at given fpr
    params:
    cosine_query_pos: cosine similarities between positive pairs from query
    cosine_query_neg: cosine similarities between negative pairs from query
    cosine_query_distractors: cosine similarities between negative pairs
                              from query and distractors
    fpr: false positive rate at which to compute TPR
    output:
    float: threshold for given fpr
    float: TPR at given FPR
    """
    neg_pairs = cosine_query_neg + cosine_query_distractors
    neg_pairs = sorted(neg_pairs, reverse=True)
    
    N = int(fpr * len(neg_pairs))
    th = neg_pairs[N]
    
    pos_pairs_after_th = [x for x in cosine_query_pos if x>th]
    tpr = len(pos_pairs_after_th) / len(cosine_query_pos)
    
    return th, tpr


## Test block (PASSED)

In [19]:
test_query_dict = {
    2876: ['1.jpg', '2.jpg', '3.jpg'],
    5674: ['5.jpg'],
    864:  ['9.jpg', '10.jpg'],
}
test_query_img_names = ['1.jpg', '2.jpg', '3.jpg', '5.jpg', '9.jpg', '10.jpg']
test_query_embeddings = torch.Tensor([
                    [1.56, 6.45,  -7.68],
                    [-1.1 , 6.11,  -3.0],
                    [-0.06,-0.98,-1.29],
                    [8.56, 1.45,  1.11],
                    [0.7,  1.1,   -7.56],
                    [0.05, 0.9,   -2.56],
])

test_distractors_img_names = ['11.jpg', '12.jpg', '13.jpg', '14.jpg', '15.jpg']

test_distractors_embeddings = torch.Tensor([
                    [0.12, -3.23, -5.55],
                    [-1,   -0.01, 1.22],
                    [0.06, -0.23, 1.34],
                    [-6.6, 1.45,  -1.45],
                    [0.89,  1.98, 1.45],
])

test_cosine_query_pos = compute_cosine_query_pos(test_query_dict, test_query_img_names,
                                            test_query_embeddings)
test_cosine_query_neg = compute_cosine_query_neg(test_query_dict, test_query_img_names,
                                            test_query_embeddings)
test_cosine_query_distractors = compute_cosine_query_distractors(test_query_embeddings,
                                                            test_distractors_embeddings)

100%|██████████| 3/3 [00:00<00:00, 2321.14it/s]
100%|██████████| 11/11 [00:00<00:00, 14950.53it/s]
100%|██████████| 6/6 [00:00<00:00, 3020.74it/s]


In [20]:
true_cosine_query_pos = [0.8678237233650096, 0.21226104378511604,
                         -0.18355866977496182, 0.9787437979250561]
assert np.allclose(sorted(test_cosine_query_pos), sorted(true_cosine_query_pos)), \
      "A mistake in compute_cosine_query_pos function"

true_cosine_query_neg = [0.15963231223161822, 0.8507997093616965, 0.9272761484302097,
                         -0.0643994061127092, 0.5412660901220571, 0.701307100338029,
                         -0.2372575528216902, 0.6941032794522218, 0.549425446066643,
                         -0.011982733001947084, -0.0466679194884999]
assert np.allclose(sorted(test_cosine_query_neg), sorted(true_cosine_query_neg)), \
      "A mistake in compute_cosine_query_neg function"

true_cosine_query_distractors = [0.3371426578637511, -0.6866465610863652, -0.8456563512871669,
                                 0.14530087113136106, 0.11410510307646118, -0.07265097629002357,
                                 -0.24097699660707042,-0.5851992679925766, 0.4295494455718534,
                                 0.37604478596058194, 0.9909483738948858, -0.5881093317868022,
                                 -0.6829712976642919, 0.07546364489032083, -0.9130970963915521,
                                 -0.17463101988684684, -0.5229363015558941, 0.1399896725311533,
                                 -0.9258034013399499, 0.5295114163723346, 0.7811585442749943,
                                 -0.8208760031249596, -0.9905139680301821, 0.14969764653247228,
                                 -0.40749654525418444, 0.648660814944824, -0.7432584300096284,
                                 -0.9839696492435877, 0.2498741082804709, -0.2661183373780491]
assert np.allclose(sorted(test_cosine_query_distractors), sorted(true_cosine_query_distractors)), \
      "A mistake in compute_cosine_query_distractors function"

In [21]:
test_thr = []
test_tpr = []
for fpr in [0.5, 0.3, 0.1]:
    x, y = compute_ir(test_cosine_query_pos, test_cosine_query_neg,
                    test_cosine_query_distractors, fpr=fpr)
    test_thr.append(x)
    test_tpr.append(y)

true_thr = [-0.011982733001947084, 0.3371426578637511, 0.701307100338029]
assert np.allclose(np.array(test_thr), np.array(true_thr)), "A mistake in computing threshold"

true_tpr = [0.75, 0.5, 0.5]
assert np.allclose(np.array(test_tpr), np.array(true_tpr)), "A mistake in computing tpr"

## Metrics on baseline

In [22]:
for fpr in [0.001, 0.01, 0.02, 0.05, 0.1]:
    print(f"TPR@FPR={fpr} is {compute_ir(cosine_query_pos, cosine_query_neg, cosine_query_distractors, fpr=fpr)[1]}")

TPR@FPR=0.001 is 0.2570477549079546
TPR@FPR=0.01 is 0.46613681135792406
TPR@FPR=0.02 is 0.5525439847836424
TPR@FPR=0.05 is 0.6830378371034577
TPR@FPR=0.1 is 0.7858841111337544
