# Regional CNN

The regional CNN is an object detection network. The inference pipeline first peroforms selective search on the image. This identifies prospective bounding boxes. Each prospective bounding box is the cropped from the image, resized and send though a backbone convolutional feature extractor. A regression and classification is performed with an MLP from these features. The regression fine tunes the bounding box prediction and the classification identifies which object is contained in the box. 

Read more at: https://arxiv.org/abs/1311.2524v5

In [4]:
import numpy as np
import torch 
import torchvision
from torchvision import transforms
from torch import nn
import torch_snippets
from torch_snippets import *
from torchvision.datasets import VOCDetection
from torchvision.transforms import ToTensor
import matplotlib.pyplot as plt
from torch.utils.data import Dataset
from torch import optim
import selectivesearch
from IPython.display import clear_output
from torch_snippets import Report
from torchvision.ops import nms
import os
import pickle
import glob
import cv2
import pandas as pd

# Choose Device
if torch.backends.mps.is_built():
    device = torch.device("mps")
elif torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

print(device)

# Hyperparameters

In [5]:
# Optimization
LEARNING_RATE = 1e-3

# Training params 
EPOCHS = 5
BATCH_SIZE = 128

# Reprocess Dataset
REPROCESS = False

# Utilities

In [6]:
def extract_regions(img):
    img = img.permute(1, 2, 0)
    img_lbl,regions = selectivesearch.selective_search(img, scale=200, min_size=100)
    img_area = np.prod(img.shape[:2]) 
    candidates = [] 
    for r in regions: 
        if r['rect'] in candidates: continue
        if r['size'] < (0.05*img_area): continue
        if r['size'] > (1*img_area): continue
        x, y, w, h = r['rect']
        candidates.append(list(r['rect']))
    candidates = [np.array([x, y, x+w, y+h]) for x, y , w, h in candidates]
    return torch.Tensor(candidates)

In [7]:
def IOU(box1, box2, epsilon=1e-5):
    x1 = min(box1[0], box2[0])
    y1 = min(box1[1], box2[1])
    x2 = max(box1[2], box2[2])
    y2 = max(box1[3], box2[3])
    
    if (x2-x1) <= 0 or (y2 - y1) <=0:
        return 0.0

    union_area = (x2 - x1) * (y2 - y1)
    
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])
    
    if (x2 - x1) <=0 or (y2 - y1) <= 0: 
        return 0.0
    
    intersection_area = (x2 - x1) * (y2 - y1)
    return float(intersection_area / (union_area + epsilon))

In [8]:
normalize = transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                                 std=[0.229, 0.224, 0.225])

def normalize_image(img):
    img = normalize(img)
    return img.to(device).float()

# Downloading Data

In [9]:
raw_train_dataset = VOCDetection(root='data/',year ='2012', image_set="train", download=True, transform=ToTensor())
raw_test_dataset = VOCDetection(root='data/',year ='2012', image_set="val", download=True, transform=ToTensor())

Using downloaded and verified file: data/VOCtrainval_11-May-2012.tar
Extracting data/VOCtrainval_11-May-2012.tar to data/
Using downloaded and verified file: data/VOCtrainval_11-May-2012.tar
Extracting data/VOCtrainval_11-May-2012.tar to data/


In [11]:
def get_bounding_boxes(y):
    objects = y['annotation']['object']
    bboxs = []
    for obj in objects:
        bbox = list(obj['bndbox'].values())
        bbox = [int(i) for i in bbox]
        bboxs.append(bbox)
    return torch.tensor(bboxs)

In [12]:
def get_class_labels(y, encode_labels=None):
    objects = y['annotation']['object']
    class_labels = []
    
    if encode_labels:
        for obj in objects:
            label_name = obj['name']
            label = encode_labels.index(label_name)
            class_labels.append(label)
        return torch.tensor(class_labels)
    else: 
        for obj in objects:
            label = obj['name']
            class_labels.append(str(label))
        return class_labels

In [13]:

def get_img_filepath(y, root_dir="data/VOCdevkit"):
    folder = y["annotation"]["folder"]
    filename = y["annotation"]["filename"]
    return root_dir + "/" + folder + "/JPEGImages/" + filename


