In [2]:
import torch 
from torch.utils.data import Dataset, DataLoader

from PIL import Image
import numpy as np
import albumentations as A
import cv2
from torchvision import transforms

import os
import json

from sklearn.model_selection import train_test_split

class PlatesDataset(Dataset):
    
    def __init__(self, root_path: str, json_path: str, sample_type: str='train', val_size: float=0.2, 
                random_state: int=42, transform: A.Compose=None):
        
        self.root_path = root_path
        self.transfrom = transform
        
        with open(json_path, 'r') as f:
            img_list = json.load(f)
        
        #img_list = img_list[:1000]
        if sample_type == 'train':
            self.img_list, _ = train_test_split(img_list, test_size=val_size, random_state=random_state)
        elif sample_type == 'val':
            _, self.img_list = train_test_split(img_list, test_size=val_size, random_state=random_state)
        
    def __len__(self):
        return len(self.img_list)
    
    
    def __getitem__(self, idx):
        
        try:
            img = cv2.imread(os.path.join(self.root_path, self.img_list[idx]['file']))
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
        except:
            idx = 0
            img = cv2.imread(os.path.join(self.root_path, self.img_list[idx]['file']))
            img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
            print('Cant open image')
        
        objects = self.img_list[idx]['nums']
        
        bboxes = np.array([[min([i[0] for i in b['box']]), min([i[1] for i in b['box']]), 
                  max([i[0] for i in b['box']]), max([i[1] for i in b['box']])] for b in objects])
        
        for bbox in bboxes:
            if (bbox[2] - bbox[0] < 0 ) or (bbox[3] - bbox[1] < 0):
                print(idx)
        
        labels = np.ones(shape=(bboxes.shape[0]), dtype=np.int64)
        
        try:
            if self.transfrom is not None:
                sample = self.transfrom(image=img, bboxes=bboxes, labels=labels)
                img, bboxes = sample['image'], sample['bboxes']
            annot = np.hstack((bboxes, labels.reshape(labels.shape[0], 1)))
        except:
            print('Cant do augmention')
            try:
                sample = A.Compose([A.Resize(640, 640)],
                                bbox_params={
                                            'format': 'pascal_voc',
                                            'label_fields': ['labels']
                                            })(image=img, bboxes=bboxes, labels=labels)

                img, bboxes = sample['image'], sample['bboxes']
                annot = np.hstack((bboxes, labels.reshape(labels.shape[0], 1)))
            except Exception as e:
                print(e)
                idx = 0
                img = cv2.imread(os.path.join(self.root_path, self.img_list[idx]['file']))
                img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
                objects = self.img_list[idx]['nums']
        
                bboxes = np.array([[min([i[0] for i in b['box']]), min([i[1] for i in b['box']]), 
                          max([i[0] for i in b['box']]), max([i[1] for i in b['box']])] for b in objects])

                for bbox in bboxes:
                    if (bbox[2] - bbox[0] < 0 ) or (bbox[3] - bbox[1] < 0):
                        print(idx)

                labels = np.ones(shape=(bboxes.shape[0]), dtype=np.int64)
                
                if self.transfrom is not None:
                    sample = self.transfrom(image=img, bboxes=bboxes, labels=labels)
                    img, bboxes = sample['image'], sample['bboxes']
            
                annot = np.hstack((bboxes, labels.reshape(labels.shape[0], 1)))
        
        
        sample = {'img': img, 'annot': annot}
        
        return sample
        
        
        
def collate_fn(batch):
    return batch



def collater(data):
    imgs = [s['img'] for s in data]
    annots = [torch.from_numpy(s['annot']) for s in data]

    imgs = torch.from_numpy(np.stack(imgs, axis=0)).to(torch.float32)

    max_num_annots = max(annot.shape[0] for annot in annots)

    if max_num_annots > 0:

        annot_padded = torch.ones((len(annots), max_num_annots, 5)) * -1

        if max_num_annots > 0:
            for idx, annot in enumerate(annots):
                if annot.shape[0] > 0:
                    annot_padded[idx, :annot.shape[0], :] = annot
    else:
        annot_padded = torch.ones((len(annots), 1, 5)) * -1

    imgs = imgs.permute(0, 3, 1, 2)

    return {'img': imgs, 'annot': annot_padded}

In [3]:
import torch
import torch.nn as nn


def calc_iou(a, b):

    area = (b[:, 2] - b[:, 0]) * (b[:, 3] - b[:, 1])
    iw = torch.min(torch.unsqueeze(a[:, 2], dim=1), b[:, 2]) - torch.max(torch.unsqueeze(a[:, 0], 1), b[:, 0])
    ih = torch.min(torch.unsqueeze(a[:, 3], dim=1), b[:, 3]) - torch.max(torch.unsqueeze(a[:, 1], 1), b[:, 1])
    iw = torch.clamp(iw, min=0)
    ih = torch.clamp(ih, min=0)
    ua = torch.unsqueeze((a[:, 2] - a[:, 0]) * (a[:, 3] - a[:, 1]), dim=1) + area - iw * ih
    ua = torch.clamp(ua, min=1e-8)
    intersection = iw * ih
    IoU = intersection / ua

    return IoU


