**Import section**

In [None]:
import torch
import torchvision
import cv2
import glob
import numpy as np
import pandas as pd
import os
import matplotlib.pyplot as plt
import torch.nn as nn
import warnings
from tqdm import tqdm
from PIL import Image
from xml.etree import ElementTree as et
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.ssd import SSDClassificationHead
from torch.optim import lr_scheduler
from torchvision.ops import nms
from collections import Counter
from torchvision import transforms
import torch.optim as optim
from torchvision.ops import nms
from torch.optim.lr_scheduler import StepLR

**Defining image size**

In [None]:
size = 720

**Dataset class**

In [None]:
class DataSet(Dataset):
    
    transform = transforms.Compose([
        transforms.ToPILImage(),
        transforms.Resize((size, size)),
        transforms.ToTensor(),
        transforms.ColorJitter(brightness=0.1, contrast=0.1),
        transforms.RandomGrayscale(p=0.1),
        transforms.GaussianBlur(kernel_size=(5, 9), sigma=(0.1, 5)),
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225]),
    ])
    
    def __init__(self, root, transforms=None):
        self.root = root # Root folder (annotation)
        self.labels = ['bg', 'car'] # All dataset's labels
        self.l2i = { label: i for i, label in enumerate(self.labels)} # Forward "Encoder"
        self.imgs = sorted([root + '/images/' + i for i in os.listdir(root + '/images')]) # Image paths
        self.xmls = sorted([root + '/annotation/' + i for i in os.listdir(root + '/annotation')]) # XML paths

    def __len__(self):
        return len(self.imgs)

    def __getitem__(self, index):
        img_path = self.imgs[index]
        xml_path = self.xmls[index]

        # Image augmentations
        
        image = cv2.imread(img_path)
        H, W, _ = image.shape
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        transformed_image = self.transform(image)
        
        # Parsing XML file
        tree = et.parse(xml_path)
        root = tree.getroot()
        boxes = []
        labels = []
            
        for item in root.findall('.//shapes/item'):
            label = item.find('label').text
            points = item.find('points')
            xmin, ymin = [float(point.text) for point in points[0]]
            xmax, ymax = [float(point.text) for point in points[1]]

            xmin, xmax = min(xmin, xmax), max(xmin, xmax)
            ymin, ymax = min(ymin, ymax), max(ymin, ymax)
            bbox = [xmin / W, ymin / H, xmax / W, ymax / H]
            bbox = (bbox * np.array([size, size, size, size])).astype(np.int16).tolist()
            boxes.append(bbox)
            labels.append(label)
            
        
        # Defining "target"
        target = {}
        target['labels'] = torch.as_tensor([self.l2i[label] for label in labels])
        target['boxes'] = torch.as_tensor(boxes)

        return transformed_image, target

    def collate_fn(self, batch):
        return tuple(zip(*batch))

data_folder = "/kaggle/input/cars-final-dataset/car"

dataset = DataSet(data_folder)

train_size = int(0.7 * len(dataset))
val_size = len(dataset) - train_size

train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

train_dataloader = DataLoader(train_dataset, batch_size=8,shuffle=True, collate_fn=dataset.collate_fn, drop_last=True)
val_dataloader = DataLoader(val_dataset, batch_size=8, shuffle=True, collate_fn=dataset.collate_fn, drop_last=True)

**Draw batch function**

In [None]:
import matplotlib.pyplot as plt
import torchvision.transforms.functional as F
from torchvision.utils import draw_bounding_boxes
import numpy as np
import torch

def draw_batch_with_boxes(batch):
    images, targets = batch
    fig, axs = plt.subplots(2, 4, figsize=(20, 10)) # Adjust the size as needed
    axs = axs.flatten()

    for i, (image, target) in enumerate(zip(images, targets)):
        # Convert image tensor to uint8 format for draw_bounding_boxes
        image_uint8 = (image * 255).type(torch.uint8)

        # Draw bounding boxes
        boxes = target['boxes']
        labels = [dataset.labels[l.item()] for l in target['labels']]
        boxes_pil = draw_bounding_boxes(image_uint8, boxes, labels=labels, width=2, colors="blue")

        # Convert back to PIL to display
        image_with_boxes = F.to_pil_image(boxes_pil)

        # Plotting
        axs[i].imshow(np.array(image_with_boxes))
        axs[i].axis('off')

    plt.tight_layout()
    plt.show()

