In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn
from torchvision.datasets.voc import VOCDetection
from torchvision import transforms
from torch.utils.data import DataLoader
from torch.optim import Adam

In [None]:
architecture_config = [
    (7, 64, 2, 3),
    "M",
    (3, 192, 1, 1),
    "M",
    (1, 128, 1, 0),
    (3, 256, 1, 1),
    (1, 256, 1, 0),
    (3, 512, 1, 1),
    "M",
    [(1, 256, 1, 0), (3, 512, 1, 1), 4],
    (1, 512, 1, 0),
    (3, 1024, 1, 1),
    "M",
    [(1, 512, 1, 0), (3, 1024, 1, 1), 2],
    (3, 1024, 1, 1),
    (3, 1024, 2, 1),
    (3, 1024, 1, 1),
    (3, 1024, 1, 1),
]


class block(nn.Module):
    def __init__(self, in_channels, out_channels, **kwargs):
        super(block, self).__init__()
        self.conv = nn.Conv2d(in_channels=in_channels, out_channels=out_channels, bias=False, **kwargs)
        self.batchnorm = nn.BatchNorm2d(num_features=out_channels)
        self.leaky_relu = nn.LeakyReLU(negative_slope=0.1)
    def forward(self, x):
        return self.leaky_relu(self.batchnorm(self.conv(x)))


class Yolo(nn.Module):
    def __init__(self, in_channels=3, grid_size=7, num_boxes=2, num_classes=20):
        super(Yolo, self).__init__()
        self._in_channels = in_channels
        self._grid_size = grid_size
        self._num_boxes = num_boxes
        self._num_classes = num_classes
        self.avgpool = nn.AdaptiveAvgPool2d((448, 448))
        self.architecture = architecture_config
        self.conv_part = self._create_conv_layers(
            architecture=self.architecture,
            in_channels=self._in_channels
            )
        self.fcs = self._create_fc_layers(
            grid_size=self._grid_size,
            num_boxes=self._num_boxes,
            num_classes=self._num_classes
            )
        
    def forward(self, x):
        S = self._grid_size
        B = self._num_boxes
        C = self._num_classes 
        x = self.avgpool(x)
        x = self.conv_part(x)
        x = x.view(x.shape[0], -1)
        x = self.fcs(x)
        x = x.view(-1, S, S, C + B * 5)
        return x

    @staticmethod    
    def _create_fc_layers(grid_size, num_boxes, num_classes):
        S, B, C = grid_size, num_boxes, num_classes
        return nn.Sequential(nn.Linear(1024 * S * S, 4096), 
                             nn.Dropout(p=0.5),
                             nn.LeakyReLU(negative_slope=0.1),
                             nn.Linear(4096, S * S * (C + B * 5)),
                             nn.ReLU())

    @staticmethod           
    def _create_conv_layers(architecture, in_channels):
        layers = list()
        for x in architecture:
            if isinstance(x, tuple):
                kernel_size = x[0]
                out_channels = x[1]
                stride = x[2]
                padding = x[3]
                layers.append(block(in_channels=in_channels, out_channels=out_channels, kernel_size=kernel_size,
                                    stride=stride, padding=padding))
                in_channels = x[1]
                
            elif isinstance(x, str):
                layers.append(nn.MaxPool2d(kernel_size=2, stride=2))
                
            elif isinstance(x, list):
                layer1 = x[0]
                layer2 = x[1]
                times = x[2]
                
                for _ in range(times):
                    layers.append(block(in_channels=in_channels,
                                        out_channels=layer1[1],
                                        kernel_size=layer1[0],
                                        stride=layer1[2],
                                        padding=layer1[3]))
                    layers.append(block(in_channels=layer1[1],
                                        out_channels=layer2[1],
                                        kernel_size=layer2[0],
                                        stride=layer2[2],
                                        padding=layer2[3]))
                    in_channels = layer2[1]
          
        return nn.Sequential(*layers)
        

In [None]:
import torch
import torch.nn as nn


