In [1]:
import os
import cv2
import numpy as np
import pandas as pd
import torch
from pathlib import Path

import xml.etree.ElementTree as ET

In [2]:
images_path = Path('./Datasets/images')
anno_path = Path('./Datasets/annotations')

In [3]:
def filelist(root, file_type):
    """Returns a fully-qualified list of filenames under root directory"""
    return [os.path.join(directory_path, f) for directory_path, directory_name, 
            files in os.walk(root) for f in files if f.endswith(file_type)]

def generate_train_df (anno_path):
    annotations = filelist(anno_path, '.xml')
    anno_list = []
    for anno_path in annotations:
        root = ET.parse(anno_path).getroot()
        anno = {}
        anno['filename'] = Path(str(images_path) + '/'+ root.find("./filename").text)
        anno['bb_info'] = [int(root.find("./object/bndbox/xmin").text),int(root.find("./object/bndbox/ymin").text),int(root.find("./object/bndbox/xmax").text),int(root.find("./object/bndbox/ymax").text)]
        anno['class'] = root.find("./object/name").text
        anno_list.append(anno)
    return pd.DataFrame(anno_list)


In [4]:
df_train = generate_train_df(anno_path)

In [5]:
df_train

Unnamed: 0,filename,bb_info,class
0,Datasets/images/road712.png,"[98, 140, 139, 182]",speedlimit
1,Datasets/images/road706.png,"[136, 92, 177, 135]",speedlimit
2,Datasets/images/road289.png,"[61, 140, 146, 227]",stop
3,Datasets/images/road538.png,"[115, 169, 149, 205]",speedlimit
4,Datasets/images/road510.png,"[89, 201, 133, 245]",speedlimit
...,...,...,...
872,Datasets/images/road535.png,"[100, 254, 180, 334]",speedlimit
873,Datasets/images/road284.png,"[111, 133, 165, 187]",speedlimit
874,Datasets/images/road290.png,"[105, 157, 171, 224]",speedlimit
875,Datasets/images/road723.png,"[115, 185, 160, 230]",speedlimit


In [6]:
#label encode target
class_dict = {'speedlimit': 0, 'stop': 1, 'crosswalk': 2, 'trafficlight': 3}
df_train['class'] = df_train['class'].apply(lambda x:  class_dict[x])

In [7]:
import albumentations as A
from albumentations.pytorch import ToTensorV2

In [8]:
transform = A.Compose([
    A.HorizontalFlip(p=0.5),
    A.ColorJitter(brightness=0.6, contrast=0.6, saturation=0.6, hue=0.6, p=0.4),
    A.Resize(height=300, width=447, p=1.0),
    ToTensorV2()
], bbox_params=A.BboxParams(format='pascal_voc', label_fields=[]))

In [9]:
y_transform = A.Compose([
    A.Resize(height=300, width=447, p=1.0),
    ToTensorV2()
], bbox_params=A.BboxParams(format='pascal_voc', label_fields=[]))

In [10]:
X = df_train[['filename', 'bb_info']]
Y = df_train['class']

In [11]:
from sklearn.model_selection import train_test_split

X_train, X_val, y_train, y_val = train_test_split(X, Y, test_size=0.2, random_state=42)

In [12]:
from torch.utils.data import Dataset, DataLoader

class RoadDataset(Dataset):
    def __init__(self, paths, bb, y, transforms=False):
        self.transforms = transforms
        self.paths = paths.values
        self.bb = bb.values
        self.y = y.values
    def __len__(self):
        return len(self.paths)
    
    def __getitem__(self, idx):
        path = self.paths[idx]
        y_class = self.y[idx]
        bb_info = self.bb[idx]
        
        image = cv2.imread(str(path))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        
        if self.transforms:

            tf_result = self.transforms(image=image , bboxes=[bb_info], class_labels=y_class)
            x = tf_result['image']
            y_bb = tf_result['bboxes'][0]
            y_bb = torch.tensor(y_bb)

        
        
        return x, y_class, y_bb

