**第一部分 导入库和设置基本参数**

In [1]:
import torch, torchvision
import torch.optim as optim
import torchvision.transforms as transforms

import matplotlib.pyplot as plt
from matplotlib.patches import Rectangle
from PIL import Image, ImageOps

import math

import warnings
warnings.filterwarnings('ignore')

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
imagenet_mean, imagenet_std = [0.485, 0.456, 0.406], [0.229, 0.224, 0.225]
min_size, max_size = 600, 1000

RPN_config = {'anchor_scale' : (128, 256, 512), 'anchor_aspect_ratio' : (0.5, 1.0, 2.0), 'downsample' : 16, 
              'in_channels' : 512, 'num_anchors' : 9,
              'bbox_reg_weights' : (1., 1., 1., 1.),
              'iou_positive_thresh' : 0.7, 'iou_negative_high' : 0.3, 'iou_negative_low' : 0,
              'batch_size_per_image' : 256, 'positive_fraction' : 0.5, 
              'min_size' : 16, 'nms_thresh' : 0.7, 
              'top_n_train' : 2000, 'top_n_test' : 300}

FastRCNN_config = {'output_size' : 7, 'downsample' : 16, 
                   'out_channels' : 4096, 'num_classes' : 21,
                   'bbox_reg_weights' : (10., 10., 5., 5.),
                   'iou_positive_thresh' : 0.5, 'iou_negative_high' : 0.5, 'iou_negative_low' : 0.1,
                   'batch_size_per_image' : 128, 'positive_fraction' : 0.25, 
                   'min_size' : 1, 'nms_thresh' : 0.3, 
                   'score_thresh' : 0.05, 'top_n' : 50}

TRAIN_config = {'epochs' : 15,
                'lr' : 0.001, 'momentum' : 0.9, 'weight_decay' : 0.0005,
                'milestones' : [10], 'clip' : 10,
                'epoch_freq' : 1, 'print_freq' : 1,
                'save' : True, 'SAVE_PATH' : './'}

TEST_config = {'num_classes' : 21, 'iou_thresh' : 0.5, 'use_07_metric' : True}

DEMO_config = {'min_size' : min_size, 'mean' : imagenet_mean, 'std' : imagenet_std, 'score_thresh' : 0.7}

gpu_id = 0

**第二部分 数据获取及处理**

In [3]:
import torchvision.transforms.functional as Func

class Compose:
    """
    Composes several transforms together.
    """
    def __init__(self, transforms):
        self.transforms = transforms

    def __call__(self, image, bboxs):
        for t in self.transforms:
            image, bboxs = t(image, bboxs)
        return image, bboxs


class ToTensor:
    """
    Converts a PIL Image or numpy.ndarray (H x W x C) to a torch.FloatTensor of shape (C x H x W) in the range [0.0, 1.0].
    Only applied to image, not bboxes.
    """
    def __call__(self, image, bboxs):
        return Func.to_tensor(image), bboxs
    
    
class Normalize(torch.nn.Module):
    """
    Normalize a tensor image with mean and standard deviation.
    Only applied to image, not bboxes.
    """
    def __init__(self, mean, std, inplace=False):
        super().__init__()
        self.mean = mean
        self.std = std
        self.inplace = inplace

    def forward(self, image, bboxs):
        return Func.normalize(image, self.mean, self.std, self.inplace), bboxs
    
    
class Resize(torch.nn.Module):
    """
    Resize the short side of image to given size.
    Assume the coords are given min_x, min_y, max_x, max_y.
    Both applied to image and bboxes.
    """
    def __init__(self, min_size, max_size):
        super().__init__()
        self.min_size = min_size
        self.max_size = max_size

    def forward(self, image, bboxs):
        return resize(image, bboxs, self.min_size, self.max_size)
    
    
class Flip(torch.nn.Module):
    """
    Apply horizontal flip on image and bboxes.
    Assume the coords are given min_x, min_y, max_x, max_y.
    Both applied to image and bboxes.
    """
    def __init__(self, p=0.5):
        super().__init__()
        self.p = p

    def forward(self, image, bboxs):
        if torch.rand(1) < self.p:
            flip_image = ImageOps.mirror(image)
            if bboxs == None:
                return flip_image, bboxs
            else:
                flip_bbox = flip(image, bboxs)
                return flip_image, flip_bbox
        else:
            return image, bboxs
            
            
def resize(img, bboxs, min_size, max_size):
    w, h = img.size
    min_side, max_side = min(w, h), max(w, h)
    
    ratio = min(min_size / min_side, max_size / max_side)
    resize_w, resize_h = int(ratio * w), int(ratio * h)
    ratio_w, ratio_h = resize_w / w, resize_h / h
    
    resize_img = img.resize((resize_w, resize_h), resample=Image.BILINEAR)
    resize_bboxs = bboxs.clone()
    if bboxs != None:
        resize_bboxs[:, 0::2] = bboxs[:, 0::2] * ratio_w
        resize_bboxs[:, 1::2] = bboxs[:, 1::2] * ratio_h
    return resize_img, resize_bboxs
            
            
def flip(img, bboxs):
    img = Func.pil_to_tensor(img)
    _, h, w = img.shape

    flip_bboxs = []
    for bbox in bboxs:
        min_x, min_y, max_x, max_y = bbox
        flip_min_x, flip_max_x = w-min_x, w-max_x
        flip_bboxs.append(torch.FloatTensor([flip_max_x, min_y, flip_min_x, max_y]))
    return torch.stack(flip_bboxs)