# Example usage with a batch from the validation dataloader
batch = next(iter(val_dataloader))
draw_batch_with_boxes(batch)


**mAP metric**

In [None]:
# "Helper" function
def intersection_over_union(boxes_preds, boxes_labels):

    box1_x1 = boxes_preds[..., 0:1]
    box1_y1 = boxes_preds[..., 1:2]
    box1_x2 = boxes_preds[..., 2:3]
    box1_y2 = boxes_preds[..., 3:4]
    box2_x1 = boxes_labels[..., 0:1]
    box2_y1 = boxes_labels[..., 1:2]
    box2_x2 = boxes_labels[..., 2:3]
    box2_y2 = boxes_labels[..., 3:4]

    x1 = torch.max(box1_x1, box2_x1)
    y1 = torch.max(box1_y1, box2_y1)
    x2 = torch.min(box1_x2, box2_x2)
    y2 = torch.min(box1_y2, box2_y2)

    intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)
    box1_area = abs((box1_x2 - box1_x1) * (box1_y2 - box1_y1))
    box2_area = abs((box2_x2 - box2_x1) * (box2_y2 - box2_y1))

    return intersection / (box1_area + box2_area - intersection + 1e-6)

# mAP function
def mAP(predictions, ground_truths, iou_threshold=0.5, num_classes=2):
    # predictions (list of lists) : [[idx_of_image, class, score, x1, y1, x2, y2], ...]
    # ground_truths (list of lists) : [[idx_of_image, class, x1, y1, x2, y2], ...]


    # Preprocessing data

    # Reformatting predictions
    reformatted_predictions = []
    for image_number, prediction in enumerate(predictions):
        boxes = prediction["boxes"].cpu().detach().tolist()
        labels = prediction["labels"].cpu().detach().tolist()
        scores = prediction["scores"].cpu().detach().tolist()
        for idx, box in enumerate(boxes):
            add = [image_number, labels[idx], scores[idx], box[0], box[1], box[2], box[3]]
            reformatted_predictions.append(add)

    # Reformatting ground_truths
    reformatted_ground_truths = []
    for image_number, ground_truth in enumerate(list(ground_truths)):
        boxes = ground_truth["boxes"].tolist()
        labels = ground_truth["labels"].tolist()

        for i in range(len(boxes)):
            reformatted_prediction = [image_number, labels[i], boxes[i][0], boxes[i][1], boxes[i][2], boxes[i][3]]
            reformatted_ground_truths.append(reformatted_prediction)


    ious = np.arange(0.5, 0.95, 0.05)
    avg_precisions = []
    eps = 1e-6
    results = []

    # Code of the funcion below
    # 1) We need to group boxes with the same class
    # 2) On the first step we need to group boxes with the same image index
    for IOU in ious:
        for c in range(num_classes):
            detection_in_cur_class = []
            ground_truths_in_cur_class = []

            # Adding each one prediciton which has the 'c' class to handling ("c" in our case may be in {0,1,2,3} - num_classes)
            for prediction in reformatted_predictions:
                if prediction[1] == c:
                    detection_in_cur_class.append(prediction)
            # Adding each one gt box which has the 'c' class to handling
            for ground_truth in reformatted_ground_truths:
                if ground_truth[1] == c:
                    ground_truths_in_cur_class.append(ground_truth)

            # Dictionary, where key is an image index and value is a number of ground truth boxes there
            boxes_per_image = Counter([ground_truth[0] for ground_truth in ground_truths_in_cur_class])
            # Updating our dictionary, as we need to know if we covered some gt_boxes
            for image_idx, num_gt_boxes in boxes_per_image.items():
                boxes_per_image[image_idx] = torch.zeros(num_gt_boxes)

            # Sorting predictions over scores in descending order
            detection_in_cur_class.sort(key=lambda x: x[2], reverse=True)
            # Initializing things through which we will get AP
            TP = torch.zeros(len(detection_in_cur_class))
            FP = torch.zeros(len(detection_in_cur_class))
            Total_ground_truths = len(ground_truths_in_cur_class)


            for detection_idx, detection in enumerate(detection_in_cur_class):
                # GT boxes with the same class and image as "detection"
                ground_truths_same_image = [box for box in ground_truths_in_cur_class if box[0] == detection[0]]
                number_gts_for_particular_detection = len(ground_truths_same_image)

                max_iou = 0
                best_ground_truth_idx = 0

                for idx, ground_truth in enumerate(ground_truths_same_image):
                    iou = intersection_over_union(torch.tensor(detection[3:]), torch.tensor(ground_truth[2:]))

                    if iou > max_iou:
                        max_iou = iou
                        best_ground_truth_idx = idx

                if max_iou > IOU:
                    if boxes_per_image[detection[0]][best_ground_truth_idx] == 0:
                        TP[detection_idx] = 1
                        boxes_per_image[detection[0]][best_ground_truth_idx] = 1
                    else:
                        FP[detection_idx] = 1
                else:
                    FP[detection_idx] = 1
            TP_cumsum = torch.cumsum(TP, dim=0)
            FP_cumsum = torch.cumsum(FP, dim=0)
            recalls = TP_cumsum / (Total_ground_truths + eps)
            precisions = torch.divide(TP_cumsum, (TP_cumsum + FP_cumsum + eps))
            precisions = torch.cat((torch.tensor([1]), precisions))
            recalls = torch.cat((torch.tensor([0]), recalls))
            avg_precisions.append(torch.trapz(precisions, recalls))
        results.append(sum(avg_precisions) / len(avg_precisions))
    return sum(results) / len(results)

