## Identification Rate Metric

При обучении модели для распознавания лиц с помощью CE (кросс-энтропии) мы можем считать метрику accuracy как индикатор того, насколько хорошо наша модель работает. Но у accuracy тут есть недостаток: она не сможет померить, насколько хорошо наша модель работает на лицах людей, которых нет в обучающей выборке.  

Чтобы это исправить, придумали новую метрику: **identification rate**. Вот как она работает:

Создадим два набора изображений лиц: query и distractors. Никакие лица из этих наборов не должны содержаться в обучающем и валидационном датасете.

1. посчитаем косинусные расстояния между лицами, соответствующими одним и тем же людям из query части. Например, пусть одному человеку соответствуют три фото в query: 01.jpg, 02.jpg, 03.jpg. Тогда считаем три косинусных расстояния между всеми тремя парами из этих фото.
2. посчитаем косинусные расстояния между лицами, соответствующими разным людям из query части.
3. посчитаем косинусные расстояния между всеми парами лиц из query и distractors. Т.е. пара — это (лицо из query, лицо из distractors). Всего получится |query|*|distractors| пар.
4. Сложим количества пар, полученных на 2 и 3 шагах. Это количество false пар.
5. Зафиксируем **FPR** (false positive rate). Пусть, например, будет 0.01. FPR, умноженный на количество false пар из шага 4 — это разрешенное количество false positives, которые мы разрешаем нашей модели. Обозначим это количество через N.
6. Отсортируем все значения косинусных расстояний false пар. N — ое по счету значение расстояния зафиксируем как **пороговое расстояние**.
7. Посчитаем количество positive пар с шага 1, которые имеют косинусное расстояние меньше, чем пороговое расстояние. Поделим это количество на общее количество positive пар с шага 1. Это будет TPR (true positive rate) — итоговое значение нашей метрики.

Такая метрика обычно обозначается как TPR@FPR=0.01. FPR может быть разным. Приразных FPR будет получаться разное TPR.

Смысл этой метрики в том, что мы фиксируем вероятность ошибки вида false positive, т.е. когда "сеть сказала, что это один и тот же человек, но это не так", считаем порог косинусного расстояния для этого значения ошибки, потом берем все positive пары и смотрим, у скольких из них расстояние меньше этого порога. Т.е. насколько точно наша сеть ищет похожие лица при заданной вероятности ошибки вида false positive.

**Для подсчета метрик, то вам нужно разбить данные на query и distractors самим.**

Делается это примерно так:
- Выбраете несколько id, которые не использовались при тренировке моделей, и помещаете их в query set;
- Выбираете несколько id, которые не использовались при тренировке моделей и не входят в query, и помещаете их в distractors set. Обычно distractors set должен быть сильно больше, чем query set.
- Обрабатываете картинки из query и distractors тем же способом, что картинки для обучения сети.


Обратите внимание, что если картинок в query и distractors очень много, то полученных пар картинок в пунктах 1-2-3 алгоритма подсчета TPR@FPR будет очень-очень много. Чтобы код подсчета работал быстрее, ограничивайте размеры этих датасетов. Контролируйте, сколько значений расстояний вы считаете.

Ниже дан шаблон кода для реализации FPR@TPR метрики и ячейки с тестами. Тесты проверяют, что ваш код в ячейках написан правильно.

## План заданий

* Правильно разбить датасет на query и distractors
* Реализовать метрику и пройти все тесты
* Подгрузить все модели, обученные на разных лоссах и сравнить их метрики

In [1]:
import torch
import torch.nn as nn
from PIL import Image
import torchvision.transforms as transforms
import numpy as np
from itertools import combinations

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')      
transform = transforms.Compose([transforms.ToTensor()])
    
from models import get_recognition_model


### Разбить датасет загрузить данные

In [2]:
import os
import random
from collections import defaultdict

TRAIN_IDS_PATH = 'data/celeba_aligned_top_500'
DATA_ROOT = 'data/celeba_aligned_top_2000'
N_QUERY = 200
N_DISTRACTORS = 1000

