In [41]:
# ***************************************************************************
# --------------------------------- Imports ---------------------------------
# ***************************************************************************

import numpy as np # linear algebra

import os # file handling
import json # file handling
import pickle # file handling
import zipfile # file handling

import torch # deep learning
import torch.nn as nn
import torchvision # deep learning for computer vision
from torch.utils.data import Dataset # shortcuts for writing dataset

import tqdm # progress bar

import matplotlib.pyplot as plt # graphing

import PIL

import random # data loader

import time # timing

import cv2 # object detection

In [29]:
# ***************************************************************************
# ------------------------------- Dataset Info ------------------------------
# ***************************************************************************
# 1) Classes are:
    # 0-background
    # 1-connected
    # 2-empty

In [30]:
# ***************************************************************************
# --------------------- Object Detection Dataset Class ----------------------
# ***************************************************************************

from torch.utils.data import Dataset, DataLoader

class PortDataset(Dataset):
    def __init__(self,root,pkl_images,pkl_targets):
        '''
        dataset for Port (object detection version).

        arguments
            root: the root path of the folder where the images live
            pkl_images: the path of the pickled (list) version of the image filenames
            pkl_targets: the path of the pickled (list) version of the image annotations

        Note that roboflow did all the transforming before we downloaded the data. If we need more transformations, we can go back and download the unedited version, then implement our own transformations.
        '''
        self.root=root
        self.filenames=pkl_images
        self.targets=pkl_targets

    def __len__(self):
        return len(self.filenames)

    def __getitem__(self,idx):
        if type(idx) is not int:
            raise ValueError(f'expected idx to be an integer, got {type(idx)}')
        # Image tensor
        image=torchvision.io.read_image(os.path.join(self.root,self.filenames[idx])).to(torch.float32)
        # Targets tensor
        boxes=torch.tensor(self.targets[idx]['boxes'])
        labels=torch.tensor(self.targets[idx]['labels'])
        targets={'boxes':boxes,'labels':labels}      
        
        return image,targets

In [32]:
# ***************************************************************************
# --------------------- Generate file lists for dataset ---------------------
# ***************************************************************************

def pickle_data(subset,root):
    '''
    Turns the data from port-by-ds-on-robolab into lists of filepaths, boxes, and labels which can be used by pytorch.
    Saves the output to the kaggle working directory.
    '''
    if subset not in ['test','train','valid']:
        raise ValueError(f'please enter the string "test", "valid", or "train". Received {subset}')
    
    # Read in COCO json data
    fpath=f'{root}/{subset}/_annotations.coco.json'
    with open(fpath,encoding='utf-8') as f:
        data=json.load(f)
        
    # Make images: a list of the image pathways
    images=[None]*len(data['images'])
    
    # Populate images
    for image in data['images']:
        idx=image['id']
        images[idx]=os.path.join(root,subset,image['file_name'])
        
    # Make targets: a list of distinct dictionaries
    targets=[None]*len(data['images'])
    for i in range(len(targets)):
        targets[i]={'boxes':[],'labels':[]}
    
    # Populate targets
    for note in data['annotations']:
        # Get image index
        image_idx=note['image_id']
        # change bounding box representation from (x,y,w,h) (upper-left and size) to (x1,y1,x2,y2) (upper-left and bottom-right)
        x,y,w,h=note['bbox'].copy()
        bbox=[x,y,x+w,y+h]
        if bbox[0]>=bbox[2] or bbox[1]>=bbox[3]:
            raise Exception(f'expected x1,y1 to be less than x2,y2 respectively. Got box {bbox}')
        # Add box to d
        targets[image_idx]['boxes'].append(bbox)
        # Add label to d
        targets[image_idx]['labels'].append(note['category_id'])
    
    # Check that boxes and labels are in bijective correspondence
    total_boxes=0
    for i in range(0,len(targets)):
        assert(len(targets[i]['boxes'])==len(targets[i]['labels']))
        total_boxes+=len(targets[i]['boxes'])
    
    # Remove images with no labeled boxes
    i=0
    while i<len(images):
        if len(targets[i]['boxes'])==0:
            targets.pop(i)
            images.pop(i)
        else:
            i+=1
    
    # Save data
    with open(f'/kaggle/working/{subset}_images.pkl','wb') as f:
        pickle.dump(images, f)
    with open(f'/kaggle/working/{subset}_targets.pkl','wb') as f:
        pickle.dump(targets, f)



