In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data.dataset import Dataset
from torch.utils.data import DataLoader as dataloader
import torchvision.models as models

import albumentations as A
from albumentations.pytorch import ToTensorV2

import time
import os
import random
import numpy as np
import matplotlib.pyplot as plt
from tqdm.notebook import trange, tqdm
from PIL import Image, ImageOps
import copy
import pandas as pd
import cv2

In [None]:
class CUB200(Dataset):
    def __init__(self, root, image_size, transform, test_train=1, return_masks=False):
        """
        Initalize the dataframe
         Gets the path and loads class labels  
         Gets the file path and reads the file paths of the images
         Gets the file path of the bounding boxes
         Use custom test/train split 1/0
         Merges on the "index" name
        """
        class_list_path = os.path.join(root, "image_class_labels.txt")
        self.data_df = pd.read_csv(class_list_path, sep=" ", names=["index", "class"])
        data_list_path = os.path.join(root, "images.txt")
        cub200_df = pd.read_csv(data_list_path, sep=" ", names=["index", "file_path"])
        bbox_list_path = os.path.join(root, "bounding_boxes.txt")
        bbox_df = pd.read_csv(bbox_list_path, sep=" ", names=["index", "x", "y", "width", "height"])
        split_df = pd.read_csv("test_train_split.txt", sep=" ", names=["index", "split"])

        self.data_df = self.data_df.merge(cub200_df, left_on='index', right_on='index')
        self.data_df = self.data_df.merge(bbox_df, left_on='index', right_on='index')
        self.data_df = self.data_df.merge(split_df, left_on='index', right_on='index')

        # Final dataframe
        self.data_df = self.data_df[self.data_df.split != test_train]

        # self.return_masks = return_masks
        self.image_size = image_size
        self.transform = transform

        self.root = root
        self.image_root_dir = os.path.join(self.root, "images")
        self.mask_root_dir = os.path.join(self.root, "segmentations")

    def get_bbox_list(self, data, img_size):
        bbox_array = [data["x"],
                      data["y"],
                      data["width"],
                      data["height"]]
        
        # If the box is outside of the image in either direction x or y, adjust width to be the length up to the edge
        if (bbox_array[0] + bbox_array[2]) > img_size[1]:
            bbox_array[2] = img_size[1] - bbox_array[0]

        if (bbox_array[1] + bbox_array[3]) > img_size[0]:
            bbox_array[3] = img_size[0] - bbox_array[1]

        return [bbox_array]

    # Gets the number of output tensors and matches it to 4 coordinate outputs
    def get_output_tensors(self, data_out):
        if len(data_out["bboxes"]) > 0:
            bbox = torch.FloatTensor(data_out["bboxes"][0]) / self.image_size
            label = data_out["class_labels"][0]
        else:
            bbox = torch.zeros(4)
            label = -1

        return bbox, [label]

    # Gets an item from the dataloader
    def __getitem__(self, index):
        # Gets a single row. Allowing for indexing like a list
        data_series = self.data_df.iloc("index")[index]
        file_path = data_series["file_path"]
        label = data_series["class"]

        # Build the file path
        img_path = os.path.join(self.image_root_dir, file_path)
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        # Pass in the row to the dataframe, and the image's shape
        # Gets the bbox dimensions as a list [x, y, width, height]
        bbox_array = self.get_bbox_list(data_series, image.shape)

        if self.return_masks:
            mask_path = os.path.join(self.mask_root_dir, file_path).split(".jpg")[0] + ".png"
            mask = cv2.imread(mask_path)

            data_out = self.transform(image=image, bboxes=bbox_array, mask=mask, class_labels=[label])
            bbox, label = self.get_output_tensors(data_out)
            mask = (data_out["mask"][:, :, 0] > 100).long()

            return data_out["image"], mask, bbox, label
        # We are not using a mask
        # Transform the image using our data pipeline, and give its bbox, along with its class label
        else:
            data_out = self.transform(image=image, bboxes=bbox_array, class_labels=[label])
            # Convert the bbox and label to an output tensor, which is returned as an item when dataloader is iterated upon
            bbox, label = self.get_output_tensors(data_out)
            return data_out["image"], bbox, label

    def __len__(self):
        return len(self.data_df)

