<a href="https://colab.research.google.com/github/joohyung0809/Computer_Vision/blob/main/Faster%20R-CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [40]:
!pip install torchnet

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [41]:
import warnings
warnings.filterwarnings(action='ignore')

import os
import six
from collections import namedtuple

import cv2
import albumentations as A
from albumentations.pytorch import ToTensorV2
# COCO formate의 데이터셋 사용을 돕는 라이브러리
from pycocotools.coco import COCO

import numpy as np
import pandas as pd
from tqdm import tqdm

# torchvision - computer vision용 pytorch 라이브러리
from torchvision.models import vgg16
from torchvision.ops import RoIPool
from torchvision.ops import nms

import torch
import torch.nn as nn
from torch.nn import functional as F
from torch.utils.data import Dataset
from torch.utils import data as data_

# torchnet - logging, eval, visualize 등을 돕는 라이브러리
from torchnet.meter import ConfusionMeter, AverageValueMeter

- 하이퍼 파라미터 세팅

- 데이터 불러오기

In [42]:
# TrainDataset
class TrainCustom(Dataset):
    def __init__(self, annotation, data_dir, transforms = False):
        """
        Args:
            annotation: annotation 파일 위치
            data_dir: data가 존재하는 폴더 경로
            transforms : transform or not
        """

        super().__init__()
        self.data_dir = data_dir
        # coco annotation 불러오기 (coco API)
        self.coco = COCO(annotation)
        self.transforms = transforms

    def __getitem__(self, index: int):
        
        # 이미지 아이디 가져오기
        image_id = self.coco.getImgIds(imgIds=index)

        # 이미지 정보 가져오기
        image_info = self.coco.loadImgs(image_id)[0]

        # 이미지 로드
        image = cv2.imread(os.path.join(self.data_dir, image_info['file_name']))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image /= 255.0

        # 어노테이션 파일 로드
        ann_ids = self.coco.getAnnIds(imgIds=image_info['id'])
        anns = self.coco.loadAnns(ann_ids)

        # 박스 가져오기
        boxes = np.array([x['bbox'] for x in anns])

        # boxes (x_min, y_min, x_max, y_max) -> 이 형태로 만들어주기
        boxes[:, 2] = boxes[:, 0] + boxes[:, 2]
        boxes[:, 3] = boxes[:, 1] + boxes[:, 3]

        # 레이블 가져오기
        labels = np.array([x['category_id'] for x in anns])
        labels = torch.as_tensor(labels, dtype=torch.int64)

        # transform 함수 정의
        if self.transforms :
            scale = 1.0  # resize scale
            H, W, _ = image.shape
            resize_H = int(scale * H)
            resize_W = int(scale * W)
            transforms = get_train_transform(resize_H, resize_W)
        else :
            scale = 1.0
            transforms = no_transform()
        
        # transform
        sample = {
            'image': image,
            'bboxes': boxes,
            'labels': labels
        }
        sample = transforms(**sample)
        image = sample['image']
        bboxes = torch.tensor(sample['bboxes'], dtype=torch.float32)
        boxes = torch.tensor(sample['bboxes'], dtype=torch.float32)

        # bboxes (x_min, y_min, x_max, y_max) -> boxes (y_min, x_min, y_max, x_max) 계산의 편의를 위해
        boxes[:, 0] = bboxes[:, 1]
        boxes[:, 1] = bboxes[:, 0]
        boxes[:, 2] = bboxes[:, 3]
        boxes[:, 3] = bboxes[:, 2]

        return image, boxes, labels, scale

    def __len__(self) -> int:
        return len(self.coco.getImgIds())

# Test Datset
class TestCustom(Dataset):
    def __init__(self, annotation, data_dir):
        """
        Args:
            annotation: annotation 파일 위치
            data_dir: data가 존재하는 폴더 경로
        """

        super().__init__()
        self.data_dir = data_dir
        # coco annotation 불러오기 (coco API)
        self.coco = COCO(annotation)

    def __getitem__(self, index: int):
        
        # 이미지 아이디 가져오기
        image_id = self.coco.getImgIds(imgIds=index)

        # 이미지 정보 가져오기
        image_info = self.coco.loadImgs(image_id)[0]

        # 이미지 로드
        image = cv2.imread(os.path.join(self.data_dir, image_info['file_name']))
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB).astype(np.float32)
        image /= 255.0
        image = torch.tensor(image, dtype = torch.float).permute(2,0,1)
        
        return image, image.shape[1:]

    def __len__(self) -> int:
        return len(self.coco.getImgIds())


- transform

