In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision
import torchvision.transforms as tvt
from torch.utils.data.sampler import SubsetRandomSampler
import torch_directml
import matplotlib.pyplot as plt
from pycocotools.coco import COCO
import numpy as np
from PIL import Image
import random
import os, time
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import functools

# use directml to run codes on AMD GPU
dml = torch_directml.device()
dml

device(type='privateuseone', index=0)

In [None]:
class DataInfo:
    def __init__(self, dir='./coco', *, type='train2014', categories=None) -> None:
        self.dir  = dir
        self.type = type
        self.annFile = '%s/annotations/%s_%s.json'%(self.dir,'instances',self.type)
        # target images' information:
        self.ctgs = categories
        self.h = 256
        self.w = 256
        self.minArea = 4096

    def coco_json(self):
        return COCO(self.annFile)
    
yolo_interval = 8

class MyDataset(torch.utils.data.Dataset):
    xform = tvt.Compose([
        tvt.ToTensor(),
        # transform to range [-1, 1]:
        tvt.Normalize([0.5, 0.5, 0.5], [0.5, 0.5, 0.5]),
    ])
    yolo_interval = 8

    def __init__(self, data_info: DataInfo, *, save_dir="./resized", update=False):
        super().__init__()
        cocoGt = data_info.coco_json()
        self.dir = save_dir
        self.coco = cocoGt
        self.data = data_info
        self.catIds = cocoGt.getCatIds(catNms=data_info.ctgs)
        catType = self.data.ctgs
        self.catId_to_label = {cocoGt.getCatIds(catType[i])[0]: i  for i in range(len(catType))}
        self.label_to_cat = {i: catType[i] for i in range(len(catType))}

        self.imgIds = self.gen_data_id(update)

    def gen_data_id(self, *, update=False):
        cocoGt = self.coco
        catIds = self.catIds
        sets = [set(cocoGt.getImgIds(catIds=catId)) for catId in catIds]
        imgIds = functools.reduce(lambda a, b: a.union(b), sets)
        ids = []
        for imgId in imgIds:
            anns = cocoGt.loadAnns(cocoGt.getAnnIds(imgIds=imgId, iscrowd=False))
            for ann in anns:
                if ann['category_id'] in self.catIds \
                and ann['area'] >= self.data.minArea:
                    ids.append(imgId)
                    self.gen_resized_image(imgId, update)
                    # break inner for-loop
                    break
            # switch to next image
        return anns
    
    def resize(self, im, bbox):
        w_ori = im['width']
        h_ori = im['height']
        # xi, yi are in range [0,1]
        new_box = [bbox[0]/w_ori, bbox[1]/h_ori, \
                   bbox[2]/w_ori, bbox[3]/h_ori]
        return new_box

    def gen_resized_image(self, imgId, update):
        im = self.coco.loadImgs(imgId)[0]
        orig_path = '%s/%s/%s'%(self.data.dir, self.data.type, im['file_name'])
        save_path = '%s/%s'%(self.dir, im['file_name'])
        img = Image.open(orig_path)
        if img.mode != 'RGB':
            # force update if it is not RGB
            img = img.convert('RGB')
        if update or not os.path.exists(save_path):
            img = img.resize((self.data.w, self.data.h), resample=Image.Resampling.LANCZOS)
            img.save(save_path)

    def __len__(self):
        return len(self.imgIds)

    def __getitem__(self, index):
        im   = self.coco.loadImgs(self.imgIds[index])[0]
        anns = self.coco.loadAnns(self.coco.getAnnIds(imgIds=self.imgIds[index]))
        path = '%s/%s'%(self.dir, im['file_name'])
        pil_image = Image.open(path)
        img_tensor = self.xform(pil_image)
        labs_tensor = torch.zeros(5, dtype=torch.uint8) + 13
        cell_ids = torch.zeros(5, dtype=torch.uint8)
        anchor_ids = torch.zeros(5, dtype=torch.uint8)
        # add one class for 'other object'
        yolo_vectors = torch.zeros(5, 5 + len(self.catIds) + 1, dtype=torch.float)
        i = 0
        for ann in anns:
            if ann['category_id'] in self.catIds \
            and ann['area'] >= self.data.minArea:
                bbox_renormalized = xywh_to_ccwh(self.resize(im, anns[i]['bbox']))
                label = self.catId_to_label[ann['category_id']]
                cell_ids[i], dx, dy = calc_cell_id(bbox_renormalized, self.yolo_interval)
                anchor_ids[i], sw, sh = calc_anchor_id(bbox_renormalized)
                labs_tensor[i] = label
                yolo_vectors[i, 0] = 1
                yolo_vectors[i, 1:5] = torch.LongTensor([dx, dy, sw, sh])
                yolo_vectors[i, 5+label] = 1

                i += 1
                if i >= len(anns):
                    break

        return img_tensor, labs_tensor, yolo_vectors, cell_ids, anchor_ids
    
