In [None]:
# pytorch modules and other ML/DL modules
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision
import torchsummary
from sklearn.model_selection import train_test_split

# other modules
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import cv2
import os
import glob
from pathlib import Path
from PIL import Image
import itertools
import copy

# data augmentation module
import data_augmentation as aug

## 1. Resize raw images to processed images

In [None]:
if not os.path.exists('./dataset/processed'):
    os.mkdir('./dataset/processed')
    raw_images = glob.glob('./dataset/raw/*.jpg')
    for img in raw_images:
        raw = Image.open(img)
        processed = raw.resize((640,480))
        name = os.path.basename(img)
        processed.save('./dataset/processed/'+name)
else:
    print("640 x 480 sized images generated already in ./dataset/processed directory.")

## 2. Read in processed image path and labels into dataframe

In [None]:
processed_image_path = Path('./dataset/processed')
processed_labels = pd.read_csv('./dataset/label/labels.csv')
colour_dict = {'blue':0, 'green':1, 'yellow':2, 'red':3, 'pink':4}
dict_colour = ["blue","green","yellow","red","pink"]
processed_labels['colour'] = np.array([ colour_dict[processed_labels['colour'][x]] for x in range(len(processed_labels['colour'])) ])
processed_labels['filepath'] = processed_labels['filepath'].apply(lambda x: Path(str(processed_image_path) + '/' + x))

bboxes = [ np.array([processed_labels['bbox_x'][index], processed_labels['bbox_y'][index], processed_labels['bbox_w'][index], processed_labels['bbox_h'][index]], dtype=np.float32) for index, row in processed_labels.iterrows() ]
processed_labels['bbox'] = bboxes
processed_labels = processed_labels.drop(['img_width','img_height'], axis=1)
processed_labels.head()

## 3. Save and add extra columns of smaller images of size 320 x 240 onto dataframe

In [None]:
new_paths = []
new_bbs = []
if not os.path.exists('./dataset/resized'):
    os.mkdir('./dataset/resized')
train_path_resized = Path('./dataset/resized')
for index, row in processed_labels.iterrows():
    new_path,new_bb = aug.resize_image_bb(row['filepath'], train_path_resized, aug.create_bb_array(row.values),240)
    new_paths.append(new_path)
    new_bbs.append(new_bb)
processed_labels['new_path'] = new_paths
processed_labels['new_bb'] = new_bbs
processed_labels.head()

## 4. Train and validation datasets

In [None]:
X = processed_labels[['filepath', 'bbox', 'new_path','new_bb']]
Y = processed_labels['colour']
X_train, X_test, Y_train, Y_test = train_test_split(X,Y,test_size=0.2,random_state=42)

class Ball_Dataset_640x480(Dataset):
    def __init__(self, path, bbox, colour, transforms=False):
        self.path = path.values
        self.colour = colour.values
        self.bbox = bbox.values
        self.transforms = transforms

    def __len__(self):
        return len(self.path)

    def __getitem__(self, index):
        filepath = self.path[index]
        colour = self.colour[index]
        bbox = self.bbox[index]
        x, bbox = aug.transformsXY(filepath, self.bbox[index], self.transforms)
        image = torch.from_numpy(np.rollaxis(x, 2))
        return image, bbox, np.array(colour)

class Ball_Dataset_320x240(Dataset):
    def __init__(self, path, bbox, colour, transforms=False):
        self.path = path.values
        self.colour = colour.values
        self.bbox = bbox.values
        self.transforms = transforms

    def __len__(self):
        return len(self.path)

    def __getitem__(self, index):
        filepath = self.path[index]
        colour = self.colour[index]
        bbox = self.bbox[index]
        x, bbox = aug.transformsXY(filepath, self.bbox[index], self.transforms, 240) # image size specified by height
        image = torch.from_numpy(np.rollaxis(x, 2))
        return image, bbox, np.array(colour)

batch_size = 16
train_dataset = Ball_Dataset_640x480(X_train['filepath'], X_train['bbox'], Y_train, transforms=True)
test_dataset = Ball_Dataset_640x480(X_test['filepath'], X_test['bbox'], Y_test)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size)

train_dataset_320x240 = Ball_Dataset_320x240(X_train['new_path'], X_train['new_bb'], Y_train, transforms=True)
test_dataset_320x240 = Ball_Dataset_320x240(X_test['new_path'], X_test['new_bb'], Y_test)
train_dataloader_320x240 = DataLoader(train_dataset_320x240, batch_size=batch_size, shuffle=True)
test_dataloader_320x240 = DataLoader(test_dataset_320x240, batch_size=batch_size)

## 5. Different ways of plotting images by accessing dataframe values