class FocalLoss(nn.Module):
    def __init__(self):
        super(FocalLoss, self).__init__()

    def forward(self, classifications, regressions, anchors, annotations):
        alpha = 0.25
        gamma = 2.0
        batch_size = classifications.shape[0]
        classification_losses = []
        regression_losses = []

        anchor = anchors[0, :, :]

        anchor_widths = anchor[:, 2] - anchor[:, 0]
        anchor_heights = anchor[:, 3] - anchor[:, 1]
        anchor_ctr_x = anchor[:, 0] + 0.5 * anchor_widths
        anchor_ctr_y = anchor[:, 1] + 0.5 * anchor_heights

        for j in range(batch_size):

            classification = classifications[j, :, :]
            regression = regressions[j, :, :]
            
            bbox_annotation = annotations[j, :, :]
            bbox_annotation = bbox_annotation[bbox_annotation[:, 4] != -1]

            if bbox_annotation.shape[0] == 0:
                if torch.cuda.is_available():
                    regression_losses.append(torch.tensor(0).float().cuda())
                    classification_losses.append(torch.tensor(0).float().cuda())
                else:
                    regression_losses.append(torch.tensor(0).float())
                    classification_losses.append(torch.tensor(0).float())

                continue

            classification = torch.clamp(classification, 1e-4, 1.0 - 1e-4)

            IoU = calc_iou(anchors[0, :, :], bbox_annotation[:, :4])

            IoU_max, IoU_argmax = torch.max(IoU, dim=1)

            # compute the loss for classification
            targets = torch.ones(classification.shape) * -1
            if torch.cuda.is_available():
                targets = targets.cuda()

            targets[torch.lt(IoU_max, 0.4), :] = 0

            positive_indices = torch.ge(IoU_max, 0.5)

            num_positive_anchors = positive_indices.sum()

            assigned_annotations = bbox_annotation[IoU_argmax, :]

            targets[positive_indices, :] = 0
            targets[positive_indices, assigned_annotations[positive_indices, 4].long()] = 1

            alpha_factor = torch.ones(targets.shape) * alpha
            if torch.cuda.is_available():
                alpha_factor = alpha_factor.cuda()

            alpha_factor = torch.where(torch.eq(targets, 1.), alpha_factor, 1. - alpha_factor)
            focal_weight = torch.where(torch.eq(targets, 1.), 1. - classification, classification)
            focal_weight = alpha_factor * torch.pow(focal_weight, gamma)

            bce = -(targets * torch.log(classification) + (1.0 - targets) * torch.log(1.0 - classification))

            cls_loss = focal_weight * bce

            zeros = torch.zeros(cls_loss.shape)
            if torch.cuda.is_available():
                zeros = zeros.cuda()
            cls_loss = torch.where(torch.ne(targets, -1.0), cls_loss, zeros)

            classification_losses.append(cls_loss.sum() / torch.clamp(num_positive_anchors.float(), min=1.0))


            if positive_indices.sum() > 0:
                assigned_annotations = assigned_annotations[positive_indices, :]

                anchor_widths_pi = anchor_widths[positive_indices]
                anchor_heights_pi = anchor_heights[positive_indices]
                anchor_ctr_x_pi = anchor_ctr_x[positive_indices]
                anchor_ctr_y_pi = anchor_ctr_y[positive_indices]

                gt_widths = assigned_annotations[:, 2] - assigned_annotations[:, 0]
                gt_heights = assigned_annotations[:, 3] - assigned_annotations[:, 1]
                gt_ctr_x = assigned_annotations[:, 0] + 0.5 * gt_widths
                gt_ctr_y = assigned_annotations[:, 1] + 0.5 * gt_heights

                gt_widths = torch.clamp(gt_widths, min=1)
                gt_heights = torch.clamp(gt_heights, min=1)

                targets_dx = (gt_ctr_x - anchor_ctr_x_pi) / anchor_widths_pi
                targets_dy = (gt_ctr_y - anchor_ctr_y_pi) / anchor_heights_pi
                targets_dw = torch.log(gt_widths / anchor_widths_pi)
                targets_dh = torch.log(gt_heights / anchor_heights_pi)

                targets = torch.stack((targets_dx, targets_dy, targets_dw, targets_dh))
                targets = targets.t()

                norm = torch.Tensor([[0.1, 0.1, 0.2, 0.2]])
                if torch.cuda.is_available():
                    norm = norm.cuda()
                targets = targets / norm

                regression_diff = torch.abs(targets - regression[positive_indices, :])

                regression_loss = torch.where(
                    torch.le(regression_diff, 1.0 / 9.0),
                    0.5 * 9.0 * torch.pow(regression_diff, 2),
                    regression_diff - 0.5 / 9.0
                )
                regression_losses.append(regression_loss.mean())
            else:
                if torch.cuda.is_available():
                    regression_losses.append(torch.tensor(0).float().cuda())
                else:
                    regression_losses.append(torch.tensor(0).float())

        return torch.stack(classification_losses).mean(dim=0, keepdim=True), torch.stack(regression_losses).mean(dim=0,
                                                                                                                 keepdim=True)

In [4]:
import torch
import torch.nn as nn
import numpy as np


