# K210从训练到部署实战

## 0. 导入必要的库

In [2]:
import torch
from torchvision import datasets, transforms, utils, models
from torch import nn

import numpy as np

# 若检测到有GPU则使用GPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

## 1. 各类辅助函数操作

In [3]:
# reorg降采样操作
class reorg_layer(nn.Module):
    def __init__(self, stride):
        super(reorg_layer, self).__init__()
        self.stride = stride

    def forward(self, x):
        batch_size, channels, height, width = x.size()
        _height, _width = height // self.stride, width // self.stride
        
        x = x.view(batch_size, channels, _height, self.stride, _width, self.stride).transpose(3, 4).contiguous()
        x = x.view(batch_size, channels, _height * _width, self.stride * self.stride).transpose(2, 3).contiguous()
        x = x.view(batch_size, channels, self.stride * self.stride, _height, _width).transpose(1, 2).contiguous()
        x = x.view(batch_size, -1, _height, _width)

        return x

# iou分数计算
def iou_score(bboxes_a, bboxes_b):
    """
        bbox_1 : [B*N, 4] = [x1, y1, x2, y2]
        bbox_2 : [B*N, 4] = [x1, y1, x2, y2]
    """
    tl = torch.max(bboxes_a[:, :2], bboxes_b[:, :2])
    br = torch.min(bboxes_a[:, 2:], bboxes_b[:, 2:])
    area_a = torch.prod(bboxes_a[:, 2:] - bboxes_a[:, :2], 1)
    area_b = torch.prod(bboxes_b[:, 2:] - bboxes_b[:, :2], 1)

    en = (tl < br).type(tl.type()).prod(dim=1)
    area_i = torch.prod(br - tl, 1) * en  # * ((tl < br).all())
    return area_i / (area_a + area_b - area_i)

# loss计算
class MSEWithLogitsLoss(nn.Module):
    def __init__(self):
        super(MSEWithLogitsLoss, self).__init__()

    def forward(self, logits, targets, mask):
        inputs = torch.clamp(torch.sigmoid(logits), min=1e-4, max=1.0 - 1e-4)

        # 被忽略的先验框的mask都是-1，不参与loss计算
        pos_id = (mask==1.0).float()
        neg_id = (mask==0.0).float()
        pos_loss = pos_id * (inputs - targets)**2
        neg_loss = neg_id * (inputs)**2
        loss = 5.0*pos_loss + 1.0*neg_loss

        return loss
def compute_loss(pred_conf, pred_cls, pred_txtytwth, targets):
    batch_size = pred_conf.size(0)
    # 损失函数
    conf_loss_function = MSEWithLogitsLoss()
    cls_loss_function = nn.CrossEntropyLoss(reduction='none')
    txty_loss_function = nn.BCEWithLogitsLoss(reduction='none')
    twth_loss_function = nn.MSELoss(reduction='none')

    # 预测
    pred_conf = pred_conf[..., 0]           # [B, HW,]
    pred_cls = pred_cls.permute(0, 2, 1)    # [B, C, HW]
    pred_txty = pred_txtytwth[..., :2]      # [B, HW, 2]
    pred_twth = pred_txtytwth[..., 2:]      # [B, HW, 2]

    # 标签  
    gt_conf = targets[..., 0].float()                 # [B, HW,]
    gt_obj = targets[..., 1].float()                  # [B, HW,]
    gt_cls = targets[..., 2].long()                   # [B, HW,]
    gt_txty = targets[..., 3:5].float()               # [B, HW, 2]
    gt_twth = targets[..., 5:7].float()               # [B, HW, 2]
    gt_box_scale_weight = targets[..., 7]             # [B, HW,]
    gt_mask = (gt_box_scale_weight > 0.).float()      # [B, HW,]

    # 置信度损失
    conf_loss = conf_loss_function(pred_conf, gt_conf, gt_obj)
    conf_loss = conf_loss.sum() / batch_size
    
    # 类别损失
    cls_loss = cls_loss_function(pred_cls, gt_cls) * gt_mask
    cls_loss = cls_loss.sum() / batch_size
    
    # 边界框txty的损失
    txty_loss = txty_loss_function(pred_txty, gt_txty).sum(-1) * gt_mask * gt_box_scale_weight
    txty_loss = txty_loss.sum() / batch_size

    # 边界框twth的损失
    twth_loss = twth_loss_function(pred_twth, gt_twth).sum(-1) * gt_mask * gt_box_scale_weight
    twth_loss = twth_loss.sum() / batch_size
    bbox_loss = txty_loss + twth_loss

    #总的损失
    total_loss = conf_loss + cls_loss + bbox_loss

    return conf_loss, cls_loss, bbox_loss, total_loss