def calc_cell_id(bbox, interval)->tuple(int, float, float):
    # y-axis
    nh = bbox[1]*interval // 1
    if nh >= interval:
        nh = interval - 1
    elif nh < 0:
        nh = 0
    # x-axis
    nw = bbox[0]*interval // 1
    if nw >= interval:
        nw = interval - 1
    elif nw < 0:
        nw = 0
    return int(nh*interval + nw), bbox[0]-nw*interval-0.5, bbox[1]-nh*interval-0.5

def calc_anchor_id(bbox, interval)->tuple(int, float, float):
    wh_ratio = bbox[2] / bbox[3]
    anchor_box_index = -1
    rw = 0
    rh = 0
    if wh_ratio <= 0.25:
        anchor_box_index = 0
        rw = np.log(bbox[2]/1*interval)
        rh = np.log(bbox[3]/5*interval)
    elif wh_ratio <= 0.5:
        anchor_box_index = 1
        rw = np.log(bbox[2]/1*interval)
        rh = np.log(bbox[3]/3*interval)
    elif wh_ratio <= 2.0:
        anchor_box_index = 2
        rw = np.log(bbox[2]/1*interval)
        rh = np.log(bbox[3]/1*interval)
    elif wh_ratio <= 4.0:
        anchor_box_index = 3
        rw = np.log(bbox[2]/3*interval)
        rh = np.log(bbox[3]/1*interval)
    else:
        anchor_box_index = 4
        rw = np.log(bbox[2]/5*interval)
        rh = np.log(bbox[3]/1*interval)
    return anchor_box_index, rw, rh

def xywh_to_ccwh(bbox):
    x, y, w, h = bbox
    xc, yc = x+w/2, y+h/2
    return [xc, yc, w, h]

class SkipBlock(nn.Module):
    def __init__(self, in_ch, out_ch, ker=3, *, stride=1, padding=1) -> None:
        super().__init__()
        self.in_ch = in_ch
        self.out_ch = out_ch
        self.stride = stride
        self.conv1 = nn.Conv2d(in_ch, out_ch, ker, stride=stride, padding=padding)
        self.conv2 = nn.Conv2d(out_ch, out_ch, 3, stride=1, padding=1)
        self.bn1 = nn.BatchNorm2d(out_ch)
        self.bn2 = nn.BatchNorm2d(out_ch)
        self.relu = nn.ReLU(inplace=True)
        if stride != 1 or in_ch != out_ch:
            # would have bugs if ker!=3
            if stride != 2:
                raise ValueError('Currently stride must be 1 or 2.')
            self.downsampler = nn.Sequential(
                nn.Conv2d(in_ch, out_ch, 1, stride=stride, padding=0, bias=False),
                nn.BatchNorm2d(out_ch),
            )
        else:
            self.downsampler = None

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)
        if self.downsampler is not None:
            identity = self.downsampler(x)
        out = out + identity
        out = self.relu(out)

        return out
    
def calc(x, ker=3, *, stride=1, padding=1):
    return ((x+2*padding-ker) / stride + 1.0)//1

class HW6_YOLO(nn.Module):

    def __init__(self, ngf=32, size=256) -> None:
        super().__init__()
        # The first convolution layer. Assuing (B, 3, 256, 256) to the input.
        model = nn.ModuleList([
            nn.ReflectionPad2d(3),
            nn.Conv2d(3, ngf, 7, padding=0),
            nn.BatchNorm2d(ngf),
            nn.ReLU(inplace=True),
        ])
        # out_size: 256
        new_size = calc(size, 7, padding=0)

        # The second convolution layer, downsample only once before skip-block
        model.extend([
            nn.ReflectionPad2d(2),
            nn.Conv2d(ngf, ngf * 2, 5, stride=3, padding=0),
            nn.BatchNorm2d(ngf * 2),
            nn.ReLU(inplace=True),
        ])
        # out_size: 86
        new_size = calc(new_size, 5, stride=3, padding=0)

        # The skip-blocks
        new_in_ch = ngf * 2
        num_blocks = [6, 6, 4]
        new_out_chs = [64, 128, 256]
        for i in range(len(num_blocks)):
            new_out_ch = new_out_chs[i]
            num_block  = num_blocks[i]
            model.extend(
                self._gen_skip_blocks(new_in_ch, new_out_ch, num_block, stride=2, padding=1)
            )
            new_in_ch = new_out_ch
            new_size = calc(new_size, 3, stride=2, padding=1)
        # out_size: 11

        model.append(nn.MaxPool2d(3, stride=2, padding=0))
        new_size = calc(new_size, 3, stride=2, padding=0)
        # out_size: 256x5x5

        self.model = nn.Sequential(*model)

        self.fc = nn.Sequential(
            nn.Linear(new_out_ch*new_size*new_size, 4096),
            nn.ReLU(inplace=True),
            nn.Linear(4096, 2048),
            nn.Sigmoid(),
            # 8 * 8 * 5 * 9 = 2880
            nn.Linear(2048, yolo_interval**2 * 5 * 9),
        )

    @staticmethod
    def _gen_skip_blocks(in_ch, out_ch, num_layer, *, stride=1, padding=1):
        # the first skip-block will downsample the input if necessary.
        layers = [SkipBlock(in_ch, out_ch, stride=stride, padding=padding),]
        for _ in range(1, num_layer):
            # the following skip-blocks will keep the input size unchanged.
            layers.append(SkipBlock(out_ch, out_ch, stride=1, padding=1))
        return layers
    
    def forward(self, x):

        out = self.model(x)
        out = out.view(256*5*5, -1)
        out = self.fc(out)
        return out
    