class BBoxTransform(nn.Module):

    def __init__(self, mean=None, std=None):
        super(BBoxTransform, self).__init__()
        if mean is None:
            self.mean = torch.from_numpy(np.array([0, 0, 0, 0]).astype(np.float32))
        else:
            self.mean = mean
        if std is None:
            self.std = torch.from_numpy(np.array([0.1, 0.1, 0.2, 0.2]).astype(np.float32))
        else:
            self.std = std
        if torch.cuda.is_available():
            self.mean = self.mean.cuda()
            self.std = self.std.cuda()

    def forward(self, boxes, deltas):

        widths = boxes[:, :, 2] - boxes[:, :, 0]
        heights = boxes[:, :, 3] - boxes[:, :, 1]
        ctr_x = boxes[:, :, 0] + 0.5 * widths
        ctr_y = boxes[:, :, 1] + 0.5 * heights

        dx = deltas[:, :, 0] * self.std[0] + self.mean[0]
        dy = deltas[:, :, 1] * self.std[1] + self.mean[1]
        dw = deltas[:, :, 2] * self.std[2] + self.mean[2]
        dh = deltas[:, :, 3] * self.std[3] + self.mean[3]

        pred_ctr_x = ctr_x + dx * widths
        pred_ctr_y = ctr_y + dy * heights
        pred_w = torch.exp(dw) * widths
        pred_h = torch.exp(dh) * heights

        pred_boxes_x1 = pred_ctr_x - 0.5 * pred_w
        pred_boxes_y1 = pred_ctr_y - 0.5 * pred_h
        pred_boxes_x2 = pred_ctr_x + 0.5 * pred_w
        pred_boxes_y2 = pred_ctr_y + 0.5 * pred_h

        pred_boxes = torch.stack([pred_boxes_x1, pred_boxes_y1, pred_boxes_x2, pred_boxes_y2], dim=2)

        return pred_boxes


class ClipBoxes(nn.Module):

    def __init__(self):
        super(ClipBoxes, self).__init__()

    def forward(self, boxes, img):
        batch_size, num_channels, height, width = img.shape

        boxes[:, :, 0] = torch.clamp(boxes[:, :, 0], min=0)
        boxes[:, :, 1] = torch.clamp(boxes[:, :, 1], min=0)

        boxes[:, :, 2] = torch.clamp(boxes[:, :, 2], max=width)
        boxes[:, :, 3] = torch.clamp(boxes[:, :, 3], max=height)

        return boxes


