In [0]:
!pip3 uninstall pytorch-hrvvi-ext -y
!pip3 install -U --no-cache-dir pytorch-hrvvi-ext

In [0]:
import sys
import os

import torch
import hutil
import matplotlib.pyplot as plt
print(hutil.__version__)

1.4.11


In [0]:
%load_ext autoreload
%autoreload 2

In [0]:
gdrive = "/gdrive"
from google.colab import drive
drive.mount(gdrive, force_remount=True)
mydrive = os.path.join(gdrive, "My Drive")
!ls /gdrive/My\ Drive

def gpath(p):
    return os.path.join(mydrive, p)

Go to this URL in a browser: https://accounts.google.com/o/oauth2/auth?client_id=947318989803-6bn6qk8qdgf4n4g3pfee6491hc0brc4i.apps.googleusercontent.com&redirect_uri=urn%3Aietf%3Awg%3Aoauth%3A2.0%3Aoob&scope=email%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdocs.test%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fdrive.photos.readonly%20https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fpeopleapi.readonly&response_type=code

Enter your authorization code:
··········
Mounted at /gdrive
'Colab Notebooks'   eng-fra.pt	 images   repo	   weixin.pkl
 datasets	    fonts	 models   result


In [0]:
import random

from PIL import Image
from toolz import curry

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, SGD
from torch.optim.lr_scheduler import LambdaLR, MultiStepLR
from torch.utils.data import DataLoader

from torchvision.models import resnet18

from hutil import cuda
from hutil.datasets import VOCDetection
from hutil.data import train_test_split, Fullset
from hutil.train import init_weights, Trainer
from hutil.train.metrics import TrainLoss, MeanAveragePrecision
from hutil.ext.summary import summary
from hutil.detection import BoundingBox, transform_bbox, transform_bboxes, iou_1m, non_max_suppression, box_collate_fn
from hutil.transforms import Compose, Resize, ToTensor, ToPercentCoords, CenterCrop
from hutil.inference import freeze


█

In [0]:
def iou_1m_centers(box, boxes):
    box = transform_bboxes(
        box, format=BoundingBox.XYWH, to=BoundingBox.LTRB)
    boxes = transform_bboxes(
        boxes, format=BoundingBox.XYWH, to=BoundingBox.LTRB)
    return iou_1m(box, boxes)


def compute_default_boxes(lx, ly, scale, ars):
    default_boxes = torch.zeros(lx, ly, len(ars), 4)
    default_boxes[:, :, :, 0] = (torch.arange(
        lx, dtype=torch.float).view(lx, 1, 1).expand(lx, ly, len(ars)) + 0.5) / lx
    default_boxes[:, :, :, 1] = (torch.arange(
        ly, dtype=torch.float).view(1, ly, 1).expand(lx, ly, len(ars)) + 0.5) / ly
    default_boxes[:, :, :, 2] = scale * torch.sqrt(ars)
    default_boxes[:, :, :, 3] = scale / torch.sqrt(ars)
    return default_boxes


def compute_scales(num_feature_maps, s_min, s_max):
    return [
        s_min + (s_max - s_min) * k / (num_feature_maps - 1)
        for k in range(num_feature_maps)
    ]


def compute_loc_target(gt_box, default_boxes):
    box_txty = (gt_box[:2] - default_boxes[..., :2]) \
        / default_boxes[..., 2:]
    box_twth = torch.log(gt_box[2:] / default_boxes[..., 2:])
    return torch.cat((box_txty, box_twth), dim=-1)