In [None]:
batch_size = 64

num_epochs = 2

learning_rate = 1e-4

root = "CUB_200_2011"

image_size = 128

In [None]:
start_from_checkpoint = True

save_dir = '.'

model_name = 'ResNet34_CUB'

device = 'cpu'

In [None]:
# Data augmentation pipeline
train_transform = A.Compose([A.SmallestMaxSize(max_size=image_size),
                             A.RandomCrop(height=image_size, width=image_size),
                             A.HorizontalFlip(p=0.5),
                             A.ShiftScaleRotate(shift_limit=0.2, scale_limit=0.2, rotate_limit=30, p=0.5),
                             A.RGBShift(r_shift_limit=25, g_shift_limit=25, b_shift_limit=25, p=0.5),
                             A.RandomBrightnessContrast(brightness_limit=0.3, contrast_limit=0.3, p=0.5),
                             A.Normalize(mean=[0.485, 0.456, 0.406],
                                         std=[0.229, 0.224, 0.225]),
                            ToTensorV2()], 
                            # Specifies parameters for handling bounding boxes
                            # COCO -> (x_min, y_min, width, height)
                            # min_area to be considered = 0
                            # visibility of box = 0
                            # Specify corresponding labels
                            bbox_params=A.BboxParams(format='coco',
                                                     min_area=0, min_visibility=0.0, 
                                                     label_fields=['class_labels']))

transform = A.Compose([A.SmallestMaxSize(max_size=image_size),
                       A.RandomCrop(height=image_size, width=image_size),
                       A.Normalize(mean=[0.485, 0.456, 0.406],
                                   std=[0.229, 0.224, 0.225]),
                       ToTensorV2()], 
                      bbox_params=A.BboxParams(format='coco',
                                               min_area=0, min_visibility=0.0, 
                                               label_fields=['class_labels']))

In [None]:
# nn.Module class that will return the IoU for a batch of outputs
class BboxIOU(nn.Module):
    
    def xyhw_to_xyxy(self, bbox):
        """
        Converts from (x_min, y_min, width, height) to (x_min, y_min, x_max, y_max) format
        """
        # [(), (), (), ()]
        new_bbox = torch.cat((bbox[:, 0:1], 
                              bbox[:, 1:2],
                              bbox[:, 2:3] + bbox[:, 0:1], 
                              bbox[:, 3:4] + bbox[:, 1:2]), 1)
        return new_bbox
    
    def bb_intersection_over_union(self, pred_xyhw, target_xyhw):
        pred_xyxy = self.xyhw_to_xyxy(pred_xyhw) # [(), (), (), ()]
        target_xyxy = self.xyhw_to_xyxy(target_xyhw) # [(), (), (), ()]
        
        # Determine the (x, y) - coordinates of the intersection rectangle
        # (x_min, y_min, x_max, y_max)
        xA = torch.cat((pred_xyxy[:, 0:1], target_xyxy[:, 0:1]), 1).max(dim=1)[0].unsqueeze(1) # Max of x_min
        yA = torch.cat((pred_xyxy[:, 1:2], target_xyxy[:, 1:2]), 1).max(dim=1)[0].unsqueeze(1) # Max of y_min
        xB = torch.cat((pred_xyxy[:, 2:3], target_xyxy[:, 2:3]), 1).min(dim=1)[0].unsqueeze(1) # Max of x_max
        yB = torch.cat((pred_xyxy[:, 3:4], target_xyxy[:, 3:4]), 1).min(dim=1)[0].unsqueeze(1) # Max of y_max

        # Compute the area of intersection rectangle
        x_len = F.relu(xB - xA)
        y_len = F.relu(yB - yA)
        
        # Negative area means no overlap
        interArea = x_len * y_len

#       If you don't have xyhw values, calculate areas like this
#       w1 = (pred_xyxy[:, 0:1] - pred_xyxy[:, 2:3]).abs()
#       h1 = (pred_xyxy[:, 1:2] - pred_xyxy[:, 3:4]).abs()

#       w2 = (target_xyxy[:, 0:1] - target_xyxy[:, 2:3]).abs()
#       h2 = (target_xyxy[:, 1:2] - target_xyxy[:, 3:4]).abs()

