In [1]:
import xml.etree.ElementTree as ET
import os
import cv2

In [2]:
classes = ['bicycle','trailer','caravan', 'truck', 'motorcycle', 'car', 'vehicle fallback', 'rider', 'bus', 'autorickshaw', 'person', 'animal', 'traffic sign', 'train', 'traffic light']
train_info_file = '/work/Assignment2/dataset_info/test.txt'
val_info_file = '/work/Assignment2/dataset_info/train.txt'
labels_path = '/work/Assignment2/dataset_info/val.txt'
annotations_path = '/work/Detection/idd-detection/IDD_Detection/Annotations/'
old_images_path = '/work/Detection/idd-detection/IDD_Detection/JPEGImages/'
images_path = '/work/Assignment2/Images/'
model_dir = "/work/Assignment2/Models" 

In [3]:
os.makedirs(model_dir, exist_ok=True)
os.makedirs(labels_path, exist_ok=True)
with open(train_info_file, 'r') as f:
    train_img_paths = [line.strip() for line in f]
f.close()
with open(val_info_file, 'r') as f:
    val_img_paths = [line.strip() for line in f]
f.close()

FileExistsError: [Errno 17] File exists: '/work/Assignment2/dataset_info/val.txt'

In [None]:
def convert(size, box, scale_x, scale_y):
    dw = (float(1.0) / (size[0])) 
    dh = (float(1.0)/ (size[1])) 
    box0 = max(float(1.0), box[0])
    box2 = max(float(1.0), box[2])
    box1 = min(size[0] - 1.0, box[1])
    box3 = min(size[1] - 1.0, box[3])
        
    x = ((box0 + box1) / float(2.0) ) 
    y = ((box2 + box3) / float(2.0) ) 
    w = max((box1 - box0), float(1.0)) 
    h = max((box3 - box2), float(1.0))  
    x *= dw
    w *= dw
    y *= dh
    h *= dh  
    return (x, y, w, h)

def convert_labels(annotations_path, labels_path, annotations):
    for annotation in annotations:
        folder_path = annotation.rsplit('/', 1)[0]
        os.makedirs(labels_path + '/' + folder_path, exist_ok=True)
        in_file = open(annotations_path + '/' + annotation + '.xml')
        out_file = open(labels_path + '/' + annotation + '.txt', 'w')

        tree = ET.parse(in_file)
        root = tree.getroot()
        size = root.find('size')
        w = int(size.find('width').text)
        h = int(size.find('height').text)
        scale_x = float(416.0) / float(w)
        scale_y = float(416.0) / float(h)
#         image =cv2.imread(old_images_path + '/' + annotation + '.jpg')
#         image = cv2.resize(src=image, dsize=(416, 416))
#         os.makedirs(images_path + '/' + folder_path, exist_ok=True)
#         cv2.imwrite(images_path + '/' + annotation + '.jpg', image)
        
        
        for obj in root.iter('object'):
            cls = obj.find('name').text

#             if cls not in classes:
#                 continue

            cls_idx = classes.index(cls)
            box = obj.find('bndbox')
            b = (float(box.find('xmin').text),
                 float(box.find('xmax').text),
                 float(box.find('ymin').text),
                 float(box.find('ymax').text))
            
            bbox = convert((w, h), b , scale_x, scale_y)
            out_file.write(str(cls_idx) + " " + 
                           " ".join([str(bb) for bb in bbox]) + '\n')

        in_file.close()
        out_file.close()
    return classes

In [None]:
annotations = train_img_paths + val_img_paths
# convert_labels(annotations_path, labels_path, annotations)

In [None]:
#   ! pip install albumentations

In [None]:
#  !pip install opencv-python==4.5.5.64

In [None]:
#  !pip install PyQt5

In [None]:
#  ! pip install torchsummary

In [None]:
import albumentations 
import cv2
from albumentations.pytorch import ToTensorV2

In [None]:
img_size = 416

In [None]:
train_transforms = albumentations.Compose(
    [
        albumentations.LongestMaxSize(max_size=int(img_size * 1.1)),
        albumentations.PadIfNeeded(
            min_height=int(img_size * 1.1),
            min_width=int(img_size * 1.1),
            border_mode=cv2.BORDER_CONSTANT,
        ),
        albumentations.RandomCrop(width=img_size, height=img_size),
        albumentations.ColorJitter(brightness=0.6, contrast=0.6, 
                    saturation=0.6, hue=0.6, p=0.4),
        albumentations.ShiftScaleRotate(
            rotate_limit=20, p=0.5, border_mode=cv2.BORDER_CONSTANT
        )
        ,
        albumentations.HorizontalFlip(p=0.5),
        albumentations.Blur(p=0.1),
        albumentations.CLAHE(p=0.1),
        albumentations.Posterize(p=0.1),
        albumentations.ToGray(p=0.1),
        albumentations.ChannelShuffle(p=0.05),
        albumentations.Normalize(mean=[0, 0, 0], std=[1, 1, 1], max_pixel_value=255,),
        ToTensorV2(),
    ],
    bbox_params=albumentations.BboxParams(format='yolo',
                         min_visibility=0.4, 
                         label_fields=[]),
)