class Anchors(nn.Module):
    def __init__(self, pyramid_levels=None, strides=None, sizes=None, ratios=None, scales=None):
        super(Anchors, self).__init__()

        if pyramid_levels is None:
            self.pyramid_levels = [3, 4, 5, 6, 7]
        if strides is None:
            self.strides = [2 ** x for x in self.pyramid_levels]
        if sizes is None:
            self.sizes = [2 ** (x + 2) for x in self.pyramid_levels]
        if ratios is None:
            self.ratios = np.array([0.5, 1, 2])
        if scales is None:
            self.scales = np.array([2 ** 0, 2 ** (1.0 / 3.0), 2 ** (2.0 / 3.0)])

    def forward(self, image):

        image_shape = image.shape[2:]
        image_shape = np.array(image_shape)
        image_shapes = [(image_shape + 2 ** x - 1) // (2 ** x) for x in self.pyramid_levels]

        all_anchors = np.zeros((0, 4)).astype(np.float32)

        for idx, p in enumerate(self.pyramid_levels):
            anchors = generate_anchors(base_size=self.sizes[idx], ratios=self.ratios, scales=self.scales)
            shifted_anchors = shift(image_shapes[idx], self.strides[idx], anchors)
            all_anchors = np.append(all_anchors, shifted_anchors, axis=0)

        all_anchors = np.expand_dims(all_anchors, axis=0)

        anchors = torch.from_numpy(all_anchors.astype(np.float32))
        if torch.cuda.is_available():
            anchors = anchors.cuda()
        return anchors


def generate_anchors(base_size=16, ratios=None, scales=None):
    if ratios is None:
        ratios = np.array([0.5, 1, 2])

    if scales is None:
        scales = np.array([2 ** 0, 2 ** (1.0 / 3.0), 2 ** (2.0 / 3.0)])

    num_anchors = len(ratios) * len(scales)
    anchors = np.zeros((num_anchors, 4))
    anchors[:, 2:] = base_size * np.tile(scales, (2, len(ratios))).T
    areas = anchors[:, 2] * anchors[:, 3]
    anchors[:, 2] = np.sqrt(areas / np.repeat(ratios, len(scales)))
    anchors[:, 3] = anchors[:, 2] * np.repeat(ratios, len(scales))
    anchors[:, 0::2] -= np.tile(anchors[:, 2] * 0.5, (2, 1)).T
    anchors[:, 1::2] -= np.tile(anchors[:, 3] * 0.5, (2, 1)).T

    return anchors


def compute_shape(image_shape, pyramid_levels):
    image_shape = np.array(image_shape[:2])
    image_shapes = [(image_shape + 2 ** x - 1) // (2 ** x) for x in pyramid_levels]
    return image_shapes


def shift(shape, stride, anchors):
    shift_x = (np.arange(0, shape[1]) + 0.5) * stride
    shift_y = (np.arange(0, shape[0]) + 0.5) * stride
    shift_x, shift_y = np.meshgrid(shift_x, shift_y)
    shifts = np.vstack((
        shift_x.ravel(), shift_y.ravel(),
        shift_x.ravel(), shift_y.ravel()
    )).transpose()

    A = anchors.shape[0]
    K = shifts.shape[0]
    all_anchors = (anchors.reshape((1, A, 4)) + shifts.reshape((1, K, 4)).transpose((1, 0, 2)))
    all_anchors = all_anchors.reshape((K * A, 4))

    return all_anchors

In [5]:
import torch.nn as nn
import torch
import math
from efficientnet_pytorch import EfficientNet as EffNet
from torchvision.ops.boxes import nms as nms_torch


def nms(dets, thresh):
    return nms_torch(dets[:, :4], dets[:, 4], thresh)


class ConvBlock(nn.Module):
    def __init__(self, num_channels):
        super(ConvBlock, self).__init__()
        self.conv = nn.Sequential(
            nn.Conv2d(num_channels, num_channels, kernel_size=3, stride=1, padding=1, groups=num_channels),
            nn.Conv2d(num_channels, num_channels, kernel_size=1, stride=1, padding=0),
            nn.BatchNorm2d(num_features=num_channels, momentum=0.9997, eps=4e-5), nn.ReLU())

    def forward(self, input):
        return self.conv(input)


class BiFPN(nn.Module):
    def __init__(self, num_channels, epsilon=1e-4):
        super(BiFPN, self).__init__()
        self.epsilon = epsilon
        # Conv layers
        self.conv6_up = ConvBlock(num_channels)
        self.conv5_up = ConvBlock(num_channels)
        self.conv4_up = ConvBlock(num_channels)
        self.conv3_up = ConvBlock(num_channels)
        self.conv4_down = ConvBlock(num_channels)
        self.conv5_down = ConvBlock(num_channels)
        self.conv6_down = ConvBlock(num_channels)
        self.conv7_down = ConvBlock(num_channels)

        # Feature scaling layers
        self.p6_upsample = nn.Upsample(scale_factor=2, mode='nearest')
        self.p5_upsample = nn.Upsample(scale_factor=2, mode='nearest')
        self.p4_upsample = nn.Upsample(scale_factor=2, mode='nearest')
        self.p3_upsample = nn.Upsample(scale_factor=2, mode='nearest')

        self.p4_downsample = nn.MaxPool2d(kernel_size=2)
        self.p5_downsample = nn.MaxPool2d(kernel_size=2)
        self.p6_downsample = nn.MaxPool2d(kernel_size=2)
        self.p7_downsample = nn.MaxPool2d(kernel_size=2)

        # Weight
        self.p6_w1 = nn.Parameter(torch.ones(2))
        self.p6_w1_relu = nn.ReLU()
        self.p5_w1 = nn.Parameter(torch.ones(2))
        self.p5_w1_relu = nn.ReLU()
        self.p4_w1 = nn.Parameter(torch.ones(2))
        self.p4_w1_relu = nn.ReLU()
        self.p3_w1 = nn.Parameter(torch.ones(2))
        self.p3_w1_relu = nn.ReLU()

        self.p4_w2 = nn.Parameter(torch.ones(3))
        self.p4_w2_relu = nn.ReLU()
        self.p5_w2 = nn.Parameter(torch.ones(3))
        self.p5_w2_relu = nn.ReLU()
        self.p6_w2 = nn.Parameter(torch.ones(3))
        self.p6_w2_relu = nn.ReLU()
        self.p7_w2 = nn.Parameter(torch.ones(2))
        self.p7_w2_relu = nn.ReLU()

    def forward(self, inputs):
        """
            P7_0 -------------------------- P7_2 -------->
            P6_0 ---------- P6_1 ---------- P6_2 -------->
            P5_0 ---------- P5_1 ---------- P5_2 -------->
            P4_0 ---------- P4_1 ---------- P4_2 -------->
            P3_0 -------------------------- P3_2 -------->
        """

        # P3_0, P4_0, P5_0, P6_0 and P7_0
        p3_in, p4_in, p5_in, p6_in, p7_in = inputs
        # P7_0 to P7_2
        # Weights for P6_0 and P7_0 to P6_1
        p6_w1 = self.p6_w1_relu(self.p6_w1)
        weight = p6_w1 / (torch.sum(p6_w1, dim=0) + self.epsilon)
        # Connections for P6_0 and P7_0 to P6_1 respectively
        p6_up = self.conv6_up(weight[0] * p6_in + weight[1] * self.p6_upsample(p7_in))
        # Weights for P5_0 and P6_0 to P5_1
        p5_w1 = self.p5_w1_relu(self.p5_w1)
        weight = p5_w1 / (torch.sum(p5_w1, dim=0) + self.epsilon)
        # Connections for P5_0 and P6_0 to P5_1 respectively
        p5_up = self.conv5_up(weight[0] * p5_in + weight[1] * self.p5_upsample(p6_up))
        # Weights for P4_0 and P5_0 to P4_1
        p4_w1 = self.p4_w1_relu(self.p4_w1)
        weight = p4_w1 / (torch.sum(p4_w1, dim=0) + self.epsilon)
        # Connections for P4_0 and P5_0 to P4_1 respectively
        p4_up = self.conv4_up(weight[0] * p4_in + weight[1] * self.p4_upsample(p5_up))

        # Weights for P3_0 and P4_1 to P3_2
        p3_w1 = self.p3_w1_relu(self.p3_w1)
        weight = p3_w1 / (torch.sum(p3_w1, dim=0) + self.epsilon)
        # Connections for P3_0 and P4_1 to P3_2 respectively
        p3_out = self.conv3_up(weight[0] * p3_in + weight[1] * self.p3_upsample(p4_up))

        # Weights for P4_0, P4_1 and P3_2 to P4_2
        p4_w2 = self.p4_w2_relu(self.p4_w2)
        weight = p4_w2 / (torch.sum(p4_w2, dim=0) + self.epsilon)
        # Connections for P4_0, P4_1 and P3_2 to P4_2 respectively
        p4_out = self.conv4_down(
            weight[0] * p4_in + weight[1] * p4_up + weight[2] * self.p4_downsample(p3_out))
        # Weights for P5_0, P5_1 and P4_2 to P5_2
        p5_w2 = self.p5_w2_relu(self.p5_w2)
        weight = p5_w2 / (torch.sum(p5_w2, dim=0) + self.epsilon)
        # Connections for P5_0, P5_1 and P4_2 to P5_2 respectively
        p5_out = self.conv5_down(
            weight[0] * p5_in + weight[1] * p5_up + weight[2] * self.p5_downsample(p4_out))
        # Weights for P6_0, P6_1 and P5_2 to P6_2
        p6_w2 = self.p6_w2_relu(self.p6_w2)
        weight = p6_w2 / (torch.sum(p6_w2, dim=0) + self.epsilon)
        # Connections for P6_0, P6_1 and P5_2 to P6_2 respectively
        p6_out = self.conv6_down(
            weight[0] * p6_in + weight[1] * p6_up + weight[2] * self.p6_downsample(p5_out))
        # Weights for P7_0 and P6_2 to P7_2
        p7_w2 = self.p7_w2_relu(self.p7_w2)
        weight = p7_w2 / (torch.sum(p7_w2, dim=0) + self.epsilon)
        # Connections for P7_0 and P6_2 to P7_2
        p7_out = self.conv7_down(weight[0] * p7_in + weight[1] * self.p7_downsample(p6_out))

        return p3_out, p4_out, p5_out, p6_out, p7_out


class Regressor(nn.Module):
    def __init__(self, in_channels, num_anchors, num_layers):
        super(Regressor, self).__init__()
        layers = []
        for _ in range(num_layers):
            layers.append(nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=1, padding=1))
            layers.append(nn.ReLU(True))
        self.layers = nn.Sequential(*layers)
        self.header = nn.Conv2d(in_channels, num_anchors * 4, kernel_size=3, stride=1, padding=1)

    def forward(self, inputs):
        inputs = self.layers(inputs)
        inputs = self.header(inputs)
        output = inputs.permute(0, 2, 3, 1)
        return output.contiguous().view(output.shape[0], -1, 4)


class Classifier(nn.Module):
    def __init__(self, in_channels, num_anchors, num_classes, num_layers):
        super(Classifier, self).__init__()
        self.num_anchors = num_anchors
        self.num_classes = num_classes
        layers = []
        for _ in range(num_layers):
            layers.append(nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=1, padding=1))
            layers.append(nn.ReLU(True))
        self.layers = nn.Sequential(*layers)
        self.header = nn.Conv2d(in_channels, num_anchors * num_classes, kernel_size=3, stride=1, padding=1)
        self.act = nn.Sigmoid()

    def forward(self, inputs):
        inputs = self.layers(inputs)
        inputs = self.header(inputs)
        inputs = self.act(inputs)
        inputs = inputs.permute(0, 2, 3, 1)
        output = inputs.contiguous().view(inputs.shape[0], inputs.shape[1], inputs.shape[2], self.num_anchors,
                                          self.num_classes)
        return output.contiguous().view(output.shape[0], -1, self.num_classes)