In [14]:
def process_img(x, y):
    _, H, W = x.shape
    candidate_regions = extract_regions(x)
    
    rois, classes, deltas, crops, paths = [], [], [], [], []
    gtbbs = get_bounding_boxes(y)
    img_path = get_img_filepath(y)
    # calculate IOU for each of the region candidates and the ground truth bounding boxes
    ious = np.array([[IOU(candidate, _bb_) for candidate in candidate_regions] for _bb_ in gtbbs]).T
    
    # calculate candidate class labels
    # calculate candidate bounding box offsets
    for ix, candidate in enumerate(candidate_regions):
        lx, ly, hx, hy = candidate.type(torch.int16)
        candidate_ious = ious[ix]
        best_iou_at = np.argmax(candidate_ious)
        best_iou = candidate_ious[best_iou_at]
        best_bb = _lx, _ly, _hx, _hy = gtbbs[best_iou_at]
        
        #calculate offsets 
        delta = torch.tensor([_lx-lx, _ly-ly, _hx-hx, _hy-hy])/torch.tensor([W, H, W, H])
        
        #calculate targets
        if best_iou > 0.3:
            clss = get_class_labels(y)[best_iou_at]
        else: 
            clss = 'background'
            
        # calculate candidate offset from image
        roi = candidate/torch.tensor([W, H, W, H])
        
        # calculate the image cropped by the region candidate
        crop = x[:, ly:hy, lx:hx]
        
        crops.append(crop)
        classes.append(clss)
        deltas.append(delta)
        rois.append(roi)
        paths.append(img_path)
        
    paths = np.array(paths)
    rois = torch.stack(rois).numpy()
    classes = np.array(classes)
    deltas = torch.stack(deltas).numpy()
    ious = torch.tensor(ious).numpy()

    img_dict = {"paths":paths, "rois0":rois[:, 0], "rois1":rois[:, 1], "rois2":rois[:, 2], "rois3":rois[:, 3],
                "classes":classes, "deltas0":deltas[:, 0], "deltas1":deltas[:, 1],  "deltas2":deltas[:, 2],  "deltas3":deltas[:, 3],
               "gtbbs0":_lx.item(), "gtbbs1":_ly.item(), "gtbbs2":_hx.item(), "gtbbs3":_hy.item(),}
        
    img_df = pd.DataFrame(data=img_dict, index=None)
    
    return img_df



In [15]:
def preprocess_dataset(raw_dataset, root_dir='VOC_data/', set_type='train', max_datasize=200):
    n_images = raw_dataset.__len__()
    total_datapoints = 0
    x, y = raw_dataset.__getitem__(0)
    df = process_img(x, y)
    total_datapoints += len(df)
    for i in range(1, n_images):
        clear_output(wait=True)
        print(i, " Raw images processed producing ", total_datapoints, " data points ----- ", (total_datapoints*100)/max_datasize, "% Complete")
        x, y = raw_dataset.__getitem__(i)
        new_df = process_img(x,y)
        df = pd.concat([df, new_df])
        total_datapoints += len(new_df)
        if total_datapoints > max_datasize:
            break
    df.to_csv("VOC_data/df_" + set_type + ".csv", index=False)
   



In [16]:
if REPROCESS:
    preprocess_dataset(raw_train_dataset, set_type='train', max_datasize=200000)

In [17]:
if REPROCESS:
    preprocess_dataset(raw_test_dataset, set_type='test', max_datasize=10000)

# Prepare Final Dataset

In [18]:
class RCNNDataset(Dataset):
    def __init__(self, root_dir='VOC_data/', set_type='train'):
        self.root = root_dir
        self.ds = pd.read_csv(root_dir + "df_" + set_type + ".csv")
            
        self.labels = ["background", "person", "bird", "cat", "cow", "dog", "horse", "sheep", 
                       "aeroplane", "bicycle", "boat", "bus", "car", "motorbike", "train", 
                       "bottle", "chair", "diningtable", "pottedplant", "sofa", "tvmonitor"]
        
        
    def __len__(self):
        return len(self.ds)-1
        
        
    def __getitem__(self, idx):

        # Get the Data point and split into it's labels
        data_point = self.ds.iloc[idx].to_numpy()
        path = data_point[0]
        rois = data_point[1:5]
        delta = data_point[6:10]
        label = data_point[5]
        gtbbs = data_point[10:14]
        
        # Read in teh image and perform the crop and boudning box transform
        img = cv2.cvtColor(cv2.imread(data_point[0]), cv2.COLOR_BGR2RGB)
        H, W, _= img.shape
        sh = np.array([W, H, W, H])
        bbox = (rois*sh).astype(int)
        crop = img[bbox[1]:bbox[3], bbox[0]:bbox[2]]
        return img, crop, bbox, label, delta, gtbbs
            
    
    def collate_fn(self, batch):
        inputs, rois, rixs, labels, deltas, gtbbs = [],[],[],[],[],[]
        
        # Normalize crop and send labels to devices ready for running
        # through the model
        for ix in range(len(batch)):
            img, crop, bbox, label, delta, gtbb = batch[ix]
            crop = torch.tensor(cv2.resize(img, (224,224))).permute(2, 0, 1)
            crop = normalize_image(crop/255.)
            
            inputs.append(crop)
            labels.append(self.labels.index(label))
            gtbbs.append(gtbb)
            deltas.append(torch.tensor(delta.astype(np.float32)))
        
        inputs = torch.stack(inputs).to(device)
        labels = torch.tensor(labels).long().to(device)
        deltas = torch.stack(deltas).float().to(device)
        return inputs, labels, deltas
            