train_ids = set(os.listdir(TRAIN_IDS_PATH))
all_person_ids = os.listdir(DATA_ROOT)

test_ids_pool = [pid for pid in all_person_ids if pid not in train_ids]
random.shuffle(test_ids_pool)

query_ids = set(test_ids_pool[:N_QUERY])
distractor_ids = set(test_ids_pool[N_QUERY : N_QUERY + N_DISTRACTORS])

query_dict_temp = defaultdict(list)
distractors_img_names = []

for person_id in os.listdir(DATA_ROOT):
    if person_id in query_ids:
        for img_file in os.listdir(os.path.join(DATA_ROOT, person_id)):
            relative_path = os.path.join(DATA_ROOT, person_id, img_file)
            query_dict_temp[person_id].append(relative_path)
    elif person_id in distractor_ids:
        for img_file in os.listdir(os.path.join(DATA_ROOT, person_id)):
            relative_path = os.path.join(DATA_ROOT, person_id, img_file)
            distractors_img_names.append(relative_path)

query_dict = {pid: names for pid, names in query_dict_temp.items() if len(names) > 1}
query_img_names = [name for names in query_dict.values() for name in names]

print(f"Query IDs: {len(query_dict)}, Query Images: {len(query_img_names)}")
print(f"Distractor IDs: {len(distractor_ids)}, Distractor Images: {len(distractors_img_names)}")

Query IDs: 200, Query Images: 5572
Distractor IDs: 1000, Distractor Images: 27865


In [3]:
model = get_recognition_model()
model.fc = nn.Identity()

model = model.to(device)
model.eval();

## Шаблон кода для Identificaton rate metric (TPR@FPR)

In [7]:
def compute_embeddings(model, images_list):
  '''
  compute embeddings from the trained model for list of images.
  params:
    model: trained nn model that takes images and outputs embeddings
    images_list: list of images paths to compute embeddings for
  output:
    list: list of model embeddings. Each embedding corresponds to images
          names from images_list
  '''
  model.eval()
  embeddings = []
  with torch.no_grad(): 
    for img in images_list:
      img = Image.open(img)
      tensor = transform(img).unsqueeze(0).to(device)
      embedding = model(tensor)
      embeddings.append(embedding.cpu().numpy())
  return embeddings

In [21]:
def compute_cosine(emb1, emb2):
    # emb1 = emb1.flatten()
    # emb2 = emb2.flatten()
    # emb1 = emb1[0]
    # emb2 = emb2[0]
    return np.dot(emb1, emb2) / (np.linalg.norm(emb1) * np.linalg.norm(emb2))

In [13]:
query_embeddings = compute_embeddings(model, query_img_names)
distractors_embeddings = compute_embeddings(model, distractors_img_names)

In [14]:
def compute_cosine_query_pos(query_dict, query_img_names, query_embeddings):
  '''
  compute cosine similarities between positive pairs from query (stage 1)
  params:
    query_dict: dict {class: [image_name_1, image_name_2, ...]}. Key: class in
                the dataset. Value: images corresponding to that class
    query_img_names: list of images names
    query_embeddings: list of embeddings corresponding to query_img_names
  output:
    list of floats: similarities between embeddings corresponding
                    to the same people from query list
  '''
  img_to_emb = {name: emb for name, emb in zip(query_img_names, query_embeddings)}
  pos_similarities = []
  for person_id, list_of_his_images in query_dict.items():
      for img1_path, img2_path in combinations(list_of_his_images, 2):
          emb1 = img_to_emb[img1_path]
          emb2 = img_to_emb[img2_path]
          pos_similarities.append(compute_cosine(emb1, emb2))
  return pos_similarities