## 2. YOLO网络定义

In [4]:
class YOLOv2(nn.Module):
    def __init__(self,
                 cfg,
                 device,
                 input_size=416,
                 num_classes=20,
                 trainable=False,
                 conf_thresh=0.001, 
                 nms_thresh=0.6, 
                 topk=100,
                 anchor_size=None):
        super(YOLOv2, self).__init__()
        self.device = device              # cuda或是cpu 
        self.input_size = input_size      # 输入图像大小
        self.num_classes = num_classes    # 类别数量
        self.trainable = trainable        # 训练时标记
        self.conf_thresh = conf_thresh    # 置信度阈值
        self.nms_thresh = nms_thresh      # NMS阈值
        self.stride = cfg['stride']       # 网络最大降采样倍数
        self.topk = topk
        
        # Anchor box config
        self.anchor_size = torch.tensor(anchor_size)  # [KA, 2]
        self.num_anchors = len(anchor_size)
        self.anchor_boxes = self.create_grid(input_size)  # 用于得到最终bbox的参数
        
        # CBR block
        def conv(in_channels, out_channels, kernel_size, strides=1, padding=0):
            return nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size, strides, padding, bias=False),
                nn.BatchNorm2d(out_channels), nn.ReLU(inplace=True))
        
        # backbone
        self.backbone = models.resnet50(pretrained=trainable)
        c5 = 2048
        
        # neck
        self.convsets_1 = nn.Sequential(
            conv(c5, 1024, kernel_size=1),
            conv(1024, 1024, kernel_size=3, strides=1, padding=1),
            conv(1024, 1024, kernel_size=3, strides=1, padding=1))
        
        # 融合高分辨率的特征信息c4
        self.route_layer = conv(1024, 128, kernel_size=1)
        self.reorg = reorg_layer(stride=2)
        
        # head 
        self.convsets_2 = conv(1024+128*4, 1024, kernel_size=3, strides=1, padding=1)
        
        # prediction 
        self.pred = nn.Conv2d(1024, self.num_anchors*(1+4+self.num_classes), kernel_size=1)
        
        if self.trainable:
            self.init_bias()


    def init_bias(self):
        # init bias
        init_prob = 0.01
        bias_value = -torch.log(torch.tensor((1. - init_prob) / init_prob))
        nn.init.constant_(self.pred.bias[..., :self.num_anchors], bias_value)
        nn.init.constant_(self.pred.bias[..., 1*self.num_anchors:(1+self.num_classes)*self.num_anchors], bias_value)

    def create_grid(self, input_size):
        # 生成一个tensor：grid_xy，每个位置的元素是网格的坐标，
        # 这一tensor将在获得边界框参数的时候会用到。
        w, h = input_size, input_size
        # 生成G矩阵
        fmp_w, fmp_h = w // self.stride, h // self.stride
        grid_y, grid_x = torch.meshgrid([torch.arange(fmp_h), torch.arange(fmp_w)])
        # [H, W, 2] -> [HW, 2]
        grid_xy = torch.stack([grid_x, grid_y], dim=-1).float().view(-1, 2)
        # [HW, 2] -> [HW, 1, 2] -> [HW, KA, 2]
        grid_xy = grid_xy[:, None, :].repeat(1, self.num_anchors, 1)

        # [KA, 2] -> [1, KA, 2] -> [HW, KA, 2]
        anchor_wh = self.anchor_size[None, :, :].repeat(fmp_h*fmp_w, 1, 1)

        # [HW, KA, 4] -> [M, 4]
        anchor_boxes = torch.cat([grid_xy, anchor_wh], dim=-1)
        anchor_boxes = anchor_boxes.view(-1, 4).to(self.device)

        return anchor_boxes        


    def set_grid(self, input_size):
        # 用于重置grid_xy
        self.input_size = input_size
        self.anchor_boxes = self.create_grid(input_size)

    def decode_boxes(self, anchors, txtytwth_pred):
        # 将网络输出的tx,ty,tw,th四个量转换成bbox的(x1,y1),(x2,y2)
        """将txtytwth预测换算成边界框的左上角点坐标和右下角点坐标 \n
            Input: \n
                txtytwth_pred : [B, H*W*KA, 4] \n
            Output: \n
                x1y1x2y2_pred : [B, H*W*KA, 4] \n
        """
        # 获得边界框的中心点坐标和宽高
        # b_x = sigmoid(tx) + gride_x
        # b_y = sigmoid(ty) + gride_y
        xy_pred = torch.sigmoid(txtytwth_pred[..., :2]) + anchors[..., :2]
        # b_w = anchor_w * exp(tw)
        # b_h = anchor_h * exp(th)
        wh_pred = torch.exp(txtytwth_pred[..., 2:]) * anchors[..., 2:]

        # [B, H*W*KA, 4]
        xywh_pred = torch.cat([xy_pred, wh_pred], -1) * self.stride

        # 将中心点坐标和宽高换算成边界框的左上角点坐标和右下角点坐标
        x1y1x2y2_pred = torch.zeros_like(xywh_pred)
        x1y1x2y2_pred[..., :2] = xywh_pred[..., :2] - xywh_pred[..., 2:] * 0.5
        x1y1x2y2_pred[..., 2:] = xywh_pred[..., :2] + xywh_pred[..., 2:] * 0.5
        
        return x1y1x2y2_pred

    def nms(self, dets, scores):
        # 这是一个最基本的基于python语言的nms操作
        # 这一代码来源于Faster RCNN项目
        """"Pure Python NMS baseline."""
        x1 = dets[:, 0]  #xmin
        y1 = dets[:, 1]  #ymin
        x2 = dets[:, 2]  #xmax
        y2 = dets[:, 3]  #ymax

        areas = (x2 - x1) * (y2 - y1)                    # bbox的宽w和高h
        order = scores.argsort()[::-1]                   # 按照降序对bbox的得分进行排序

        keep = []                                        # 用于保存经过筛的最终bbox结果
        while order.size > 0:
            i = order[0]                                 # 得到最高的那个bbox
            keep.append(i)                               
            xx1 = np.maximum(x1[i], x1[order[1:]])
            yy1 = np.maximum(y1[i], y1[order[1:]])
            xx2 = np.minimum(x2[i], x2[order[1:]])
            yy2 = np.minimum(y2[i], y2[order[1:]])

            w = np.maximum(1e-28, xx2 - xx1)
            h = np.maximum(1e-28, yy2 - yy1)
            inter = w * h

            # Cross Area / (bbox + particular area - Cross Area)
            ovr = inter / (areas[i] + areas[order[1:]] - inter)
            #reserve all the boundingbox whose ovr less than thresh
            inds = np.where(ovr <= self.nms_thresh)[0]
            order = order[inds + 1]

        return keep


    def postprocess(self, all_local, all_conf):
        # 后处理代码
        """
        Input:
            conf_pred: (Tensor) [H*W*KA, 1]
            cls_pred:  (Tensor) [H*W*KA, C]
            reg_pred:  (Tensor) [H*W*KA, 4]
        """
        anchors = self.anchor_boxes

        # (H x W x KA x C,)
        scores = (torch.sigmoid(conf_pred) * torch.softmax(cls_pred, dim=-1)).flatten()

        # Keep top k top scoring indices only.
        num_topk = min(self.topk, reg_pred.size(0))

        # torch.sort is actually faster than .topk (at least on GPUs)
        predicted_prob, topk_idxs = scores.sort(descending=True)
        topk_scores = predicted_prob[:num_topk]
        topk_idxs = topk_idxs[:num_topk]

        # filter out the proposals with low confidence score
        keep_idxs = topk_scores > self.conf_thresh
        scores = topk_scores[keep_idxs]
        topk_idxs = topk_idxs[keep_idxs]

        anchor_idxs = torch.div(topk_idxs, self.num_classes, rounding_mode='floor')
        labels = topk_idxs % self.num_classes

        reg_pred = reg_pred[anchor_idxs]
        anchors = anchors[anchor_idxs]

        # 解算边界框, 并归一化边界框: [H*W*KA, 4]
        bboxes = self.decode_boxes(anchors, reg_pred)
        
        # to cpu
        scores = scores.cpu().numpy()
        labels = labels.cpu().numpy()
        bboxes = bboxes.cpu().numpy()

        # NMS
        keep = np.zeros(len(bboxes), dtype=np.int)
        for i in range(self.num_classes):
            inds = np.where(labels == i)[0]
            if len(inds) == 0:
                continue
            c_bboxes = bboxes[inds]
            c_scores = scores[inds]
            c_keep = self.nms(c_bboxes, c_scores)
            keep[inds[c_keep]] = 1

        keep = np.where(keep > 0)
        bboxes = bboxes[keep]
        scores = scores[keep]
        labels = labels[keep]

        # 归一化边界框
        bboxes = bboxes / self.input_size
        bboxes = np.clip(bboxes, 0., 1.)

        return bboxes, scores, labels
    
    @torch.no_grad()
    def inference(self, x):
        # backbone
        _, c4, c5 = self.backbone(x)

        # neck
        p5 = self.convsets_1(c5)
        ## 处理c4特征
        p4 = self.reorg(self.route_layer(c4))
        ## 融合c4特征
        p5 = torch.cat([p4,p5], dim=1)

        # head
        p5 = self.convsets_2(p5)

        # prediction
        prediction = self.pred(p5)

        B, abC, H, W = prediction.size()
        KA = self.num_anchors
        NC = self.num_classes

        # [B, KA * C, H, W] -> [B, H, W, KA * C] -> [B, H*W, KA*C]
        prediction = prediction.permute(0, 2, 3, 1).contiguous().view(B, H*W, abC)

        # 从pred中分离出objectness预测、类别class预测、bbox的txtytwth预测  
        # [B, H*W, KA*C] -> [B, H*W, KA] -> [B, H*W*KA, 1]
        conf_pred = prediction[..., :KA].contiguous().view(B, -1, 1)
        # [B, H*W, KA*C] -> [B, H*W, KA*NC] -> [B, H*W*KA, NC]
        cls_pred = prediction[..., 1*KA : (1+NC)*KA].contiguous().view(B, -1, NC)
        # [B, H*W, KA*C] -> [B, H*W, KA*4] -> [B, H*W*KA, 4]
        txtytwth_pred = prediction[..., (1+NC)*KA:].contiguous().view(B, -1, 4) 
        
        # 测试时，默认batch为1，
        # 因此，我们不需要用batch这个维度，用[0]将其取走。
        conf_pred = conf_pred[0]            #[H*W*KA, 1]
        cls_pred = cls_pred[0]              #[H*W*KA, NC]
        txtytwth_pred = txtytwth_pred[0]    #[H*W*KA, 4]

        # 后处理
        bboxes, scores, labels = self.postprocess(conf_pred, cls_pred, txtytwth_pred)

        return bboxes, scores, labels
    
    def forward(self, x, target=None):
        # 前向推理的代码，主要分为两部分：
        # 训练部分：网络得到obj、cls和txtytwth三个分支的预测，然后计算loss；
        # 推理部分：输出经过后处理得到的bbox、cls和每个bbox的预测得分。
        if not self.trainable:
            return self.inference(x)
        else:
            # backbone
            _, c4, c5 = self.backbone(x)

            # neck
            p5 = self.convsets_1(c5)
            ## 处理c4特征
            p4 = self.reorg(self.route_layer(c4))
            ## 融合c4特征
            p5 = torch.cat([p4,p5], dim=1)
            
            # head
            p5 = self.convsets_2(p5)
            
            # prediction
            prediction = self.pred(p5)
            
            B, abC, H, W = prediction.size()
            KA = self.num_anchors
            NC = self.num_classes

            # [B, KA * C, H, W] -> [B, H, W, KA * C] -> [B, H*W, KA*C]
            prediction = prediction.permute(0, 2, 3, 1).contiguous().view(B, H*W, abC)

            # 从pred中分离出objectness预测、类别class预测、bbox的txtytwth预测  
            # [B, H*W, KA*C] -> [B, H*W, KA] -> [B, H*W*KA, 1]
            conf_pred = prediction[..., :KA].contiguous().view(B, -1, 1)
            # [B, H*W, KA*C] -> [B, H*W, KA*NC] -> [B, H*W*KA, NC]
            cls_pred = prediction[..., 1*KA : (1+NC)*KA].contiguous().view(B, -1, NC)
            # [B, H*W, KA*C] -> [B, H*W, KA*4] -> [B, H*W*KA, 4]
            txtytwth_pred = prediction[..., (1+NC)*KA:].contiguous().view(B, -1, 4) 
            
            # 添加IoU Loss进入计算
            ## 解算边界框
            x1y1x2y2_pred = (self.decode_boxes(self.anchor_boxes, txtytwth_pred) / self.input_size).view(-1, 4)
            x1y1x2y2_gt = targets[:, :, 7:].view(-1, 4)
            ## 计算预测框和真实框之间的IoU
            iou_pred = iou_score(x1y1x2y2_pred, x1y1x2y2_gt).view(B, -1, 1)
            ## 将IoU作为置信度的学习目标
            with torch.no_grad():
                gt_conf = iou_pred.clone()
            ## 将IoU作为置信度的学习目标 
            ## [obj, cls, txtytwth, x1y1x2y2] -> [conf, obj, cls, txtytwth]
            targets = torch.cat([gt_conf, targets[:, :, :7]], dim=2)
            
            # 计算损失
            (
                conf_loss,
                cls_loss,
                bbox_loss,
                total_loss
            ) = compute_loss(
                pred_conf=conf_pred, 
                pred_cls=cls_pred,
                pred_txtytwth=txtytwth_pred,
                targets=targets,
                )

            return conf_loss, cls_loss, bbox_loss, total_loss