class SSDTransform:

    def __init__(self, default_boxes, num_classes, label_field="category_id", bbox_field="bbox"):
        self.default_boxes = default_boxes
        self.num_classes = num_classes
        self.label_field = label_field
        self.bbox_field = bbox_field

    def __call__(self, img, anns):
        num_feature_maps = len(self.default_boxes)
        loc_target = []
        cls_target = []
        default_boxes = []
        for i in range(num_feature_maps):
            d_boxes = self.default_boxes[i].view(-1, 4)
            default_boxes.append(d_boxes)
            num_anchors = d_boxes.size(0)
            loc_target.append(torch.zeros(
                num_anchors, 4))
            cls_target.append(torch.full(
                (num_anchors,), self.num_classes - 1, dtype=torch.long))

        for ann in anns:
            label = ann[self.label_field]
            bbox = torch.tensor(
                transform_bbox(
                    ann[self.bbox_field],
                    BoundingBox.LTWH,
                    BoundingBox.XYWH))

            max_ious = []
            for d_boxes, cls_t, loc_t in zip(default_boxes, cls_target, loc_target):
                ious = iou_1m_centers(bbox, d_boxes)
                max_ious.append(ious.max(dim=0))

                iou_mask = ious > 0.5
                if iou_mask.sum() != 0:
                    cls_t[iou_mask] = label
                    loc_t[iou_mask] = compute_loc_target(
                        bbox, d_boxes[iou_mask])

            i, (max_iou, ind) = max(
                enumerate(max_ious), key=lambda x: x[1][0])
            loc_target[i][ind] = compute_loc_target(
                bbox, default_boxes[i][ind])
            cls_target[i][ind] = label
        return img, [loc_target, cls_target]


class SSDLoss(nn.Module):
    def __init__(self, num_classes, neg_pos_ratio=3, p=0.01):
        super().__init__()
        self.num_classes = num_classes
        self.neg_pos_ratio = neg_pos_ratio
        self.p = p

    def forward(self, loc_preds, cls_preds, loc_target, cls_target):
        BACKGROUND_CLASS = self.num_classes - 1
        total_pos = 0
        loc_loss = 0
        cls_loss_pos = 0
        cls_loss_neg = 0
        for loc_p, cls_p, loc_t, cls_t in zip(loc_preds, cls_preds, loc_target, cls_target):

            pos = cls_t != BACKGROUND_CLASS
            num_pos = pos.sum().item()
            if num_pos == 0:
                continue

            total_pos += num_pos

            cls_loss_pos += F.cross_entropy(
                cls_p[pos], cls_t[pos], reduction='sum')

            cls_loss_neg_all = -F.log_softmax(
                cls_p[~pos], dim=1)[..., BACKGROUND_CLASS]
            num_neg = min(self.neg_pos_ratio * num_pos, len(cls_loss_neg_all))
            if num_neg != 0:
                cls_loss_neg += torch.topk(
                    cls_loss_neg_all, num_neg, sorted=False)[0].sum()
            else:
                cls_loss_neg += torch.zeros_like(cls_loss_pos)

            loc_loss += F.smooth_l1_loss(
                loc_p[pos], loc_t[pos], reduction='sum')

        if random.random() < self.p:
            print("loc: %.4f  cls_pos: %.4f cls_neg: %.4f" %
                  (loc_loss.item() / total_pos,
                   cls_loss_pos.item() / total_pos,
                   cls_loss_neg.item() / total_pos))

        loss = (loc_loss + cls_loss_pos + cls_loss_neg) / total_pos
        return loss