def _return_corner_coordinates(boxes, box_format):
    if box_format == "midpoint":
        x = boxes[..., 0:1]
        y = boxes[..., 1:2]
        boxes_width = boxes[..., 2:3]
        boxes_height = boxes[..., 3:4]
        box_x1 = x - boxes_width / 2
        box_y1 = y - boxes_height / 2
        box_x2 = x + boxes_width / 2
        box_y2 = y + boxes_height / 2
    elif box_format == "corners":
        box_x1 = boxes[..., 0:1]
        box_y1 = boxes[..., 1:2]
        box_x2 = boxes[..., 2:3]
        box_y2 = boxes[..., 3:4]
    return box_x1, box_y1, box_x2, box_y2


def intersection_over_union(boxes_preds, boxes_labels, box_format="midpoint"):
    box1_x1, box1_y1, box1_x2, box1_y2 = _return_corner_coordinates(boxes_preds, box_format=box_format)
    box2_x1, box2_y1, box2_x2, box2_y2 = _return_corner_coordinates(boxes_labels, box_format=box_format)
    # Intersection rectangle corner coordinates
    x1 = torch.max(box1_x1, box2_x1)
    y1 = torch.max(box1_y1, box2_y1)
    x2 = torch.min(box1_x2, box2_x2)
    y2 = torch.min(box1_y2, box2_y2)

    dx = x2 - x1
    dy = y2 - y1
    intersection_area = dx.clamp(0) * dy.clamp(0)  # in case they don't intersect
    box1_area = torch.abs((box1_x1 - box1_x2) * (box1_y1 - box1_y2))
    box2_area = torch.abs((box2_x1 - box2_x2) * (box2_y1 - box2_y2))
    union_area = box1_area + box2_area - intersection_area
    return intersection_area / union_area

def get_object_loss(target, bestbox, predictions, exists_box, s=-9):
    pred_box = bestbox * predictions[..., 25+s:26+s] + (1 - bestbox) * predictions[..., 20+s:21+s]
    mse = nn.MSELoss(reduction="sum")
    object_loss = mse(exists_box * pred_box, 
            exists_box * target[..., 20+s:21+s])
    return object_loss

def get_no_object_loss(exists_box, predictions, target, s=-9):
    mse = nn.MSELoss(reduction="sum")
    return (
        mse((1. - exists_box) * predictions[..., 20+s:21+s], (1. - exists_box) * target[..., 20+s:21+s]) +
        mse((1. - exists_box) * predictions[..., 25+s:26+s], (1. - exists_box) * target[..., 20+s:21+s])
    )

def get_class_loss(exists_box, predictions, target, s=-9):
    mse = nn.MSELoss(reduction="sum")
    return mse(exists_box * predictions[..., :20+s], exists_box * target[..., :20+s])



class YoloLoss(nn.Module):
    def __init__(self, grid_size=7, num_boxes=2, num_classes=20):
        super(YoloLoss, self).__init__()
        self.mse = nn.MSELoss(reduction="sum")
        self.S = grid_size
        self.B = num_boxes
        self.C = num_classes
        self.lambda_coord = 5
        self.lambda_noobj = 0.5
        
    def forward(self, predictions, target):  # (N, S, S, C + B * 5)
        
        # Determine "responsible" bounding box predictor (based on the highest IoU)
        s = self.C - 20
        iou_b1 = intersection_over_union(predictions[..., 21+s:25+s], target[..., 21+s:25+s])
        iou_b2 = intersection_over_union(predictions[..., 26+s:30+s], target[..., 21+s:25+s])
        ious = torch.cat([iou_b1.unsqueeze(dim=0), iou_b2.unsqueeze(dim=0)], dim=0)
        iou_maxes, bestbox = torch.max(ious, dim=0)
        exists_box = target[..., 20+s].unsqueeze(3)  # I_obj (zero or one)
        
        # Box coordinates loss
        
        box_predictions = exists_box * (bestbox * predictions[..., 26+s:30+s] + (1 - bestbox) * predictions[..., 21+s:25+s])
        box_targets = exists_box * target[..., 21+s:25+s]
        # box_predictions = box_predictions1.clone()
        # box_targets = box_targets1.clone()
        
        box_predictions[..., 2:4] = torch.sign(box_predictions[..., 2:4]) * torch.sqrt(torch.abs(box_predictions[..., 2:4] + 1e-6))
        box_targets[..., 2:4] = torch.sqrt(box_targets[..., 2:4])
        
        # (N, S, S, 4) -> (N*S*S, 4)
        box_loss = self.mse(box_predictions, box_targets)
        
        object_loss = get_object_loss(
            target=target,
            bestbox=bestbox,
            predictions=predictions,
            exists_box=exists_box,
            s=-9,
        )
        no_object_loss = get_no_object_loss(
            exists_box=exists_box,
            predictions=predictions,
            target=target,
            s=-9,
        )
        class_loss = get_class_loss(
            exists_box=exists_box,
            predictions=predictions,
            target=target,
            s=-9,
        )

        loss = (
            self.lambda_coord * box_loss +
            object_loss +
            self.lambda_noobj * no_object_loss + 
            class_loss
        )
        return loss