IndentationError: expected an indented block (22223102.py, line 59)

In [None]:
# 保存模型的路径
path_to_save = os.path.join(args.save_folder, args.dataset, args.version)
os.makedirs(path_to_save, exist_ok=True)

print('start trainig, training on', device)

# 使用多尺度训练
print('use the multi-scale trick ...')
train_size = 640
val_size = 416

# 构建yolov2的配置文件
cfg = {
        # model
        'backbone': 'darknet19',
        'pretrained': True,
        'stride': 32,  # P5
        'reorg_dim': 64,
        'head_dim': 1024,
        # anchor size
        'anchor_size': {
            'voc': [[1.19, 1.98], [2.79, 4.59], [4.53, 8.92], [8.06, 5.29], [10.32, 10.65]],
            'coco': [[0.53, 0.79], [1.71, 2.36], [2.89, 6.44], [6.33, 3.79], [9.03, 9.74]]
            },
        # matcher
        'ignore_thresh': 0.5,
        }

# 构建dataset类
train_datasets = datasets.VOCDetection(root='../data', year='2012', image_set='train', download=True,
                     transform=transforms.Compose([
                         transforms.Resize(224),
                         transforms.ToTensor(),]))
test_dataloader = DataLoader(
    datasets.MNIST('../data', train=False, download=True,
                  transform=transforms.Compose([
                      transforms.Resize(224),
                      transforms.ToTensor(),
                  ])),
    batch_size=batch_size_test, shuffle=True, num_workers=num_workers
)