pickle_data('test','/kaggle/input/port-by-ds-on-robolab')
test_dataset=PortDataset('/kaggle/input/port-by-ds-on-robolab/test',
                  pickle.load(open('/kaggle/working/test_images.pkl','rb')),
                  pickle.load(open('/kaggle/working/test_targets.pkl','rb'))
                 )

In [13]:
# ***************************************************************************
# ------------------------- ResNet18 Implementation -------------------------
# ***************************************************************************

def conv3x3(in_planes:int,out_planes:int,stride:int=1,groups:int=1,dilation:int=1)->nn.Conv2d:
    """3x3 convolution with padding"""
    return nn.Conv2d(
        in_planes,
        out_planes,
        kernel_size=3,
        stride=stride,
        padding=dilation,
        groups=groups,
        bias=False,
        dilation=dilation,
    )


def conv1x1(in_planes:int,out_planes:int,stride:int=1)->nn.Conv2d:
    """1x1 convolution"""
    return nn.Conv2d(in_planes,out_planes,kernel_size=1,stride=stride,bias=False)
    

class BottleneckBlock(nn.Module): # size halves iff we multiply channels by 2 (architecture decision, not code enforced)
    def __init__(self,in_channels,out_channels,stride):
        super(BottleneckBlock,self).__init__()
        self.conv1=conv1x1(in_channels,in_channels)
        self.bn1=nn.BatchNorm2d(in_channels) 
        self.conv2=conv3x3(in_channels,out_channels,stride) # Use the Bottleneck approach (downsample input on 3x3 conv)
        self.bn2=nn.BatchNorm2d(out_channels)
        self.conv3=conv1x1(out_channels,out_channels)
        self.bn3=nn.BatchNorm2d(out_channels)
        self.relu=nn.ReLU(inplace=True)
        self.stride=stride
        self.identity=lambda x: x
        # reshape residual connection to match outputs (need to downsamples)
        if stride!=1 or in_channels!=out_channels:
             self.identity = nn.Sequential(
                conv1x1(in_channels,out_channels,stride),
                nn.BatchNorm2d(out_channels)
            )


    def forward(self,x:torch.Tensor)->torch.Tensor:
        out=self.conv1(x)
        out=self.bn1(out)
        out=self.relu(out)
        
        out=self.conv2(out)
        out=self.bn2(out)
        out=self.relu(out)
        
        out=self.conv3(out)
        out=self.bn3(out)
        out+=self.identity(x)
        out=self.relu(out)

        return out