In [19]:
train_ds = RCNNDataset(set_type='train')
test_ds = RCNNDataset(set_type='test')

# Create Data Loader

In [20]:
train_loader = torch.utils.data.DataLoader(train_ds, batch_size=BATCH_SIZE, collate_fn=train_ds.collate_fn, drop_last=True)
test_loader = torch.utils.data.DataLoader(test_ds, batch_size=BATCH_SIZE, collate_fn=train_ds.collate_fn, drop_last=True)

# Build the Model

In [21]:
vgg_backbone = torchvision.models.vgg16(pretrained=True)
vgg_backbone.classifier = nn.Sequential()
for param in vgg_backbone.parameters():
    param.requires_grad = False
vgg_backbone.eval().to(device)



VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [22]:
class RCNN(nn.Module):
    def __init__(self):
        super().__init__()
        feature_dim = 25088
        self.backbone = vgg_backbone
        self.cls_score = nn.Linear(feature_dim, 21)
        
        self.bbox = nn.Sequential(nn.Linear(feature_dim, 512),
                                 nn.ReLU(),
                                 nn.Linear(512, 4),
                                 nn.Tanh())
        
        self.cel = nn.CrossEntropyLoss()
        self.l1 = nn.L1Loss()
        
    def forward(self, input):
        feat = self.backbone(input)
        cls_score = self.cls_score(feat)
        bbox = self.bbox(feat)
        return cls_score, bbox
    
    def calc_loss(self, probs, _deltas, labels, deltas):
        # Classification loss
        detection_loss = self.cel(probs, labels)
        
        # Regression Loss
        ixs, = torch.where(labels !=0)
        _deltas = _deltas[ixs]
        deltas = deltas[ixs]
        self.lmb = 10.0
        if len(ixs) > 0:
            regression_loss = self.l1(_deltas, deltas)
            return detection_loss + self.lmb * regression_loss, detection_loss.detach(), regression_loss.detach()
        else:
            regression_loss = 0.0
            return detection_loss + self.lmb * regression_loss, detection_loss.detach(), regression_loss
            

# Training Loop

In [23]:
def decode(_y): 
    _, preds = _y.max(-1) 
    return preds

def train_batch(inputs, model, optimizer, criterion):
    inputs, clss, deltas = inputs
    model.train()
    optimizer.zero_grad()
    _clss, _deltas = model(inputs)
    loss, loc_loss, regr_loss = criterion(_clss, _deltas, clss, deltas)
    accs = clss == decode(_clss)
    loss.backward()
    optimizer.step()
    return loss.detach(), loc_loss, regr_loss, accs.cpu().numpy()

In [24]:
@torch.no_grad()
def validate_batch(inputs, model, criterion):
    input, clss, deltas = inputs
    with torch.no_grad():
        model.eval()
        _clss,_deltas = model(input)
        loss, loc_loss, regr_loss = criterion(_clss, _deltas, clss, deltas)
        _, _clss = _clss.max(-1)
        accs = clss == _clss
    return _clss, _deltas, loss.detach(), loc_loss, regr_loss, accs.cpu().numpy()

In [25]:


rcnn = RCNN().to(device)
criterion = rcnn.calc_loss
optimizer = optim.SGD(rcnn.parameters(), lr=LEARNING_RATE)
log = Report(EPOCHS)

