In [None]:
class SlowROIPool(nn.Module): # RoI(Region of Interest) Pooling
    # 마지막 max pooling layer를 RoI pooling layer로 대체할때 사용함.
    # Rol Pooling을 수행하는 feature map의 크기는 14*14*512
    def __init__(self, output_size):
        super().__init__()
        self.maxpool = nn.AdaptiveMaxPool2d(output_size)
        self.size = output_size

    # images : 14*14*512 feature map 리스트 , rois : region of interests(=region proposals) 상대좌표, roi_idx : region of interst의 index 리스트
    def forward(self, images, rois, roi_idx):
        n = rois.shape[0] # region of interest(region proposals)의 수

        # feature map의 크기는 고정된 크기로 들어와서 h, w는 모두 14
        h = images.size(2)
        w = images.size(3)

        # Region of Interest의 (x1, y1, x2, y2)의 행렬
        # 상대 좌표로 들어옴
        # rois는 원본 이미지 크기에서 region of interest가 차지하는 비율 형식으로저장되어 0~1사이의 값을 가짐
        x1 = rois[:,0]
        y1 = rois[:,1]
        x2 = rois[:,2]
        y2 = rois[:,3]

        # Region of Interest의 상대좌표를 feature map에 맞게 절대 좌표로 변환
        # 상대 좌표에 feature map의 height, weight를 곱해서 구함
        x1 = np.floor(x1 * w).astype(int)
        x2 = np.ceil(x2 * w).astype(int)
        y1 = np.floor(y1 * h).astype(int)
        y2 = np.ceil(y2 * h).astype(int)
        
        res = []
        # RoI Projection
        # region of interest의 수만큼 반복
        for i in range(n):
            img = images[roi_idx[i]].unsqueeze(0) # roi_idx i번째 해당하는 feature map
            img = img[:, :, y1[i]:y2[i], x1[i]:x2[i]] # 절대 좌표의 부분을 잘라냄
            img = self.maxpool(img) # adaptive max pooling
            res.append(img)
        res = torch.cat(res, dim=0)
        return res # 7*7*(number of region proposals)

In [None]:
# pre-trained된 VGG16 모델을 load한 후 detection task에 맞게 네트워크를 수정
class RCNN(nn.Module):
    def __init__(self):
        super().__init__()

        rawnet = torchvision.models.vgg16_bn(pretrained=True) # pre-trained된 vgg16_bn 모델 load (batch normalization이 추가된 VGG16 모델)
        self.seq = nn.Sequential(*list(rawnet.features.children())[:-1]) # 마지막 max pooling 제거
        self.roipool = SlowROIPool(output_size=(7, 7)) # 마지막 pooling layer을 RoI Pooling으로 대체
        self.feature = nn.Sequential(*list(rawnet.classifier.children())[:-1]) # 마지막 fc layer 제거

        _x = Variable(torch.Tensor(1, 3, 224, 224)) # 입력되는 데이터의 크기
        _r = np.array([[0., 0., 1., 1.]]) 
        _ri = np.array([0])
        _x = self.feature(self.roipool(self.seq(_x), _r, _ri).view(1, -1)) # 7*7*(number of region proposals)
        feature_dim = _x.size(1)
        
        self.cls_score = nn.Linear(feature_dim, N_CLASS+1) # classifier
        self.bbox = nn.Linear(feature_dim, 4*(N_CLASS+1)) # bounding box regressorxxxxxxxxxxxxxxxxx
        
        self.cel = nn.CrossEntropyLoss() # Classifier의 Loss Func
        self.sl1 = nn.SmoothL1Loss() # Bounding Box Regressor의 Loss Func

    def forward(self, inp, rois, ridx):
        res = inp # image
        res = self.seq(res) # 마지막 max pooling이 제거된 resnet에 image 삽입
        res = self.roipool(res, rois, ridx) # feature map에 RoI Pooling 적용
        res = res.detach() # 연산 X
        res = res.view(res.size(0), -1)
        feat = self.feature(res) # fc layers

        cls_score = self.cls_score(feat) # classification result
        bbox = self.bbox(feat).view(-1, N_CLASS+1, 4) # bounding box regressor result
        return cls_score, bbox

    # Multi-Task Loss
    def calc_loss(self, probs, bbox, labels, gt_bbox):
        loss_sc = self.cel(probs, labels)
        lbl = labels.view(-1, 1, 1).expand(labels.size(0), 1, 4)
        mask = (labels != 0).float().view(-1, 1).expand(labels.size(0), 4)
        loss_loc = self.sl1(bbox.gather(1, lbl).squeeze(1) * mask, gt_bbox * mask)
        lmb = 1.0
        loss = loss_sc + lmb * loss_loc
        return loss, loss_sc, loss_loc