def train_yolo(net, data_set, batch_size=8, epoch_size=10, *, device="cpu"):

    net = net.to(device)
    train_data_loader = torch.utils.data.DataLoader(data_set, batch_size=batch_size, shuffle=True)
    criterion1 = nn.BCELoss()
    criterion2 = nn.MSELoss()
    criterion3 = nn.CrossEntropyLoss()

    optimizer = torch.optim.SGD(net.parameters(), lr=1e-5, momentum=0.9)
    start_time = time.perf_counter()
    loss_record = []
    num_anchor_box = 5
    batch_indeces = torch.reshape(
        torch.tensor(list(range(batch_size))*5), 
        (5,-1)
    ).transpose(0,1).to(device)

    for epoch in range(epoch_size):
        running_loss = 0.
        for iter, data in enumerate(train_data_loader):
            im_tensor, labs_tensor, yolo_vectors, cell_ids, anchor_ids = data
            im_tensor = im_tensor.to(device)
            labs_tensor = labs_tensor.to(device)
            yolo_vectors = yolo_vectors.to(device)
            cell_ids = cell_ids.to(device)
            anchor_ids = anchor_ids.to(device)
            optimizer.zero_grad()

            out = net(im_tensor)
            pred_yolo_tensor = out.view(batch_size, yolo_interval**2, num_anchor_box, 9)

            pred_objectness = pred_yolo_tensor[:, :, :, 0]
            ground_objectness = torch.zeros(batch_size, yolo_interval**2, num_anchor_box)
            # 5 is the number of objects in each image.

            ground_objectness[batch_indeces, 
                             cell_ids, 
                             anchor_ids] = yolo_vectors[:,:,0]
            loss1 = criterion1(pred_objectness, ground_objectness)

            pred_yolo_vectors = pred_yolo_tensor[
                                batch_indeces, 
                                cell_ids, 
                                anchor_ids,
                            ][:,:,:].reshape(-1,9)
            filter = yolo_vectors[0].nonzero(as_tuple=True)

            pred_regression = pred_yolo_vectors[filter][:,1:5]
            ground_regression = yolo_vectors[filter][:,1:5]
            loss2 = criterion2(pred_regression, ground_regression)

            pred_classification = pred_yolo_vectors[filter][:,5:]
            ground_classification = labs_tensor[filter]
            loss3 = criterion3(pred_classification, ground_classification)

            loss = loss1 + loss2 + loss3

            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            if iter % 1024 == 1023:
                current_time = time.perf_counter()
                elapsed_time = current_time - start_time
                avg_loss = running_loss / 1024
                loss_record.append(avg_loss)
                running_loss = 0.
                print("\n[epoch %d/%d] [iter %4d] [elapsed time %5d secs] [mean loss: %7.4f]"\
                      %(epoch, epoch_size, iter, elapsed_time, avg_loss))




         

In [7]:
a = torch.zeros(4,4)
a[[[1,2,3],[2,3,0]],[[1,2,3],[1,2,3]]] = torch.tensor([[2,3,4],[1,1,1]]).float()

In [40]:
# a = torch.zeros(5,5,3).float()
a[list(range(0,3)),list(range(1,4)),0] = torch.tensor(list(range(2,5))).float()
a[a[:,:,0].nonzero(as_tuple=True),tuple(range(0,1))]
# a.nonzero()

TypeError: only integer tensors of a single element can be converted to an index

In [47]:
# a
# a[:,:,0].nonzero(as_tuple=True)
a[a[:,:,0].nonzero(as_tuple=True)]

tensor([[2., 2., 0.],
        [3., 3., 0.],
        [4., 4., 0.]])