In [None]:
import cv2
import math
import glob
import inspect
import json
import mlflow
import torch
import os
import time
import sys
import datasets

import torchvision as tv
import lightning as L
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt

from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from pytorch_lightning.loggers import MLFlowLogger
from lightning.pytorch.callbacks import ModelCheckpoint

# Build anchors and find best candidate

In [None]:
BATCH_SIZE = 5
HEIGHT = 16
WIDTH = 16

anchors = []
anchors.append((-32, -32, 32, 32))
anchors.append((-64, -64, 64, 64))
anchors.append((-128, -128, 128, 128))
anchors = np.asarray(anchors)
anchors = torch.from_numpy(anchors)
print(anchors.shape)
N_ANCHORS = len(anchors)

x = torch.arange(0, WIDTH)
y = torch.arange(0, HEIGHT)
grid_x, grid_y = torch.meshgrid([x, y])
grid = torch.cat([grid_x[:, :, None], grid_y[:, :, None]], dim=-1)
grid = grid.view(-1, 2)
print(grid.shape)
grid = grid[None, :].repeat(BATCH_SIZE, 1, 1)
print(grid.shape)

grid_anchors = anchors[None, :].repeat(BATCH_SIZE, HEIGHT * WIDTH, 1, 1)
print(grid_anchors.shape)

grid = grid[:, :, None, :].repeat(1, 1, 1, 1)

grid_anchors[:, :, :, 0] += grid[:, :, :, 0]
grid_anchors[:, :, :, 1] += grid[:, :, :, 1]
grid_anchors[:, :, :, 2] += grid[:, :, :, 0]
grid_anchors[:, :, :, 3] += grid[:, :, :, 1]

candidates = grid_anchors.view(BATCH_SIZE * WIDTH * HEIGHT * N_ANCHORS, 4)
print(candidates.shape)

gt_bboxes = []
gt_bboxes.append((0, 0, 0, 0))

In [None]:
class CustomDataset(Dataset):
    def __init__(self, root, train=True):
        self.root = root

        dataset = datasets.load_dataset("fuliucansheng/pascal_voc", "voc2007_main", split="train")
        self.elements = np.asarray(dataset)

        n_train = int(len(self) * 0.95)

        if train:
            self.elements = self.elements[:n_train]
        else:
            self.elements = self.elements[n_train:]

    def __len__(self):
        return len(self.elements)

    def __getitem__(self, idx):
        
        if torch.is_tensor(idx):
            idx = idx.tolist()

        element = self.elements[idx]

        p = 256
        k = 4
        
        element = dataset[i]
    
        classes = element["classes"]
        
        objects = element["objects"]
        bboxes = objects["bboxes"]
        bboxes_classes = objects["classes"]
        
        img = element["image"]
        img = np.asarray(img)
        h, w, c = img.shape
    
        if h < p:
            f = np.ceil(p / h)
            img = cv2.resize(img, None, fx=f, fy=f)
        elif w < p:
            f = np.ceil(p / w)
            img = cv2.resize(img, None, fx=f, fy=f)
    
        h, w, c = img.shape
    
        rx = np.random.randint(0, w - p)
        ry = np.random.randint(0, h - p)
    
        img = img[ry:ry + p, rx:rx + p]
    
        mask = np.zeros([h, w], dtype=np.int64)
        mask = mask[ry:ry + p, rx:rx + p]
    
        coords = np.zeros([k, 4])
        for j in range(len(classes)):
            x1, y1, x2, y2 = np.asarray(bboxes[j], dtype=int)
    
            x1 = max(0, x1 - rx)
            y1 = max(0, y1 - ry)
            x2 = max(0, x2 - rx)
            y2 = max(0, y2 - ry)
    
            coords[j] = x1, y1, x2, y2
            mask[y1:y2, x1:x2] = 1
    
        img = torch.from_numpy(img).permute(2, 0, 1).to(torch.float32) / 255
        mask = torch.from_numpy(mask).to(torch.long)[None, :]
        coords = torch.from_numpy(coords) / p
        
        sample = dict()
        sample["img"] = img
        sample["mask"] = mask
        sample["bboxes"] = coords

        return sample

In [None]:
ROOT_PATH = "../data"
BATCH_SIZE = 16
NUM_WORKERS = 0
DEVICE = "cuda"
LR = 1e-3
N_CLASSES = 2
N_EPOCHS = 100
MODEL_IDENTIFIER = "region_proposal"

In [None]:
checkpoint = None

In [None]:
train_dataset = CustomDataset(ROOT_PATH, train=True)
test_dataset = CustomDataset(ROOT_PATH, train=False)
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, pin_memory=True, shuffle=True, num_workers=NUM_WORKERS)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, pin_memory=True, shuffle=True, num_workers=NUM_WORKERS)
print("Train:", len(train_dataset))
print("Test:", len(test_dataset))

In [None]:
element = next(iter(train_loader))

img = element["img"]
mask = element["mask"]
bboxes = element["bboxes"]

