In [None]:
import os
import torch
import torch.utils.data
import torchvision
from PIL import Image
from pycocotools.coco import COCO
import cv2
import matplotlib.pyplot as plt
# import albumentations as A
# from albumentations.pytorch import ToTensorV2
import numpy as np
device = 'cuda'
from torchmetrics.detection.mean_ap import MeanAveragePrecision
from torch.optim.lr_scheduler import MultiStepLR
import time
import os
plt.style.use('ggplot')
from tqdm import tqdm
seed = 42
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.cuda.manual_seed_all(seed)

from custom_utils import (
    Averager, 
    SaveBestModel, 
    save_model, 
    save_loss_plot,
    save_mAP
)

In [None]:
class myOwnDataset(torch.utils.data.Dataset):
    def __init__(self, root, annotation, transforms=None):
        self.root = root
        self.transforms = transforms
        self.coco = COCO(annotation)
        self.ids = list(sorted(self.coco.imgs.keys()))
        self.diction = {1: 0, 2: 1, 3: 2, 4: 3, 6: 4,  7:5, 8: 6, 10: 7, 
                   11: 8, 12: 9, 17: 10, 37: 11, 73: 12, 77: 13, 79: 14, 'other':14
                   }
    def __getitem__(self, index):
        linix = 1
        # Own coco file
        coco = self.coco
        # Image ID
        img_id = self.ids[index]
        # List: get annotation id from coco
        ann_ids = coco.getAnnIds(imgIds=img_id)
        # Dictionary: target coco_annotation file for an image
        coco_annotation = coco.loadAnns(ann_ids)
        # print(coco_annotation)
        # path for input image
        path = coco.loadImgs(img_id)[0]['file_name']
        # open the input image
        img = Image.open(os.path.join(self.root, path))
        # number of objects in the image
        num_objs = len(coco_annotation)

        # Bounding boxes for objects
        # In coco format, bbox = [xmin, ymin, width, height]
        # In pytorch, the input should be [xmin, ymin, xmax, ymax]
        labels = []
        boxes = []
        for i in range(num_objs):
            linix = 2
            xmin = coco_annotation[i]['bbox'][0]
            ymin = coco_annotation[i]['bbox'][1]
            xmax = xmin + coco_annotation[i]['bbox'][2]
            ymax = ymin + coco_annotation[i]['bbox'][3]
            xmin, ymin, xmax,ymax = xmin, ymin, xmax,ymax
#              or int(xmax) == int(ymin) or int(xmax) ==int(ymax) or int(xmin)==(ymin) or int(xmin) ==int(ymax)
            if int(xmax) == int(xmin) or int(ymin)==int(ymax):
                continue
            boxes.append([xmin, ymin, xmax, ymax])

            try:
                labels.append(self.diction[coco_annotation[i]['category_id']])
            except:
                labels.append(self.diction['other'])
 
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        # Labels (In my case, I only one class: target class or background)
#         labels = torch.ones((num_objs,), dtype=torch.int64)
        if linix == 1:
            boxes = torch.zeros(0,4)
#             labels = torch.zeros((1,),dtype= torch.int64)
        # Tensorise img_id
        img_id = torch.tensor([img_id])
        # Size of bbox (Rectangular)
        areas = []
        for i in range(num_objs):
            areas.append(coco_annotation[i]['area'])
        areas = torch.as_tensor(areas, dtype=torch.float32)
        # Iscrowd
        iscrowd = torch.zeros((num_objs,), dtype=torch.int64)
        # Annotation is in dictionary format
        my_annotation = {}
        my_annotation["boxes"] = boxes
        my_annotation["labels"] = labels
        my_annotation["image_id"] = img_id
        my_annotation["area"] = areas
        my_annotation["iscrowd"] = iscrowd

        if self.transforms is not None:
            img = self.transforms(img)

        return img, my_annotation

    def __len__(self):
        return len(self.ids)

In [None]:
# torchvision.transforms.Resize((300,300)), torchvision.transforms.Grayscale(num_output_channels=1)
def get_transform():
    custom_transforms = [  torchvision.transforms.Grayscale(num_output_channels=1)# above is for: randomly selecting one for process
        ]
    custom_transforms.append(torchvision.transforms.ToTensor())
    return torchvision.transforms.Compose(custom_transforms)

train_data_dir = 'images_thermal_train'
train_coco = 'images_thermal_train/coco.json'
val_train_dir = 'images_thermal_val'
val_coco = 'images_thermal_val/coco.json'
# create own Dataset
train_dataset = myOwnDataset(root=train_data_dir,
                          annotation=train_coco,
                          transforms=get_transform()
                          )