def compute_cosine_query_neg(query_dict, query_img_names, query_embeddings):
  '''
  compute cosine similarities between negative pairs from query (stage 2)
  params:
    query_dict: dict {class: [image_name_1, image_name_2, ...]}. Key: class in
                the dataset. Value: images corresponding to that class
    query_img_names: list of images names
    query_embeddings: list of embeddings corresponding to query_img_names
  output:
    list of floats: similarities between embeddings corresponding
                    to different people from query list
  '''
  img_to_emb = {name: emb for name, emb in zip(query_img_names, query_embeddings)}
  neg_similarities = []
  person_ids = list(query_dict.keys())
  for id1, id2 in combinations(person_ids, 2):
      for img1_path in query_dict[id1]:
          for img2_path in query_dict[id2]:
              emb1 = img_to_emb[img1_path]
              emb2 = img_to_emb[img2_path]
              similarity = compute_cosine(emb1, emb2)
              neg_similarities.append(similarity)
  return neg_similarities

def compute_cosine_query_distractors(query_embeddings, distractors_embeddings):
  '''
  compute cosine similarities between negative pairs from query and distractors
  (stage 3)
  params:
    query_embeddings: list of embeddings corresponding to query_img_names
    distractors_embeddings: list of embeddings corresponding to distractors_img_names
  output:
    list of floats: similarities between pairs of people (q, d), where q is
                    embedding corresponding to photo from query, d —
                    embedding corresponding to photo from distractors
  '''
  cross_similarities = []
  for q_emb in query_embeddings:
      for d_emb in distractors_embeddings:
          similarity = compute_cosine(q_emb, d_emb) 
          cross_similarities.append(similarity)
  return cross_similarities

In [15]:
cosine_query_pos = compute_cosine_query_pos(query_dict, query_img_names,
                                            query_embeddings)
cosine_query_neg = compute_cosine_query_neg(query_dict, query_img_names,
                                            query_embeddings)
cosine_query_distractors = compute_cosine_query_distractors(query_embeddings,
                                                            distractors_embeddings)


Ячейка ниже проверяет, что код работает верно:

In [22]:
test_query_dict = {
    2876: ['1.jpg', '2.jpg', '3.jpg'],
    5674: ['5.jpg'],
    864:  ['9.jpg', '10.jpg'],
}
test_query_img_names = ['1.jpg', '2.jpg', '3.jpg', '5.jpg', '9.jpg', '10.jpg']
test_query_embeddings = [
                    [1.56, 6.45,  -7.68],
                    [-1.1 , 6.11,  -3.0],
                    [-0.06,-0.98,-1.29],
                    [8.56, 1.45,  1.11],
                    [0.7,  1.1,   -7.56],
                    [0.05, 0.9,   -2.56],
]

test_distractors_img_names = ['11.jpg', '12.jpg', '13.jpg', '14.jpg', '15.jpg']

test_distractors_embeddings = [
                    [0.12, -3.23, -5.55],
                    [-1,   -0.01, 1.22],
                    [0.06, -0.23, 1.34],
                    [-6.6, 1.45,  -1.45],
                    [0.89,  1.98, 1.45],
]

test_cosine_query_pos = compute_cosine_query_pos(test_query_dict, test_query_img_names,
                                            test_query_embeddings)
test_cosine_query_neg = compute_cosine_query_neg(test_query_dict, test_query_img_names,
                                            test_query_embeddings)
test_cosine_query_distractors = compute_cosine_query_distractors(test_query_embeddings,
                                                            test_distractors_embeddings)

In [None]:
true_cosine_query_pos = [0.8678237233650096, 0.21226104378511604,
                         -0.18355866977496182, 0.9787437979250561]
# print(test_cosine_query_pos)
assert np.allclose(sorted(test_cosine_query_pos), sorted(true_cosine_query_pos)), \
      "A mistake in compute_cosine_query_pos function"

true_cosine_query_neg = [0.15963231223161822, 0.8507997093616965, 0.9272761484302097,
                         -0.0643994061127092, 0.5412660901220571, 0.701307100338029,
                         -0.2372575528216902, 0.6941032794522218, 0.549425446066643,
                         -0.011982733001947084, -0.0466679194884999]
assert np.allclose(sorted(test_cosine_query_neg), sorted(true_cosine_query_neg)), \
      "A mistake in compute_cosine_query_neg function"