In [13]:
train_ds = RoadDataset(X_train['filename'],X_train['bb_info'] ,y_train, transforms=transform)
valid_ds = RoadDataset(X_val['filename'],X_val['bb_info'],y_val,transforms=y_transform)

In [14]:
batch_size = 64
train_dl = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
valid_dl = DataLoader(valid_ds, batch_size=batch_size)

In [15]:
import torch

import torch.optim as optim
import torch.nn as nn
import torch.nn.functional as F
from torchvision import models

In [16]:
class BB_model(nn.Module):
    def __init__(self):
        super(BB_model, self).__init__()
        resnet = models.resnet34(pretrained=True)
        layers = list(resnet.children())[:8]
        self.features1 = nn.Sequential(*layers[:6])
        self.features2 = nn.Sequential(*layers[6:])
        self.classifier = nn.Sequential(nn.BatchNorm1d(512), nn.Linear(512, 4))
        self.bb = nn.Sequential(nn.BatchNorm1d(512), nn.Linear(512, 4))
        
    def forward(self, x):
        x = self.features1(x)
        x = self.features2(x)
        x = F.relu(x)
        x = nn.AdaptiveAvgPool2d((1,1))(x)
        x = x.view(x.shape[0], -1)
        return self.classifier(x), self.bb(x)

In [17]:
def update_optimizer(optimizer, lr):
    for i, param_group in enumerate(optimizer.param_groups):
        param_group["lr"] = lr

In [18]:
def train_epocs(model, optimizer, train_dl, val_dl, epochs=10,C=1000):
    idx = 0
    for i in range(epochs):
        model.train()
        total = 0
        sum_loss = 0
        for x, y_class, y_bb in train_dl:
            batch = y_class.shape[0]
            x = x.float() / 255
            y_class = y_class
            y_bb = y_bb
            out_class, out_bb = model(x)
            loss_class = F.cross_entropy(out_class, y_class, reduction="sum")
            
            loss_bb = F.l1_loss(out_bb, y_bb, reduction="none").sum(1)
                 
            loss_bb = loss_bb.sum()
            loss = loss_class + loss_bb/C
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            idx += 1
            total += batch
            sum_loss += loss.item()
        train_loss = sum_loss/total
        val_loss, val_acc = val_metrics(model, valid_dl, C)
        print("train_loss %.3f val_loss %.3f val_acc %.3f" % (train_loss, val_loss, val_acc))
    return sum_loss/total

In [19]:
def val_metrics(model, valid_dl, C=1000):
    model.eval()
    total = 0
    sum_loss = 0
    correct = 0 
    for x, y_class, y_bb in valid_dl:
        batch = y_class.shape[0]
        x = x.float() / 255
        y_class = y_class
        y_bb = y_bb
        out_class, out_bb = model(x)
        loss_class = F.cross_entropy(out_class, y_class, reduction="sum")
        
        loss_bb = F.l1_loss(out_bb, y_bb, reduction="none").sum(1)
        
        loss_bb = loss_bb.sum()
        loss = loss_class + loss_bb/C
        _, pred = torch.max(out_class, 1)
        correct += pred.eq(y_class).sum().item()
        sum_loss += loss.item()
        total += batch
    return sum_loss/total, correct/total

In [20]:
model = BB_model()
parameters = filter(lambda p: p.requires_grad, model.parameters())
optimizer = torch.optim.Adam(parameters, lr=0.006)

In [None]:
train_epocs(model, optimizer, train_dl, valid_dl, epochs=10)

train_loss 2.584 val_loss 364059.312 val_acc 0.057
train_loss 1.779 val_loss 396.109 val_acc 0.727
train_loss 1.573 val_loss 9.757 val_acc 0.733
train_loss 1.525 val_loss 1.383 val_acc 0.727


In [None]:
update_optimizer(optimizer, 0.001)
train_epocs(model, optimizer, train_dl, valid_dl, epochs=10)