# Обучение и оценка качества детектора автомобилей
![car_detection_frames.jpg](car_detection_frames.jpg)

В данном задании реализован простой нейросетевой детектор машин на основе
полносверточной нейросети, а также функции для подсчета
метрики качества и подавления множественных обнаружений.

Fully convolutional классификатор `Classifier` обучается на изображениях машин (40, 100, 1), затем от его выхода берётся значение Softmax для класса, соответствующего обнаружжению машины. Такой детектор `DetectorFromClassifier` можно применять к любым изображениям больше исходного (по сути свёртка с ядром (40, 100)). В пикселе полученного изображения будет уверенность детектора в том, что машина обнаружена в рамке размера (40, 100), для которой пиксель - верхний левый угол.

Реализованы следующие методы:  
`get_detections`: применяет детектор к набору изображений и возвращает в формате bbox все обнаружения с *confidence* больше фискированного порога  
`calc_iou`: подсчитывает *IoU* между двумя bbox  
`calc_auc`: по предсказаниям и gt bboxes подсчитывает PR AUC (вариация по confidence threshold)  
`nms`: реализует простую версию алгоритма подавления немаксимумов

Алгоритм `calc_auc`:
1. Для каждого изображения составляется список из tp (true positive) и fp (false positive)
обнаружений. Для этого:  
(a) Сортируются обнаружения в порядке убывания соответствующих мер уверенности классификатора. Подготавливается список gt прямоугольников из разметки данного изображения.  
(b) Для каждого обнаружения находится соответствующий ему прямоугольник из разметки, для
которого мера IoU максимальна и > iou_thr.  
(c) Если такой прямоугольник найден, то обнаружение добавляется в tp, иначе — в fp.  
(d) Чтобы не сопоставлять один и тот же прямоугольник из разметки двум обнаружениями
детектора, после добавления обнаружения в tp соответствующий ему прямоугольник
удаляется из gt.  
2. Объединяются списки tp и fp всех изображений.  
3. Теперь нужны два списка — все обнаружения pp (объединение tp и fp) и tp. 
Эти списки сортируются по возрастанию мер уверенности классификатора.  
4. Проход по списку всех обнаружений: пусть сейчас рассматривается обнаружение
с мерой уверенности c, тогда подсчитывается количество всех обнаружений с мерой уверенности *> c* и
количество tp обнаружений с уверенностью *> c*.  
5. Из полученных данных и общего количество прямоугольников в разметке рассчитываются recall
и precision для каждого порога уверенности c.  
6. Собираются полученные тройки (recall, precision, c), задающие PR кривую, и подсчитывается AUC.

Алгоритм `nms`:
1. Обнаружения сортируются по убыванию меры уверенности.  
2. Для каждого обнаружения удаляются все следующие за ним обнаружения (те, у которых мера
уверенности меньше), которые пересекаются с данным по мере IoU больше, чем на iou_thr.

Все `bbox` в формате `[row, col, n_rows, n_cols (, confidence)]`, где `row` - строка, с которой начинается `bbox`, `col` - соответствующая колонка, `n_row` и `n_col` - количество строк и колонок, занимаемых `bbox`, `confidence` - (опционально) уверенность детектора в том, что в данной рамке находится машина. 

In [None]:
import torch
import torch.nn as nn
from torch.nn import functional as F
from torchvision import transforms

from tqdm import tqdm

import numpy as np
from sklearn.metrics import accuracy_score, auc

if torch.cuda.is_available():
    device = torch.device('cuda:0')
else:
    device = torch.device('cpu')


BATCH_SIZE = 256
EPOCHS = 20

train_transform = nn.Sequential(*[
    transforms.Normalize((0.508), (0.291)),
    transforms.RandomHorizontalFlip(),
    transforms.RandomAutocontrast(),
    transforms.RandomAdjustSharpness(2),
]).to(device)

test_transform = transforms.Normalize((0.508), (0.291)).to(device)