class SSDInference:

    def __init__(self, width, height, default_boxes, num_classes, conf_threshold=0.01, max_boxes=0, iou_threshold=0.45):
        self.width = width
        self.height = height
        self.default_boxes = default_boxes
        self.num_classes = num_classes
        self.conf_threshold = conf_threshold
        self.max_boxes = max_boxes
        self.iou_threshold = iou_threshold

    def __call__(self, loc_preds, cls_preds):
        detections = []
        batch_size = cls_preds[0].size(0)
        for i in range(batch_size):
            boxes = []
            confs = []
            labels = []
            for loc_p, cls_p, d_boxes in zip(loc_preds, cls_preds, self.default_boxes):
                loc_p = loc_p[i]
                cls_p = cls_p[i]
                d_boxes = d_boxes.view(-1, 4)

                conf = torch.softmax(cls_p, dim=1)[..., :-1]
                conf, label = torch.max(conf, dim=1)

                mask = conf > self.conf_threshold
                conf = conf[mask]
                label = label[mask]
                box = loc_p[mask]
                d_boxes = d_boxes[mask]

                box[:, :2].mul_(d_boxes[:, 2:]).add_(d_boxes[:, :2])
                box[:, 2:].exp_().mul_(d_boxes[:, 2:])
                box[:, [0, 2]] *= self.width
                box[:, [1, 3]] *= self.height

                boxes.append(box)
                confs.append(conf)
                labels.append(label)

            boxes = torch.cat(boxes, dim=0)
            confs = torch.cat(confs, dim=0)
            labels = torch.cat(labels, dim=0)

            boxes = transform_bboxes(
                boxes, format=BoundingBox.XYWH, to=BoundingBox.LTRB, inplace=True)
            indices = non_max_suppression(
                boxes, confs, self.max_boxes, self.iou_threshold)
            dets = [
                BoundingBox(
                    image_name=i,
                    class_id=labels[ind].item(),
                    box=boxes[ind].tolist(),
                    confidence=confs[ind].item(),
                    box_format=BoundingBox.LTRB,
                ) for ind in indices
            ]
            detections += dets
        return detections


In [0]:
import torch
import torch.nn as nn
import torch.nn.functional as F


def conv1x1(in_channels, out_channels):
    return nn.Conv2d(in_channels, out_channels, kernel_size=1)


def conv3x3(in_channels, out_channels, stride=1, padding=1):
    return nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=padding)