**Model defining**

In [None]:
# Non pretrained
model = torchvision.models.detection.ssd300_vgg16()
num_classes = 2
anchors = model.anchor_generator.num_anchors_per_location()
out_channels = [512, 1024, 512, 256, 256, 256]
model.head.classification_head = SSDClassificationHead(out_channels, anchors, num_classes)

In [None]:
from torchvision.models.detection import SSD300_VGG16_Weights

# Pretrained - CHOOSE IT
def create_model(num_classes=2):
    # Load the Torchvision pretrained model.
    model = torchvision.models.detection.ssd300_vgg16(
        weights=SSD300_VGG16_Weights.COCO_V1
    )
    
    num_classes = 2
    anchors = model.anchor_generator.num_anchors_per_location()
    out_channels = [512, 1024, 512, 256, 256, 256]
    model.head.classification_head = SSDClassificationHead(out_channels, anchors, num_classes)
    return model

model = create_model(2)

**Train/Eval loop**

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
    print("GPU доступен. Используем GPU.")
else:
    device = torch.device("cpu")
    print("GPU не доступен. Используем CPU.")

model.to(device)

In [None]:

# optimizer = torch.optim.SGD(model.parameters(), lr=0.001,weight_decay=5e-4, momentum=0.9)
optimizer = optim.Adam(model.parameters(), lr=0.0001, betas=(0.9, 0.999), eps=1e-08) # - CHOOSE IT
scheduler = StepLR(optimizer, step_size=30, gamma=0.1)
n_epochs = 13