#         area1 = w1 * h1
#         area2 = w2 * h2

        # Predicted and target box areas
        area1 = pred_xyhw[:, 2:3] * pred_xyhw[:, 3:4]
        area2 = target_xyhw[:, 2:3] * target_xyhw[:, 3:4]

        # Compute the intersection over union by taking the intersection
        # area and dividing it by the sum of prediction + ground-truth
        iou = interArea / (area1 + area2 - interArea + 1e-5)

        # Return the intersection over union value
        return iou

    def forward(self, predictions, data):
        """
        data: list of data, index 0 is the input image index [0] is the target
        predictions: raw output of the model, the first 4 outputs are assumed to be the bounding box values
        """
        
        pred_bbox = torch.sigmoid(predictions[:, :4])
        target_bbox = data[1].to(pred_bbox.device)
        
        return self.bb_intersection_over_union(pred_bbox, target_bbox)

In [None]:
# Initalize our datasets # (#, idx, class, file_path, x, y, width, height, split)
train_data = CUB200(root, image_size=image_size, transform=train_transform, test_train = 0)
test_data = CUB200(root, image_size=image_size, transform=transform, test_train = 1)

In [None]:
len(train_data)

In [None]:
train_data.data_df

In [None]:
test_data.data_df

In [None]:
import torch.utils.data.dataloader as dataloader

In [None]:
# Split 90-10
validation_split = 0.9

# Total train examples
n_train_examples = int(len(train_data) * validation_split)

# Total validation examples
n_valid_examples = len(train_data) - n_train_examples

# Splits them based on values provided
train_data, valid_data = torch.utils.data.random_split(train_data, [n_train_examples, n_valid_examples], generator=torch.Generator().manual_seed(42))

In [None]:
train_loader = dataloader.DataLoader(train_data, shuffle=True, batch_size=batch_size)
valid_loader = dataloader.DataLoader(valid_data, shuffle=True, batch_size=batch_size)
test_loader = dataloader.DataLoader(test_data, shuffle=True, batch_size=batch_size)

In [None]:
x, y,z = next(iter(train_loader))
x.shape, y.shape, z

In [None]:
import torch.utils.data.dataloader as dataloader

