In [None]:
import os
import sys
import csv
import time

In [None]:
import utils
import transforms
import engine
import coco_utils
import coco_eval

In [None]:
import torch
import numpy as np
import cv2
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from PIL import Image
import glob
%matplotlib inline

In [None]:
# Train Val image filtering list
images, labels = [], []
for folder in glob.glob('C:\\Users\\riyas\\Desktop\\Train_data_bbox/*'):
    for file in glob.glob('{}/*_bbox2D.bin'.format(folder)):
        a = file.split(".")[0][:-7]
        images.append(f"{a}_image.jpg")
        labels.append(f"{a}_bbox2D.bin")

In [None]:
# Test image filtering list
images_test, labels_test = [], []
for folder in glob.glob('C:\\Users\\riyas\\Desktop\\test/*'):
    for file in glob.glob('{}/*_image.jpg'.format(folder)):
        a = file.split(".")[0][:-5]
        images_test.append(f"{a}image.jpg")

In [None]:
# To test if all the data is loaded properly
len(images), len(labels), len(images_test)

In [None]:
# For test data loading
class ImageDataset2(torch.utils.data.Dataset):
    def __init__(self, images, labels, transforms=None):
        self.images_list = images
        self.transforms = transforms
 
    def __getitem__(self, idx):
        img_path = self.images_list[idx]
        img = Image.open(img_path).convert("RGB") 
 
        if self.transforms is not None:
            img, _ = self.transforms(img, [])
 
        return img, _
 
    def __len__(self):
        return len(self.images_list)

In [None]:
# For train data loading
class ImageDataset(torch.utils.data.Dataset):
    def __init__(self, images, labels, transforms=None):
        self.images_list = images
        self.labels_list = labels
        self.transforms = transforms
        
        # load all image files, sorting them to ensure that they are aligned
        self.label_data_types = []
        for i in range(11):
            self.label_data_types.append((f"temp{i}", np.float32))
 
    def __getitem__(self, idx):
        # load images and bbox
        img_path = self.images_list[idx]
        bbox_path = self.labels_list[idx]
        
        img = Image.open(img_path).convert("RGB") 
        labels = np.fromfile(bbox_path, dtype=np.float32)
        
        # Assign bounding box coordinates and labels from bbox2D files
        x_min, x_max = labels[0],labels[2]
        y_min, y_max = labels[1],labels[3]
        label = [np.int64(labels[4])]
        boxes = [[np.int32(x_min), np.int32(y_min), np.int32(x_max), np.int32(y_max)]]
        
        objects = [1]
 
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)        
 
        image_id = torch.tensor([idx])
        area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        # suppose all instances are not crowd
        iscrowd = torch.zeros((len(objects),), dtype=torch.int64)
 
        target = {}
        target["boxes"] = boxes
        target["labels"] = label
        target["image_id"] = image_id
        target["area"] = area
        target["iscrowd"] = iscrowd
 
        if self.transforms is not None:
            img, target = self.transforms(img, target)
 
        return img, target
 
    def __len__(self):
        return len(self.images_list)

In [None]:
class RandomHorizontalFlip(object):
    def __init__(self, prob):
        self.prob = prob

    def __call__(self, image, target):
        if random.random() < self.prob:
            height, width = image.shape[-2:]
            image = image.flip(-1)
            bbox = target["boxes"]
            bbox[:, [0, 2]] = width - bbox[:, [2, 0]]
            target["boxes"] = bbox
            if "masks" in target:
                target["masks"] = target["masks"].flip(-1)
            if "keypoints" in target:
                keypoints = target["keypoints"]
                keypoints = _flip_coco_person_keypoints(keypoints, width)
                target["keypoints"] = keypoints
        return image, target

In [None]:
import utils
import transforms as T
from engine import train_one_epoch, evaluate

def evaluate(model, data_loader, device):
    n_threads = torch.get_num_threads()
    torch.set_num_threads(1)
    cpu_device = torch.device("cpu")
    model.eval()
    metric_logger = utils.MetricLogger(delimiter="  ")
    header = "Test:"

    coco = get_coco_api_from_dataset(data_loader.dataset)
    iou_types = _get_iou_types(model)
    coco_evaluator = CocoEvaluator(coco, iou_types)

    for images, targets in metric_logger.log_every(data_loader, 100, header):
        images = list(img.to(device) for img in images)

        if torch.cuda.is_available():
            torch.cuda.synchronize()
        model_time = time.time()
        outputs = model(images)

        outputs = [{k: v.to(cpu_device) for k, v in t.items()} for t in outputs]
        model_time = time.time() - model_time

        res = {target["image_id"].item(): output for target, output in zip(targets, outputs)}
        evaluator_time = time.time()
        coco_evaluator.update(res)
        evaluator_time = time.time() - evaluator_time
        metric_logger.update(model_time=model_time, evaluator_time=evaluator_time)

    # gather the stats from all processes
    metric_logger.synchronize_between_processes()
    print("Averaged stats:", metric_logger)
    coco_evaluator.synchronize_between_processes()

    # accumulate predictions from all images
    coco_evaluator.accumulate()
    coco_evaluator.summarize()
    torch.set_num_threads(n_threads)
    return coco_evaluator