In [43]:
# Train dataset transform
def get_train_transform(h, w):
    return A.Compose([
        A.Resize(height = h, width = w),
        A.Flip(p=0.5),
        ToTensorV2(p=1.0)
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

# No transform
def no_transform():
    return A.Compose([
        ToTensorV2(p=1.0) # format for pytorch tensor
    ], bbox_params={'format': 'pascal_voc', 'label_fields': ['labels']})

- backbone 생성

In [44]:
def decom_vgg16():
    # the 30th layer of features is relu of conv5_3
    model = vgg16(pretrained=True)
    
    features = list(model.features)[:30]
    classifier = model.classifier

    classifier = list(classifier)
    del classifier[6]
    if not use_drop:
        del classifier[5]
        del classifier[2]
    classifier = nn.Sequential(*classifier)

    # freeze top4 conv
    for layer in features[:10]:
        for p in layer.parameters():
            p.requires_grad = False

    return nn.Sequential(*features), classifier # 얘로 feature map 추출

- 위에서 생긴 feature map을 RPN에 통과시켜야 함
## RPN 정의
1. Anchor box 생성
2. Proposal 생성
  - RPN에서 구한 rpn_loc와 anchor를 통해서 ROI를 생성
  - ROI 개수 줄이기 위해 미리 정해둔 크기(min_size) 에 맞는 ROI들 중에서 NMS를 통해 최종 ROI 반환(train시 2000개)
3. Region Proposal Network
  - VGG16 통과한 feature map으로부터 region proposal들 생성

In [45]:
# 1. Anchor box 생성(한 픽셀 당 몇 개를 생성할 거냐 정하는 단계)
def generate_anchor_base(base_size=16, ratios=[0.5, 1, 2], anchor_scales=[8, 16, 32]):
    """ 
    Args:
        ratios: 비율
        anchor_scales: 스케일
    Returns: basic anchor boxes, shape=(R, 4)
        R: len(ratio) * len(anchor_scales) = anchor 개수 = 9
        4: anchor box 좌표 값
    """

    py = base_size / 2. # center y
    px = base_size / 2. # center x

    anchor_base = np.zeros((len(ratios) * len(anchor_scales), 4), dtype=np.float32) # anchor_box
    
    for i in six.moves.range(len(ratios)):
        for j in six.moves.range(len(anchor_scales)):
            h = base_size * anchor_scales[j] * np.sqrt(ratios[i])
            w = base_size * anchor_scales[j] * np.sqrt(1. / ratios[i])

            index = i * len(anchor_scales) + j
            # offset of anchor box
            anchor_base[index, 0] = py - h / 2. # y_min
            anchor_base[index, 1] = px - w / 2. # x_min
            anchor_base[index, 2] = py + h / 2. # y_max
            anchor_base[index, 3] = px + w / 2. # x_max
            
    return anchor_base # (9,4)
    # (9,4) 짜리 Anchor box 생성

In [46]:
# 2. Score 정보 얻기
class ProposalCreator:
    def __init__(self, parent_model,
                 nms_thresh=0.7, # nms threshold
                 n_train_pre_nms=12000, # train시 nms 전 roi 개수
                 n_train_post_nms=2000, # train시 nms 후 roi 개수
                 n_test_pre_nms=6000,   # test시 nms 전 roi 개수
                 n_test_post_nms=300,   # test시 nms 후 roi 개수
                 min_size=16            
                 ):
        self.parent_model = parent_model # 해당 모델이 train중인지 test중인지 나타냄
        self.nms_thresh = nms_thresh
        self.n_train_pre_nms = n_train_pre_nms
        self.n_train_post_nms = n_train_post_nms
        self.n_test_pre_nms = n_test_pre_nms
        self.n_test_post_nms = n_test_post_nms
        self.min_size = min_size

    def __call__(self, loc, score, anchor, img_size, scale=1.):    
        if self.parent_model.training: # train중일 때
            n_pre_nms = self.n_train_pre_nms
            n_post_nms = self.n_train_post_nms
        else: # test중일 때
            n_pre_nms = self.n_test_pre_nms
            n_post_nms = self.n_test_post_nms

        # 미세조정 하는 구간
        roi = loc2bbox(anchor, loc) # anchor의 좌표값과 predicted bounding bounding box offset(y,x,h,w)를 통해 bounding box 좌표값(y_min, x_min, y_max, x_max) 생성

        # Clip predicted boxes to image.
        roi[:, slice(0, 4, 2)] = np.clip(roi[:, slice(0, 4, 2)], 0, img_size[0])
        roi[:, slice(1, 4, 2)] = np.clip(roi[:, slice(1, 4, 2)], 0, img_size[1])

        # min_size 보다 작은 box들은 제거 (잘라내는 과정)
        min_size = self.min_size * scale
        hs = roi[:, 2] - roi[:, 0]
        ws = roi[:, 3] - roi[:, 1]
        keep = np.where((hs >= min_size) & (ws >= min_size))[0]
        roi = roi[keep, :]
        score = score[keep]
        
        # Sort all (proposal, score) pairs by score from highest to lowest.
        # Take top pre_nms_topN 
        order = score.ravel().argsort()[::-1]
        if n_pre_nms > 0:
            order = order[:n_pre_nms]
        roi = roi[order, :]
        score = score[order]

        # nms 적용
        keep = nms(
            torch.from_numpy(roi).cuda(),
            torch.from_numpy(score).cuda(),
            self.nms_thresh)
        if n_post_nms > 0:
            keep = keep[:n_post_nms]
        roi = roi[keep.cpu().numpy()]
        
        return roi 

In [47]:
class RegionProposalNetwork(nn.Module):
    def __init__(self, in_channels=512, mid_channels=512, ratios=[0.5, 1, 2],
                 anchor_scales=[8, 16, 32], feat_stride=16, proposal_creator_params=dict(),):
        
        super(RegionProposalNetwork, self).__init__()

        self.anchor_base = generate_anchor_base(anchor_scales=anchor_scales, ratios=ratios) # 9개의 anchorbox 생성
        self.feat_stride = feat_stride
        self.proposal_layer = ProposalCreator(self, **proposal_creator_params) # proposal_creator_params : 해당 네트워크가 training인지 testing인지 알려준다.
        n_anchor = self.anchor_base.shape[0] # anchor 개수
        self.conv1 = nn.Conv2d(in_channels, mid_channels, 3, 1, 1)
        self.score = nn.Conv2d(mid_channels, n_anchor * 2, 1, 1, 0)  # 9*2
        self.loc = nn.Conv2d(mid_channels, n_anchor * 4, 1, 1, 0)   # 9*4
        normal_init(self.conv1, 0, 0.01) # weight initalizer
        normal_init(self.score, 0, 0.01) # weight initalizer
        normal_init(self.loc, 0, 0.01)   # weight initalizer

    def forward(self, x, img_size, scale=1.):
        # x(feature map)
        n, _, hh, ww = x.shape

        # 전체 (h*w*9)개 anchor의 좌표값 # anchor_base:(9, 4)
        anchor = _enumerate_shifted_anchor(np.array(self.anchor_base), self.feat_stride, hh, ww) #  base anchor box를 픽셀 전체로 확장
        # anchor (9216, 4)
        n_anchor = anchor.shape[0] // (hh * ww) # anchor 개수
        
        middle = F.relu(self.conv1(x))
        
        # anchor box에 맞게 밑에 모두 reshape

        # predicted bounding box offset
        rpn_locs = self.loc(middle) # torch.Size([1, 36, 32, 32])
        rpn_locs = rpn_locs.permute(0, 2, 3, 1).contiguous().view(n, -1, 4) # torch.Size([1, 9216, 4]) -> 얼마나 미세조정 해줘야 되냐 정보

        # predicted scores for anchor (foreground or background)
        rpn_scores = self.score(middle)  # torch.Size([1, 18, 32, 32])
        rpn_scores = rpn_scores.permute(0, 2, 3, 1).contiguous() # torch.Size([1, 32, 32, 18])
        
        # scores for foreground
        rpn_softmax_scores = F.softmax(rpn_scores.view(n, hh, ww, n_anchor, 2), dim=4) # torch.Size([1, 32, 32, 9, 2])
        rpn_fg_scores = rpn_softmax_scores[:, :, :, :, 1].contiguous() # torch.Size([1, 32, 32, 9])
        rpn_fg_scores = rpn_fg_scores.view(n, -1) # torch.Size([1, 9216])
        
        rpn_scores = rpn_scores.view(n, -1, 2) # torch.Size([1, 9216, 2]) -> 얼마나 score가 나왔냐 정보

        # proposal생성 (ProposalCreator)
        rois = list()        # proposal의 좌표값이 있는 bounding box array
        roi_indices = list() # roi에 해당하는 image 인덱스
        for i in range(n):
            # proposal_layer 돌리면 2000개의 ROI가 나옴
            roi = self.proposal_layer(rpn_locs[i].cpu().data.numpy(),rpn_fg_scores[i].cpu().data.numpy(),anchor, img_size,scale=scale) 
            batch_index = i * np.ones((len(roi),), dtype=np.int32)
            rois.append(roi)
            roi_indices.append(batch_index)
        rois = np.concatenate(rois, axis=0)
        roi_indices = np.concatenate(roi_indices, axis=0)
        
        return rpn_locs, rpn_scores, rois, roi_indices, anchor # 2000개의 ROI를 묶어서 return 하는 형태


def _enumerate_shifted_anchor(anchor_base, feat_stride, height, width):
    # anchor_base는 하나의 pixel에 9개 종류의 anchor box를 나타냄
    # 이것을 enumerate시켜 전체 이미지의 pixel에 각각 9개의 anchor box를 가지게 함
    # 32x32 feature map에서는 32x32x9=9216개 anchor box가짐

    shift_y = np.arange(0, height * feat_stride, feat_stride)
    shift_x = np.arange(0, width * feat_stride, feat_stride)
    shift_x, shift_y = np.meshgrid(shift_x, shift_y)
    shift = np.stack((shift_y.ravel(), shift_x.ravel(),
                      shift_y.ravel(), shift_x.ravel()), axis=1)

    A = anchor_base.shape[0]
    K = shift.shape[0]
    anchor = anchor_base.reshape((1, A, 4)) + \
             shift.reshape((1, K, 4)).transpose((1, 0, 2))
    anchor = anchor.reshape((K * A, 4)).astype(np.float32)
    return anchor # (9216, 4)


## Faster R-CNN head 정의
- ROI pool 후에 clasifier, regression 통과
#### 다시 말하면 최종 목표는 <br> RPN으로 부터 나온 ROI받아서 projection으로  Pooling하고 <br> 그로부터 나온 고정된 feature vector로 부터 class 예측, 정확한 box예측하는 것

In [48]:
class VGG16RoIHead(nn.Module):
    """
    Faster R-CNN head
    RoI pool 후에 classifier, regressior 통과
    """

    def __init__(self, n_class, roi_size, spatial_scale, classifier):
        super(VGG16RoIHead, self).__init__()

        self.classifier = classifier  
        self.cls_loc = nn.Linear(4096, n_class * 4) # bounding box regressor
        self.score = nn.Linear(4096, n_class) # Classifier

        normal_init(self.cls_loc, 0, 0.001)  # weight initialize
        normal_init(self.score, 0, 0.01)     # weight initialize

        self.n_class = n_class # 배경 포함한 class 수
        self.roi_size = roi_size # RoI-pooling 후 feature map의  높이, 너비 (참고로 ROI size는 target size)
        self.spatial_scale = spatial_scale # roi resize scale
        self.roi = RoIPool( (self.roi_size, self.roi_size),self.spatial_scale) # torchvision에서 제공해줌 -> projection까지 제공

    def forward(self, x, rois, roi_indices):
        # in case roi_indices is  ndarray
        roi_indices = totensor(roi_indices).float()
        rois = totensor(rois).float()
        indices_and_rois = torch.cat([roi_indices[:, None], rois], dim=1)
        # NOTE: important: yx->xy
        xy_indices_and_rois = indices_and_rois[:, [0, 2, 1, 4, 3]]
        indices_and_rois =  xy_indices_and_rois.contiguous() 

        # 각 이미지 roi pooling 
        pool = self.roi(x, indices_and_rois) 
        # flatten 
        pool = pool.view(pool.size(0), -1)
        # fully connected
        fc7 = self.classifier(pool)
        # regression 
        roi_cls_locs = self.cls_loc(fc7)
        # softmax
        roi_scores = self.score(fc7)

        
        return roi_cls_locs, roi_scores

### Faster R-CNN 정의
- 지금까지 선언한 걸 통합하면 됨
- Feature Extraction : image로부터 feature map 생성
- Region Proposal Networks : Region of Interset 생성
- Localization and Classification Head : ROI에 해당하는 feature map을 최종 detect

In [49]:
def nograd(f):
    def new_f(*args, **kwargs):
        with torch.no_grad():
            return f(*args, **kwargs)
    return new_f

class FasterRCNN(nn.Module):
    def __init__(self, extractor, rpn, head,
                loc_normalize_mean = (0., 0., 0., 0.),
                loc_normalize_std = (0.1, 0.1, 0.2, 0.2)):
        super(FasterRCNN, self).__init__()
        self.extractor = extractor  # extractor : vgg
        self.rpn = rpn              # rpn : region proposal network
        self.head = head            # head : RoiHead

        # mean and std
        self.loc_normalize_mean = loc_normalize_mean
        self.loc_normalize_std = loc_normalize_std
        self.use_preset()

    @property
    def n_class(self): # 최종 class 개수 (배경 포함)
        return self.head.n_class

    # predict 시 사용하는 forward
    # train 시 FasterRCNNTrainer을 사용하여 FasterRcnn에 있는 extractor, rpn, head를 모듈별로 불러와서 forward
    def forward(self, x, scale=1.):
        img_size = x.shape[2:]

        h = self.extractor(x) # extractor 통과
        rpn_locs, rpn_scores, rois, roi_indices, anchor = self.rpn(h, img_size, scale) # rpn 통과
        roi_cls_locs, roi_scores = self.head(h, rois, roi_indices) # head 통과
        return roi_cls_locs, roi_scores, rois, roi_indices 

    def use_preset(self): # prediction 과정 쓰이는 threshold 정의
        self.nms_thresh = 0.3
        self.score_thresh = 0.05

    def _suppress(self, raw_cls_bbox, raw_prob):
        bbox = list()
        label = list()
        score = list()
        
        # skip cls_id = 0 because it is the background class
        for l in range(1, self.n_class):
            cls_bbox_l = raw_cls_bbox.reshape((-1, self.n_class, 4))[:, l, :]
            prob_l = raw_prob[:, l]
            mask = prob_l > self.score_thresh
            cls_bbox_l = cls_bbox_l[mask]
            prob_l = prob_l[mask]
            keep = nms(cls_bbox_l, prob_l,self.nms_thresh)
            bbox.append(cls_bbox_l[keep].cpu().numpy())
            # The labels are in [0, self.n_class - 2].
            label.append((l - 1) * np.ones((len(keep),)))
            score.append(prob_l[keep].cpu().numpy())
        
        bbox = np.concatenate(bbox, axis=0).astype(np.float32)
        label = np.concatenate(label, axis=0).astype(np.int32)
        score = np.concatenate(score, axis=0).astype(np.float32)
        return bbox, label, score

    @nograd
    def predict(self, imgs,sizes=None):
        """
        이미지에서 객체 검출
        Input : images
        Output : bboxes, labels, scores
        """
        self.eval()
        prepared_imgs = imgs
                
        bboxes = list()
        labels = list()
        scores = list()
        for img, size in zip(prepared_imgs, sizes):
            img = totensor(img[None]).float()
            scale = img.shape[3] / size[1]
            roi_cls_loc, roi_scores, rois, _ = self(img, scale=scale) # self = FasterRCNN
            # We are assuming that batch size is 1.
            roi_score = roi_scores.data
            roi_cls_loc = roi_cls_loc.data
            roi = totensor(rois) / scale

            # Convert predictions to bounding boxes in image coordinates.
            # Bounding boxes are scaled to the scale of the input images.
            mean = torch.Tensor(self.loc_normalize_mean).cuda(). repeat(self.n_class)[None]
            std = torch.Tensor(self.loc_normalize_std).cuda(). repeat(self.n_class)[None]

            roi_cls_loc = (roi_cls_loc * std + mean)
            roi_cls_loc = roi_cls_loc.view(-1, self.n_class, 4)
            roi = roi.view(-1, 1, 4).expand_as(roi_cls_loc)
            cls_bbox = loc2bbox(tonumpy(roi).reshape((-1, 4)),tonumpy(roi_cls_loc).reshape((-1, 4)))
            cls_bbox = totensor(cls_bbox)
            cls_bbox = cls_bbox.view(-1, self.n_class * 4)
            # clip bounding box
            cls_bbox[:, 0::2] = (cls_bbox[:, 0::2]).clamp(min=0, max=size[0])
            cls_bbox[:, 1::2] = (cls_bbox[:, 1::2]).clamp(min=0, max=size[1])

            prob = (F.softmax(totensor(roi_score), dim=1))

            bbox, label, score = self._suppress(cls_bbox, prob)
            bboxes.append(bbox)
            labels.append(label)
            scores.append(score)

        self.use_preset()
        self.train()
        return bboxes, labels, scores

    def get_optimizer(self):
        '''
        Optimizer 선언
        '''
        lr = learning_rate
        params = []
        for key, value in dict(self.named_parameters()).items():
            if value.requires_grad:
                if 'bias' in key:
                    params += [{'params': [value], 'lr': lr * 2, 'weight_decay': 0}]
                else:
                    params += [{'params': [value], 'lr': lr, 'weight_decay': weight_decay}]
        self.optimizer = torch.optim.SGD(params, momentum=0.9)
        return self.optimizer

    def scale_lr(self, decay=0.1):
        for param_group in self.optimizer.param_groups:
            param_group['lr'] *= decay
        return self.optimizer


## Faster R-CNN 생성



In [50]:
class FasterRCNNVGG16(FasterRCNN): # backbone으로 VGG16 사용했음

    feat_stride = 16  # downsample 16x for output of conv5 in vgg16

    def __init__(self, n_fg_class=10, ratios=[0.5, 1, 2], anchor_scales=[8, 16, 32] ): # n_fg_class : 배경포함 하지 않은 class 개수        
        extractor, classifier = decom_vgg16()
        
        rpn = RegionProposalNetwork(
            512, 512,
            ratios=ratios,
            anchor_scales=anchor_scales,
            feat_stride=self.feat_stride,
        )

        head = VGG16RoIHead(
            n_class=n_fg_class + 1,
            roi_size=7,
            spatial_scale=(1. / self.feat_stride),
            classifier=classifier
        )
        super(FasterRCNNVGG16, self).__init__( # fast RCNN 상속 받았으니 super를 통해 넣어줌
            extractor,
            rpn,
            head,
        )

- trainer 정의
  - 지금까지는 모델만 정의
  - 우린 RPN, ROI head를 훈련시켜야 함

## 그 애들이
1. Anchor Target Creator
  - Anchor box에 해당하는 ground truth bounding box match
  - Region Proposal Network loss 구할 때 ground truth로 사용
2. positive, negative sampling(Proposal target Creator)
  - RPN에서 NMS를 거친 roi들을 ground truth와의 IOU비교
  - Positive/Negative sampling 수행(총 128개)
  - sample ROI와 gt_bbox regression에서 regression 해야할 ground truth loc값(t_x, t_y, t_w, t_h) 구함



0. util 함수 정의

In [51]:
def bbox_iou(bbox_a, bbox_b):
    if bbox_a.shape[1] != 4 or bbox_b.shape[1] != 4:
        raise IndexError

    #bbox_a 1개와 bbox_b k개를 비교해야하므로 None을 이용해서 차원을 늘려서 연산한다.
    # top left
    tl = np.maximum(bbox_a[:, None, :2], bbox_b[:, :2])
    # bottom right
    br = np.minimum(bbox_a[:, None, 2:], bbox_b[:, 2:])

    area_i = np.prod(br - tl, axis=2) * (tl < br).all(axis=2)
    area_a = np.prod(bbox_a[:, 2:] - bbox_a[:, :2], axis=1)
    area_b = np.prod(bbox_b[:, 2:] - bbox_b[:, :2], axis=1)
    return area_i / (area_a[:, None] + area_b - area_i)

1. Anchor Target Creator


In [52]:
class AnchorTargetCreator(object):

    def __init__(self,
                 n_sample=256,
                 pos_iou_thresh=0.7, neg_iou_thresh=0.3,
                 pos_ratio=0.5):
        self.n_sample = n_sample
        self.pos_iou_thresh = pos_iou_thresh
        self.neg_iou_thresh = neg_iou_thresh
        self.pos_ratio = pos_ratio

    def __call__(self, bbox, anchor, img_size):

        img_H, img_W = img_size

        n_anchor = len(anchor) # 9216
        inside_index = get_inside_index(anchor, img_H, img_W) # (2272,) -> 안쪽에 위치하는 box만을 catch해서 그것의 index만 가져온다.
        anchor = anchor[inside_index] # (2272, 4)
        argmax_ious, label = self._create_label(
            inside_index, anchor, bbox)

        # compute bounding box regression targets
        loc = bbox2loc(anchor, bbox[argmax_ious]) # (2272, 4)

        # map up to original set of anchors
        label = unmap(label, n_anchor, inside_index, fill=-1) # (9216,)
        loc = unmap(loc, n_anchor, inside_index, fill=0) # (9216, 4)

        return loc, label

    def _create_label(self, inside_index, anchor, bbox):
        # Positive / Negative sample을 만들어내는 과정
        # label) 1 :positive, 0 : negative, -1 : dont care
        label = np.empty((len(inside_index),), dtype=np.int32)
        label.fill(-1)

        argmax_ious, max_ious, gt_argmax_ious = self._calc_ious(anchor, bbox, inside_index)

        label[max_ious < self.neg_iou_thresh] = 0 # 0.3 이하는 negative

        # 가장 iou가 큰 것은 positive label
        label[gt_argmax_ious] = 1

        # positive label
        label[max_ious >= self.pos_iou_thresh] = 1 # 0.7

        # subsample positive labels if we have too many
        n_pos = int(self.pos_ratio * self.n_sample)
        pos_index = np.where(label == 1)[0]
        if len(pos_index) > n_pos:
            disable_index = np.random.choice(
                pos_index, size=(len(pos_index) - n_pos), replace=False)
            label[disable_index] = -1

        # subsample negative labels if we have too many
        n_neg = self.n_sample - np.sum(label == 1)
        neg_index = np.where(label == 0)[0]
        if len(neg_index) > n_neg:
            disable_index = np.random.choice(
                neg_index, size=(len(neg_index) - n_neg), replace=False)
            label[disable_index] = -1

        return argmax_ious, label

    def _calc_ious(self, anchor, bbox, inside_index):
        # ious between the anchors and the gt boxes
        ious = bbox_iou(anchor, bbox)
        argmax_ious = ious.argmax(axis=1)
        max_ious = ious[np.arange(len(inside_index)), argmax_ious]
        gt_argmax_ious = ious.argmax(axis=0)
        gt_max_ious = ious[gt_argmax_ious, np.arange(ious.shape[1])]
        gt_argmax_ious = np.where(ious == gt_max_ious)[0]

        return argmax_ious, max_ious, gt_argmax_ious

2. positive, negative sampling(Proposal target Creator)
- ROI들을 또 Positve / Negative 나누어야 함

In [53]:
class ProposalTargetCreator:
    def __init__(self,
                 n_sample=128,
                 pos_ratio=0.25, pos_iou_thresh=0.5,
                 neg_iou_thresh_hi=0.5, neg_iou_thresh_lo=0.0
                 ):
        self.n_sample = n_sample
        self.pos_ratio = pos_ratio
        self.pos_iou_thresh = pos_iou_thresh # positive iou threshold
        self.neg_iou_thresh_hi = neg_iou_thresh_hi # negitave iou threshold = (neg_iou_thresh_hi ~ neg_iou_thresh_lo)
        self.neg_iou_thresh_lo = neg_iou_thresh_lo 

    def __call__(self, roi, bbox, label,
                 loc_normalize_mean=(0., 0., 0., 0.),
                 loc_normalize_std=(0.1, 0.1, 0.2, 0.2)):
        n_bbox, _ = bbox.shape

        roi = np.concatenate((roi, bbox), axis=0)

        pos_roi_per_image = np.round(self.n_sample * self.pos_ratio) # positive image 갯수 = 32
        iou = bbox_iou(roi, bbox) # RoI와 bounding box IoU
        gt_assignment = iou.argmax(axis=1)
        max_iou = iou.max(axis=1)
        gt_roi_label = label[gt_assignment] + 1 # class label [0, n_fg_class - 1] -> [1, n_fg_class].

        # positive sample 선택 (>= pos_iou_thresh IoU)
        pos_index = np.where(max_iou >= self.pos_iou_thresh)[0]
        pos_roi_per_this_image = int(min(pos_roi_per_image, pos_index.size))
        if pos_index.size > 0:
            pos_index = np.random.choice(
                pos_index, size=pos_roi_per_this_image, replace=False)

        # Negative sample 선택 [neg_iou_thresh_lo, neg_iou_thresh_hi)
        neg_index = np.where((max_iou < self.neg_iou_thresh_hi) &
                             (max_iou >= self.neg_iou_thresh_lo))[0]
        neg_roi_per_this_image = self.n_sample - pos_roi_per_this_image
        neg_roi_per_this_image = int(min(neg_roi_per_this_image,
                                         neg_index.size))
        if neg_index.size > 0:
            neg_index = np.random.choice(
                neg_index, size=neg_roi_per_this_image, replace=False)

        # The indices that we're selecting (both positive and negative).
        keep_index = np.append(pos_index, neg_index)
        gt_roi_label = gt_roi_label[keep_index]
        gt_roi_label[pos_roi_per_this_image:] = 0  # negative sample의 label = 0
        sample_roi = roi[keep_index] # (128, 4)

        # sample roi와 gt_bbox를 이용해 bbox regression에서 regression해야할 ground truth loc값(t_x, t_y, t_w, t_h) 계산
        gt_roi_loc = bbox2loc(sample_roi, bbox[gt_assignment[keep_index]]) # (128, 4)
        gt_roi_loc = ((gt_roi_loc - np.array(loc_normalize_mean, np.float32)) / np.array(loc_normalize_std, np.float32))

        return sample_roi, gt_roi_loc, gt_roi_label

#### Training, loss 계산, checkpoint 저장 및 불러오기

In [54]:
LossTuple = namedtuple('LossTuple', ['rpn_loc_loss', 'rpn_cls_loss',
                                     'roi_loc_loss', 'roi_cls_loss',
                                     'total_loss'])
class FasterRCNNTrainer(nn.Module):

    def __init__(self, faster_rcnn):
        super(FasterRCNNTrainer, self).__init__()

        self.faster_rcnn = faster_rcnn
        self.rpn_sigma = rpn_sigma
        self.roi_sigma = roi_sigma

        # target creator create gt_bbox gt_label etc as training targets. 
        self.anchor_target_creator = AnchorTargetCreator()
        self.proposal_target_creator = ProposalTargetCreator()

        self.loc_normalize_mean = faster_rcnn.loc_normalize_mean
        self.loc_normalize_std = faster_rcnn.loc_normalize_std

        self.optimizer = self.faster_rcnn.get_optimizer()

        # training 상태 보여주는 지표
        self.rpn_cm = ConfusionMeter(2) # confusion matrix for classification
        self.roi_cm = ConfusionMeter(11)  # confusion matrix for classification
        self.meters = {k: AverageValueMeter() for k in LossTuple._fields}  # average loss

    def forward(self, imgs, bboxes, labels, scale):
        n = bboxes.shape[0]
        
        if n != 1:
            raise ValueError('Currently only batch size 1 is supported.')

        _, _, H, W = imgs.shape
        img_size = (H, W)

        # VGG (features extractor)
        features = self.faster_rcnn.extractor(imgs)
        
        # RPN (region proposal)
        rpn_locs, rpn_scores, rois, roi_indices, anchor = self.faster_rcnn.rpn(features, img_size, scale)

        # Since batch size is one, convert variables to singular form
        bbox = bboxes[0]
        label = labels[0]
        rpn_score = rpn_scores[0]
        rpn_loc = rpn_locs[0]
        roi = rois

        """
        sample roi =  rpn에서 nms 거친 2000개의 roi들 중 positive/negative 비율 고려해 최종 sampling한 roi
        """
        sample_roi, gt_roi_loc, gt_roi_label = self.proposal_target_creator(
            roi,
            tonumpy(bbox),
            tonumpy(label),
            self.loc_normalize_mean,
            self.loc_normalize_std)
        
        # NOTE it's all zero because now it only support for batch=1 now
        # Faster R-CNN head (prediction head)
        sample_roi_index = torch.zeros(len(sample_roi))
        roi_cls_loc, roi_score = self.faster_rcnn.head(features,sample_roi,sample_roi_index) 

        # ------------------ RPN losses -------------------#
        gt_rpn_loc, gt_rpn_label = self.anchor_target_creator(tonumpy(bbox),anchor,img_size) 
        gt_rpn_label = totensor(gt_rpn_label).long() 
        gt_rpn_loc = totensor(gt_rpn_loc) 
        
        # rpn bounding box regression loss
        rpn_loc_loss = _fast_rcnn_loc_loss(rpn_loc,gt_rpn_loc,gt_rpn_label.data,self.rpn_sigma)
        # rpn classification loss
        rpn_cls_loss = F.cross_entropy(rpn_score, gt_rpn_label.cuda(), ignore_index=-1)
        
        _gt_rpn_label = gt_rpn_label[gt_rpn_label > -1]
        _rpn_score = tonumpy(rpn_score)[tonumpy(gt_rpn_label) > -1]
        self.rpn_cm.add(totensor(_rpn_score, False), _gt_rpn_label.data.long())

        # ------------------ ROI losses (fast rcnn loss) -------------------#
        n_sample = roi_cls_loc.shape[0] 
        roi_cls_loc = roi_cls_loc.view(n_sample, -1, 4)
        roi_loc = roi_cls_loc[torch.arange(0, n_sample).long().cuda(), \
                              totensor(gt_roi_label).long()]
        gt_roi_label = totensor(gt_roi_label).long() 
        gt_roi_loc = totensor(gt_roi_loc) 

        # faster rcnn bounding box regression loss
        roi_loc_loss = _fast_rcnn_loc_loss(
            roi_loc.contiguous(),
            gt_roi_loc,
            gt_roi_label.data,
            self.roi_sigma)

        # faster rcnn classification loss
        roi_cls_loss = nn.CrossEntropyLoss()(roi_score, gt_roi_label.cuda())
        
        self.roi_cm.add(totensor(roi_score, False), gt_roi_label.data.long())

        losses = [rpn_loc_loss, rpn_cls_loss, roi_loc_loss, roi_cls_loss]
        losses = losses + [sum(losses)] # total_loss == sum(losses)

        return LossTuple(*losses)
    
    # training
    def train_step(self, imgs, bboxes, labels, scale):
        self.optimizer.zero_grad()
        losses = self.forward(imgs, bboxes, labels, scale)
        losses.total_loss.backward() # training sector에서 backward 진행 -> roi와 rpn의 weight를 update해줌
        self.optimizer.step()
        self.update_meters(losses)
        return losses
    
    # checkpoint 만들기
    def save(self, save_optimizer=False, save_path=None):
        save_dict = dict()

        save_dict['model'] = self.faster_rcnn.state_dict()

        if save_optimizer:
            save_dict['optimizer'] = self.optimizer.state_dict()

        if save_path is None:
            save_path = './checkpoints/faster_rcnn_scratch_checkpoints.pth'

        save_dir = os.path.dirname(save_path)
        if not os.path.exists(save_dir):
            os.makedirs(save_dir)

        torch.save(save_dict, save_path)
        return save_path
    
    # checkpoint load
    def load(self, path, load_optimizer=True, parse_opt=False, ):
        state_dict = torch.load(path)
        if 'model' in state_dict:
            self.faster_rcnn.load_state_dict(state_dict['model'])
        else:  # legacy way, for backward compatibility
            self.faster_rcnn.load_state_dict(state_dict)
            return self
        if 'optimizer' in state_dict and load_optimizer:
            self.optimizer.load_state_dict(state_dict['optimizer'])
        return self

    def update_meters(self, losses):
        loss_d = {k: scalar(v) for k, v in losses._asdict().items()}
        for key, meter in self.meters.items():
            meter.add(loss_d[key])

    def reset_meters(self):
        for key, meter in self.meters.items():
            meter.reset()
        self.roi_cm.reset()
        self.rpn_cm.reset()

    def get_meter_data(self):
        return {k: v.value()[0] for k, v in self.meters.items()}


def _smooth_l1_loss(x, t, in_weight, sigma):
    sigma2 = sigma ** 2
    diff = in_weight * (x - t)
    abs_diff = diff.abs()
    flag = (abs_diff.data < (1. / sigma2)).float()
    y = (flag * (sigma2 / 2.) * (diff ** 2) +
         (1 - flag) * (abs_diff - 0.5 / sigma2))
    return y.sum()


def _fast_rcnn_loc_loss(pred_loc, gt_loc, gt_label, sigma):
    # Localization loss 구할 때는 positive example에 대해서만 계산
    in_weight = torch.zeros(gt_loc.shape).cuda()
    in_weight[(gt_label > 0).view(-1, 1).expand_as(in_weight).cuda()] = 1
    loc_loss = _smooth_l1_loss(pred_loc, gt_loc, in_weight.detach(), sigma)
    loc_loss /= ((gt_label >= 0).sum().float())
    return loc_loss

## Train 진행
- 데이터셋 불러오고 trainer 불러오고, trainer step 과정 거치고, faster_rcnn에 대해 학습 진행

In [55]:
def train():
    # Train dataset 불러오기
#     dataset = TrainDataset()
    annotation = os.path.join(data_dir,'train.json')
    dataset = TrainCustom(annotation, data_dir, transforms=True)
    print('load data')
    dataloader = data_.DataLoader(dataset, 
                                  batch_size=1,     # only batch_size=1 support
                                  shuffle=True, 
                                  pin_memory=False,
                                  num_workers=4)
    
    # faster rcnn 불러오기
    faster_rcnn = FasterRCNNVGG16().cuda()
    print('model construct completed')
    
    # faster rcnn trainer 불러오기
    trainer = FasterRCNNTrainer(faster_rcnn).cuda()
    
    # checkpoint load
    if train_load_path:
        trainer.load(train_load_path)
        print('load pretrained model from %s' % train_load_path)
    
    lr_ = learning_rate
    best_loss = 1000
    for epoch in range(epochs):
        trainer.reset_meters()
        for ii, (img, bbox_, label_, scale) in enumerate(tqdm(dataloader)):
            
            img, bbox, label = img.cuda().float(), bbox_.cuda(), label_.cuda()
            trainer.train_step(img, bbox, label, float(scale))
        
        losses = trainer.get_meter_data()
        print(f"Epoch #{epoch+1} loss: {losses}")
        if losses['total_loss'] < best_loss :
            trainer.save()
            
        if epoch == 9:
            trainer.faster_rcnn.scale_lr(lr_decay)
            lr_ = lr_ * lr_decay

        if epoch == 13: 
            break

In [56]:
train()

loading annotations into memory...


FileNotFoundError: ignored