class NetworkConstructor(nn.Module):
    # Make sure that the correct number of arguments are being passed through
    def __init__(self, model, output_size, device, loss_function, batch_size, learning_rate, 
                 save_dir, eval_metric, train_loader, test_loader, valid_loader, start_from_checkpoint=True):
        super(NetworkConstructor, self).__init__()

        self.optimizer = optimizer
        self.device = device
        self.loss_fun = loss_function
        self.batch_size = batch_size
        self.learning_rate = learning_rate
        self.train_loss = []
        self.val_acc = []
        self.train_acc = []

        # Dataloaders
        self.train_loader = train_loader
        self.valid_loader = valid_loader
        self.test_loader = test_loader

        # To update the pretrained model's output_size and update its last layer
        self.output_size = output_size
        self.model = self.change_output(model, output_size=output_size)

        # Full filename
        self.save_path = os.path.join(save_dir, model_name + ".pt")
        self.save_dir = save_dir

        self.lr_schedule = lr_schedule
        self.eval_metric = eval_metric

        # Create save path from save_dir and model_name, we will save and load our checkpoint here
        # Create the save directory if it does note exist
        if not os.path.isdir(self.save_dir):
            os.makedirs(self.save_dir)
        # Loads checkpoint if starting from checkpoint is True
        if start_from_checkpoint:
            self.load_checkpoint()
        else:
            # If checkpoint does exist and start_from_checkpoint = False
            # Raise an error to prevent accidental overwriting
            if os.path.isfile(self.save_path):
                raise ValueError("Warning Checkpoint exists")
            else:
                print("Initalized model")

    # Helper function
    def __get_layer__(self, num_ftrs, output_size):
        layer = nn.Linear(num_ftrs, output_size).to(self.device)
        return layer

    # Update the output to output output_size classes
    def change_output(self, model, output_size):
        if output_size > 0:
            if hasattr(model, "fc"):
                num_ftrs = model.fc.in_features
                model.fc = self.__get_layer__(num_ftrs, output_size)
        return model

    def load_checkpoint(self):
        # Check if checkpoint exists
        if os.path.isfile(self.save_path):
            # Load Checkpoint
            check_point = torch.load(self.save_path)

            # Checkpoint is saved as a python dictionary
            # Here we unpack the dictionary to get our previous training states
            # self.model.load_state_dict(check_point['model_state_dict'])
            # self.optimizer.load_state_dict(check_point['optimizer_state_dict'])

            # self.start_epoch = check_point['epoch']
            # self.best_valid_acc = check_point['best_valid_acc']

            # self.train_loss_logger = check_point['train_loss_logger']
            # self.train_acc_logger = check_point['train_acc_logger']
            # self.val_acc_logger = check_point['val_acc_logger']

            print("Checkpoint loaded, starting from epoch:", self.start_epoch)
        else:
            # Raise Error if it does not exist
            raise ValueError("Checkpoint Does not exist")

    def save_checkpoint(self, epoch, valid_acc):
        self.best_valid_acc = valid_acc

        torch.save({
            'epoch': epoch,
            'model_state_dict': self.model.state_dict(),
            'optimizer_state_dict': self.optimizer.state_dict(),
            'best_valid_acc': valid_acc,
            'train_loss_logger': self.train_loss_logger,
            'train_acc_logger': self.train_acc_logger,
            'val_acc_logger': self.val_acc_logger,
        }, self.save_path)

    # Training
    def train_model(self, num_epochs):

        # Loop over epoch
        for epoch in range(num_epochs):
            # Training
            self.train()
            for images, coords, t in enumerate(self.train_loader):
                # Forward pass of image through network and get output
                logits = self.forward(images.to(self.device))
    
                # Calculate loss using loss function
                loss = self.loss_function(fx, data[1].to(self.device))
    
                self.optimizer.zero_grad()
                loss.backward()
                torch.nn.utils.clip_grad_norm_(self.parameters(), 5)
                self.optimizer.step()
    
                # Log the loss for plotting
                self.train_loss.append(loss.item())

            # Evaluation
            train_counter = 0
            valid_counter = 0
            self.eval()
            with torch.no_grad():
                for i, data in enumerate(loader):
                    # Forward pass of image through network
                    fx = self.forward(data[0].to(self.device))
                    # Log the cumulative sum of the acc
                    train_counter += self.eval_metric(fx, data).sum().item()
    
            # Log the accuracy from the epoch
            if train_test_val == "train":
                self.train_acc_logger.append(epoch_acc / len(loader.dataset))
            elif train_test_val == "val":
                self.val_acc_logger.append(epoch_acc / len(loader.dataset))

            
            # Check if the current validation accuracy is greater than the previous best
            # If so, then save the model
            if valid_acc > self.best_valid_acc:
                self.save_checkpoint(epoch, valid_acc)

            if self.lr_schedule is not None:
                self.lr_schedule.step()

    # This function should perform a single evaluation epoch, it WILL NOT be used to train our model
    def evaluate_model(self, train_test_val="test"):
        epoch_acc = 0
        self.eval()
        with torch.no_grad():
            for i, data in enumerate(loader):
                # Forward pass of image through network
                fx = self.forward(data[0].to(self.device))

                # Log the cumulative sum of the acc
                epoch_acc += self.eval_metric(fx, data).sum().cpu().item()

        # Log the accuracy from the epoch
        if train_test_val == "train":
            self.train_acc_logger.append(epoch_acc / len(loader.dataset))
        elif train_test_val == "val":
            self.val_acc_logger.append(epoch_acc / len(loader.dataset))

        return epoch_acc / len(loader.dataset)

In [None]:
x = dataloader.DataLoader(train_data_split, shuffle=True, batch_size=16)

In [None]:
for label, images, coords, in enumerate(x):
    print(images.shape)
    break

In [None]:
# Create an instance of the ResNet34 Model
# resnet = models.resnet34(pretrained=True)
resnet = models.resnet34(weights="IMAGENET1K_V1")

In [None]:
loss_fun=nn.BCEWithLogitsLoss()