class ResNet18(nn.Module):
    def __init__(self,num_classes:int):
        """
        Creates a ResNet18 module with num_classes classes.
        """
        super(ResNet18,self).__init__()
        self.num_classes=num_classes
        self.in_channels=64 # update the in_channels for the next layer after we make a layer
        
        self.conv1=nn.Conv2d(in_channels=3,out_channels=self.in_channels,kernel_size=(7,7),stride=2)
        self.bn1=nn.BatchNorm2d(self.in_channels)
        self.relu=nn.ReLU(inplace=True)
        self.maxpool=nn.MaxPool2d(kernel_size=3,stride=2,padding=1)

        self.layer1=self._make_layer(out_channels=self.in_channels,stride=1) #in_channels=64
        self.layer2=self._make_layer(out_channels=self.in_channels*2,stride=2) #in_channels=64
        self.layer3=self._make_layer(out_channels=self.in_channels*2,stride=2) #in_channels=128
        self.layer4=self._make_layer(out_channels=self.in_channels*2,stride=2) #in_channels=256

        self.avgpool=nn.AdaptiveAvgPool2d(output_size=(1,1))
        self.fc=nn.Linear(in_features=self.in_channels,out_features=self.num_classes)

        self.softmax=nn.Softmax(dim=1)

        # initialize weights using Kaiming initialization
        self.apply(self._init_weights)

    
    def forward(self,x:torch.Tensor)->torch.Tensor:
        x=self.conv1(x)
        x=self.bn1(x)
        x=self.relu(x)
        x=self.maxpool(x)
        
        x=self.layer1(x)
        x=self.layer2(x)
        x=self.layer3(x)
        x=self.layer4(x)

        x=self.avgpool(x)
        x=torch.flatten(x,1)
        x=self.fc(x)

        return x


    def predict(self,x):
        probs=self.softmax(self.forward(x))
        return torch.argmax(probs,dim=1)
        

    def _make_layer(self,out_channels:int,stride:int)->nn.Sequential:
        """
        Makes a block layer.
        The first block has stride 1 to preserve the dimension of the input, and the second block has stride "stride" to achieve the output dimension.
        """
        layer=nn.Sequential(
            BottleneckBlock(self.in_channels,out_channels,1),
            BottleneckBlock(out_channels,out_channels,stride)
        )
        self.in_channels=out_channels
        return layer


    def _init_weights(self,m):
        if isinstance(m, nn.Conv2d) or isinstance(m, nn.Linear):
            # Kaiming initialization (good for ReLU-based nets)
            nn.init.kaiming_normal_(m.weight, nonlinearity="relu")
            if m.bias is not None:
                nn.init.constant_(m.bias,0.0)

In [None]:
# ***************************************************************************
# --------------------- Sliding Window Object Detector ----------------------
# ***************************************************************************

def sliding_window(image,step,ws):
    """
    Generator that yields a sub-image in a sliding-window iteration scheme.

    Args:
        -image (torch.Tensor): the image to slide the window across.
        -step (int): the number of pixels to step when sliding (both horizontally and vertically when reaching the end of a row).
        -ws (tuple[int]): window size (H,W).
    """
    # slide a window across the image
    for y in range(0,image.shape[1]-ws[1],step):
        for x in range(0, image.shape[2]-ws[0],step):
            # yield the current window
            yield (x,y,image[:,y:y+ws[1],x:x+ws[0]])


def image_pyramid(image,scale=1.5,minSize=(224, 224)):
    """
    Generator that yields images progressively scaled by a factor of "scale" until reaching a size below "min_size".

    Args:
        -image (torch.Tensor): the image to construct the image pyramid from. The shape is (C,H,W).
        -scale: (float): the factor by which the image is scaled down each iteration.
        -min_size (tuple[int]): the dimensions of the smallest permitted image (the "top of the pyramid"). The entries are (max_rows,max_cols).
    """
    # yield the original image
    yield image
    # keep looping over the image pyramid
    while True:
        # compute the dimensions of the next image in the pyramid
        w=int(image.shape[2]/scale)
        h=int(image.shape[1]/scale)

        resizer=torchvision.transforms.Resize((h,w)) # images are C,H,W
        
        image=resizer(image)
        # if the resized image does not meet the supplied minimum
        # size, then stop constructing the pyramid
        if image.shape[1]<minSize[0] or image.shape[2]<minSize[1]:
            break
        # yield the next image in the pyramid
        yield image