In [4]:
class VOC_Detection(torch.utils.data.Dataset):
    def __init__(self, root, year, image_set, download, transforms, use_diff):
        self.dataset = torchvision.datasets.VOCDetection(root, year, image_set, download)
        self.transforms = transforms
        self.use_diff = use_diff
        self.VOC_LABELS = ('__background__', # always index 0
                           'aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 'cat', 'chair', 'cow', 
                           'diningtable', 'dog', 'horse','motorbike', 'person', 'pottedplant', 'sheep', 'sofa', 
                           'train', 'tvmonitor')
        
    def __getitem__(self, idx):
        img, target = self.dataset[idx]
        labels, bboxs = [], []
        for info in target['annotation']['object']:
            if self.use_diff or (int(info['difficult']) == 0):
                labels.append(self.VOC_LABELS.index(info['name']))
                # Make pixel indexes 0-based
                bboxs.append(torch.FloatTensor([float(info['bndbox']['xmin'])-1, float(info['bndbox']['ymin'])-1, 
                                                float(info['bndbox']['xmax'])-1, float(info['bndbox']['ymax'])-1]))
        
        labels, bboxs = torch.tensor(labels, dtype=int), torch.stack(bboxs, dim=0)
        if self.transforms: img, bboxs = self.transforms(img, bboxs)
        return img, labels, bboxs

    def __len__(self):
        return len(self.dataset)

In [5]:
batch_size = 16
data_dir = 'dataset/'

In [6]:
train_transform = Compose([Resize(min_size, max_size), 
                           Flip(), 
                           ToTensor(), 
                           Normalize(mean=imagenet_mean, std=imagenet_std)])
test_transform = Compose([Resize(min_size, max_size), 
                          ToTensor(), 
                          Normalize(mean=imagenet_mean, std=imagenet_std)])

train_dataset = VOC_Detection(root=data_dir, year='2012', image_set='train', 
                                      download=False, transforms=train_transform, use_diff=False)
test_dataset = VOC_Detection(root=data_dir, year='2012', image_set='val', 
                                     download=False, transforms=test_transform, use_diff=False)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=8)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=8)

**第三部分 utils工具函数定义**

In [7]:
class Balanced_Sampler():
    def __init__(self, batch_size_per_image, positive_fraction):
        self.batch_size_per_image = batch_size_per_image
        self.positive_fraction = positive_fraction

    def __call__(self, labels):
        sampled_positive_masks = []
        sampled_negative_masks = []
        for labels_per_image in labels:
            positive_idx = torch.where(labels_per_image >= 1)[0]
            negative_idx = torch.where(labels_per_image == 0)[0]

            num_positive = int(self.batch_size_per_image * self.positive_fraction)
            num_positive = min(positive_idx.numel(), num_positive)
            
            num_negative = self.batch_size_per_image - num_positive
            num_negative = min(negative_idx.numel(), num_negative)
            
            sampled_positive = torch.randperm(positive_idx.numel(), device=positive_idx.device)[:num_positive]
            sampled_negative = torch.randperm(negative_idx.numel(), device=negative_idx.device)[:num_negative]

            sampled_positive_idx = positive_idx[sampled_positive]
            sampled_negative_idx = negative_idx[sampled_negative]

            sampled_positive_mask = torch.zeros_like(labels_per_image, device=labels_per_image.device).bool()
            sampled_negative_mask = torch.zeros_like(labels_per_image, device=labels_per_image.device).bool()

            sampled_positive_mask[sampled_positive_idx] = True
            sampled_negative_mask[sampled_negative_idx] = True

            sampled_positive_masks.append(sampled_positive_mask)
            sampled_negative_masks.append(sampled_negative_mask)

        return torch.stack(sampled_positive_masks, dim=0).bool(), torch.stack(sampled_negative_masks, dim=0).bool()


class BoxCoder():
    def __init__(self, weights=(1., 1., 1., 1.), bbox_clip=math.log(1000. / 16)):
        self.weights = weights
        self.bbox_clip = bbox_clip
        
    def decode(self, bbox_deltas, proposals):
        widths = proposals[:, :, 2] - proposals[:, :, 0]
        heights = proposals[:, :, 3] - proposals[:, :, 1]
        cx = (proposals[:, :, 0] + proposals[:, :, 2]) / 2
        cy = (proposals[:, :, 1] + proposals[:, :, 3]) / 2
        
        wx, wy, ww, wh = self.weights
        dx = bbox_deltas[:, :, 0] / wx
        dy = bbox_deltas[:, :, 1] / wy
        dw = bbox_deltas[:, :, 2] / ww
        dh = bbox_deltas[:, :, 3] / wh

        dw = torch.clamp(dw, max=self.bbox_clip)
        dh = torch.clamp(dh, max=self.bbox_clip)

        pred_cx = cx + dx * widths
        pred_cy = cy + dy * heights
        pred_w = widths * torch.exp(dw)
        pred_h = heights * torch.exp(dh)

        pred_x1 = pred_cx - pred_w / 2
        pred_y1 = pred_cy - pred_h / 2
        pred_x2 = pred_cx + pred_w / 2
        pred_y2 = pred_cy + pred_h / 2
        pred_bboxs = torch.stack((pred_x1, pred_y1, pred_x2, pred_y2), dim=2)
        return pred_bboxs

    def encode(self, matched_gt_bboxs, proposals):
        wx, wy, ww, wh = self.weights
        
        proposals_x1 = proposals[:, :, 0]
        proposals_y1 = proposals[:, :, 1]
        proposals_x2 = proposals[:, :, 2]
        proposals_y2 = proposals[:, :, 3]
        
        matched_gt_bboxs_x1 = matched_gt_bboxs[:, :, 0]
        matched_gt_bboxs_y1 = matched_gt_bboxs[:, :, 1]
        matched_gt_bboxs_x2 = matched_gt_bboxs[:, :, 2]
        matched_gt_bboxs_y2 = matched_gt_bboxs[:, :, 3]
        
        proposals_widths = proposals_x2 - proposals_x1
        proposals_heights = proposals_y2 - proposals_y1
        proposals_cx = (proposals_x1 + proposals_x2) / 2
        proposals_cy = (proposals_y1 + proposals_y2) / 2

        matched_gt_bboxs_widths = matched_gt_bboxs_x2 - matched_gt_bboxs_x1
        matched_gt_bboxs_heights = matched_gt_bboxs_y2 - matched_gt_bboxs_y1
        matched_gt_bboxs_cx = (matched_gt_bboxs_x1 + matched_gt_bboxs_x2) / 2
        matched_gt_bboxs_cy = (matched_gt_bboxs_y1 + matched_gt_bboxs_y2) / 2

        targets_dx = wx * (matched_gt_bboxs_cx - proposals_cx) / proposals_widths
        targets_dy = wy * (matched_gt_bboxs_cy - proposals_cy) / proposals_heights
        targets_dw = ww * torch.log(matched_gt_bboxs_widths / proposals_widths)
        targets_dh = wh * torch.log(matched_gt_bboxs_heights / proposals_heights)

        targets = torch.stack((targets_dx, targets_dy, targets_dw, targets_dh), dim=2)
        return targets