val_dataset = myOwnDataset(root=val_train_dir,
                          annotation=val_coco,
                          transforms=get_transform()
                          )

# collate_fn needs for batch
def collate_fn(batch):
    return tuple(zip(*batch))

# # Batch size
train_batch_size = 5
val_batch_size = 5
# train_dataset = train_dataset.to('cuda')
# val_dataset = val_dataset.to('cuda')

# # own DataLoader
train_dataloader = torch.utils.data.DataLoader(train_dataset,
                                          batch_size=train_batch_size,
                                          shuffle=True,
                                          collate_fn=collate_fn)
val_dataloader = torch.utils.data.DataLoader(val_dataset,
                                          batch_size=val_batch_size,
                                          shuffle=False,
                                          collate_fn=collate_fn)

In [None]:
len(train_dataset)

In [None]:
len(val_dataset)

In [None]:
i =8
img= train_dataset[i][0]
bbox = train_dataset[i][1]['boxes']
classes = train_dataset[i][1]['labels']
for i,j in zip(bbox, classes):
    xmin, ymin, xmax, ymax = i
    
    pt1 = (int(xmin), int(ymin))
    pt2 = (int(xmax), int(ymax))
    bnd_img = cv2.rectangle(img.permute(1, 2, 0).numpy(),pt1, pt2,(0,0,0),1)
    bnd_img = cv2.putText(
        bnd_img,
        str(j),
        (int(xmin), int(ymin) - 10),
        fontFace = cv2.FONT_HERSHEY_SIMPLEX,
        fontScale = 0.3,
        color = (0, 255, 255),
        thickness=1)
    plt.grid(False)
    plt.imshow(bnd_img, cmap = 'gray')

In [None]:
# device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# # DataLoader is iterable over Dataset
# for imgs, annotations in data_loader:
#     imgs = list(img.to(device) for img in imgs)
#     annotations = [{k: v.to(device) for k, v in t.items()} for t in annotations]
#     print(annotations)

In [None]:
# img, bbox = my_dataset[0]
# # img = img.to(torch.int8)
# for i,j in zip(bbox["boxes"], bbox["labels"]):
#     xmin, ymin, xmax, ymax = i
    
#     pt1 = (int(xmin), int(ymin))
#     pt2 = (int(xmax), int(ymax))
    
#     bnd_img = cv2.rectangle(img.permute(1, 2, 0).numpy(),pt1, pt2,(0,0,0),2)
#     cv2.putText(bnd_img, str(j), (int(xmin-5), int(ymin-2)), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,12), 2)

# plt.imshow(bnd_img, cmap = 'gray')

In [None]:
def validate(valid_data_loader, model):
    print('Validating')
    model.eval()
     
    # Initialize tqdm progress bar.
    prog_bar = tqdm(valid_data_loader, total=len(valid_data_loader))
    target = []
    preds = []
    for i, data in enumerate(prog_bar):
        images, targets = data
        
        images = list(image.to(device) for image in images)
        targets = [{k: v.long().to(device) for k, v in t.items()} for t in targets]
        
        with torch.no_grad():
            outputs = model(images, targets)

        # For mAP calculation using Torchmetrics.
        #####################################
        for i in range(len(images)):
            true_dict = dict()
            preds_dict = dict()
            true_dict['boxes'] = targets[i]['boxes'].detach().cpu()
            true_dict['labels'] = targets[i]['labels'].detach().cpu()
            preds_dict['boxes'] = outputs[i]['boxes'].detach().cpu()
            preds_dict['scores'] = outputs[i]['scores'].detach().cpu()
            preds_dict['labels'] = outputs[i]['labels'].detach().cpu()
            preds.append(preds_dict)
            target.append(true_dict)
        #####################################

    metric = MeanAveragePrecision(iou_type="bbox")
    metric.update(preds, target)
    metric_summary = metric.compute()
    print(metric_summary)
    return metric_summary

In [None]:
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor, RPNHead
from tqdm import tqdm
from torchvision.models.detection.anchor_utils import AnchorGenerator


def get_model_instance_segmentation(num_classes):
    # load an instance segmentation model pre-trained pre-trained on COCO
#     anchor_sizes = ( (32,), (64,), (128,), (256,), (512,))
#     aspect_ratios = ((0.5, 1.0, 2.0),) * len(anchor_sizes)
#     anchor_generator = AnchorGenerator(
#                 anchor_sizes, aspect_ratios
#             )
    model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True, weights= 'COCO_V1')