In [None]:
transforms = transforms.Compose([
        transforms.ToTensor(),
        transforms.Resize((333, 500)),
        transforms.RandomRotation(30),
        transforms.RandomHorizontalFlip(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])])

In [None]:
S = 7
B = 2
C = 20
NUM_CHANNELS = 3

In [None]:
def class_names2classes(class_name):
    classes = ["person", "bird", "cat", "cow", "dog", "horse", "sheep", "aeroplane", "bicycle", "boat", "bus", "car", "motorbike", "train", "bottle", "chair", "diningtable", "pottedplant", "sofa", "tvmonitor"]
    for i in range(len(classes)):
        if class_name == classes[i]:
          return i

In [None]:
def collate_fn(batch):
    data = tuple(zip(*batch))
    x_b, y_b = data
    y_batch = []
    for i in range(len(y_b)):
        y = y_b[i]
        y_label = []
        x_size, y_size = y['annotation']['size']['width'], y['annotation']['size']['height']
        for item in y['annotation']['object']:
            class_name = item['name']
            bndbox = item['bndbox']
            class_label = int(class_names2classes(class_name))
            xmin, xmax, ymin, ymax = int(bndbox["xmin"]) / int(x_size), int(bndbox["xmax"]) / int(x_size), int(bndbox["ymin"]) / int(y_size), int(bndbox["ymax"]) / int(y_size)
            y_label.append([class_label, (xmax+xmin) / 2, (ymax+ymin) / 2, xmax-xmin, ymax-ymin])
        y_batch.append(y_label)
    
    label_matrices = []
    for y_ in y_batch:
        label_matrix = torch.zeros((S, S, C + 5 * B))
        for box in y_:
            class_label, x, y, width, height = box
            i, j = int(S * y), int(S * x)
            x_cell, y_cell = S * x - j, S * y - i
            width_cell, height_cell = (
                width * S,
                height * S,
            )

            if label_matrix[i, j, 20] == 0:
                label_matrix[i, j, 20] = 1
                box_coordinates = torch.tensor(
                    [x_cell, y_cell, width_cell, height_cell]
                )
                label_matrix[i, j, 21:25] = box_coordinates
                label_matrix[i, j, class_label] = 1 
        label_matrices.append(label_matrix)                   
    return torch.stack(x_b, dim=0), torch.stack(label_matrices, dim=0)  

In [None]:
train_dataset = VOCDetection(root="dataset/", image_set='train', transform=transforms, download=False)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True, collate_fn=collate_fn)

In [None]:

test_dataset = VOCDetection(root="dataset/",  image_set='val', transform=transforms, download=False)

test_loader = DataLoader(test_dataset, batch_size=16, collate_fn=collate_fn)

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
model = Yolo(NUM_CHANNELS, S, B, C + 5*B).to(device)

opt = Adam(model.parameters(), lr=2*10-4)
loss_fn = YoloLoss()

In [None]:
iterator = iter(train_loader)
inputs, labels = next(iterator)
inputs.shape, labels.shape

(torch.Size([16, 3, 333, 500]), torch.Size([16, 7, 7, 30]))

In [None]:
from tqdm import tqdm
from time import sleep

num_epochs = 50

for epoch in range(num_epochs):
    with tqdm(train_loader, unit="batch") as tepoch:
        running_loss = 0.
        for x_batch, y_batch in tepoch:
            tepoch.set_description(f"Epoch {epoch}")

            y_batch = y_batch.to(device)
            x_batch = x_batch.to(device)
            opt.zero_grad()
            preds = model(x_batch)
            loss = loss_fn(preds, y_batch)
            
            loss.backward()
            opt.step()
            running_loss += loss.item()
            tepoch.set_postfix(loss=loss.item())
            sleep(0.1)
        print("running_loss = {}".format(running_loss))    