class Matcher(object):
    def __init__(self, iou_positive_thresh, iou_negative_high, iou_negative_low, low_quality_match):
        self.BELOW_LOW_THRESHOLD = -1
        self.BETWEEN_THRESHOLDS = -2

        self.iou_positive_thresh = iou_positive_thresh
        self.iou_negative_high = iou_negative_high
        self.iou_negative_low = iou_negative_low
        self.low_quality_match = low_quality_match

    def __call__(self, match_quality_matrix):
        proposals_max_iou_val, proposals_max_iou_idx = match_quality_matrix.max(dim=0)
        proposals_match = proposals_max_iou_idx.clone()
        
        # Negative
        negative_mask = (self.iou_negative_low <= proposals_max_iou_val) & (proposals_max_iou_val < self.iou_negative_high)
        
        # Not negative nor positive
        between_mask = (self.iou_negative_high <= proposals_max_iou_val) & (proposals_max_iou_val < self.iou_positive_thresh)
        between_mask = between_mask | (proposals_max_iou_val < self.iou_negative_low)
        
        proposals_max_iou_idx[negative_mask] = self.BELOW_LOW_THRESHOLD
        proposals_max_iou_idx[between_mask] = self.BETWEEN_THRESHOLDS
        
        if self.low_quality_match:
            gt_max_iou_val, _ = match_quality_matrix.max(dim=1)
            positive_idx = torch.where(match_quality_matrix == gt_max_iou_val[:, None])[1]
            proposals_max_iou_idx[positive_idx] = proposals_match[positive_idx]
        return proposals_max_iou_idx

**第四部分 Region Proposal网络模型定义**

In [8]:
from torch import nn
from torch.nn import functional as F


class AnchorGenerator(nn.Module):
    def __init__(self, anchor_scale=(128, 256, 512), anchor_aspect_ratio=(0.5, 1.0, 2.0), downsample=16, gpu_id=0):
        super(AnchorGenerator, self).__init__()
        torch.cuda.set_device(gpu_id)
        self.gpu = gpu_id
        
        self.anchor_scale = anchor_scale
        self.anchor_aspect_ratio = anchor_aspect_ratio
        self.downsample = downsample
        self.base_anchors = self.generate_base_anchors(anchor_scale, anchor_aspect_ratio).cuda(self.gpu)
        
    def forward(self, features):
        images_anchors = self.generate_images_anchors(features, self.downsample)
        return images_anchors

    def generate_base_anchors(self, anchor_scale, anchor_aspect_ratio):
        anchor_scale, anchor_aspect_ratio = torch.FloatTensor(anchor_scale), torch.FloatTensor(anchor_aspect_ratio)
        anchor_h_ratio = torch.sqrt(anchor_aspect_ratio)
        anchor_w_ratio = 1 / anchor_h_ratio
        
        anchor_ws = (anchor_w_ratio[:, None] * anchor_scale[None, :]).view(-1)
        anchor_hs = (anchor_h_ratio[:, None] * anchor_scale[None, :]).view(-1)
        
        base_anchors = torch.stack([-anchor_ws, -anchor_hs, anchor_ws, anchor_hs], dim=1) / 2
        return base_anchors

    def generate_images_anchors(self, features, downsample):
        features_size = [feature.shape[-2:] for feature in features]
        
        images_anchors = []
        for f_h, f_w in features_size:
            grid_y, grid_x = torch.meshgrid(torch.arange(0, f_h).cuda(self.gpu) * downsample, 
                                            torch.arange(0, f_w).cuda(self.gpu) * downsample)
            grid_y, grid_x = grid_y.reshape(-1), grid_x.reshape(-1)
            grid_xy = torch.stack((grid_x, grid_y, grid_x, grid_y), dim=1)
            image_anchors = (grid_xy.view(-1, 1, 4) + self.base_anchors.view(1, -1, 4)).reshape(-1, 4)
            images_anchors.append(image_anchors)
            
        return torch.stack(images_anchors, dim=0)


class RPNHead(nn.Module):
    def __init__(self, in_channels, num_anchors, gpu_id):
        super(RPNHead, self).__init__()
        torch.cuda.set_device(gpu_id)
        self.gpu = gpu_id
        
        self.conv = nn.Sequential(nn.Conv2d(in_channels, in_channels, kernel_size=3, stride=1, padding=1), 
                                  nn.ReLU()).cuda(self.gpu)
        self.classification = nn.Conv2d(in_channels, num_anchors, kernel_size=1, stride=1).cuda(self.gpu)
        self.bbox_regressor = nn.Conv2d(in_channels, 4 * num_anchors, kernel_size=1, stride=1).cuda(self.gpu)
        self._initialize_weights()

    def forward(self, features):
        features = self.conv(features)
        objectness = self.classification(features)
        pred_bbox_deltas = self.bbox_regressor(features)
        return objectness, pred_bbox_deltas
    
    def _initialize_weights(self):
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.normal_(m.weight, 0, 0.01)
                nn.init.constant_(m.bias, 0)