test_transforms = albumentations.Compose(
    [
        albumentations.LongestMaxSize(max_size=int(img_size)), 
        albumentations.PadIfNeeded(
            min_height=int(img_size),
            min_width=int(img_size),
            border_mode=cv2.BORDER_CONSTANT,
        ),  
        albumentations.Normalize(mean=[0, 0, 0], std=[1, 1, 1], max_pixel_value=255,),
        ToTensorV2(),           
    ],
    bbox_params=albumentations.BboxParams(format='yolo',
                             min_visibility=0.4, 
                             label_fields=[]),
)


In [None]:
from torch.utils.data import TensorDataset, Dataset, DataLoader
import numpy as np
from PIL import Image
import PyQt5

In [None]:
def iou(box1, box2):
    intersection = (torch.min(box1[..., 0], box2[..., 0]) *
                  torch.min(box1[..., 1], box2[..., 1]))

    union = (box1[..., 0] * box1[..., 1] +
            box2[..., 0] * box2[..., 1] -
            intersection)

    return intersection / (union + 1e-5)

In [None]:
class IDDDataset(Dataset):
    def __init__(self, annotations, image_dir, label_dir, anchors, image_size=416,
                 strides=[13, 26, 52], classes=len(classes), transforms=None):
        self.annotations = annotations
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.image_size = image_size
        self.transforms = transforms
        self.strides = strides
        self.anchors = torch.tensor(anchors[0] + anchors[1] + anchors[2])
        self.num_anchors = self.anchors.shape[0]
        self.num_anchors_per_scale = torch.div(self.num_anchors, 3, rounding_mode='floor')
        self.classes = classes
        self.ignore_iou_thresh = 0.5

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, index):
        label_path = os.path.join(self.label_dir, self.annotations[index] + '.txt')
        bboxes = np.roll(np.loadtxt(fname=label_path, delimiter=' ', ndmin=2), 4, axis=1).tolist()
        img_path = os.path.join(self.image_dir, self.annotations[index] + '.jpg')
        image = np.array(Image.open(img_path).convert('RGB'))
        if self.transforms:
            augments = self.transforms(image=image, bboxes=bboxes)
            image = augments['image']
            bboxes = augments['bboxes']

        targets = [torch.zeros((torch.div(self.num_anchors, 3, rounding_mode='floor'), S, S, 6)) for S in self.strides]
        for box in bboxes:
            
            iou_anchors = iou(torch.tensor(box[2:4]), self.anchors)
            anchor_indices = iou_anchors.argsort(descending=True, dim=0)
            x, y, w, h, c = box
            has_anchor = [False] * 3

            for anchor_idx in anchor_indices:
                scale_idx = torch.div(anchor_idx, self.num_anchors_per_scale, rounding_mode='floor')
                anchor_on_scale = anchor_idx % self.num_anchors_per_scale
                S = self.strides[scale_idx]
                i, j = int(S * y), int(S * x)
                anchor_taken = targets[scale_idx][anchor_on_scale, i, j, 0]
                if not anchor_taken and not has_anchor[scale_idx]:
                    targets[scale_idx][anchor_on_scale, i, j, 0] = 1
                    x_c, y_c = S * x - j, S * y - i
                    w_c, h_c = (w * S, h * S)
                    box_coordinates = torch.tensor([x_c, y_c, w_c, h_c])
                    targets[scale_idx][anchor_on_scale, i, j, 1:5] = box_coordinates
                    targets[scale_idx][anchor_on_scale, i, j, 5] = int(c)
                    has_anchor[scale_idx] = True
                elif not anchor_taken and iou_anchors[anchor_idx] > self.ignore_iou_thresh:
                    targets[scale_idx][anchor_on_scale, i, j, 0] = -1
        return image, tuple(targets)

In [None]:
import torch.nn as nn
import math

In [None]:
mse = nn.MSELoss()
bce = nn.BCEWithLogitsLoss()
entropy = nn.CrossEntropyLoss()