class EfficientNet(nn.Module):
    def __init__(self, backbone_name):
        super(EfficientNet, self).__init__()
        model = EffNet.from_pretrained(backbone_name)
        del model._conv_head
        del model._bn1
        del model._avg_pooling
        del model._dropout
        del model._fc
        self.model = model

    def forward(self, x):
        x = self.model._swish(self.model._bn0(self.model._conv_stem(x)))
        feature_maps = []
        for idx, block in enumerate(self.model._blocks):
            drop_connect_rate = self.model._global_params.drop_connect_rate
            if drop_connect_rate:
                drop_connect_rate *= float(idx) / len(self.model._blocks)
            x = block(x, drop_connect_rate=drop_connect_rate)
            if block._depthwise_conv.stride == [2, 2]:
                feature_maps.append(x)

        return feature_maps[1:]


class EfficientDet(nn.Module):
    def __init__(self, num_anchors=9, num_classes=20, compound_coef=0, backbone_name='efficientnet-b0'):
        super(EfficientDet, self).__init__()
        self.compound_coef = compound_coef

        self.num_channels = [64, 88, 112, 160, 224, 288, 384, 384][self.compound_coef]

        self.conv3 = nn.Conv2d(40, self.num_channels, kernel_size=1, stride=1, padding=0)
        self.conv4 = nn.Conv2d(80, self.num_channels, kernel_size=1, stride=1, padding=0)
        self.conv5 = nn.Conv2d(192, self.num_channels, kernel_size=1, stride=1, padding=0)
        self.conv6 = nn.Conv2d(192, self.num_channels, kernel_size=3, stride=2, padding=1)
        