In [None]:
axes=[]
fig,ax=plt.subplots(1,3,squeeze=False, figsize=(30,5))
i=1
while (i != 6):
    axes.append( fig.add_subplot(1,5,i) )
    if i == 1:
        aug.draw_bbox(aug.read_image(X_train.iloc[0][0]), X_train.iloc[0][1], dict_colour[Y_train.iloc[0]])
    elif i == 2:
        aug.draw_bbox(aug.read_image(str(X_train['new_path'][3])), X_train['new_bb'][3], dict_colour[Y_train.iloc[3]]) # resized image (same image as below)
    elif i == 3:
        aug.draw_bbox(aug.read_image(str(X_train['filepath'][3])), X_train['bbox'][3], dict_colour[Y_train.iloc[3]])
    elif i == 4:
        index = 6
        im, bb = aug.transformsXY(str(X_train['new_path'][index]),X_train['new_bb'][index],True,240)  # augmented image
        print(X_train['new_path'][index])
        print(im.shape)
        aug.draw_bbox(im, bb, dict_colour[Y_train[index]])
    else:
        index = 6
        im2, bb2 = aug.transformsXY(str(X_train['new_path'][index]),X_train['new_bb'][index],False,240) # unaugmented image
        print(X_train['new_path'][index])
        print(im2.shape)
        aug.draw_bbox(im2, bb2, dict_colour[Y_train[index]])
    i+=1
[axi.set_axis_off() for axi in ax.ravel()]
plt.show()

## 6. CNN architecture, train and evaluate functions

In [None]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__()
        self.conv1 = nn.Sequential(
            nn.Conv2d(in_channels=3,out_channels=9,kernel_size=11,stride=4),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,stride=2)
        )
        self.conv2 = nn.Sequential(
            nn.Conv2d(in_channels=9,out_channels=18,kernel_size=5,stride=2),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2,stride=2)
        )
        self.dense = nn.Sequential(
            nn.Linear(972,256,bias=True),
            nn.ReLU()
        )
        self.colour = nn.Linear(256,5)
        self.bbox = nn.Linear(256,4)
    def forward(self,x):
        output_conv = self.conv2(self.conv1(x))
        output_flat = output_conv.view(output_conv.shape[0],-1)
        output_dense = self.dense(output_flat)
        output_colour = self.colour(output_dense)
        output_bbox = self.bbox(output_dense)
        return output_bbox, output_colour

def train(model, optimizer, train_dataloader, test_dataloader, epochs=30, C=1000):
    index = 0
    for i in range(epochs):
        model.train()
        total = 0
        sum_loss = 0
        for image, bbox, colour in train_dataloader:
            batch = colour.shape[0]
            output_bbox, output_colour = model(image) 
            loss_colour = nn.functional.cross_entropy(output_colour, colour, reduction="sum")
            loss_bbox = nn.functional.l1_loss(output_bbox, bbox, reduction="none").sum(1)
            loss_bbox = loss_bbox.sum()
            loss = loss_colour + loss_bbox/C
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            index += 1
            total += batch
            sum_loss += loss.item()
        train_loss = sum_loss/total*100
        test_loss, test_accuracy = evaluate(model, test_dataloader, C)
        print("EPOCH %d | train loss %.3f test loss %.3f colour accuracy %.3f" % (i, train_loss, test_loss, test_accuracy))
    return sum_loss/total

def evaluate(model, test_dataloader, C=1000):
    model.eval()
    total = 0
    sum_loss = 0
    correct = 0 
    for image, bbox, colour in test_dataloader:
        batch = colour.shape[0]
        output_bbox, output_colour = model(image)
        _, output_colour_num = torch.max(output_colour,1) # Tensor, LongTensor = torch.max(input, 1)
        loss_colour = nn.functional.cross_entropy(output_colour, colour, reduction="sum")
        loss_bbox = nn.functional.l1_loss(output_bbox, bbox, reduction="none").sum(1)
        loss_bbox = loss_bbox.sum()
        loss = loss_colour + loss_bbox/C
        correct += output_colour_num.eq(colour).sum().item()
        sum_loss += loss.item()
        total += batch
    return sum_loss/total*100, correct/total*100

## 7. Train CNN model

In [None]:
#model = CNN()
parameters = filter(lambda p: p.requires_grad, model.parameters())
optimizer = optim.Adam(parameters, lr=0.0003)
train(model,optimizer,train_dataloader_320x240,test_dataloader_320x240,epochs=1000,C=1)

## 8. Save trained model

In [None]:
torch.save(model.state_dict(), "./trained_model/model_cnn_320x240.pt")

## 9. Load trained model

In [None]:
model = CNN() # a torch.nn.Module object
model.load_state_dict(torch.load("./trained_model/model_cnn_320x240.pt"))

## 10. Evaluate trained model