In [None]:
# batch별로 Fast R-CNN 모델이 학습하는 과정 구현
def train_batch(img, rois, ridx, gt_cls, gt_tbbox, is_val=False):
    sc, r_bbox = rcnn(img, rois, ridx)
    loss, loss_sc, loss_loc = rcnn.calc_loss(sc, r_bbox, gt_cls, gt_tbbox)
    fl = loss.data.cpu().numpy()[0]
    fl_sc = loss_sc.data.cpu().numpy()[0]
    fl_loc = loss_loc.data.cpu().numpy()[0]

    if not is_val:
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    return fl, fl_sc, fl_loc

def train_epoch(run_set, is_val=False): # Hierarchical sampling을 통한 학습 데이터를 sampling
    I = 2 # number of image
    B = 64 # number of rois per image 
    POS = int(B * 0.25) # positive samples
    NEG = B - POS # negative samples

    # shuffle images
    Nimg = len(run_set)
    perm = np.random.permutation(Nimg)
    perm = run_set[perm]
    
    losses = []
    losses_sc = []
    losses_loc = []

    # 전체 이미지를 I개만큼 처리
    for i in trange(0, Nimg, I):
        lb = i
        rb = min(i+I, Nimg)
        torch_seg = torch.from_numpy(perm[lb:rb])
        img = Variable(train_imgs[torch_seg], volatile=is_val).cuda()
        ridx = []
        glo_ids = []

        for j in range(lb, rb):
            info = train_img_info[perm[j]]

            # roi의 positive, negative idx에 대한 리스트
            pos_idx = info['pos_idx']
            neg_idx = info['neg_idx']
            ids = []

            if len(pos_idx) > 0:
                ids.append(np.random.choice(pos_idx, size=POS))
            if len(neg_idx) > 0:
                ids.append(np.random.choice(neg_idx, size=NEG))
            if len(ids) == 0:
                continue
            ids = np.concatenate(ids, axis=0)

            # glo_ids : 두 이미지에 대한 positive, negative sample의 idx를 저장한 리스트
            glo_ids.append(ids)
            ridx += [j-lb] * ids.shape[0]

        if len(ridx) == 0:
            continue
        glo_ids = np.concatenate(glo_ids, axis=0)
        ridx = np.array(ridx)

        rois = train_roi[glo_ids]
        gt_cls = Variable(torch.from_numpy(train_cls[glo_ids]), volatile=is_val).cuda()
        gt_tbbox = Variable(torch.from_numpy(train_tbbox[glo_ids]), volatile=is_val).cuda()

        loss, loss_sc, loss_loc = train_batch(img, rois, ridx, gt_cls, gt_tbbox, is_val=is_val)
        losses.append(loss)
        losses_sc.append(loss_sc)
        losses_loc.append(loss_loc)

    avg_loss = np.mean(losses)
    avg_loss_sc = np.mean(losses_sc)
    avg_loss_loc = np.mean(losses_loc)
    print(f'Avg loss = {avg_loss:.4f}; loss_sc = {avg_loss_sc:.4f}, loss_loc = {avg_loss_loc:.4f}')
    
    return losses, losses_sc, losses_loc

In [None]:
def reg_to_bbox(img_size, reg, box):
    img_width, img_height = img_size
    bbox_width = box[:,2] - box[:,0] + 1.0
    bbox_height = box[:,3] - box[:,1] + 1.0
    bbox_ctr_x = box[:,0] + 0.5 * bbox_width
    bbox_ctr_y = box[:,1] + 0.5 * bbox_height

    bbox_width = bbox_width[:,np.newaxis]
    bbox_height = bbox_height[:,np.newaxis]
    bbox_ctr_x = bbox_ctr_x[:,np.newaxis]
    bbox_ctr_y = bbox_ctr_y[:,np.newaxis]

    out_ctr_x = reg[:,:,0] * bbox_width + bbox_ctr_x
    out_ctr_y = reg[:,:,1] * bbox_height + bbox_ctr_y

    out_width = bbox_width * np.exp(reg[:,:,2])
    out_height = bbox_height * np.exp(reg[:,:,3])

    return np.array([
        np.maximum(0, out_ctr_x - 0.5 * out_width),
        np.maximum(0, out_ctr_y - 0.5 * out_height),
        np.minimum(img_width, out_ctr_x + 0.5 * out_width),
        np.minimum(img_height, out_ctr_y + 0.5 * out_height)
    ]).transpose([1, 2, 0])