In [None]:
def intersection_over_union(box_pred, box_true, ciou=False):

    box1_x1 = box_pred[..., 0:1] - box_pred[..., 2:3] / 2
    box1_y1 = box_pred[..., 1:2] - box_pred[..., 3:4] / 2
    box1_x2 = box_pred[..., 0:1] + box_pred[..., 2:3] / 2
    box1_y2 = box_pred[..., 1:2] + box_pred[..., 3:4] / 2
    box2_x1 = box_true[..., 0:1] - box_true[..., 2:3] / 2
    box2_y1 = box_true[..., 1:2] - box_true[..., 3:4] / 2
    box2_x2 = box_true[..., 0:1] + box_true[..., 2:3] / 2
    box2_y2 = box_true[..., 1:2] + box_true[..., 3:4] / 2

    x1 = torch.max(box1_x1, box2_x1)
    y1 = torch.max(box1_y1, box2_y1)
    x2 = torch.min(box1_x2, box2_x2)
    y2 = torch.min(box1_y2, box2_y2)

    intersection = (x2 - x1).clamp(0) * (y2 - y1).clamp(0)
    box1_area = abs((box1_x2 - box1_x1) * (box1_y2 - box1_y1))
    box2_area = abs((box2_x2 - box2_x1) * (box2_y2 - box2_y1))

    return intersection / (box1_area + box2_area - intersection + 1e-6)

In [None]:
def Loss(predictions, target, anchors):
    obj = target[..., 0] == 1
    noobj = target[..., 0] == 0 

    no_object_loss = bce(
        (predictions[..., 0:1][noobj]), (target[..., 0:1][noobj]),
    )

    anchors = anchors.reshape(1, 3, 1, 1, 2)
    box_preds = torch.cat([torch.sigmoid(predictions[..., 1:3]), torch.exp(predictions[..., 3:5]) * anchors], dim=-1)
    ious = intersection_over_union(box_preds[obj], target[..., 1:5][obj]).detach()
    object_loss = mse(torch.sigmoid(predictions[..., 0:1][obj]), ious * target[..., 0:1][obj])

    predictions[..., 1:3] = torch.sigmoid(predictions[..., 1:3])
    target[..., 3:5] = torch.log(
        (1e-16 + target[..., 3:5] / anchors)
    )
    box_loss = mse(predictions[..., 1:5][obj], target[..., 1:5][obj])


    class_loss = entropy(
        (predictions[..., 5:][obj]), (target[..., 5][obj].long()),
    )

    return (10 * box_loss + 1 * object_loss + 10 * no_object_loss + 1 * class_loss)

In [None]:
layer_shapes = [
    ('convolution', 32, 3, 1),      
    ('convolution', 64, 3, 2), 
    ('residual', 1),
    ('convolution', 128, 3, 2),
    ('residual', 2),
    ('convolution', 256, 3, 2),
    ('residual', 8),
    ('convolution', 512, 3, 2),
    ('residual', 8),
    ('convolution', 1024, 3, 2),
    ('residual', 4),
    ('convolution', 512, 1, 1),
    ('convolution', 1024, 3, 1),
    ('prediction'),
    ('convolution', 256, 1, 1),
    ('upsample'),
    ('convolution', 256, 1, 1),
    ('convolution', 512, 3, 1),
    ('prediction'),
    ('convolution', 128, 1, 1),
    ('upsample'),
    ('convolution', 128, 1, 1),
    ('convolution', 256, 3, 1),
    ('predicton'),
]

In [None]:
import torch
from torchsummary import summary

In [None]:
class Conv(nn.Module):
    def __init__(self, in_channels, out_channels, batch=True, act=True, **kwargs):
        super().__init__()
        layers = []

        layers.append(nn.Conv2d(in_channels, out_channels, bias=not batch, **kwargs))

        if batch:
            layers.append(nn.BatchNorm2d(out_channels))
        if act:
            layers.append(nn.LeakyReLU(0.1, inplace=True))

        self.block = nn.Sequential(*layers)

    def forward(self, x):
        return self.block(x)

class Prediction(nn.Module):
    def __init__(self, in_channels, classes):
        super().__init__()

        self.layer = nn.Sequential(
            Conv(in_channels, 2*in_channels, kernel_size=3, padding=1),
            Conv(2*in_channels, (classes+5)*3,  batch=False, act=False, kernel_size=1),
        )
        self.classes = classes

    def forward(self, x):
        x = self.layer(x)
        x = x.reshape(x.shape[0], 3, self.classes+5, x.shape[2], x.shape[3])
        x = x.permute(0, 1, 3, 4, 2)
        return x