l_t_history = []
l_v_history = []
mAP_history = []
best_mAP_SSD = 0
for epoch in range(n_epochs):
    loss_train_history = 0
    loss_train_regression = 0
    loss_train_classification = 0

    model.train()
    with tqdm(train_dataloader, desc=f"Epoch {epoch+1}") as pbar:
        for i, data in enumerate(pbar):
            images, targets = data
            images = [image.to(device) for image in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            optimizer.zero_grad()
            output = model(images, targets)
            bbox_loss = output['bbox_regression']
            classification_loss = output['classification']
            losses = bbox_loss + classification_loss
            losses.backward()
            optimizer.step()

            loss_train_history += losses.cpu().item()
            loss_train_regression += bbox_loss.cpu().item()
            loss_train_classification += classification_loss.cpu().item()

        scheduler.step()
        l_t_history.append(loss_train_history)
        print(f"Classifier training loss: {loss_train_classification / len(train_dataloader)}")
        print(f"Box regression training loss: {loss_train_regression / len(train_dataloader)}")


    
model.eval()
map_per_batch = []
with tqdm(val_dataloader, desc=f"mAP evaluating") as pbar:
    with torch.no_grad():
        for i, data in enumerate(pbar):
            images, targets = data
            images = [image.to(device) for image in images]

            optimizer.zero_grad()
            predictions = model(images)
            map_this_batch = mAP(predictions, targets)
            if map_this_batch > best_mAP_SSD:
                best_mAP_SSD = map_this_batch
            map_per_batch.append(map_this_batch)

# Uncomment to mAP 
# mAP_history.append(sum(map_per_batch) / len(map_per_batch))
# print(f"Custom mAP on this epoch is: {sum(map_per_batch) / len(map_per_batch)}")
# print("Saving weights...")
torch.save(model.state_dict(), 'model.pth') # Save our trained weights



**Image printing**

In [None]:
i2l = { i: label for i, label in enumerate(['bg', 'car'])}
def decode_print(image, prediction):
    # Preprocessing prediction

    boxes = prediction[0]["boxes"].cpu().detach()
    labels = np.array([i2l[i] for i in prediction[0]['labels'].cpu().detach().numpy()])
    scores = prediction[0]["scores"].cpu().detach()

    # Computing indexes with NonMaxSupression
    indexes = nms(boxes, scores, 0.1).tolist()

    boxes = boxes.tolist()
    labels.tolist()
    scores.tolist()

    for index in indexes:
        if (scores[index] > 0.2):
            x1, y1, x2, y2 = list(map(int, boxes[index]))

            w = abs(x2 - x1)
            h = abs(y2 - y1)


            text_size, _ = cv2.getTextSize(labels[index], cv2.FONT_HERSHEY_SIMPLEX, 1.0, 2)
            text_origin = (x1, y1)

            image = cv2.rectangle(image, (x1, y1), (x2, y2), (0, 255, 0), 3)
            image = cv2.putText(image, labels[index] + " " + str("{:.3f}".format(scores[index].item())), text_origin, cv2.FONT_HERSHEY_SIMPLEX, 0.5, 255, 1)

    plt.imshow(image)

**Loading Weights + Inference**

In [None]:
model.load_state_dict(torch.load('/kaggle/input/baseline-weights/FirstBse.pth'))

img = Image.open("/kaggle/input/test-files/motorway.jpg").convert("RGB")
img_numpy = np.array(img.resize((720, 720), resample=Image.Resampling.BILINEAR)) / 255.
img = torch.from_numpy(img_numpy.astype('float32')).permute(2,0,1) # converting np_array, H,W,C -> torch_tensor, C,H,W
img = img[None, :]

model.eval()
img = img.to(device)
prediction = model(img)

decode_print(img_numpy, prediction)

**Saving model to ONNX format**

In [None]:
model.eval()

dummy_input = Image.open("/kaggle/input/test-files/motorway.jpg").convert("RGB")
img_numpy = np.array(dummy_input.resize((720, 720), resample=Image.Resampling.BILINEAR)) / 255.
dummy_input = torch.from_numpy(img_numpy.astype('float32')).permute(2,0,1) # converting np_array, H,W,C -> torch_tensor, C,H,W
dummy_input = dummy_input[None, :] # unsqueeze(0)
dummy_input = dummy_input.to(device)

torch.onnx.export(model,
                  dummy_input,
                  "/kaggle/working/onnx_model.onnx", 
                  export_params=True,
                  do_constant_folding=True,
                  input_names = ['input'], 
                  output_names = ['output'])