def alt_NMS(boxes:list[tuple],proba:list[float],IoU_thresh:float):
    """
    An alternate version of non-maximum suppression that sorts the boxes by their lower right y coordinate.
    """
    # lists to store picked indices
    picks=[]
    # convert the list of boxes to an array for slicing
    boxes_pick=np.array(boxes)
    proba_pick=np.array(proba)
    # get box coordinates (as arrays)
    x1=boxes_pick[:,0]
    y1=boxes_pick[:,1]
    x2=boxes_pick[:,2]
    y2=boxes_pick[:,3]
    # compute area of each box and sort by bottom right y coordinate
    area=(x2-x1+1)*(y2-y1+1)
    idxs=np.argsort(y2)
    # perform nms
    while len(idxs)>0:
        # pick the last index
        last=len(idxs)-1
        i=idxs[last]
        picks.append(i)
        suppress=[last]
        # loop over all other boxes and check the overlap
        for pos in range(last):
            j=idxs[pos]
            # find the overlapping region of the boxes
            xx1=max(x1[i],x1[j])
            yy1=max(y1[i],y1[j])
            xx2=min(x2[i],x2[j])
            yy2=min(y2[i],y2[j])
            # compute width and height of overlapping region
            w=max(0,xx2-xx1+1)
            h=max(0,yy2-yy1+1)
            # compute overlap
            overlap=float(w*h)/area[j]
            # eliminate boxes with too much overlap
            if overlap>IoU_thresh:
                suppress.append(pos)

        # delete suppressed boxes
        idxs=np.delete(idxs,suppress)

    return boxes_pick[picks],proba_pick[picks]
    

def non_maximum_suppression(boxes:list[tuple],probs:list[float],IoU_thresh:float):
    """
    Runs non maximum suppression on a single class after sorting the boxes by probability.

    Args:
        -boxes (list[tuple]): a list of the bounding boxes detected (all for the same class and image).
        -probs (list[float]): a list of the confidences (probabilities) of each box.
        -IoU_thresh (float) the minumum IoU to supress a box.

    Returns:
        -list[tuple]: a list of the NMS boxes.
        -list[float]: the probabilities of the NMS boxes.
    """
    # lists of picked poxes and their probabilities
    picks=[]
    prob_picks=[]
    # sort boxes and probs by probabilities (small-->large).
    sorted_idx=np.argsort(probs)
    sorted_boxes=[boxes[i] for i in sorted_idx]
    sorted_probs=[probs[i] for i in sorted_idx]

    while len(sorted_boxes)!=0:
        # set the "last" (most likely) box
        last=sorted_boxes[-1]
        picks.append(last)
        prob_picks.append(sorted_probs[-1])
        sorted_boxes.pop(-1)
        sorted_probs.pop(-1)

        # check IoU for all other boxes
        for i,box in enumerate(sorted_boxes):
            # compute IoU
            x1,y1,x2,y2=last
            x3,y3,x4,y4=box
            if x1<=x4 and y1<=y4 and x2>=x3 and y2>=y3: # overlapping boxes
                I=(min(x2,x4)-max(x1,x3))*(min(y2,y4)-max(y1,y3))
                A1=(x2-x1)*(y2-y1)
                A2=(x4-x3)*(y4-y3)
                U=A1+A2-I
                if U==0:
                    raise ZeroDivisionError(f'divide by zero encountered at box {box} compared against pick {last}; iter {i} of inner loop. I={I},U={U}.')
                IoU=I/U
                # suppress boxes with lots of overlap
                if IoU>IoU_thresh:
                    sorted_boxes.pop(i) 
                    sorted_probs.pop(i)
    
    return picks,prob_picks