true_cosine_query_distractors = [0.3371426578637511, -0.6866465610863652, -0.8456563512871669,
                                 0.14530087113136106, 0.11410510307646118, -0.07265097629002357,
                                 -0.24097699660707042,-0.5851992679925766, 0.4295494455718534,
                                 0.37604478596058194, 0.9909483738948858, -0.5881093317868022,
                                 -0.6829712976642919, 0.07546364489032083, -0.9130970963915521,
                                 -0.17463101988684684, -0.5229363015558941, 0.1399896725311533,
                                 -0.9258034013399499, 0.5295114163723346, 0.7811585442749943,
                                 -0.8208760031249596, -0.9905139680301821, 0.14969764653247228,
                                 -0.40749654525418444, 0.648660814944824, -0.7432584300096284,
                                 -0.9839696492435877, 0.2498741082804709, -0.2661183373780491]
assert np.allclose(sorted(test_cosine_query_distractors), sorted(true_cosine_query_distractors)), \
      "A mistake in compute_cosine_query_distractors function"

[0.8678237233650096, 0.21226104378511595, -0.1835586697749618, 0.9787437979250561]


И, наконец, финальная функция, которая считает IR metric:

In [24]:
def compute_ir(cosine_query_pos, cosine_query_neg, cosine_query_distractors,
               fpr=0.1):
  '''
  compute identification rate using precomputer cosine similarities between pairs
  at given fpr
  params:
    cosine_query_pos: cosine similarities between positive pairs from query
    cosine_query_neg: cosine similarities between negative pairs from query
    cosine_query_distractors: cosine similarities between negative pairs
                              from query and distractors
    fpr: false positive rate at which to compute TPR
  output:
    float: threshold for given fpr
    float: TPR at given FPR
  '''
  false_sims = cosine_query_neg + cosine_query_distractors
  false_sims.sort(reverse=True)
  num_false_pairs = len(false_sims)
  threshold_idx = int(fpr * num_false_pairs)
  threshold_idx = min(threshold_idx, num_false_pairs - 1)
  threshold = false_sims[threshold_idx]
  num_positive_pairs = len(cosine_query_pos)
  true_positives = sum(1 for sim in cosine_query_pos if sim >= threshold)
  tpr = true_positives / num_positive_pairs
  return threshold, tpr

И ячейки для ее проверки:

In [25]:
test_thr = []
test_tpr = []
for fpr in [0.5, 0.3, 0.1]:
  x, y = compute_ir(test_cosine_query_pos, test_cosine_query_neg,
                    test_cosine_query_distractors, fpr=fpr)
  test_thr.append(x)
  test_tpr.append(y)

In [26]:
true_thr = [-0.011982733001947084, 0.3371426578637511, 0.701307100338029]
assert np.allclose(np.array(test_thr), np.array(true_thr)), "A mistake in computing threshold"

true_tpr = [0.75, 0.5, 0.5]
assert np.allclose(np.array(test_tpr), np.array(true_tpr)), "A mistake in computing tpr"

А в ячейке ниже вы можете посчитать TPR@FPR для датасета с лицами. Давайте, например, посчитаем для значений fpr = [0.5, 0.2, 0.1, 0.05].

In [None]:
fpr_values = [0.5, 0.2, 0.1, 0.05]

print("Результаты для реального датасета:")
for fpr in fpr_values:
    threshold, tpr = compute_ir(cosine_query_pos, cosine_query_neg,
                                cosine_query_distractors, fpr=fpr)
    print(f"TPR @ FPR={fpr:.2f} = {tpr:.4f} (Порог: {threshold:.4f})")

Результаты для реального датасета:
TPR @ FPR=0.50: 0.7367 (Порог: 0.8480)
TPR @ FPR=0.20: 0.4456 (Порог: 0.8892)
TPR @ FPR=0.10: 0.2962 (Порог: 0.9061)
TPR @ FPR=0.05: 0.1956 (Порог: 0.9181)