# ============================== 1 Classifier model ============================
# simple FCN classifier to check if there's a car on the image
# outputs logits
class Classifier(nn.Module):
    def __init__(self):
        super(Classifier_, self).__init__()
        self.layers = []
        for i in range(10):
            self.layers.extend([
                nn.Conv2d( 2**((i+1)//2), 2**((i+2)//2), 3),
                nn.BatchNorm2d(2**((i+2)//2)),
                nn.ReLU()
            ])
        
        self.layers.extend([
            nn.Conv2d(32, 128, (20, 80)),
            nn.Dropout(0.5),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 2, 1),
        ])
        self.process = nn.Sequential(*self.layers)

    def forward(self, x):
        return self.process(x)


# train Classifier on available data
# (simple procedure without validation as this isn't the aim of the task)
def fit_cls_model(X, y):
    """
    :param X: 4-dim tensor with training images
    :param y: 1-dim tensor with labels for training
    :return: trained nn model
    """
    model = Classifier().to(device)

    dataset = torch.utils.data.TensorDataset(X, y)
    dataloader = torch.utils.data.DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=1)

    criterion = torch.nn.CrossEntropyLoss()
    opt = torch.optim.AdamW(model.parameters(), lr=1e-4)

    # return best model according to accuracy on train dataset
    best_model = None
    history = {'test_acc': []}
    
    for epoch in tqdm(range(EPOCHS)):
        model.train()
        test_acc = []
        for sample in dataloader:
            X_batch, y_batch = sample
            X_batch = train_transform(X_batch.to(device))
            y_batch = y_batch.to(device)
            output = model(X_batch).reshape(-1, 2)

            loss = criterion(output, y_batch)
            loss.backward()

            opt.step()
            opt.zero_grad()

            preds = torch.argmax(F.softmax(output.detach(), dim=1), dim=1)
            test_acc.append(accuracy_score(y_batch.cpu().numpy(), preds.cpu().numpy()))

        history['test_acc'].append(np.mean(test_acc))

        if np.argmax(history['test_acc']) == epoch:
            best_model = model

    return best_model


# ============================ 2 Classifier -> Detector =============================
# detection wrapper for classifier to return only logits corresponding to the class 'car',
# normalized with Softmax
class DetectorFromClassifier(nn.Module):
    def __init__(self, cls_model):
        super(DetectorFromClassifier, self).__init__()
        self.base_model = cls_model

    def forward(self, x):
        return self.base_model(x)[:,1,:,:]
    
def get_detection_model(cls_model):
    """
    :param cls_model: trained cls model
    :return: fully convolutional nn model with weights initialized from cls
             model
    """
    detection_model = DetectorFromClassifier(nn.Sequential(test_transform, cls_model, nn.Softmax(dim=1)))
    model.eval()
    return model


# ============================ 3 Simple detector ===============================
# returns all bboxes where detector's confidence is higher than conf_thru
def get_detections(detection_model, dictionary_of_images, conf_thru=0.7):
    """
    :param detection_model: trained fully convolutional detector model
    :param dictionary_of_images: dictionary of images in format
        {filename: ndarray}
    :return: detections in format {filename: detections}. detections is a N x 5
        array, where N is number of detections. Each detection is described
        using 5 numbers: [row, col, n_rows, n_cols, confidence].
    """

    detection_model = detection_model.to(device)
    detections = {}
    with torch.inference_mode():
        for filename, image in tqdm(dictionary_of_images.items()):
            heatmap = detection_model(torch.from_numpy(image)[None, None, ...].to(device)).cpu().numpy()[0]
            rows, cols = np.where(heatmap > conf_thru)
            detections[filename] = np.dstack([
                rows, cols, np.full_like(rows, 40), np.full_like(rows, 100), heatmap[rows, cols]
            ])[0]
    
    return detections


# =============================== 5 IoU ========================================
# calculates IoU of 2 bboxes
def calc_iou(first_bbox, second_bbox):
    """
    :param first bbox: bbox in format (row, col, n_rows, n_cols)
    :param second_bbox: bbox in format (row, col, n_rows, n_cols)
    :return: iou measure for two given bboxes
    """
    area1 = first_bbox[2] * first_bbox[3]
    area2 = second_bbox[2] * second_bbox[3]

    left = max(first_bbox[1], second_bbox[1])
    right = min(first_bbox[1] + first_bbox[3], second_bbox[1] + second_bbox[3])
    top = max(first_bbox[0], second_bbox[0])
    bottom = min(first_bbox[0] + first_bbox[2], second_bbox[0] + second_bbox[2])

    intersection_area = max(0, right - left) * max(0, bottom - top)

    return intersection_area / float(area1 + area2 - intersection_area)



# =============================== 6 AUC ========================================
# calculates PR AUC for given predicted and gt bboxes
def calc_auc(pred_bboxes, gt_bboxes):
    """
    :param pred_bboxes: dict of bboxes in format {filename: detections}
        detections is a N x 5 array, where N is number of detections. Each
        detection is described using 5 numbers: [row, col, n_rows, n_cols,
        confidence].
    :param gt_bboxes: dict of bboxes in format {filenames: bboxes}. bboxes is a
        list of tuples in format (row, col, n_rows, n_cols)
    :return: auc measure for given detections and gt
    """
    
    # only bboxes whose IoU with gt is >=IOU_THR are considered 
    IOU_THR = 0.5

    # tp - pred_bboxes matched with some gt
    # fp - pred_bboxes that are either worse than matches from TP or don't have any match with gt at all
    tp, fp = [], []
    # number of all positives (sum of sizes of gt) for PR calculations
    number_p = 0

    # iterate over each picture
    for filename, detections in pred_bboxes.items():
        # sort detections with confidence descending
        detections = detections[np.argsort(detections[:, -1])][::-1]
        gt = np.array(gt_bboxes[filename])
        
        # discovered gt bboxes
        inspected = set()
        # inspect detections with higher confidence first
        for detection in detections:
            iou_best = 0
            idx_best = None
            
            # find the best match among gt according to IoU
            for i in range(len(gt)):
                if i in inspected:
                    continue
                else:
                    iou = calc_iou(detection[:4], gt[i])
                    if iou >= IOU_THR and iou > iou_best:
                        idx_best = i
                        iou_best = iou
            
            # found new tp
            if iou_best >= IOU_THR:
                inspected.add(idx_best)
                tp.append(detection)
            else:
                fp.append(detection)
                
        number_p += len(gt)
    

    # sort all TP and PP (predicted positives) detections with confidence ascending
    tp = np.array(tp)
    tp = tp[np.argsort(tp[:, -1])]
    if fp:
        pp = np.concatenate([tp, fp])
        pp = p[np.argsort(p[:, -1])]
    else:
        pp = tp

    N = len(pp)
    pr_curve = [(0,1)]
    
    # we'll iterate over all confidence thresholds in PP bboxes
    # given c^, we assume the detector decides bboxes with conf>=c^ are true
    old_c = 0
    for i in range(N):
        c = pp[i, -1]
        
        # if we reach a new conf threshold, we'll calculate new P and R
        if old_c == c:
            continue
        else:
            old_c = c
        
        # Npp = number of predicted positives if conf>=c^
        # Ntp = -----     true      ---
        Npp = N - i
        Ntp = (tp[:, -1] >= c).sum()
        # append new (recall, precision)
        # number_p = number of all gt bboxes
        pr_curve.append( (Ntp / number_p, Ntp / Npp) )

    points = np.array(pr_curve)
    points = points[np.argsort(points[:, 0])]
    points = points[np.lexsort([1 - points[:,1], points[:,0]])]

    return auc(points[:,0], points[:,1])


# =============================== 7 NMS ========================================
# use Non-maximum Suppression algorithm to get rid of detections thata are too close to each other
def nms(detections_dictionary, iou_thr=0.05):
    """
    :param detections_dictionary: dict of bboxes in format {filename: detections}
        detections is a N x 5 array, where N is number of detections. Each
        detection is described using 5 numbers: [row, col, n_rows, n_cols,
        confidence].
    :param iou_thr: IoU threshold for nearby detections
    :return: dict in same format as detections_dictionary where close detections
        are deleted
    """
    processed_detections = {}
    # iterate over each image and its bboxes
    for filename, detections in tqdm(detections_dictionary.items()):
        detections = np.array(detections)
        
        # we'll iterate over detections with confidence descending
        detections = detections[np.argsort(detections[:, -1])][::-1]
        # set of rejected detections
        deleted = set()
        for i in range(len(detections)):
            if i not in deleted:
                for j in range(i + 1, len(detections)):
                    if j not in deleted and calc_iou(detections[i,:4], detections[j,:4]) > iou_thr:
                        deleted.add(j)

        # get indices of all accepted detections
        mask = list(set(range(len(detections))) - deleted)
        processed_detections[filename] = detections[mask]
    
    return processed_detections