In [1]:
# Computer Vision HW6
# Author: Kai Liao
# Trained on Google Cloud

In [2]:
import os
import numpy as np
from glob import glob
from PIL import Image
from scipy.io import loadmat
from matplotlib.path import Path

import torch
import torchvision.transforms as transforms

In [3]:
class EgoHands(torch.utils.data.Dataset):
    def __init__(self, path, transform=None):
        self.path = path
        folders = sorted(glob(os.path.join(self.path, "*")))
        self.imgs = []
        self.polygons = []
        for folder in folders:
            # Add images
            self.imgs += sorted(glob(os.path.join(folder, "*.jpg")))
            
            # Add polygons
            polygon_path = glob(os.path.join(folder, "*.mat"))[0]
            polygon = loadmat(polygon_path)['polygons'][0]
            for i in range(len(polygon)):
                self.polygons.append(polygon[i])
                
        # TODO: use suitable transformations
        self.transform = transforms.Compose([transforms.ToTensor()])

    def __getitem__(self, index):
        # Load image
        img = np.array(Image.open(self.imgs[index]))

        # Compute mask
        polygons = self.polygons[index]
        gt_mask = []
        x, y = np.meshgrid(
            np.arange(img.shape[1]), np.arange(img.shape[0]))
        x, y = x.flatten(), y.flatten()
        points = np.vstack((x, y)).T
        for i, polygon in enumerate(polygons):
            if polygon.size == 0:
                continue
            path = Path(polygon)
            grid = path.contains_points(points)
            grid = grid.reshape((*img.shape[:2]))
            gt_mask.append(np.expand_dims(grid, axis=-1))
        
        try:
            gt_mask = np.concatenate(gt_mask, axis=-1)
            target = {}
            boxes = []
            for i in range(gt_mask.shape[2]):
                pos = np.where(gt_mask[:,:,i])
                xmin = np.min(pos[1])
                xmax = np.max(pos[1])
                ymin = np.min(pos[0])
                ymax = np.max(pos[0])
                boxes.append([xmin, ymin, xmax, ymax])

            labels = torch.ones((gt_mask.shape[2],), dtype=torch.int64)
            boxes = torch.as_tensor(boxes, dtype=torch.float32)
            target["boxes"] = boxes
            target["labels"] = labels

            if self.transform:
                img = self.transform(img)

            return img, target
        
        except:
            '''
            return next image that has mask
            it may happen that an image is used multiple times during training
            but the effect is parhaps negligible given the size of dataset.
            '''
            return self.__getitem__(index + 1)

    def __len__(self):
        return len(self.imgs)

In [4]:
import transforms as T
import utils

def get_transform(train):
    transforms = []
    transforms.append(T.ToTensor())
    if train:
        transforms.append(T.RandomHorizontalFlip(0.5))
    return T.Compose(transforms)


# set up model
import torchvision
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor

model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
num_classes = 2
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')


# define dataset: 70/30 train/val
train_set = EgoHands('_LABELLED_SAMPLES/', get_transform(train=True))
val_set = EgoHands('_LABELLED_SAMPLES/', get_transform(train=False))
indices = torch.randperm(len(train_set)).tolist()
train_size = int(len(train_set) * 0.7)
val_set = torch.utils.data.Subset(val_set, indices[:-train_size])
train_set = torch.utils.data.Subset(train_set, indices[-train_size:])

# define dataloader
train_loader = torch.utils.data.DataLoader(
        train_set, batch_size=2, shuffle=True, num_workers=4,
        collate_fn=utils.collate_fn)

val_loader = torch.utils.data.DataLoader(
        val_set, batch_size=1, shuffle=False, num_workers=4,
        collate_fn=utils.collate_fn)

model.to(device)

FasterRCNN(
  (transform): GeneralizedRCNNTransform()
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
      (bn1): FrozenBatchNorm2d()
      (relu): ReLU(inplace=True)
      (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
      (layer1): Sequential(
        (0): Bottleneck(
          (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn1): FrozenBatchNorm2d()
          (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
          (bn2): FrozenBatchNorm2d()
          (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
          (bn3): FrozenBatchNorm2d()
          (relu): ReLU(inplace=True)
          (downsample): Sequential(
            (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
            (1): FrozenBatchNorm2d()
          )
  

In [5]:
from engine import train_one_epoch, evaluate
import utils

def training():
    params = [p for p in model.parameters() if p.requires_grad]
    optimizer = torch.optim.SGD(params, lr=0.005,
                                momentum=0.9, weight_decay=0.0005)
    
    lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer,
                                                   step_size=3,
                                                   gamma=0.1)

    num_epochs = 1

    for epoch in range(num_epochs):
        train_one_epoch(model, optimizer, train_loader, device, epoch, print_freq=120)
        lr_scheduler.step()

    print("Finished Training")

In [6]:
training()

Epoch: [0]  [   0/1680]  eta: 12:44:04  lr: 0.000010  loss: 1.4050 (1.4050)  loss_classifier: 0.8627 (0.8627)  loss_box_reg: 0.2130 (0.2130)  loss_objectness: 0.2278 (0.2278)  loss_rpn_box_reg: 0.1014 (0.1014)  time: 27.2886  data: 25.8937  max mem: 2478
Epoch: [0]  [ 120/1680]  eta: 1:36:20  lr: 0.000609  loss: 0.3071 (0.5678)  loss_classifier: 0.1325 (0.2856)  loss_box_reg: 0.1612 (0.2091)  loss_objectness: 0.0128 (0.0497)  loss_rpn_box_reg: 0.0151 (0.0234)  time: 3.8936  data: 3.4316  max mem: 2745
Epoch: [0]  [ 240/1680]  eta: 1:24:28  lr: 0.001209  loss: 0.1665 (0.3933)  loss_classifier: 0.0692 (0.1893)  loss_box_reg: 0.0781 (0.1520)  loss_objectness: 0.0017 (0.0317)  loss_rpn_box_reg: 0.0140 (0.0203)  time: 3.2510  data: 2.7759  max mem: 2745
Epoch: [0]  [ 360/1680]  eta: 1:16:34  lr: 0.001808  loss: 0.1254 (0.3219)  loss_classifier: 0.0587 (0.1535)  loss_box_reg: 0.0520 (0.1256)  loss_objectness: 0.0016 (0.0242)  loss_rpn_box_reg: 0.0111 (0.0186)  time: 3.4863  data: 3.0436  max

In [125]:
# test
model.eval()
sample_inds = np.random.randint(low=0, high=len(val_set) - 1, size=5)
predictions = []

with torch.no_grad():
    for ind in sample_inds:
        predictions.append(model([val_set[ind][0].to(device)]))

In [126]:
from PIL import ImageDraw, Image

for i in range(len(predictions)):
    x = val_set[sample_inds[i]][0]
    img = transforms.ToPILImage()(x).convert("RGB")
    draw = ImageDraw.Draw(img)
    for coordinates in predictions[i][0]["boxes"].tolist():
        draw.rectangle(coordinates, outline="red")
    name = str(i)
    name += ".png"
    img.save(name)

In [127]:
# save
torch.save(model, "./model.pth")