def detect_ports(model:torch.nn.Module,orig_image:torch.Tensor,pyr_scale:float=1.5,min_pyr_size:tuple[int]=(300,300),win_step:int=24,roi_size:tuple[int]=(64,64),input_size:tuple[int]=(64,64),min_confidence:float=0.95,verbose=False):
    # original image dimensions (used for getting coordinates of sliding windows)
    orig_height,orig_width=orig_image.shape[:2]
    # lists to store the ROIs (images) and the coordaintes
    rois=[]
    locs=[]
    # runtime tracker
    start=time.time()
    # extract ROI s and coordinates
    pyramid=image_pyramid(orig_image,scale=pyr_scale,minSize=min_pyr_size)
    for image in pyramid:
        resizer=torchvision.transforms.Resize(input_size)
        scale=orig_width/float(image.shape[1])
        # run sliding window
        for (x,y,roiOrig) in sliding_window(image,win_step,roi_size):
            x=int(x*scale)
            y=int(y*scale)
            w=int(roi_size[0]*scale)
            h=int(roi_size[1]*scale)
            roi=resizer(roiOrig)
            rois.append(roi)
            locs.append((x,y,x+w,y+h))
    # runtime tracker
    end=time.time()
    if verbose:
        print(f'[INFO] looping over pyramid/windows took {end-start:.5f} seconds')
    
    if verbose:
        print('[INFO] classifying ROIs...')
    # aggregate ROIs into one tensor
    inputs=torch.stack(rois,dim=0).to(device)
    # track runtime
    start=time.time()
    # get predictions for all ROIs
    logits=torch.nn.Softmax(dim=1)(model(inputs))
    # track runtime
    end=time.time()
    if verbose:
        print(f'[INFO] classifying ROIs took {end-start:.5f} seconds')
    del inputs

    # organize predicted boxes by label
    labels = {}
    for i,logit in enumerate(logits):
        label=torch.argmax(logit).item()
        prob=logit[label].item()
        if prob>=min_confidence:
            box=locs[i]
            L=labels.get(label,[])
            L.append((box,prob))
            labels[label]=L
            
    label_names=['background','connected','empty']
    # run non maximum suppression
    for label in labels.keys():
        # visualization before NMS
        if verbose:
            print(f'[INFO] showing results for "{label_names[label]}"')
            clone=orig_image.detach().clone().permute(1,2,0).numpy()
            for box,prob in labels[label]:
                (startX,startY,endX,endY)=box
                cv2.rectangle(clone,(startX,startY),(endX, endY),(0, 255, 0),2)
            clone=clone.astype(np.uint8)
            plt.imshow(clone)
            plt.title(f'Before NMS ({label_names[label]})')
            plt.show()
        
        # run NMS
        boxes=np.array([p[0] for p in labels[label]])
        proba=np.array([p[1] for p in labels[label]])
        boxes,proba=non_maximum_suppression(boxes,proba,IoU_thresh=0.8)
        # restructure labels to be the same structure as before
        labels[label]=[(boxes[i],proba[i]) for i in range(len(boxes))]

        # visualization after NMS
        if verbose:
            clone=orig_image.detach().clone().permute(1,2,0).numpy()
            for startX,startY,endX,endY in [p[0] for p in labels[label]]:
                cv2.rectangle(clone,(startX,startY),(endX, endY),(0, 255, 0),2)
            clone=clone.astype(np.uint8)
            plt.imshow(clone)
            plt.title(f'After NMS ({label_names[label]})')
            plt.show()

    return labels

In [39]:
# ***************************************************************************
# ----------------------------- Load Best Model -----------------------------
# ***************************************************************************

device='cpu'
# initialize ResNet18 with 3 output classes (background, empty, connected)
model=ResNet18(3).to(device)
# load best weights
state_dict=torch.load('/kaggle/input/port-classifier/best_weights.pth',map_location=device)
model.load_state_dict(state_dict)
# set to eval mode and display architecture
model.eval()

ResNet18(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2))
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): BottleneckBlock(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
    )
    (1): BottleneckBlock(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      

In [40]:
# ***************************************************************************
# ------------------------ Run Test on Sample Image -------------------------
# ***************************************************************************
# The testing stage never progressed beyond manually inspecting outputs from the object detector.
# This is because the results were obviously inadequate, so quantitative testing was not necessary.

# index of test image
idx=17
# test image
image=test_dataset[idx][0]
# run object detection. Display outputs by setting "verbose" arg to True
labels=detect_ports(model,image,min_pyr_size=(200,200),win_step=16,roi_size=(64,64),min_confidence=0.8,verbose=False)