class PredTransition(nn.Module):
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.conv1 = conv1x1(in_channels, out_channels)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu1 = nn.ReLU(inplace=True)
        self.conv2 = conv3x3(out_channels, out_channels, stride=2)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.relu2 = nn.ReLU(inplace=True)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu1(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu2(x)
        return x


class SSD(nn.Module):
    def __init__(self, backbone, out_channels, num_classes):
        super().__init__()
        self.num_classes = num_classes
        self.conv1 = backbone.conv1
        self.bn1 = backbone.bn1
        self.relu = backbone.relu
        self.maxpool = backbone.maxpool

        self.layer1 = backbone.layer1
        self.layer2 = backbone.layer2
        self.layer3 = backbone.layer3
        self.layer4 = backbone.layer4

        self.t1 = PredTransition(512, 512)
        self.t2 = PredTransition(512, 256)
        self.t3 = PredTransition(256, 256)

        self.pred1 = conv1x1(128, out_channels[0])
        self.pred2 = conv1x1(256, out_channels[1])
        self.pred3 = conv1x1(512, out_channels[2])
        self.pred4 = conv1x1(512, out_channels[3])
        self.pred5 = conv1x1(256, out_channels[4])
        self.pred6 = conv1x1(256, out_channels[5])

    def forward(self, x):
        b = x.size(0)
        c1 = self.conv1(x)
        c1 = self.bn1(c1)
        c1 = self.relu(c1)
        c2 = self.maxpool(c1)

        c2 = self.layer1(c2)
        c3 = self.layer2(c2)
        c4 = self.layer3(c3)
        c5 = self.layer4(c4)

        c6 = self.t1(c5)
        c7 = self.t2(c6)
        c8 = self.t3(c7)

        f3 = self.pred1(c3)
        f4 = self.pred2(c4)
        f5 = self.pred3(c5)
        f6 = self.pred4(c6)
        f7 = self.pred5(c7)
        f8 = self.pred6(c8)

        fs = [f3, f4, f5, f6, f7, f8]

        loc_preds = []
        cls_preds = []
        for f in fs:
            f = f.permute(0, 3, 2, 1).contiguous().view(
                b, -1, 4 + self.num_classes)
            loc_preds.append(f[..., :4])
            cls_preds.append(f[..., 4:])
        return loc_preds, cls_preds


In [0]:


WIDTH = 300
HEIGHT = 300
LOCATIONS = [
    (38, 38),
    (19, 19),
    (10, 10),
    (5, 5),
    (3, 3),
    (2, 2),
]
ASPECT_RATIOS = [
    (1, 2, 1/2),
    (1, 2, 3, 1/2, 1/3),
    (1, 2, 3, 1/2, 1/3),
    (1, 2, 3, 1/2, 1/3),
    (1, 2, 3, 1/2, 1/3),
    (1, 2, 1/2),
]
ASPECT_RATIOS = [torch.tensor(ars) for ars in ASPECT_RATIOS]
NUM_FEATURE_MAPS = len(ASPECT_RATIOS)
SCALES = compute_scales(NUM_FEATURE_MAPS, 0.2, 0.9)
DEFAULT_BOXES = [
    compute_default_boxes(lx, ly, scale, ars)
    for (lx, ly), scale, ars in zip(LOCATIONS, SCALES, ASPECT_RATIOS)
]

NUM_CLASSES = 21


In [0]:

train_transform = Compose([
    Resize(HEIGHT),
    CenterCrop(HEIGHT),
    ToPercentCoords(),
    ToTensor(),
    SSDTransform(DEFAULT_BOXES, NUM_CLASSES),
])

test_transform = Compose([
    Resize(HEIGHT),
    CenterCrop(HEIGHT),
    ToTensor(),
])

data_home = "./VOC"
ds = VOCDetection(data_home, year='2012', image_set='trainval', download=True)
rest, ds = train_test_split(ds, 0.003)
ds_train = Fullset(ds, train_transform)
ds_val = Fullset(ds, test_transform)
# ds_train, ds_val = train_test_split(
#     ds, test_ratio=0.05,
#     transform=train_transform,
#     test_transform=test_transform)


Dataset found. Skip download or extract


In [0]:

backbone = resnet18(pretrained=True)
del backbone.fc
freeze(backbone)

out_channels = [
    (4 + NUM_CLASSES) * len(ars)
    for ars in ASPECT_RATIOS
]
net = SSD(backbone, out_channels, NUM_CLASSES)
# net.apply(init_weights(nonlinearity='relu'))
criterion = SSDLoss(NUM_CLASSES, p=1)
# optimizer = SGD(filter(lambda x: x.requires_grad, net.parameters()),
#                 lr=1e-2, momentum=0.9, dampening=0.9, weight_decay=5e-4)
optimizer = Adam(filter(lambda x: x.requires_grad,
                        net.parameters()), lr=1e-2, weight_decay=5e-4, amsgrad=True)
lr_scheduler = MultiStepLR(optimizer, [], gamma=0.1)
# lr_scheduler = LambdaLR(optimizer, lambda x: 0.99 ** x)


In [0]:

metrics = {
    'loss': TrainLoss(),
}
inference = SSDInference(
    WIDTH, HEIGHT, cuda(DEFAULT_BOXES), NUM_CLASSES, max_boxes=10,
)
test_metrics = {
    'mAP': MeanAveragePrecision(inference)
}


trainer = Trainer(net, criterion, optimizer, lr_scheduler,
                  metrics=metrics, evaluate_metrics=test_metrics,
                  save_path="./checkpoints", name="SSD-VOC")


In [0]:
summary(net, (3,HEIGHT, WIDTH))

In [0]:
train_loader = DataLoader(
    ds_train, batch_size=64, shuffle=True, num_workers=1, pin_memory=True)
val_loader = DataLoader(
    ds_val, batch_size=64, collate_fn=box_collate_fn)


In [0]:
hist = trainer.fit(train_loader, 20)
plot_history(hist)

In [0]:
test_metrics['mAP'].predict.max_boxes = 100
%time trainer.evaluate(val_loader)