class Residual(nn.Module):
    def __init__(self, channels, num_blocks, res=True):
        super().__init__()
        self.layers = nn.ModuleList()
        self.num_blocks = num_blocks

        for _ in range(num_blocks):
            self.layers.append(
                nn.Sequential(
                    Conv(channels, torch.div(channels, 2, rounding_mode='floor'), kernel_size=1),
                    Conv(torch.div(channels, 2, rounding_mode='floor'), channels, kernel_size=3, padding=1)
                )
            )
        self.res = res
    
    def forward(self, x):
        for layer in self.layers:
            if self.res:
                x = x + layer(x)
            else:
                x = layer(x)
        return x
    
class Model(nn.Module):
    def __init__(self, in_channels, num_of_classes ):
        super().__init__()

        self.in_channels = in_channels
        self.num_of_classes  = num_of_classes 

        self.layers = self.create_model()

    def forward(self, x):
        outs = []
        route_conn = []

        for layer in self.layers:
            if isinstance(layer, Prediction):
                outs.append(layer(x))
                continue
            x = layer(x)

            if isinstance(layer, Residual) and layer.num_blocks == 8:
                route_conn.append(x)
            elif isinstance(layer, nn.Upsample):
                x = torch.cat([x, route_conn.pop()], dim=1)
        return outs

    def create_model(self):
        layers = nn.ModuleList()
        in_channels = self.in_channels

        for shape in layer_shapes:
            if shape[0] == 'convolution':
                _, out_channels, kernel_size, stride = shape
                padding = 0
                if kernel_size == 3:
                    padding = 1
                layers.append(
                    Conv(
                        in_channels,
                        out_channels,
                        kernel_size=kernel_size,
                        stride=stride,
                        padding=padding
                    )
                )
                in_channels = out_channels

            if shape[0] == 'residual':
                _, blocks = shape
                layers.append(Residual(in_channels, blocks))

            if shape[0] == 'u':
                layers.append(nn.Upsample(scale_factor=2),)
                in_channels = in_channels * 3
            
            if shape[0] == 'p':
                layers += [
                    Residual(in_channels, 1, res=False),
                    Conv(in_channels, torch.div(in_channels, 2, rounding_mode='floor'), kernel_size=1),
                    Prediction(torch.div(in_channels, 2, rounding_mode='floor'), self.num_of_classes )
                ]
                in_channels = torch.div(in_channels, 2, rounding_mode='floor')
        return layers

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
input_channels  = 3
num_of_classes = len(classes)

In [None]:
model = Model(input_channels, num_of_classes ).to(device)
summary(model, (input_channels, 416, 416))

In [None]:
from tqdm import tqdm

In [None]:
conf_threshold = 0.05
def get_accuracy(true, pred, device):

    total_c, correct_c = 0, 0
    total_n, correct_n = 0, 0
    total_o, correct_o = 0, 0

    for i in range(3):
        true[i] = true[i].to(device)
        obj = true[i][..., 0] == 1
        noobj = true[i][..., 0] == 0
        correct_c += torch.sum(
                torch.argmax(pred[i][..., 5][obj], dim=-1) == true[i][..., 5][obj]
            )
        total_c += torch.sum(obj)

        obj_preds = torch.sigmoid(pred[i][..., 0]) > conf_threshold
        correct_o += torch.sum(obj_preds[obj] == true[i][..., 0][obj])
        total_o += torch.sum(obj)
        correct_n += torch.sum(obj_preds[noobj] == true[i][..., 0][noobj])
        total_n += torch.sum(noobj)

    acc_c = correct_c / total_c * 100
    acc_o = correct_o / total_o * 100
    acc_n = correct_n / total_n * 100

    return acc_c.item(), acc_o.item(), acc_n.item()

In [None]:
def train_step(model, train_loader, optimizer, loss, scaled_anchors, scaler, device='cuda'):
    tq = tqdm(train_loader, leave=True, desc="Train")
    losses = []
    accuracy = []

    model.train()

    for i, (data, target) in enumerate(tq):
        data = data.to(device)
        t0, t1, t2 = target[0].to(device), target[1].to(device), target[2].to(device)

        with torch.cuda.amp.autocast():
            out = model(data)

            l = (loss(out[0], t0, scaled_anchors[0]) +
                 loss(out[1], t1, scaled_anchors[1]) +
                 loss(out[2], t2, scaled_anchors[2]))

        losses.append(l.item())
        optimizer.zero_grad()
        scaler.scale(l).backward()
        scaler.step(optimizer)
        scaler.update()
        l.detach()

        mean_loss = sum(losses) / len(losses)
        tq.set_postfix(loss=mean_loss)

        acc = get_accuracy(target, out, device)
        accuracy.append(acc)

    mean_loss = sum(losses) / len(losses)
    avg_acc = np.array(accuracy).mean(axis=0)
    return mean_loss, avg_acc

