In [1]:
import os
from tqdm import tqdm
import matplotlib.pyplot as plt

import const
import utils

from sklearn.model_selection import train_test_split

import torch
import torchvision
from torch.utils.data import DataLoader
from torch.optim import SGD
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

##### Loading custom train and test datasets

In [None]:
img_paths = sorted(os.listdir("./DATASET/IMAGES"))
annotation_paths = sorted(os.listdir("./DATASET/ANNOTATIONS"))

# load Image objects
IMAGES = []
for i in zip(img_paths, annotation_paths):

    img_path = f"./DATASET/IMAGES/{i[0]}"
    annotation_path = f"./DATASET/ANNOTATIONS/{i[1]}"

    img, boxes, labels = utils.get_image_data(img_path, annotation_path)

    preprocessed_img, preprocessed_boxes = utils.preprocess(img, boxes)

    IMAGES.append(const.Image(preprocessed_img, preprocessed_boxes, labels))

# obtain images, bounding boxes and labels as Tensors
IMAGES_TENSOR = [i.get_tensor_image() for i in IMAGES]
BOX_TENSOR = [i.get_tensor_image_data()[0] for i in IMAGES]
LABEL_TENSOR = [i.get_tensor_image_data()[1] for i in IMAGES]

print(f"Total of {len(IMAGES)} images loaded")

In [3]:
# obtain train and test sets

train_images, test_images, train_boxes, test_boxes, train_labels, test_labels = train_test_split(
    IMAGES_TENSOR, BOX_TENSOR, LABEL_TENSOR, 
    test_size=0.1, random_state=88)

In [None]:
# create custom train and test dataset
train_dataset = const.CustomDataset(train_images, train_boxes, train_labels)
test_dataset = const.CustomDataset(test_images, test_boxes, test_labels)

print(f"Train set : {len(train_dataset.imgs)} images")
print(f"Test set : {len(test_dataset.imgs)} images")

In [5]:
# create DataLoader for both train and test sets ; provide an iterable for each set
def custom_collate(data):
    return data

train_data_loader = DataLoader(
    dataset=train_dataset,
    batch_size=4,
    #num_workers = 2,
    #timeout = 60,
    shuffle=True,
    collate_fn=custom_collate,
    pin_memory=True if torch.cuda.is_available() else False
)

test_data_loader = DataLoader(
    dataset=train_dataset,
    batch_size=4,
    #num_workers = 2,
    #timeout = 60,
    shuffle=True,
    collate_fn=custom_collate,
    pin_memory=True if torch.cuda.is_available() else False
)

##### Model training

In [6]:
# initialising model (Faster R-CNN using ResNet-50 with FPN as backbone model)
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model = fasterrcnn_resnet50_fpn(weights='DEFAULT')

In [7]:
# change prediction head to have 3 classes
num_classes = 3
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

In [None]:
# shift model to device
model.to(device)
print("Model shifted to device:", device)

In [9]:
# create a Stochastic Gradient Descent optimizer
# requires_grad = True ; gradients need to be computed for the parameter
parameters = [i for i in model.parameters() if i.requires_grad == True]
optimizer = SGD(
    parameters,
    lr=1e-5,
    momentum=0.9,
    weight_decay=0.0005
)

In [None]:
num_epochs = 5

for epoch in range(num_epochs):

    epoch_loss = 0

    for data in tqdm(train_data_loader, desc=f"Epoch {epoch+1}"):

        imgs = []
        targets = []

        for d in data:
            imgs.append(d[0].float().to(device))
            targets.append({'boxes':d[1]['boxes'].to(device), 'labels':d[1]['labels'].to(device)})

        loss_dict = model(imgs, targets)
        loss = sum(v for v in loss_dict.values())

        epoch_loss += loss.cpu().detach().numpy()

        optimizer.zero_grad()

        loss.backward()

        optimizer.step()
    
    print(epoch_loss)

In [None]:
model.eval()
print("Model switched to evaluation mode")

In [39]:
test_data = iter(test_data_loader).__next__()

test_img = test_data[0][0]
test_box = test_data[0][1]["boxes"]
test_label = test_data[0][1]["labels"]

In [40]:
output = model([test_img.to(device)])

output_box = output[0]['boxes']
output_score = output[0]['scores']
output_label = output[0]['labels']

In [None]:
len(output_box)

In [None]:
keep = torchvision.ops.nms(output_box, output_score, 0.1)

keep_box = output_box[keep].cpu().detach().numpy().astype('int32')
keep_label = output_label[keep].cpu().detach().numpy()
keep_label_mapped = [const.REVERSE_LABEL_MAPPING[i] for i in keep_label]

print(len(keep), "objects kept")

In [43]:
TEST = const.Image(
    test_img.permute(1,2,0).cpu().numpy().astype('uint8'),
    keep_box, keep_label_mapped)

In [44]:
TEST.show()