class RegionProposalNetwork(nn.Module):
    def __init__(self, anchor_generator, rpn_head,
                 bbox_reg_weights, 
                 iou_positive_thresh, iou_negative_high, iou_negative_low,
                 batch_size_per_image, positive_fraction,
                 min_size, nms_thresh, 
                 top_n_train, top_n_test):
        super(RegionProposalNetwork, self).__init__()
        self.anchor_generator = anchor_generator
        self.rpn_head = rpn_head
        
        self.box_coder = BoxCoder(bbox_reg_weights)
        self.proposal_matcher = Matcher(iou_positive_thresh, iou_negative_high, iou_negative_low, low_quality_match=True)
        self.sampler = Balanced_Sampler(batch_size_per_image, positive_fraction)
        
        self.min_size = min_size
        self.nms_thresh = nms_thresh
        self.top_n_train = top_n_train
        self.top_n_test = top_n_test

    def assign_gt_to_anchors(self, anchors, gt_labels, gt_bboxs):
        labels, matched_gt_bboxs = [], []
        for anchors_per_img, gt_bboxs_per_img in zip(anchors, gt_bboxs):
            match_quality_matrix = torchvision.ops.box_iou(gt_bboxs_per_img, anchors_per_img)
            matched_idxs_per_img = self.proposal_matcher(match_quality_matrix)
            
            matched_gt_bboxs_per_img = gt_bboxs_per_img[torch.clamp(matched_idxs_per_img, min=0)]
            labels_per_img = (matched_idxs_per_img >= 0).float()

            # Negative
            negative_idxs = matched_idxs_per_img == self.proposal_matcher.BELOW_LOW_THRESHOLD
            labels_per_img[negative_idxs] = 0.0

            # Between
            between_idxs = matched_idxs_per_img == self.proposal_matcher.BETWEEN_THRESHOLDS
            labels_per_img[between_idxs] = -1.0

            labels.append(labels_per_img)
            matched_gt_bboxs.append(matched_gt_bboxs_per_img)
        
        labels, matched_gt_bboxs = torch.stack(labels, dim=0), torch.stack(matched_gt_bboxs, dim=0)
        return labels, matched_gt_bboxs
    
    def calculate_loss(self, objectness, pred_bbox_deltas, labels, regression_targets):
        sampled_positive_masks, sampled_negative_masks = self.sampler(labels)
        sampled_masks = sampled_positive_masks | sampled_negative_masks

        sampled_objectness, sampled_labels = objectness[sampled_masks], labels[sampled_masks]
        sampled_deltas, sampled_regression_targets = (pred_bbox_deltas[sampled_positive_masks], 
                                                      regression_targets[sampled_positive_masks])

        rpn_cls_loss = F.binary_cross_entropy_with_logits(sampled_objectness, sampled_labels)
        rpn_loc_loss = F.smooth_l1_loss(sampled_deltas, sampled_regression_targets, beta=1/9)
        
        return rpn_cls_loss, rpn_loc_loss
    
    def convert(self, bbox_cls, bbox_regression):
        N, Ax4, H, W = bbox_regression.shape
        A = Ax4 // 4
        
        bbox_cls, bbox_regression = bbox_cls.view(N, A, 1, H, W), bbox_regression.view(N, A, 4, H, W)
        bbox_cls, bbox_regression = bbox_cls.permute(0, 3, 4, 1, 2), bbox_regression.permute(0, 3, 4, 1, 2)
        bbox_cls, bbox_regression = bbox_cls.reshape(N, -1), bbox_regression.reshape(N, -1, 4)
        return bbox_cls, bbox_regression
    
    def filter_proposals(self, images, objectness, proposals):
        objectness_prob = torch.sigmoid(objectness)
        filtered_proposals = []
        for img, objectness_prob_per_img, proposals_per_img in zip(images, objectness_prob, proposals):
            # clip to image size
            proposals_per_img = torchvision.ops.clip_boxes_to_image(proposals_per_img, tuple(img.shape[-2:]))

            # remove small proposals
            keep_idx = torchvision.ops.remove_small_boxes(proposals_per_img, self.min_size)
            objectness_prob_per_img, proposals_per_img = objectness_prob_per_img[keep_idx], proposals_per_img[keep_idx]

            # NMS
            keep_idx = torchvision.ops.nms(proposals_per_img, objectness_prob_per_img, self.nms_thresh)
            objectness_prob_per_img, proposals_per_img = objectness_prob_per_img[keep_idx], proposals_per_img[keep_idx]
            
            # sort by objectness and select top-n
            top_idx = torch.argsort(objectness_prob_per_img, descending=True)[:self.top_n()]
            proposals_per_img = proposals_per_img[top_idx]
            
            filtered_proposals.append(proposals_per_img)
        return torch.stack(filtered_proposals, dim=0)

    def forward(self, images, features, gt_labels=None, gt_bboxs=None):
        anchors = self.anchor_generator(features.detach())
        objectness, pred_bbox_deltas = self.rpn_head(features)
        objectness, pred_bbox_deltas = self.convert(objectness, pred_bbox_deltas)
        
        proposals = self.box_coder.decode(pred_bbox_deltas.detach(), anchors)
        filtered_proposals = self.filter_proposals(images, objectness.detach(), proposals)
        
        rpn_cls_loss, rpn_loc_loss = None, None
        if self.training:
            labels, matched_gt_bboxs = self.assign_gt_to_anchors(anchors, gt_labels, gt_bboxs)
            regression_targets = self.box_coder.encode(matched_gt_bboxs, anchors)
            rpn_cls_loss, rpn_loc_loss = self.calculate_loss(objectness, pred_bbox_deltas, labels, regression_targets)
            
        return filtered_proposals, rpn_cls_loss, rpn_loc_loss
    
    def top_n(self):
        if self.training: return self.top_n_train
        return self.top_n_test

**第五部分 Fast-R-CNN和Faster-R-CNN网络模型定义**

In [9]:
class RoIHead(nn.Module):
    """
    Classification and regression for given features.
    """
    def __init__(self, output_size, downsample, backbone_fc, out_channels, num_classes, gpu_id):
        super(RoIHead, self).__init__()
        torch.cuda.set_device(gpu_id)
        self.gpu = gpu_id
        
        self.output_size = output_size
        self.downsample = downsample
        
        self.num_classes = num_classes
        self.fc = backbone_fc
        self.classification = nn.Linear(out_channels, num_classes).cuda(self.gpu)
        self.bbox_regressor = nn.Linear(out_channels, 4 * num_classes).cuda(self.gpu)
        
        self._initialize_weights()

    def forward(self, images, features, proposals):
        N, C, f_h, f_w = features.shape

        proposals_list = [proposal for proposal in proposals]
        bbox_features = torchvision.ops.roi_pool(features, proposals_list, self.output_size, 1 / self.downsample)
        bbox_features = bbox_features.view(N, -1, C, self.output_size, self.output_size)
        
        bbox_features = torch.flatten(bbox_features, start_dim=2)
        bbox_features = self.fc(bbox_features)
        
        objectness = self.classification(bbox_features)
        pred_bbox_deltas = self.bbox_regressor(bbox_features)
        return objectness, pred_bbox_deltas
    
    def _initialize_weights(self):
        nn.init.normal_(self.classification.weight, 0, 0.01)
        nn.init.normal_(self.bbox_regressor.weight, 0, 0.001)
        
        nn.init.constant_(self.classification.bias, 0)
        nn.init.constant_(self.bbox_regressor.bias, 0)

        