In [None]:
model.eval()
i = 0
count = 0
for filepath, bbox, colour in zip(X_train['new_path'].iteritems(), X_train['new_bb'].iteritems(), Y_train.iteritems()):

    # true
    filepath = filepath[1]
    bbox = bbox[1].astype(np.int16)
    colour = colour[1]
    dict_colour = ["blue","green","yellow","red","pink"]
    colour = dict_colour[colour]

    # predicted
    x = cv2.cvtColor(cv2.imread(str(filepath)).astype(np.float32), cv2.COLOR_BGR2RGB) / 255
    img = torch.from_numpy(np.rollaxis(x, 2))[None,]
    pred_bbox, pred_colour = model(img)
    pred_bbox = pred_bbox.data[0].numpy().astype(np.int16)
    _, pred_colour = torch.max(pred_colour,1)
    pred_colour = dict_colour[pred_colour]

    loss_bbox = int((np.square(bbox - pred_bbox)).mean(axis=None))

    if colour != pred_colour or loss_bbox >= 200:
        print(f"{i} [{filepath}] =>", "T:", bbox, colour, "|| P:", pred_bbox, pred_colour, "|| loss:", loss_bbox)
        count += 1
    i+=1

for filepath, bbox, colour in zip(X_test['new_path'].iteritems(), X_test['new_bb'].iteritems(), Y_test.iteritems()):

    # true
    filepath = filepath[1]
    bbox = bbox[1].astype(np.int16)
    colour = colour[1]
    dict_colour = ["blue","green","yellow","red","pink"]
    colour = dict_colour[colour]

    # predicted
    x = cv2.cvtColor(cv2.imread(str(filepath)).astype(np.float32), cv2.COLOR_BGR2RGB) / 255
    img = torch.from_numpy(np.rollaxis(x, 2))[None,]
    pred_bbox, pred_colour = model(img)
    pred_bbox = pred_bbox.data[0].numpy().astype(np.int16)
    _, pred_colour = torch.max(pred_colour,1)
    pred_colour = dict_colour[pred_colour]

    loss_bbox = int((np.square(bbox - pred_bbox)).mean(axis=None))

    if colour != pred_colour or loss_bbox >= 200:
        print(f"{i} [{filepath}] =>", "T:", bbox, colour, "|| P:", pred_bbox, pred_colour, "|| loss:", loss_bbox)
        count += 1
    i+=1
print(count)

## 11. Actual performance of trained model on image

In [None]:
model.eval()
with torch.no_grad():
    index = 16
    try:
        im, bb = aug.transformsXY(str(X_test['new_path'][index]),X_test['new_bb'][index],False,240 )
        print(X_test['new_path'][index])
        x = cv2.cvtColor(cv2.imread(str(X_test['new_path'][index])).astype(np.float32), cv2.COLOR_BGR2RGB) / 255
        img = torch.from_numpy(np.rollaxis(x, 2))[None,]
        pred_bbox, pred_colour = model(img)
        _, pred_colour = torch.max(pred_colour,1)
        aug.draw_bbox(im, pred_bbox.data[0], dict_colour[pred_colour])
    except KeyError:
        im, bb = aug.transformsXY(str(X_train['new_path'][index]),X_train['new_bb'][index],False,240 )
        print(X_train['new_path'][index])

        x = cv2.cvtColor(cv2.imread(str(X_train['new_path'][index])).astype(np.float32), cv2.COLOR_BGR2RGB) / 255
        img = torch.from_numpy(np.rollaxis(x, 2))[None,]
        pred_bbox, pred_colour = model(img)
        _, pred_colour = torch.max(pred_colour,1)
        aug.draw_bbox(im, pred_bbox.data[0], dict_colour[pred_colour])

## 12. Display full dataframe of train and validation sets

In [None]:
with pd.option_context('display.max_rows', None, 'display.max_columns', None): 
    display(X_train.sort_index())
    display(X_test.sort_index())

## 13. Visualise trained convolutional layer filters (feature maps)

In [None]:
kernels = model.conv1[0].weight.detach().clone()
print(kernels.size())
kernels = kernels - kernels.min()
kernels = kernels / kernels.max()
filter_img = torchvision.utils.make_grid(kernels, nrow = 3)
plt.imshow(filter_img.permute(1, 2, 0))

In [None]:
kernels = model.conv2[0].weight.detach().clone()
print(kernels.size())
kernels = kernels[:,0,:,:].unsqueeze(dim=1)
print(kernels.size())
kernels = kernels - kernels.min()
kernels = kernels / kernels.max()
filter_img = torchvision.utils.make_grid(kernels, nrow = 4)
print(filter_img.shape)
print(filter_img.permute(1, 2, 0).shape)
plt.imshow(filter_img.permute(1,2,0))

## 14. Print out overall CNN architecture

In [None]:
print(model)

## 15. Summary of CNN architecture regarding its parameters and size

In [None]:
torchsummary.summary(model,(3,240,320))