print("img", img.shape, img.min(), img.max())
print("mask", mask.shape, mask.min(), mask.max())
print("bboxes", bboxes.shape, bboxes.min(), bboxes.max())

In [None]:
@torch.no_grad
def test_model_config():
    batch = next(iter(train_loader))
    
    img = batch["img"].to(DEVICE)
    mask = batch["mask"].to(DEVICE)

    print(f"input")
    print(f"  {img.shape}")
    print(f"  {mask.shape}")
        
    pred = model(img, mask)
    
    # print(f"output")
    # print(f"  {pred.shape}")
    # print(f"  {mask.shape}")
    # print()
    
    # pred = pred.permute(0, 2, 3, 1)
    # pred = pred.reshape(-1, N_CLASSES)

    # mask = mask.reshape(-1)
    
    # loss = F.cross_entropy(pred, mask)
    # print(loss)

In [None]:
class DownBaseBlock(nn.Module):
    def __init__(self, f, f_out):
        super().__init__()

        self.conv1 = nn.Conv2d(f, f, 3, 1, padding=1)
        self.norm1 = nn.GroupNorm(f // 4, f)
        
        self.conv2 = nn.Conv2d(f * 2, f, 3, 1, padding=1)
        self.norm2 = nn.GroupNorm(f // 4, f)

        self.conv3 = nn.Conv2d(f * 3, f, 3, 1, padding=1)
        self.norm3 = nn.GroupNorm(f // 4, f)

        self.conv4 = nn.Conv2d(f, f_out, 4, 2, padding=1)
        self.norm4 = nn.GroupNorm(f_out // 4, f_out)

    def forward(self, x0):
        
        x1 = self.conv1(x0)
        x1 = self.norm1(x1)
        x1 = F.relu(x1)

        x01 = torch.cat([x0, x1], dim=1)
        
        x2 = self.conv2(x01)
        x2 = self.norm2(x2)
        x2 = F.relu(x2)

        x012 = torch.cat([x0, x1, x2], dim=1)
        
        x3 = self.conv3(x012)
        x3 = self.norm3(x3)
        x3 = x0 + x1 + x2 + x3
        x3 = F.relu(x3)

        x4 = self.conv4(x3)
        x4 = self.norm4(x4)
        x4 = F.relu(x4)
        
        return x4
class Model(L.LightningModule):
    def __init__(self, f=64):
        super().__init__()

        self.k = 3

        anchor_sizes = ((32,), (64,), (128,),)
        aspect_ratios = ((0.5, 1.0, 2.0),) * len(anchor_sizes)
        self.anchor_generator = AnchorGenerator(anchor_sizes, aspect_ratios)

        self.in_conv = nn.Conv2d(3, f, 3, 1, padding=1)
        self.norm1 = nn.GroupNorm(f // 4, f)

        self.down_block1 = DownBaseBlock(f, f)
        self.down_block2 = DownBaseBlock(f, f)
        self.down_block3 = DownBaseBlock(f, f)
        self.down_block4 = DownBaseBlock(f, f)

        self.cnn = nn.Conv2d(f, 256, 3, 1)

        self.cls_head = nn.Conv2d(256, 2 * self.k, 1, 1)
        self.reg_head = nn.Conv2d(256, 4 * self.k, 1, 1)

        for p in self.parameters():
            torch.nn.init.normal_(p, mean=0.0, std=0.02)
    
    def forward(self, x, mask):

        bs, c, h, w = x.shape

        x = self.in_conv(x)
        x = self.norm1(x)
        x0 = F.relu(x)
        
        x1 = self.down_block1(x0)
        x2 = self.down_block2(x1)
        x3 = self.down_block3(x2)
        x4 = self.down_block4(x3)

        f0 = self.cnn(x4)

        x_cls = self.cls_head(f0)
        x_reg = self.reg_head(f0)
        
        bs, c, h, w = x_cls.shape
        x_cls = x_cls.permute(0, 2, 3, 1).reshape(bs, -1, 2)

        bs, c, h, w = x_reg.shape
        x_reg = x_reg.permute(0, 2, 3, 1).reshape(bs, -1, 4)
        
        return x

    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=LR)
        return optimizer

    def training_step(self, batch, batch_idx):

        img = batch["img"]
        mask = batch["mask"]

        pred = self(img, mask)
    
        pred = pred.permute(0, 2, 3, 1)
        pred = pred.reshape(-1, N_CLASSES)
    
        mask = mask.reshape(-1)
        
        loss = F.cross_entropy(pred, mask)
        
        self.log("train_loss", loss, prog_bar=True, on_step=True, on_epoch=True)
        
        return loss

    def on_train_epoch_end(self):
        print()

In [None]:
model = Model().to(DEVICE)

In [None]:
test_model_config()

In [None]:
trainer = L.Trainer(devices=1, 
                    accelerator="gpu",
                    max_epochs=N_EPOCHS,
                    enable_checkpointing=True,
                    log_every_n_steps=10)

In [None]:
trainer.fit(model=model, train_dataloaders=train_loader)