class FastRCNN(nn.Module):
    def __init__(self, roi_head,
                 bbox_reg_weights,
                 iou_positive_thresh, iou_negative_high, iou_negative_low,
                 batch_size_per_image, positive_fraction,
                 min_size, nms_thresh, 
                 score_thresh, top_n):
        super(FastRCNN, self).__init__()
        self.roi_head = roi_head
        self.num_classes = roi_head.num_classes
        
        self.box_coder = BoxCoder(bbox_reg_weights)
        self.proposal_matcher = Matcher(iou_positive_thresh, iou_negative_high, iou_negative_low, low_quality_match=False)
        self.sampler = Balanced_Sampler(batch_size_per_image, positive_fraction)
        
        self.min_size = min_size
        self.nms_thresh = nms_thresh
        self.score_thresh = score_thresh
        self.top_n = top_n

    def assign_gt_to_proposals(self, proposals, gt_labels, gt_bboxs):
        labels, matched_gt_bboxs = [], []
        for proposals_per_image, gt_labels_per_image, gt_bboxs_per_image in zip(proposals, gt_labels, gt_bboxs):
            match_quality_matrix = torchvision.ops.box_iou(gt_bboxs_per_image, proposals_per_image)
            matched_idxs_per_image = self.proposal_matcher(match_quality_matrix)
            
            clamped_matched_idxs_per_image = torch.clamp(matched_idxs_per_image, min=0)
            
            labels_per_image = gt_labels_per_image[clamped_matched_idxs_per_image]
            matched_gt_bboxs_per_image = gt_bboxs_per_image[clamped_matched_idxs_per_image]
            
            # Negative
            negative_idxs = matched_idxs_per_image == self.proposal_matcher.BELOW_LOW_THRESHOLD
            labels_per_image[negative_idxs] = 0.0

            # Between
            between_idxs = matched_idxs_per_image == self.proposal_matcher.BETWEEN_THRESHOLDS
            labels_per_image[between_idxs] = -1.0

            labels.append(labels_per_image)
            matched_gt_bboxs.append(matched_gt_bboxs_per_image)
        return torch.stack(labels, dim=0), torch.stack(matched_gt_bboxs, dim=0)
    
    def calculate_loss(self, class_logits, pred_bbox_deltas, labels, regression_targets):
        N, P, N_Cx4 = pred_bbox_deltas.shape
        pred_bbox_deltas = pred_bbox_deltas.view(N, P, N_Cx4 // 4, 4)
        
        sampled_positive_masks, sampled_negative_masks = self.sampler(labels)
        sampled_masks = sampled_positive_masks | sampled_negative_masks

        sampled_class_logits, sampled_labels = class_logits[sampled_masks], labels[sampled_masks]
        roi_cls_loss = F.cross_entropy(sampled_class_logits, sampled_labels)
        
        sampled_deltas, sampled_regression_targets = (pred_bbox_deltas[sampled_positive_masks], 
                                                      regression_targets[sampled_positive_masks])
        sampled_positive_labels = labels[sampled_positive_masks]
        sampled_regression = []
        for sampled_positive_label, sampled_delta in zip(sampled_positive_labels, sampled_deltas):
            sampled_regression.append(sampled_delta[sampled_positive_label])
        
        if len(sampled_regression) == 0:
            roi_loc_loss = None
        else:
            sampled_regression = torch.stack(sampled_regression, dim=0)
            roi_loc_loss = F.smooth_l1_loss(sampled_regression, sampled_regression_targets)
        return roi_cls_loss, roi_loc_loss
    
    def convert(self, class_logits, pred_bbox_deltas, proposals):
        # convert class logits and pred_bbox_deltas and remove background class
        # (N, P, num_classes), (N, P, num_classes * 4) -> (N, P_without_background), (N, P_without_background, 4)
        N, P, N_Cx4 = pred_bbox_deltas.shape
        pred_bbox_deltas = pred_bbox_deltas.view(N, P, N_Cx4 // 4, 4)
        probs = F.softmax(class_logits, dim=-1)
        
        pred_scores, pred_labels, pred_deltas, pred_proposals = [], [], [], []
        for probs_per_img, pred_bbox_deltas_per_img, proposals_per_img in zip(probs, pred_bbox_deltas, proposals):
            pred_scores_per_img, pred_labels_per_img = torch.max(probs_per_img[:,1:], dim=-1)
            pred_labels_per_img += 1
            label_map = torch.arange(self.num_classes, device=probs_per_img.device).expand_as(probs_per_img)
            mask = label_map == pred_labels_per_img[:, None]
            class_idx = pred_labels_per_img > 0
            
            pred_scores.append(pred_scores_per_img[class_idx])
            pred_labels.append(pred_labels_per_img[class_idx])
            pred_deltas.append(pred_bbox_deltas_per_img[mask][class_idx])
            pred_proposals.append(proposals_per_img[class_idx])
        
        pred_scores, pred_labels = torch.stack(pred_scores, dim=0), torch.stack(pred_labels, dim=0)
        pred_deltas, pred_proposals = torch.stack(pred_deltas, dim=0), torch.stack(pred_proposals, dim=0)
        detections = self.box_coder.decode(pred_deltas, pred_proposals)
        return pred_scores, pred_labels, detections
    
    def filter_detections(self, images, class_logits, pred_bbox_deltas, proposals):
        pred_scores, pred_labels, detections = self.convert(class_logits, pred_bbox_deltas, proposals)
        
        filtered_scores, filtered_labels, filtered_detections = [], [], []
        for img, scores_per_img, labels_per_img, detections_per_img in zip(images, pred_scores, pred_labels, detections):
            # clip to image size
            detections_per_img = torchvision.ops.clip_boxes_to_image(detections_per_img, tuple(img.shape[-2:]))
            
            # remove small proposals
            keep_idx = torchvision.ops.remove_small_boxes(detections_per_img, self.min_size)
            scores_per_img, labels_per_img, detections_per_img = (scores_per_img[keep_idx], 
                                                                  labels_per_img[keep_idx], 
                                                                  detections_per_img[keep_idx])
            
            # remove low score proposals
            keep_idx = scores_per_img > self.score_thresh
            scores_per_img, labels_per_img, detections_per_img = (scores_per_img[keep_idx], 
                                                                  labels_per_img[keep_idx], 
                                                                  detections_per_img[keep_idx])
            
            # NMS
            keep_idx = torchvision.ops.batched_nms(detections_per_img, scores_per_img, labels_per_img, self.nms_thresh)
            scores_per_img, labels_per_img, detections_per_img = (scores_per_img[keep_idx], 
                                                                  labels_per_img[keep_idx], 
                                                                  detections_per_img[keep_idx])
            
            # sort by scores and select top-n
            top_idx = torch.argsort(scores_per_img, descending=True)[:self.top_n]
            scores_per_img, labels_per_img, detections_per_img = (scores_per_img[top_idx], 
                                                                  labels_per_img[top_idx], 
                                                                  detections_per_img[top_idx])

            filtered_scores.append(scores_per_img)
            filtered_labels.append(labels_per_img)
            filtered_detections.append(detections_per_img)
            
        filtered_scores = torch.stack(filtered_scores, dim=0)
        filtered_labels = torch.stack(filtered_labels, dim=0)
        filtered_detections = torch.stack(filtered_detections, dim=0)
        return filtered_scores, filtered_labels, filtered_detections
    
    def forward(self, images, features, proposals, gt_labels=None, gt_bboxs=None):
        class_logits, pred_bbox_deltas = self.roi_head(images, features, proposals)
        if self.training:
            labels, matched_gt_bboxs = self.assign_gt_to_proposals(proposals, gt_labels, gt_bboxs)
            regression_targets = self.box_coder.encode(matched_gt_bboxs, proposals)
            roi_cls_loss, roi_loc_loss = self.calculate_loss(class_logits, pred_bbox_deltas, labels, regression_targets)
            return None, None, None, roi_cls_loss, roi_loc_loss
        else:
            pred_scores, pred_labels, pred_detections = self.filter_detections(images, 
                                                                               class_logits, pred_bbox_deltas, proposals)
            return pred_labels, pred_scores, pred_detections, None, None

In [10]:
class FasterRCNN(nn.Module):
    def __init__(self, RPN_config, FastRCNN_config, gpu_id):
        super(FasterRCNN, self).__init__()
        torch.cuda.set_device(gpu_id)
        self.gpu = gpu_id

        self.backbone = self.build_backbone(gpu_id)
        self.RPN =  self.build_RPN(RPN_config, gpu_id)
        self.FastRCNN = self.build_FastRCNN(FastRCNN_config, gpu_id)
        
    def build_backbone(self, gpu_id):
        backbone = torchvision.models.vgg16(pretrained=True).features[:30].cuda(gpu_id)
        for i, children in enumerate(backbone.children()):
            for child in children.parameters():
                child.requires_grad = False
            if i == 9: break
        return backbone
        
    def build_FastRCNN(self, FastRCNN_config, gpu_id):
        classifier = list(torchvision.models.vgg16(pretrained=True).classifier)
        classifier = classifier[:2] + classifier[3:5]
        backbone_fc = nn.Sequential(*classifier).cuda(self.gpu)
        
        roi_head = RoIHead(FastRCNN_config['output_size'], FastRCNN_config['downsample'], 
                                     backbone_fc, FastRCNN_config['out_channels'], FastRCNN_config['num_classes'], gpu_id)
        FastRCNN_Model = FastRCNN(roi_head,
                                      FastRCNN_config['bbox_reg_weights'],
                                      FastRCNN_config['iou_positive_thresh'], 
                                      FastRCNN_config['iou_negative_high'], FastRCNN_config['iou_negative_low'],
                                      FastRCNN_config['batch_size_per_image'], FastRCNN_config['positive_fraction'],
                                      FastRCNN_config['min_size'], FastRCNN_config['nms_thresh'], 
                                      FastRCNN_config['score_thresh'], FastRCNN_config['top_n'])
        return FastRCNN_Model
        
    def build_RPN(self, RPN_config, gpu_id):
        anchor_generator = AnchorGenerator(RPN_config['anchor_scale'], RPN_config['anchor_aspect_ratio'], 
                                               RPN_config['downsample'], gpu_id)
        rpn_head = RPNHead(RPN_config['in_channels'], RPN_config['num_anchors'], gpu_id)
        RPN = RegionProposalNetwork(anchor_generator, rpn_head, 
                                        RPN_config['bbox_reg_weights'], 
                                        RPN_config['iou_positive_thresh'], 
                                        RPN_config['iou_negative_high'], RPN_config['iou_negative_low'],
                                        RPN_config['batch_size_per_image'], RPN_config['positive_fraction'], 
                                        RPN_config['min_size'], RPN_config['nms_thresh'], 
                                        RPN_config['top_n_train'], RPN_config['top_n_test'])
        return RPN

    def forward(self, images, gt_labels=None, gt_bboxs=None):
        if self.training: gt_labels, gt_bboxs = gt_labels.cuda(self.gpu), gt_bboxs.cuda(self.gpu)
        images = images.cuda(self.gpu)
        
        features = self.backbone(images)
        proposals, rpn_cls_loss, rpn_loc_loss = self.RPN(images, features, gt_labels, gt_bboxs)
        labels, scores, detections, roi_cls_loss, roi_loc_loss = self.FastRCNN(images, features, proposals.detach(), 
                                                                               gt_labels, gt_bboxs)
        
        return rpn_cls_loss, rpn_loc_loss, roi_cls_loss, roi_loc_loss, labels, scores, detections

**第六部分 验证函数定义**

In [11]:
def voc_ap(recall, precision, use_07_metric):
    if use_07_metric:
        ap = 0.
        for thresh in torch.arange(0., 1.1, 0.1):
            if torch.sum(recall >= thresh) == 0: p = 0
            else: p = float(torch.max(precision[recall >= thresh]).cpu())
            ap = ap + p / 11.
    else:
        rec = torch.cat((torch.tensor([0]), recall, torch.tensor([1])))
        pre = torch.cat((torch.tensor([0]), precision, torch.tensor([0])))

        for i in range(pre.shape[0]-1, 0, -1):
            pre[i-1] = torch.max(pre[i-1], pre[i])

        i = torch.where(rec[1:] != rec[:-1])[0]
        ap = float(torch.sum((rec[i+1] - rec[i]) * pre[i+1]).cpu())
    return ap


def voc_eval(pred_bboxs, pred_labels, pred_scores, gt_bboxs, gt_labels, num_classes, iou_thresh, use_07_metric):
    pred_bboxs_concat = torch.cat(pred_bboxs)
    pred_labels_concat = torch.cat(pred_labels)
    pred_scores_concat = torch.cat(pred_scores)
    
    gt_bboxs_concat = torch.cat(gt_bboxs).cuda(pred_bboxs_concat.device)
    gt_labels_concat = torch.cat(gt_labels).cuda(pred_labels_concat.device)
    
    gt_image_ids, pred_image_ids = [], []
    for image_id in range(len(gt_labels)): gt_image_ids += [image_id] * gt_labels[image_id].shape[0]
    for image_id in range(len(pred_labels)): pred_image_ids += [image_id] * pred_labels[image_id].shape[0]
    
    gt_image_ids = torch.tensor(gt_image_ids, device=gt_labels_concat.device)
    pred_image_ids = torch.tensor(pred_image_ids, device=pred_labels_concat.device)
    
    aps = []
    for class_idx in range(1, num_classes):
        gt_masks_per_class = gt_labels_concat == class_idx
        gt_bboxs_per_class = gt_bboxs_concat[gt_masks_per_class]
        gt_image_ids_per_class = gt_image_ids[gt_masks_per_class]
        
        check = torch.zeros_like(gt_image_ids_per_class, dtype=bool)
        gt_num = torch.sum(gt_masks_per_class)
        
        pred_masks_per_class = pred_labels_concat == class_idx
        pred_bboxs_per_class = pred_bboxs_concat[pred_masks_per_class]
        pred_scores_per_class = pred_scores_concat[pred_masks_per_class]
        pred_image_ids_per_class = pred_image_ids[pred_masks_per_class]
        
        sort_idx_per_class = torch.argsort(-pred_scores_per_class)
        sort_pred_bboxs_per_class = pred_bboxs_per_class[sort_idx_per_class]
        sort_pred_image_ids_per_class = pred_image_ids_per_class[sort_idx_per_class]
        
        pred_num = torch.sum(pred_masks_per_class)
        tp, fp = torch.zeros(pred_num, device=pred_num.device), torch.zeros(pred_num, device=pred_num.device)
        
        for i in range(pred_num):
            match_idx = torch.where(gt_image_ids_per_class == sort_pred_image_ids_per_class[i])[0]
            if match_idx.nelement() != 0:
                gt_bboxs_target = gt_bboxs_per_class[match_idx].clone()
                pred_bboxs_target = sort_pred_bboxs_per_class[i].clone().view(-1, 4)
                gt_bboxs_target[:, 2:] += 1
                pred_bboxs_target[:, 2:] += 1
                IoUs = torchvision.ops.box_iou(gt_bboxs_target, pred_bboxs_target).view(-1)
                IoU_idx, IoU = torch.argmax(IoUs), torch.max(IoUs)
                if (IoU > iou_thresh) and (check[match_idx][IoU_idx] == False):
                    tp[i] = 1.
                    check[match_idx][IoU_idx] = True
                else:
                    fp[i] = 1.
            else:
                fp[i] = 1.
                
        tp, fp = torch.cumsum(tp, dim=0), torch.cumsum(fp, dim=0)
        recall = tp / gt_num
        precision = tp / (tp + fp)
        ap = voc_ap(recall, precision, use_07_metric)
        aps.append(ap)
    return sum(aps) / len(aps)

**第七部分 模型训练、测试部分**

In [12]:
class FasterRCNN_Model():
    def __init__(self, RPN_config, FastRCNN_config, TRAIN_config, TEST_config, DEMO_config, gpu_id):
        self.RPN_config = RPN_config
        self.FastRCNN_config = FastRCNN_config
        self.TRAIN_config = TRAIN_config
        self.TEST_config = TEST_config
        self.DEMO_config = DEMO_config
        self.gpu_id = gpu_id
        
        self.model = FasterRCNN(RPN_config, FastRCNN_config, gpu_id)
        self.rpn_cls_losses = []
        self.rpn_loc_losses = []
        self.roi_cls_losses = []
        self.roi_loc_losses = []
        self.best_mAP = 0
        
    def train(self, train_loader, test_loader):
        params = []
        for key, value in dict(self.model.named_parameters()).items():
            if value.requires_grad:
                if 'bias' in key: params += [{'params': [value], 
                                              'lr': self.TRAIN_config['lr'] * 2, 
                                              'weight_decay': 0}]
                else: params += [{'params': [value], 
                                  'lr': self.TRAIN_config['lr'], 
                                  'weight_decay': self.TRAIN_config['weight_decay']}]
                    
        optimizer = optim.SGD(params, momentum=self.TRAIN_config['momentum'])
        scheduler = optim.lr_scheduler.MultiStepLR(optimizer, self.TRAIN_config['milestones'])
        
        self.model.train()
        for epoch in range(self.TRAIN_config['epochs']):
            print('Epoch {} Started...'.format(epoch+1))
            for i, (images, labels, bboxs) in enumerate(train_loader):
                rpn_cls_loss, rpn_loc_loss, roi_cls_loss, roi_loc_loss, _, _, _ = self.model(images, labels, bboxs)
                
                if roi_loc_loss != None: train_loss = rpn_cls_loss + rpn_loc_loss + roi_cls_loss + roi_loc_loss
                else: train_loss = rpn_cls_loss + rpn_loc_loss + roi_cls_loss
                    
                optimizer.zero_grad()
                train_loss.backward()
                optimizer.step()
                    
                if ((i+1) % self.TRAIN_config['print_freq'] == 0) and (roi_loc_loss != None):
                    rpn_c, rpn_l = rpn_cls_loss.item(), rpn_loc_loss.item()
                    roi_c, roi_l = roi_cls_loss.item(), roi_loc_loss.item()
                    self.rpn_cls_losses.append(rpn_c); self.rpn_loc_losses.append(rpn_l)
                    self.roi_cls_losses.append(roi_c); self.roi_loc_losses.append(roi_l)
            
            scheduler.step()
            
            if epoch % self.TRAIN_config['epoch_freq'] == 0:
                mAP = self.val(test_loader)
                print('Epoch {} mAP : {:.4f}'.format(epoch+1, 100 * mAP))
                if (mAP > self.best_mAP) and (self.TRAIN_config['save']):
                    self.best_mAP = mAP
                    torch.save(self.model.state_dict(), 
                               self.TRAIN_config['SAVE_PATH'] + 'epoch_{}.pt'.format(str(epoch+1).zfill(3)))
                    print('Saved Best Model')
            print()
                
    def val(self, test_loader):
        self.model.eval()
        with torch.no_grad():
            pred_bboxs, pred_labels, pred_scores = [], [], []
            gt_labels, gt_bboxs = [], []
            
            for images, gt_labels_, gt_bboxs_ in test_loader:
                _, _, _, _, pred_labels_, pred_scores_, pred_detections_ = self.model(images)
                gt_labels.append(gt_labels_.view(-1))
                gt_bboxs.append(gt_bboxs_.view(-1, 4))
                
                pred_labels.append(pred_labels_.view(-1))
                pred_scores.append(pred_scores_.view(-1))
                pred_bboxs.append(pred_detections_.view(-1, 4))
        
        self.model.train()
        mAP = voc_eval(pred_bboxs, pred_labels, pred_scores, gt_bboxs, gt_labels, 
                       self.TEST_config['num_classes'], self.TEST_config['iou_thresh'], self.TEST_config['use_07_metric'])
        return mAP
    
    def demo(self, image_dir):
        self.model.eval()
        with torch.no_grad():
            image = Image.open(image_dir).convert('RGB')
            image = transforms.Resize(size=self.DEMO_config['min_size'])(image)
            image_tensor = transforms.ToTensor()(image)
            image_norm_tensor = transforms.Normalize(mean=self.DEMO_config['mean'], std=self.DEMO_config['std'])(image_tensor)
            
            self.model.FastRCNN.score_thresh = self.DEMO_config['score_thresh']
            _, _, _, _, pred_labels_, pred_scores_, pred_detections_ = self.model(image_norm_tensor[None, :, :, :])
            
        return (image, pred_labels_[0].cpu().numpy(), pred_scores_[0].cpu().numpy(), pred_detections_[0].cpu().numpy())

In [13]:
FasterRCNN = FasterRCNN_Model(RPN_config, FastRCNN_config, TRAIN_config, TEST_config, DEMO_config, gpu_id)

In [None]:
FasterRCNN.train(train_loader, test_loader)

Epoch 1 Started...


**第八部分 绘制loss曲线**

In [None]:
# 绘制RPN Train Loss曲线
plt.figure(figsize=(20, 10))
plt.rcParams['axes.titlesize'] = 30

label_fontsize = 25

cls_lossline, = plt.plot(rpn_cls_losses, label='CLS')
loc_lossline, = plt.plot(rpn_loc_losses, color='red', label='LOC')
plt.legend(handles=[cls_lossline, loc_lossline], fontsize=20)
plt.xlabel('Step', fontsize=label_fontsize)
plt.ylabel('RPN Train Loss', fontsize=label_fontsize)

In [None]:
# 绘制Fast R-CNN Train Loss曲线
plt.figure(figsize=(20, 10))
plt.rcParams['axes.titlesize'] = 30

label_fontsize = 25

cls_lossline, = plt.plot(roi_cls_losses, label='CLS')
loc_lossline, = plt.plot(roi_loc_losses, color='red', label='LOC')
plt.legend(handles=[cls_lossline, loc_lossline], fontsize=20)
plt.xlabel('Step', fontsize=label_fontsize)
plt.ylabel('Fast R-CNN Train Loss', fontsize=label_fontsize)

**第九部分 测试图片**

In [None]:
Model = FasterRCNN_Model(RPN_config, FastRCNN_config, TRAIN_config, TEST_config, DEMO_config, gpu_id)
# Model.model.load_state_dict(torch.load('best_model_vgg.pth'))

from torchsummary import summary
summary(Model)

In [None]:
VOC_LABELS = ('__background__', 
              'aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 'cat', 'chair', 'cow', 'diningtable', 'dog', 
              'horse','motorbike', 'person', 'pottedplant', 'sheep', 'sofa', 'train', 'tvmonitor')

demo_img = 'test.png'

img, pred_labels, pred_scores, pred_detections = demo(demo_img)

In [None]:
plt.figure(figsize=(20, 20))
plt.rcParams['axes.titlesize'] = 20
plt.axis('off')

for i in range(min(6, pred_labels.shape[0])):
    plt.subplot(3, 2, i+1)
    pred_label, pred_score, pred_detection = pred_labels[i], pred_scores[i], pred_detections[i]
    image = plt.imshow(img); image.axes.get_xaxis().set_visible(False); image.axes.get_yaxis().set_visible(False)
    
    plt.gca().set_title(VOC_LABELS[int(pred_label)] + ' ' + str(round(100 * pred_score, 2)) + '%')
    min_x, min_y, max_x, max_y = pred_detection
    plt.gca().add_patch(Rectangle((min_x, min_y), max_x-min_x, max_y-min_y, edgecolor='r', facecolor='none'))