In [None]:
model_trainer = ModelTrainer(model=res_net.to(device), output_size=4, device=device, 
                             loss_fun=nn.BCEWithLogitsLoss(), batch_size=batch_size, 
                             learning_rate=learning_rate, save_dir=save_dir, model_name=model_name,
                             eval_metric=BboxIOU(), start_from_checkpoint=start_from_checkpoint)

In [None]:
model_trainer.set_data(train_set=train_data_split, test_set=test_data_split, val_set=valid_data_split)

In [None]:
plt.figure(figsize = (20,10))
images, bbox, labels = next(iter(model_trainer.test_loader))
out = torchvision.utils.make_grid(images, normalize=True)
_ = plt.imshow(out.numpy().transpose((1, 2, 0)))

In [None]:
example_indx = 3
ex_img = images[example_indx]

# The bounding box is represented in the (x_min, y_min, width, height) format
# aka the coordinate of the top left corner of the box and the box height and width

# draw_bounding_boxes expects it in the (x_min, y_min, x_max, y_max) formatweights=ResNet18_Weights.IMAGENET1K_V1
# aka the coordinates of the top left and bottom right corners of the box
ex_label = bbox[example_indx].unsqueeze(0) * image_size
ex_label[:, 2] += ex_label[:, 0]
ex_label[:, 3] += ex_label[:, 1]

img_out = (((ex_img - ex_img.min())/(ex_img.max() - ex_img.min())) * 255).to(torch.uint8)
img_box = torchvision.utils.draw_bounding_boxes(img_out, ex_label, colors=(0, 255, 0))

In [None]:
plt.figure(figsize = (5,5))
out = torchvision.utils.make_grid(img_box.unsqueeze(0).float(), normalize=True)
_ = plt.imshow(out.numpy().transpose((1, 2, 0)))

In [None]:
model_trainer.run_training(num_epochs=num_epochs)

In [None]:
print("The highest validation IoU was %.2f" %(model_trainer.best_valid_acc))

In [None]:
_ = plt.figure(figsize = (10,5))
train_x = np.linspace(0, num_epochs, len(model_trainer.train_loss_logger))
_ = plt.plot(train_x, model_trainer.train_loss_logger)
_ = plt.title("Training Loss")

In [None]:
images.shape

In [None]:
# Select an image to test
example_indx = 4
ex_img = images[example_indx]
img_out = (((ex_img - ex_img.min())/(ex_img.max() - ex_img.min())) * 255).to(torch.uint8)

real_label = bbox[example_indx].unsqueeze(0) * image_size
real_label[:, 2] += real_label[:, 0]
real_label[:, 3] += real_label[:, 1]

# Get the model's prediction for the Bounding Box
model_trainer.eval()
with torch.no_grad():
    pred_out = torch.sigmoid(model_trainer(ex_img.unsqueeze(0).to(device)))
    pred_label = (pred_out * image_size).cpu()
    pred_label[:, 2] += pred_label[:, 0]
    pred_label[:, 3] += pred_label[:, 1]
    
# Draw the box on the image
img_box = torchvision.utils.draw_bounding_boxes(img_out, real_label, colors=(0, 255, 0))
img_box = torchvision.utils.draw_bounding_boxes(img_box, pred_label, colors=(255, 0, 0))

plt.figure(figsize = (5,5))
out = torchvision.utils.make_grid(img_box.unsqueeze(0).float(), normalize=True)
_ = plt.imshow(out.numpy().transpose((1, 2, 0)))

In [None]:
_ = plt.figure(figsize = (10,5))
train_x = np.linspace(0, num_epochs, len(model_trainer.train_acc_logger))
_ = plt.plot(train_x, model_trainer.train_acc_logger, c = "y")
valid_x = np.linspace(0, num_epochs, len(model_trainer.val_acc_logger))
_ = plt.plot(valid_x, model_trainer.val_acc_logger, c = "k")

_ = plt.title("Average IoU")
_ = plt.legend(["Training IoU", "Validation IoU"])

In [None]:
# Call the evaluate function and pass the evaluation/test dataloader etc
test_acc = model_trainer.evaluate_model(train_test_val="test")
print("The Test Average IoU is: %.2f" %(test_acc))