#         self.conv3 = nn.Conv2d(48, self.num_channels, kernel_size=1, stride=1, padding=0)
#         self.conv4 = nn.Conv2d(96, self.num_channels, kernel_size=1, stride=1, padding=0)
#         self.conv5 = nn.Conv2d(232, self.num_channels, kernel_size=1, stride=1, padding=0)
#         self.conv6 = nn.Conv2d(232, self.num_channels, kernel_size=3, stride=2, padding=1)
        
        
        self.conv7 = nn.Sequential(nn.ReLU(),
                                   nn.Conv2d(self.num_channels, self.num_channels, kernel_size=3, stride=2, padding=1))

        self.bifpn = nn.Sequential(*[BiFPN(self.num_channels) for _ in range(min(2 + self.compound_coef, 8))])

        self.num_classes = num_classes
        self.regressor = Regressor(in_channels=self.num_channels, num_anchors=num_anchors,
                                   num_layers=3 + self.compound_coef // 3)
        self.classifier = Classifier(in_channels=self.num_channels, num_anchors=num_anchors, num_classes=num_classes,
                                     num_layers=3 + self.compound_coef // 3)

        self.anchors = Anchors()
        self.regressBoxes = BBoxTransform()
        self.clipBoxes = ClipBoxes()
        self.focalLoss = FocalLoss()

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

        prior = 0.01

        self.classifier.header.weight.data.fill_(0)
        self.classifier.header.bias.data.fill_(-math.log((1.0 - prior) / prior))

        self.regressor.header.weight.data.fill_(0)
        self.regressor.header.bias.data.fill_(0)

        self.backbone_net = EfficientNet(backbone_name=backbone_name)

    def freeze_bn(self):
        for m in self.modules():
            if isinstance(m, nn.BatchNorm2d):
                m.eval()

    def forward(self, inputs):
        if len(inputs) == 2:
            is_training = True
            img_batch, annotations = inputs
        else:
            is_training = False
            img_batch = inputs

        c3, c4, c5 = self.backbone_net(img_batch)
        
        p3 = self.conv3(c3)
        p4 = self.conv4(c4)
        p5 = self.conv5(c5)
        p6 = self.conv6(c5)
        p7 = self.conv7(p6)

        features = [p3, p4, p5, p6, p7]
        features = self.bifpn(features)

        regression = torch.cat([self.regressor(feature) for feature in features], dim=1)
        classification = torch.cat([self.classifier(feature) for feature in features], dim=1)
        anchors = self.anchors(img_batch)

        if is_training:
            return self.focalLoss(classification, regression, anchors, annotations)
        else:
            transformed_anchors = self.regressBoxes(anchors, regression)
            transformed_anchors = self.clipBoxes(transformed_anchors, img_batch)

            scores = torch.max(classification, dim=2, keepdim=True)[0]

            scores_over_thresh = (scores > 0.05)[0, :, 0]

            if scores_over_thresh.sum() == 0:
                return [torch.zeros(0), torch.zeros(0), torch.zeros(0, 4)]

            classification = classification[:, scores_over_thresh, :]
            transformed_anchors = transformed_anchors[:, scores_over_thresh, :]
            scores = scores[:, scores_over_thresh, :]

            anchors_nms_idx = nms(torch.cat([transformed_anchors, scores], dim=2)[0, :, :], 0.5)

            nms_scores, nms_class = classification[0, anchors_nms_idx, :].max(dim=1)

            return [nms_scores, nms_class, transformed_anchors[0, anchors_nms_idx, :]]

In [6]:
root_path = 'vkcv2022-contest-02-carplates/data/'
json_path = 'vkcv2022-contest-02-carplates/data/train.json'
val_size = 0.2

train_transform = A.Compose([A.Resize(640, 640),
                            A.HorizontalFlip(p=0.25),
                            A.Rotate(limit=(-20, 20), p=0.25),
                            A.ColorJitter(brightness=0.3, contrast=0.2, saturation=0.2, hue=0.05, p=0.25)],
                            bbox_params={
                                        'format': 'pascal_voc',
                                        'label_fields': ['labels']
                                        })


val_transform = A.Compose([A.Resize(640, 640)],
                            bbox_params={
                                        'format': 'pascal_voc',
                                        'label_fields': ['labels']
                                        })

trainset = PlatesDataset(root_path=root_path, json_path=json_path, sample_type='train', val_size=val_size, 
                         transform=train_transform)
valset = PlatesDataset(root_path=root_path, json_path=json_path, sample_type='val', val_size=val_size,
                      transform=val_transform)



print('Train size:', len(trainset))
print('Val size:', len(valset))

Train size: 20506
Val size: 5127


In [7]:
from tqdm import tqdm
import pandas as pd

from sklearn.metrics import auc


def get_iou(true_box, pred_box):
    
    ix1 = np.maximum(true_box[0], pred_box[0])
    iy1 = np.maximum(true_box[1], pred_box[1])
    ix2 = np.minimum(true_box[2], pred_box[2])
    iy2 = np.minimum(true_box[3], pred_box[3])
    
    i_height = np.maximum(iy2 - iy1 + 1, np.array(0.))
    i_width = np.maximum(ix2 - ix1 + 1, np.array(0.))
    
    area_of_intersection = i_height * i_width
    
    gt_height = true_box[3] - true_box[1] + 1
    gt_width = true_box[2] - true_box[0] + 1
    
    pd_height = pred_box[3] - pred_box[1] + 1
    pd_width = pred_box[2] - pred_box[0] + 1
    
    area_of_union = gt_height * gt_width + pd_height * pd_width - area_of_intersection
    
    iou = area_of_intersection / area_of_union
    
    return iou


class ModelTrainer():
    
    def __init__(self, model, model_name, train_loader, val_loader, optimizer, device, scheduller=None,
                 conf_thresh=0.5, val_classes=None):
        
        self.model = model
        self.model_name = model_name
        self.device = device
        self.train_loader = train_loader
        self.val_loader = val_loader
        self.optimizer = optimizer
        self.scheduller = scheduller
        
        self.conf_thresh = conf_thresh
        self.val_classes = val_classes
        
        
    def fit_epoch(self):
        
        obj_loss = []
        reg_loss = []
        global_loss = []
        
        self.model.train()
        for idx, data in tqdm(enumerate(self.train_loader)):
            
            imgs = data['img'].to(device)
            targets = data['annot'].to(device)
            self.optimizer.zero_grad()
            
            try:
                losses = self.model((imgs, targets))
            except Exception as e:
                print('Error in training step')
                print(e)
                continue
            
            
            loss = sum([v for v in losses])
            loss.backward()
            
            self.optimizer.step()
            if self.scheduller is not None:
                self.scheduller.step()            
            
            global_loss.append(loss.item())
            obj_loss.append(losses[0].item())
            reg_loss.append(losses[1].item())
            
            if idx % 200 == 0:
                print('Global Loss:', np.mean(global_loss))
                print('Cls Loss:', np.mean(obj_loss))
                print('Reg Loss:', np.mean(reg_loss))
            
        return np.mean(global_loss)
    
    
    def eval_sample(self, pred, target, iou_thresh, result_dict):
        
        pred_bboxes = pred[2].cpu().numpy()
        scores = pred[0].cpu().numpy()
        pred_labels = pred[1].cpu().numpy()
        
        true_bboxes = target[0][:,:4].cpu().numpy()
        true_labels = target[0][:, 4].cpu().numpy()
        
        for i in range(len(pred_bboxes)):
            
            pred_box = pred_bboxes[i]
            pred_label = pred_labels[i]
            
            result_dict['pred_label'].append(pred_label)
            result_dict['score'].append(scores[i])
            
            max_iou = 0
            max_id = -1
            for j in range(len(true_bboxes)):
                
                true_box = true_bboxes[j]
                true_label = true_labels[j]
                
                if true_label != pred_label:
                    continue
                       
                IoU = get_iou(true_box, pred_box)
                if (IoU >= iou_thresh) and (IoU > max_iou):
                    max_iou = IoU
                    max_id = j
            
            if max_id >= 0:
                result_dict['TP'].append(1)
                true_bboxes = np.delete(true_bboxes, max_id, axis=0)
                true_labels = np.delete(true_labels, max_id, axis=0)
                
            else:
                result_dict['TP'].append(0)
                
            
        return result_dict
    
    
    def compute_metrics(self, predictions, targets):
        

        iou_list_results = {str(round(iou, 2)): {class_id: {'precision_list': [], 'recall_list': []} 
                                                 for class_id in self.val_classes} 
                            for iou in np.linspace(0.5, 0.95, 10)}
        result_metrics = {'map_list': [], 'map@50': 0, 'map@50_95': 0, 'precsion': 0, 'recall': 0}
        
        for iou in tqdm(iou_list_results):
            result_dict = {'pred_label': [], 'score': [], 'TP': []}
            n_boxes = {label: 0 for label in self.val_classes}
        
            for pred, true in zip(predictions, targets):
                result_dict = self.eval_sample(pred, true, float(iou), result_dict)
                for label in true[0]:
                    n_boxes[label[4].item()] += 1
        
            df = pd.DataFrame(result_dict)
            iou_list_results[iou]['ap_list'] = []
            
            if iou == '0.5':
                iou_list_results[iou]['precision_list'] = []
                iou_list_results[iou]['recall_list'] = []
            
            for class_id in self.val_classes:
                df_per_class = df[df['pred_label'] == class_id]
                df_per_class = df_per_class.sort_values(by='score', ascending=False)
                
                
                TP = 0
                preds_count = 0
                for idx, row in df_per_class.iterrows():
                    TP += row['TP']
                    preds_count += 1
                    
                    precision = TP/preds_count
                    recall = TP/n_boxes[class_id]
                    
                    iou_list_results[iou][class_id]['precision_list'].append(precision)
                    iou_list_results[iou][class_id]['recall_list'].append(recall)
                    
                    if (iou == '0.5') and (row['score'] >= self.conf_thresh-0.02) and (row['score'] <= self.conf_thresh+0.02):
                        iou_list_results[iou][class_id]['precision'] = precision
                        iou_list_results[iou][class_id]['recall'] = recall
                
                
                iou_list_results[iou][class_id]['AP'] = auc(iou_list_results[iou][class_id]['recall_list'],
                                                                iou_list_results[iou][class_id]['precision_list'])
                
                iou_list_results[iou]['ap_list'].append(iou_list_results[iou][class_id]['AP'])
                
                if iou == '0.5':
                    try:
                        iou_list_results[iou]['precision_list'].append(iou_list_results[iou][class_id]['precision'])
                        iou_list_results[iou]['recall_list'].append(iou_list_results[iou][class_id]['recall'])
                    except:
                        iou_list_results[iou]['precision_list'].append(0)
                        iou_list_results[iou]['recall_list'].append(0)
            
        for iou in iou_list_results:
            if iou == '0.5':
                result_metrics['precision'] = np.mean(iou_list_results[iou]['precision_list'])
                result_metrics['recall'] = np.mean(iou_list_results[iou]['recall_list'])
            
            result_metrics['map_list'].append(np.mean(iou_list_results[iou]['ap_list']))
        
        result_metrics['map@50'] = result_metrics['map_list'][0]
        result_metrics['map@50_95'] = np.mean(result_metrics['map_list'])
        
            
        return result_metrics
            
        
    @torch.no_grad()
    def eval_epoch(self):
        
        self.model.eval()
        
        predictions = []
        answers = []
        for idx, data in tqdm(enumerate(self.val_loader)):
            
            imgs = data['img'].to(device)
            targets = data['annot'].to(device)
            
            outputs = self.model(imgs)
            
            
            predictions.append(outputs)
            answers.append(targets)
            
        result = self.compute_metrics(predictions, answers)
        
        return result
    
    
    def train_net(self, num_epochs):
        
        best_map = 0
        for epoch in range(1, num_epochs+1):
            
            train_loss = self.fit_epoch()
            result_metrics = self.eval_epoch()
            
            if result_metrics['map@50_95'] >= best_map:
                best_map = result_metrics['map@50_95']
                torch.save(self.model, f'SaveModels/{self.model_name}_{epoch}.pth')
            
            with open(f'Logs/{self.model_name}.txt', 'a') as f:
                string = f"Precision={result_metrics['precision']} Recall={result_metrics['recall']} MAP50={result_metrics['map@50']} MAP50_95={result_metrics['map@50_95']}\n"
                f.write(string)
            
            print('Epoch:', epoch)
            print('Precsion:', result_metrics['precision'])
            print('Recall:', result_metrics['recall'])
            print('MAP@50:', result_metrics['map@50'])
            print('MAP@50_95:', result_metrics['map@50_95'])
            
                        
            

    

In [None]:
from torch import nn
import torch

efficient_det_b3 = EfficientDet(num_classes=2, backbone_name='efficientnet-b0')
    
for param in efficient_det_b3.backbone_net.parameters():
    param.requires_grad = False
    
for param in efficient_det_b3.backbone_net.model._blocks[15].parameters():
    param.requires_grad = True
    
for param in efficient_det_b3.backbone_net.model._blocks[14].parameters():
    param.requires_grad = True
    
for param in efficient_det_b3.backbone_net.model._blocks[13].parameters():
    param.requires_grad = True
    
    
for name, param in efficient_det_b3.named_parameters():
    print(name, param.requires_grad)
    



model_name = 'efficientnet_b0_v2'

device = 'cuda'

num_epochs = 20
lr = 3e-4
batch_size = 16
num_workers = 8

#efficient_det_b3 = nn.DataParallel(efficient_det_b3, device_ids=[0, 2, 4, 6])

efficient_det_b3.to(device)
optimizer = torch.optim.AdamW(efficient_det_b3.parameters(), lr=lr)


train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True, 
                          collate_fn=collater, num_workers=num_workers)
val_loader = DataLoader(valset, batch_size=1, shuffle=False, 
                        collate_fn=collater, num_workers=num_workers)


scheduller = torch.optim.lr_scheduler.OneCycleLR(optimizer=optimizer, max_lr=lr, steps_per_epoch=len(train_loader), 
                                                epochs=num_epochs, pct_start=0.05, anneal_strategy='cos')


model_trainer = ModelTrainer(model=efficient_det_b3, model_name=model_name, train_loader=train_loader, val_loader=val_loader,
                            optimizer=optimizer, scheduller=scheduller, device=device, val_classes=[1])

model_trainer.train_net(num_epochs=num_epochs)