In [None]:

def val(model, val_loader, optimizer, loss, scaled_anchors, scaler, device='cuda'):
    tq = tqdm(val_loader, leave=True, desc="Val")
    losses = []
    accuracy = []

    model.eval()

    with torch.no_grad():
        for i, (data, target) in enumerate(tq):
            data = data.to(device)
            t0, t1, t2 = target[0].to(device), target[1].to(device), target[2].to(device)
            out = model(data)

            l = (loss(out[0], t0, scaled_anchors[0]) +
                 loss(out[1], t1, scaled_anchors[1]) +
                 loss(out[2], t2, scaled_anchors[2]))

            losses.append(l.detach().item())

            mean_loss = sum(losses) / len(losses)
            tq.set_postfix(loss=mean_loss)

            acc = get_accuracy(target, out, device)
            accuracy.append(acc)

    mean_loss = sum(losses) / len(losses)
    avg_acc = np.array(accuracy).mean(axis=0)
    return mean_loss, avg_acc


In [None]:
def train(model, train_loader, val_loader, optimizer, 
          loss, anchors, strides, start_epoch, epochs, best_loss, name="test_name", device='cuda'):
    scaled_anchors = (torch.tensor(anchors) * 
                     torch.tensor(strides).unsqueeze(1).unsqueeze(1).repeat(1, 3, 2))

    scaled_anchors = scaled_anchors.to(device)

    best_model = model.state_dict()
    scaler = torch.cuda.amp.GradScaler()

    history = {}
    history['train_loss'] = []
    history['train_acc'] = []
    history['val_loss'] = []
    history['val_acc'] = []

    history['test_loss'] = 0
    history['test_acc'] = 0

    for epoch in range(1 + start_epoch, 1+epochs):
        train_loss, train_acc = train_step(model, train_loader, 
                                           optimizer, loss, scaled_anchors,
                                           scaler, device)
        history['train_loss'].append(train_loss)
        history['train_acc'].append(train_acc)

        val_loss, val_acc = val(model, val_loader, 
                                optimizer, loss, 
                                scaled_anchors, scaler, device)

        history['val_loss'].append(val_loss)
        history['val_acc'].append(val_acc)

        if best_loss == best_loss and (best_loss is None or val_loss < best_loss):
            best_model = model.state_dict()
            best_loss = val_loss
    
            path = model_dir + "/"+ str(epoch) +"_epoch_model_{:.2f}".format(val_loss) +'.ph'
            torch.save({
                'epoch': epoch,
                'model_state_dict': model.state_dict(),
                'optimizer_state_dict': optimizer.state_dict(),
                'loss': best_loss,
            }, path)
    
        last_weight_path = model_dir + '/'+ 'last_weight.ph'
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'loss': best_loss,
        }, last_weight_path)


#     model.load_state_dict(best_model)

    return history

In [None]:
print(len(train_img_paths))
print(len(val_img_paths))

In [None]:
import torch.optim as optim

In [None]:
batch_size = 16
learning_rate = 1e-5
weight_decay = 1e-4
num_of_epochs = 150
anchors = [
    [(0.28, 0.22), (0.38, 0.48), (0.9, 0.78)],
    [(0.07, 0.15), (0.15, 0.11), (0.14, 0.29)],
    [(0.02, 0.03), (0.04, 0.07), (0.08, 0.06)],
]
strides=[13, 26, 52]

In [None]:
train_dataset = IDDDataset(train_img_paths, images_path, labels_path,
                             anchors, image_size=img_size,
                             strides=[13, 26, 52], classes=len(classes),
                             transforms=train_transforms)
train_loader = DataLoader(train_dataset, shuffle=True, batch_size=batch_size)

val_dataset = IDDDataset(val_img_paths, images_path, labels_path,
                             anchors, image_size=img_size,
                             strides=[13, 26, 52], classes=len(classes),
                             transforms=test_transforms)
val_loader = DataLoader(val_dataset, shuffle=True, batch_size=batch_size)

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

start_epoch = 0
best_loss = None

load =True

if load:
    checkpoint = torch.load("/work/Detection/idd-detection/IDD_Detection/Models/last_weight.ph")
    start_epoch =  checkpoint['epoch']
    model.load_state_dict(checkpoint['model_state_dict'])
    optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
    best_loss = checkpoint['loss']
    

history = train(model, train_loader, val_loader, optimizer, 
          Loss, anchors, strides, start_epoch, num_of_epochs, best_loss, 'w', device)