In [None]:
import utils
import transforms as T
from engine import train_one_epoch, evaluate
 
def get_transform(train):
    transforms = []
    # converts the image, a PIL image, into a PyTorch Tensor
    transforms.append(T.ToTensor())
    if train:
        # during training, randomly flip the training images
        # and ground-truth for data augmentation
        # 50% chance of flipping horizontally
        transforms.append(T.RandomHorizontalFlip(0.5))
 
    return T.Compose(transforms)

In [None]:
import torchvision
from engine import train_one_epoch, evaluate
import utils
start = time.time()
# train on the GPU or on the CPU, if a GPU is not available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

# Number of classes
num_classes = 3

dataset = ImageDataset(images, labels, get_transform(train=True))
dataset_test = ImageDataset2(images_test, labels_test, get_transform(train=False))

indices = torch.randperm(len(dataset)).tolist()

train_dataset = torch.utils.data.Subset(dataset, indices)
val_dataset = torch.utils.data.Subset(dataset_test, [i for i in range(len(dataset_test))])

train_data_loader = torch.utils.data.DataLoader(
    train_dataset, batch_size=4, shuffle=True, # num_workers=4,
    collate_fn=utils.collate_fn)

val_data_loader = torch.utils.data.DataLoader(
    val_dataset, batch_size=1, shuffle=False, # num_workers=4,
    collate_fn=utils.collate_fn)

# get the model using our helper function
model = torchvision.models.detection.ssd300_vgg16(pretrained=False, progress=True, num_classes=num_classes, pretrained_backbone=True)  # Or get_object_detection_model(num_classes)

# change to the model
model.head.classification_head.module_list = torch.nn.ModuleList([
    torch.nn.Conv2d(512, 12, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2)),
    torch.nn.Conv2d(1024, 18, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
    torch.nn.Conv2d(512, 18, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
    torch.nn.Conv2d(256, 18, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
    torch.nn.Conv2d(256, 12, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
    torch.nn.Conv2d(256, 12, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
])

model.head.regression_head.module_list = torch.nn.ModuleList([
    torch.nn.Conv2d(512, 16, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2)),
    torch.nn.Conv2d(1024, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
    torch.nn.Conv2d(512, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
    torch.nn.Conv2d(256, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
    torch.nn.Conv2d(256, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1)),
    torch.nn.Conv2d(256, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
])

# move model to the right device
model.to(device)

# construct an optimizer
params = [p for p in model.parameters() if p.requires_grad]

# SGD
optimizer = torch.optim.SGD(params, lr=0.0003,
                            momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler
# cos learning rate
lr_scheduler = torch.optim.lr_scheduler.CosineAnnealingWarmRestarts(optimizer, T_0=1, T_mult=2)

# let's train it for   epochs
num_epochs = 40

for epoch in range(num_epochs):
    # train for one epoch, printing every 10 iterations
    # Engine.py  train_one_epoch function takes both images and targets. to(device)
    train_one_epoch(model, optimizer, train_data_loader, device, epoch, print_freq=500)
    
    print("Epoch done!")
    
    # update the learning rate
    lr_scheduler.step()

    # evaluate on the test dataset    
    #evaluate(model, val_data_loader, device=device)
    
    torch.save(model, 'this_model_epoch' + str(epoch) + '.pth')
    
    print('')
    print('==================================================')
    print('')
end = time.time()
print("That's it!")
print("The time of execution of above program is :", end-start)


In [None]:
@torch.inference_mode()
def new_evaluate(model, data_loader, device):
    n_threads = torch.get_num_threads()
    torch.set_num_threads(1)
    model.to(device)
    model.eval()
    metric_logger = utils.MetricLogger(delimiter="  ")
    header = "Test:"
    
    csv_name = 'outputs_final.csv'
    row = [None, None]
    f = open(csv_name, 'w', newline='')
    f.write('guid/image,label\n' )
    writer = csv.writer(f)

    j = 0  
    
    for images, target in metric_logger.log_every(data_loader, 100, header):
        images = list(img.to(device) for img in images)
        if torch.cuda.is_available():
            torch.cuda.synchronize()
        model_time = time.time()

        with torch.no_grad():
            outputs = model(images)
       
        a1 = outputs[0]['labels'].cpu().detach().numpy()
        a2 = outputs[0]['scores'].cpu().detach().numpy()
        text12 = images_test[j].split('\\')[-2] + '/' + images_test[j].split('\\')[-1].split('.')[0]
        
        row[0] = text12[:-6]
        row[1] = str(a1[np.argmax(a2)])
        writer.writerow(row)
        print("Predicted Class", row[1])
        
        j += 1

    f.close()
    return 1

In [None]:
model39 = torch.load('this_model_epoch39.pth')
new_evaluate(model39, val_data_loader, device=device)