for epoch in range(EPOCHS):
    _n = len(train_loader)
    for ix, inputs in enumerate(train_loader):
        loss, loc_loss, regr_loss, accs = train_batch(inputs, rcnn, optimizer, criterion)
        pos = (epoch + ((ix+1)/_n))

        log.record(pos, trn_loss=loss.item(), trn_loc_loss=loc_loss, trn_regr_loss=regr_loss, 
                   trn_acc=accs.mean(), end='\r')
               
    _n = len(test_loader)
    PATH = "saved_models/RCNN_EPOCH_" + str(epoch) + "_accuracy_" + "{0:.4g}".format(accs.mean())
    torch.save(rcnn.state_dict(), PATH)
    for ix, inputs in enumerate(test_loader):
        _clss, _deltas, loss, loc_loss, regr_loss, accs = validate_batch(inputs, rcnn, criterion)
        pos = (epoch + ((ix+1)/_n))
        log.record(pos, val_loss=loss.item(), val_loc_loss=loc_loss, 
            val_regr_loss=regr_loss, 
            val_acc=accs.mean(), end='\r')
        # plot training and validation metrics
        
        
log.plot_epochs('trn_loss,val_loss'.split(','))

EPOCH: 0.273  trn_loss: 2.340  trn_loc_loss: 1.388  trn_regr_loss: 0.095  trn_acc: 0.547  (269.20s - 4666.05s remaining))

KeyboardInterrupt: 

# Test Model

In [None]:


def test_model(idx, show_output=True):
    img, y = raw_train_dataset.__getitem__(idx)
    gtbbs = get_bounding_boxes(y)
    gt_labels = get_class_labels(y)
    candidates = extract_regions(img)
    inputs = []
    for candidate in candidates:
        x,y,X,Y = np.array(candidate).astype(int)
        if img.shape[2] == 3:
            img = torch.tensor(img)
            img = img.permute(2, 0, 1)
        else:
            img = torch.tensor(img)

        img = img.permute(1, 2, 0).numpy()
        crop = cv2.resize(img[y:Y,x:X], (224,224))
        crop = torch.tensor(crop).permute(2, 0, 1)
        inputs.append(normalize_image(crop/255.)[None])
    inputs = torch.cat(inputs).to(device)

    with torch.no_grad():
        rcnn.eval()
        probs, deltas = rcnn(inputs)
        probs = torch.nn.functional.softmax(probs, -1)
        confs, clss = torch.max(probs, -1)
    candidates = np.array(candidates)
    confs, clss, probs, deltas = [tensor.detach().cpu().numpy() for tensor in [confs, clss, probs, deltas]]

    ixs = clss!=train_ds.labels.index('background')
    confs, clss, probs, deltas, candidates = [tensor[ixs] for tensor in [confs, clss, probs, deltas, candidates]]
    bbs = (candidates + deltas).astype(np.uint16)
    ixs = nms(torch.tensor(bbs.astype(np.float32)), torch.tensor(confs), 0.05)
    confs, clss, probs, deltas, candidates, bbs = [tensor[ixs] for tensor in [confs, clss, probs, deltas, candidates, bbs]]
    if len(ixs) == 1:
        confs, clss, probs, deltas, candidates, bbs = [tensor[None] for tensor in [confs, clss, probs, deltas, candidates, bbs]]
    if len(confs) == 0 and not show_output:
        return (0,0,224,224), 'background', 0
    if len(confs) > 0:
        best_pred = np.argmax(confs)
        best_conf = np.max(confs)
        best_bb = bbs[best_pred]
        x,y,X,Y = best_bb
    _, ax = plt.subplots(1, 2, figsize=(20,10))
    
    gtbbs = gtbbs.numpy().tolist()
    img = np.ascontiguousarray(img, dtype=np.float32)
    torch_snippets.show(img, bbs=gtbbs, texts=gt_labels, ax=ax[0])
    ax[0].grid(False)
    ax[0].set_title('Original image')
    if len(confs) == 0:
        ax[1].imshow(img)
        ax[1].set_title('No objects')
        plt.show()
        return
    ax[1].set_title(test_ds.labels[clss[best_pred]])
    bbs = bbs.tolist()
    texts = [train_ds.labels[c] for c in clss.tolist()]
    img2 = np.ascontiguousarray(img, dtype=np.float32)
    bbx = np.copy(bbs)
    torch_snippets.show(img2, bbs=bbx, texts=texts,ax=ax[1])
    return (x,y,X,Y), train_ds.labels[clss[best_pred]], best_conf



In [None]:
bbox, label, confidence = test_model(0)

In [None]:

for input, bbox, label in train_loader:
    break

In [None]:
plt.imshow(input[0].permute(1,2,0).to("cpu").numpy())