#     model = fasterrcnn_mobilenet_v3_large_320_fpn(weights=FasterRCNN_MobileNet_V3_Large_320_FPN_Weights.DEFAULT)
    # get number of input features for the classifier
#     model.rpn.anchor_generator = anchor_generator
#     model.rpn.head = RPNHead(256, anchor_generator.num_anchors_per_location()[0])

    in_features = model.roi_heads.box_predictor.cls_score.in_features
    # replace the pre-trained head with a new one
    model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

    return model
    

# 2 classes; Only target class or background

len_dataloader = len(train_dataloader)

def train(train_data_loader, model, optimizer):
    print('Training')
    model.train()
    # optimizer.to('cuda')
     # initialize tqdm progress bar
    prog_bar = tqdm(train_data_loader, total=len(train_data_loader))
    for i, data in enumerate(prog_bar):
        # print(data)
        optimizer.zero_grad()
        # print(data)
        images, targets = data
        # targets = targets.type(torch.LongTensor) 
#         print(images[0].shape)
        # print(targets)
        images = list(image.to(device) for image in images)
        targets = [{k: v.long().to(device) for k, v in t.items()} for t in targets]
#         print(targets[0]['boxes'].shape)

        loss_dict = model(images, targets)
        del images
        del targets
#         print("bbox: ", loss_dict['bbox_regression'])
#         print("loss dict: ", loss_dict)
        losses = sum(loss for loss in loss_dict.values())
        loss_value = losses.item()

        train_loss_hist.send(loss_value)

        losses.backward()
        optimizer.step()
    
        # update the loss value beside the progress bar for each iteration
        prog_bar.set_description(desc=f"Loss: {loss_value:.4f}")
    return loss_value

In [None]:
epochs = 60
multistep = [15,30, 45]
learning_rate = 0.0005
momentum = 0.9
device = 'cuda'
# move model to the right device
num_classes = 2
num_epochs = 10
model = get_model_instance_segmentation(16)
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)
print(count_parameters(model))
# move model to the right device
model.to('cuda')
    
# parameters
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

len_dataloader = len(train_dataloader)

train_loss_hist = Averager()
# To store training loss and mAP values.
train_loss_list = []
map_50_list = []
map_list = []

# Mame to save the trained model with.
MODEL_NAME = 'model'

# Whether to show transformed images from data loader or not.
# if VISUALIZE_TRANSFORMED_IMAGES:
#     from custom_utils import show_tranformed_image
#     show_tranformed_image(train_loader)

# To save best model.
save_best_model = SaveBestModel()
train_loss_hist = Averager()
# To store training loss and mAP values.
train_loss_list = []
map_50_list = []
map_list = []

# Mame to save the trained model with.
MODEL_NAME = 'model'

# Whether to show transformed images from data loader or not.
# if VISUALIZE_TRANSFORMED_IMAGES:
#     from custom_utils import show_tranformed_image
#     show_tranformed_image(train_loader)

# To save best model.
save_best_model = SaveBestModel()

In [None]:
OUT_DIR = "./"
for epoch in range(epochs):
        print(f"\nEPOCH {epoch+1} of {epochs}")

        # Reset the training loss histories for the current epoch.
        train_loss_hist.reset()

        # Start timer and carry out training and validation.
        start = time.time()
        # train_dataloader = train_dataloader.to('cuda')
        train_loss = train(train_dataloader, model, optimizer)
        metric_summary = validate(val_dataloader, model)
        print(f"Epoch #{epoch+1} train loss: {train_loss_hist.value:.3f}")   
        print(f"Epoch #{epoch+1} mAP@0.50:0.95: {metric_summary['map']}")
        print(f"Epoch #{epoch+1} mAP@0.50: {metric_summary['map_50']}")   
        end = time.time()
        print(f"Took {((end - start) / 60):.3f} minutes for epoch {epoch}")

        train_loss_list.append(train_loss)
        map_50_list.append(metric_summary['map_50'])
        map_list.append(metric_summary['map'])

        # save the best model till now.
        save_best_model(
            model, float(metric_summary['map']), epoch, 'outputs'
        )
        # Save the current epoch model.
        save_model(epoch, model, optimizer)

        # Save loss plot.
        save_loss_plot("./", train_loss_list)

        # Save mAP plot.
        save_mAP("./", map_50_list, map_list)
