<a href="https://colab.research.google.com/github/jinyingtld/python/blob/main/MMDetection_Faster_R_CNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# [MMDetection Faster R-CNN 源码详解（一)](https://zhuanlan.zhihu.com/p/166248079)

本系列文章会详细剖析 MMDetection 是如何实现 Faster R-CNN 的，在本篇文章（一）中会详细的讲解 Faster R-CNN 中 backbone 相关的代码。

下图是 MMDetection 实现的 Faster R-CNN 的结果。R-50 代表的是 ResNet 50，X-101 代表的是 ResNeXt 101。所以在本篇文章中会详细的讲解这两个 backbone 的网络结构和在 MMDetection 中的源码实现。

![Image](https://pic3.zhimg.com/80/v2-a5c9e0fe5d3f2ff43e82a570eb813326_720w.jpg)


## 一、ResNet

![](https://pic4.zhimg.com/80/v2-1155f81ae526e777e575900f3e84b65b_720w.jpg)

上图是 ResNet 的网络结构，我们来详细的分析一下。

网络的输入是 224×224 的图片，首先会经过 stem 模块。stem 模块对于不同深度的 ResNet 使用的都是相同的结构，都会经过一个conv 7×7（channels：64，stride：2，padding：3） 的卷积。输出特征图的大小为 112×112。然后再次下采样，经过 Max Pooling 3×3（stride：2，padding：1） 得到 56 ×56 的特征图。当经过了 stem 模块，会完成 4 倍的下采样。(x: 224X224X3 -->conv:( 7x7 stride=2,padding=3 output=112x112)-->BN--->ReLu-->Maxpool(3x3, stride=2 padding=1 downsampling out=112/2=56)

接下来会进入 4 组堆叠的残差模块，除了经过第一组残差模块特征图大小以及通道数不发生变化以外，后面三组残差模块，每经过一组，特征图的大小都会缩小一半，通道数扩张为原来 2 倍。经过四组残差模块进行特征提取后，特征图的大小变为 7×7，然后对这 7×7 的特征图做全局均值池化(avg Pooling)。最后接上全连接层，将通道数变为类别个数进行输出。

在 ResNet 中最主要的结构就是残差模块，我们下面就详细解释一下残差模块的结构。

残差模块含有shortcut连接, 会将经过卷积操作的结果和未经过卷积操作的结果相加. (注意: 注意：这里是相加不是拼接）对于相加后的结果再用激活函数计算. 不同的深度的 ResNet，使用了不同的残差模块。深度小于50的ResNet, 使用BasicBlock, 深度大于等于50的ResNet使用Bottleneck. 下面我们就来分别看一下这两种结构。

##（一）BasicBlock（深度小于 50 的 ResNet 使用）
BasicBlock 用于深度小于 50 的 ResNet（ResNet 18、34）。它的结构如下图：
![](https://pic4.zhimg.com/80/v2-8ef93910b9b1d74f45cef442424b31fb_720w.jpg)
图三：BasicBlock 结构

对于每个BasicBlock, 有两个分支. 一个分支用来正常的卷积,另一个分支用于跳跃连接. 输入一张特征图后会经过两个3x3的卷积提取特征,然后将卷积操作后的特征图与输入的特征图相加，再用 ReLU 激活函数进行计算。在 BasicBlock 中，卷积的通道数不发生改变，可以直接将输入的特征图和卷积后的特征图相加。

上面的操作既不会改变特征图的大小，又不会改变特征图的通道数，那么我们如何进行下采样呢？换句话说，也就是上面的情况是针对同一个 stage 之内的 BasicBlock，对于不同 stage 之间的 BasicBlock 我们怎么处理呢（如：conv2_2 和 conv3_1）？

我们对每个stage中的第一个block进行更改.将第一个3x3的卷积的步长变为原来的2倍, 通过数也扩大为原来的2倍,padding不变还为1. 这样的化经过卷积后的特征图的大小变为原来的1/2,通道数会扩张为原来的2倍. 对于shortcut分支,使用步长为2的 1×1 卷积用于下采样和扩张通道数，这样特征图的大小变成原来的 1/2，通道数也会扩张为原来的 2 倍。和卷积分支的输出特征图形状相同。然后我们将两个分支的结果相加，再通过 ReLU 激活函数即可。如下图：

![](https://pic2.zhimg.com/v2-e849a9d912716e1c213fb5db3bb76a45_r.jpg)图四：stage 和 stage 之间的 BasicBlock

当然因为 stem 和第一个 stage 之间的通道数都是 64，且第一个 stage 不需要进行下采样。所以，我们使用图三的 block 即可。]

我们来看一下源码：


In [None]:
import torch.nn as nn
import torch.utils.checkpoint as cp
from mmcv.cnn import (build_conv_layer, build_norm_layer, build_plugin_layer, constant_init, kaiming_init)
from mmcv.runner import load_checkpoint
from torch.nn.modules.batchnorm import _BatchNorm

from mmdet.utils import get_root_logger
from ..builder import BACKBONES
from ..utils import ResLayer

class BasicBlock(nn.Module):
    """ResNet 18, 34 使用的block"""
    # 输出通道为输入通道的倍数.(输出通道数 == 输入通道数)
    expansion = 1

    def __init__(self,
                 inplanes,
                 planes,
                 stride=1,
                 dilation=1,
                 downsample=None
                 style='pytorch',
                 with_cp=False,
                 conf_cfg=None,
                 norm_cfg=dict(type='BN'),
                 dcn=None,
                 plugins=None):
        super(BasicBlock, self).__init__()
        assert dcn is None, 'Not implemented yet.'
        assert plugins is None, 'Not implemented yet.'
        # conv3x3 ---> bn1 --> relu --> conv3x3 ---> bn2 --> relu

        #bn1, bn2
        self.norm1_name, norm1 = build_norm_layer(norm_cfg, planes, postfix=1)
        self.norm2_name, norm2 = build_norm_layer(norm_cfg, planes, postfix=2)

        # 当conv1: 3x3 conv 
        # 当conv 为3x3 且padding = dilation时, 原特征图大小只和 stride 有关
        self.conv1 = build_conv_layer(
            conv_cfg,
            inplanes,
            planes,
            3,
            stride=stride,
            padding=dilation,
            dilation=dilation,
            bias=False)
        self.add_module(self.norm1_name,norm1)

        # conv2: 3x3 conv, stride = 1, padding = 1.
        self.conv2 = build_conv_layer(
            conv_cfg, planes, planes, 3, padding=1, bias=False)
        self.add_module(self.norm2_name, norm2)

        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample
        self.stride = stride
        self.dilation = dilation
        self.with_cp = with_cp

    @property
    def norm1(self):
        """nn.Module: normalization layer after the first convolution layer"""
        return getattr(self, self.norm1_name)
    
    @property
    def norm2(self):
        """nn.Module: normalization layer after the second convolution layer"""
        return getattr(self, self.norm2_name)

    def forward(self, x):
        """Forward function."""
        def _inner_forward(x):
            identity = x

            out = self.conv1(x)
            out = self.norm1(out)
            out = self.relu(out)

            out = self.conv2(out)
            out = self.norm2(out)

            # 需要保证identity 和 x的宽度相同.
            if self.downsample is not None:
                identity = self.downsample(x)
            
            # h(x) = f(x) + x
            out += identity

            return out

        # 加载预训练模型并且需要求导的时候, 使用checkpoint用时间换取空间.
        # 这是因为加载权重后可以训练的epoch数少, 可以考虑时间换取空间.
        if self.with_cp and x.requires_grad:
            # 使用 checkpoint 不保存中间计算的激活值, 在反向传播中重新计算一次中间激活值。
            # 即重新运行一次检查点部分的前向传播，这是一种以时间换空间（显存）的方法。
            out = cp.checkpoint(_inner_forward, x)
        # 不加载权重，从零开始训练。不使用 ckpt，因为训练慢。
        else:
            out = _inner_forward(x)

         # 注意：f(x) + x 之后再进行 relu
        out = self.relu(out)

        return out


## （二）Bottleneck（深度大于等于 50 的 ResNet 使用）

Bottleneck 用于深度大于等于50de ResNet (ResNet 50, 101, 152). 它的结构如下图：

![](https://pic2.zhimg.com/80/v2-b37d119bf676d64d30e0f36a72824999_720w.jpg)图五：Bottleneck 结构

对于 Bottleneck 来说，它会先使用 1×1 的卷积对输入的通道数进行压缩，然后再使用 3×3 的卷积进行卷积，最后使用 1×1 的卷积恢复通道数。然后将输入与卷积的结果相加并通过激活函数进行计算。

对于上面的操作，特征图的大小和输入输出的通道数不发生改变。如果需要下采样。需要将每个 stage 的第一个 Bottleneck 中的 3×3 卷积的步长和通道数设置为原来的 2 倍。对于 shortcut 分支，我们使用 1×1 步长为 2，通道数为原来 2 倍的卷积核进行下采样。如下图：

![](https://pic1.zhimg.com/80/v2-61b2c58524945a7af86b33b95c6f35ac_720w.jpg)
图六：stage 和 stage 之间的 BasicBlock

stem 和第一个 stage 之前。只需要扩大通道数不需要下采样，所以，我们只用将上图（图六）的 Bottleneck 中的 3×3 的卷积的步长设置为 1 即可。

我们来看一下源码：


In [None]:
class Bottleneck(nn.Module):
    """ResNet 50, 101, 152 使用的 block

    Args:
        style:(str) 'pytorch 或 'caffe'.
                    如果使用 'pytorch', block 中 stride 为 2 的卷积层是 3x3 conv, stride=2
                    如果使用 'caffe',   block 中 stride 为 2 的卷积层是 1x1 conv, stride=2
    """
    # 输出通道数为输入通道数的倍数. (输出通道数 == 4 × 输入通道数)
    expansion = 4

    def __init__(self,
                 inplanes,
                 planes,
                 stride=1,
                 dilation=1,
                 downsample=None,
                 style='pytorch',
                 with_cp=False,
                 conv_cfg=None,
                 norm_cfg=dict(type='BN'),
                 dcn=None,
                 plugins=None):
        super(Bottleneck, self).__init__()
        assert style in ['pytorch', 'caffe']
        assert dcn is None or isinstance(dcn, dict)
        assert plugins is None or isinstance(plugins, list)
        if plugins is not None:
            allowed_position = ['after_conv1', 'after_conv2', 'after_conv3']
            assert all(p['position'] in allowed_position for p in plugins)

        self.inplanes = inplanes
        self.planes = planes
        self.stride = stride
        self.dilation = dilation
        self.style = style
        self.with_cp = with_cp
        self.conv_cfg = conv_cfg
        self.norm_cfg = norm_cfg
        self.dcn = dcn
        self.with_dcn = dcn is not None
        self.plugins = plugins
        self.with_plugins = plugins is not None

        if self.with_plugins:
            # collect plugins for conv1/conv2/conv3
            self.after_conv1_plugins = [
                plugin['cfg'] for plugin in plugins
                if plugin['position'] == 'after_conv1'
            ]
            self.after_conv2_plugins = [
                plugin['cfg'] for plugin in plugins
                if plugin['position'] == 'after_conv2'
            ]
            self.after_conv3_plugins = [
                plugin['cfg'] for plugin in plugins
                if plugin['position'] == 'after_conv3'
            ]

        if self.style == 'pytorch':
            self.conv1_stride = 1
            self.conv2_stride = stride
        else:
            self.conv1_stride = stride
            self.conv2_stride = 1

        # conv1x1 --> bn1 --> relu
        # conv3x3 --> bn2 --> relu
        # conv1x1 --> bn3
        self.norm1_name, norm1 = build_norm_layer(norm_cfg, planes, postfix=1)
        self.norm2_name, norm2 = build_norm_layer(norm_cfg, planes, postfix=2)
        self.norm3_name, norm3 = build_norm_layer(
            norm_cfg, planes * self.expansion, postfix=3
        )

        self.conv1 = build_conv_layer(
            conv_cfg,
            inplanes,
            planes,
            kernel_size=1,
            stride=self.conv1_stride,
            bias=False)
        self.add_module(self.norm1_name, norm1)
        fallback_on_stride = False
        if self.with_dcn:
            fallback_on_stride = dcn.pop('fallback_on_stride', False)
        if not self.with_dcn or fallback_on_stride:
            self.conv2 = build_conv_layer(
                conv_cfg,
                planes,
                planes,
                kernel_size=3,
                stride=self.conv2_stride,
                padding=dilation,
                dilation=dilation,
                bias=False)
        else:
            assert self.conv_cfg is None, 'conv_cfg must be None for DCN'
            self.conv2 = build_conv_layer(
                dcn,
                planes,
                planes,
                kernel_size=3,
                stride=self.conv2_stride,
                padding=dilation,
                dilation=dilation,
                bias=False)

        self.add_module(self.norm2_name, norm2)
        self.conv3 = build_conv_layer(
            conv_cfg,
            planes,
            planes * self.expansion,
            kernel_size=1,
            bias=False)
        self.add_module(self.norm3_name, norm3)

        self.relu = nn.ReLU(inplace=True)
        self.downsample = downsample 

        if self.with_plugins:
            self.after_conv1_plugin_names = self.make_block_plugins(
                planes, self.after_conv1_plugins)
            self.after_conv2_plugin_names = self.make_block_plugins(
                planes, self.after_conv2_plugins)
            self.after_conv3_plugin_names = self.make_block_plugins(
                planes * self.expansion, self.after_conv3_plugins)

    def make_block_plugins(self, in_channels, plugins):
        """make plugins for block.

        Args:
            in_channels (int): Input channels of plugin.
            plugins (list[dict]): List of plugins cfg to build.

        Returns:
            list[str]: List of the names of plugin.
        """
        assert isinstance(plugins, list)
        plugin_names = []
        for plugin in plugins:
            plugin = plugin.copy()
            name, layer = build_plugin_layer(
                plugin,
                in_channels=in_channels,
                postfix=plugin.pop('postfix', ''))
            assert not hasattr(self, name), f'duplicate plugin {name}'
            self.add_module(name, layer)
            plugin_names.append(name)
        return plugin_names

    def forward_plugin(self, x, plugin_names):
        out = x
        for name in plugin_names:
            out = getattr(self, name)(x)
        return out

    @property
    def norm1(self):
        """nn.Module: normalization layer after the first convolution layer"""
        return getattr(self, self.norm1_name)

    @property
    def norm2(self):
        """nn.Module: normalization layer after the second convolution layer"""
        return getattr(self, self.norm2_name)

    @property
    def norm3(self):
        """nn.Module: normalization layer after the third convolution layer"""
        return getattr(self, self.norm3_name)
    
    def forward(self, x):
        """Forward function."""

        def _inner_forward(x):
            identity = x

            out = self.conv1(x)
            out = self.norm1(out)
            out = self.relu(out)

            if self.with_plugins:
                out = self.forward_plugin(out, self.after_conv1_plugin_names)

            out = self.conv2(out)
            out = self.norm2(out)
            out = self.relu(out)

            if self.with_plugins:
                out = self.forward_plugin(out, self.after_conv2_plugin_names)

            out = self.conv3(out)
            out = self.norm3(out)

            if self.with_plugins:
                out = self.forward_plugin(out, self.after_conv3_plugin_names)

            if self.downsample is not None:
                identity = self.downsample(x)

            out += identity

            return out

        if self.with_cp and x.requires_grad:
            out = cp.checkpoint(_inner_forward, x)
        else:
            out = _inner_forward(x)

        out = self.relu(out)

        return out


## 下面我们来看一下 ResNet 类的源码：

In [None]:
class ResNet(nn.Moduel):
    """ResNet backbone.

    Args:
        depth:                     (int)   ResNet 的深度, 可以是 {18, 34, 50, 101, 152}.
        in_channels:               (int)   输入图像的通道数(默认: 3).
        stem_channels:             (int)   stem 的通道数(默认: 64).
        base_channels:             (int)   ResNet 的 res layer 的基础通道数(默认: 64).
        num_stages:                (int)   使用 ResNet 的 stage 数量(默认: 4).
        strides:         (Sequence[int])   每个 stage 的第一个 block 的 stride, 如果为 2 进行 2 倍下采样.
        dilations:       (Sequence[int])   每个 stage 中所有 block 的第一个卷积层的 dilation.
        out_indices:     (Sequence[int])   需要输出的 stage 的索引.
        style:                     (str)   'pytorch' 或 'caffe'.
                                           如果使用 'pytorch', block 中 stride 为 2 的卷积层是 3x3 conv2, stride=2
                                           如果使用 'caffe',   block 中 stride 为 2 的卷积层是 1x1 conv1, stride=2
        deep_stem:                (bool)   如果为 True, 将 stem 的 7x7 conv 替换为 3 个 3x3 conv.
        avg_down:                 (bool)   在下采样的时候使用 Avg pool 2x2 stride=2 代替带步长的卷积.
        frozen_stages:             (int)   冻结的 stage 数(停止更新梯度, 并开启eval模式), -1 代表不冻结.
        conv_cfg:                 (dict)   构建 conv 的 config.
        norm_cfg:                 (dict)   构建 norm 的 config.
        norm_eval:                (bool)   是否设置 norm 层为 eval 模式. 即冻结参数状态(mean, var).
        dcn:                      (dict)   构建 DCN 的 config.
        stage_with_dcn: (Sequence[bool])   需要使用 DCN 的 stage.
        plugins:            (list[dict])   为 stage 提供插件.
        with_cp:                  (bool)   是否加载 checkpoint. 使用 checkpoint 会节省一部分内存, 同时会减少训练时间.
        zero_init_residual:       (bool)   是否使用 0 对所有 block 中的最后一个 norm 层初始化, 使其为恒等映射.

    Example:
        >>> from mmdet.models import ResNet
        >>> import torch
        >>> self = ResNet(depth=18)
        >>> self.eval()
        >>> inputs = torch.rand(1, 3, 32, 32)
        >>> level_outputs = self.forward(inputs)
        >>> for level_out in level_outputs:
        ...     print(tuple(level_out.shape))
        (1, 64, 8, 8)
        (1, 128, 4, 4)
        (1, 256, 2, 2)
        (1, 512, 1, 1)
    """
    arch_settings = {
        18: (BasicBlock, (2, 2, 2, 2)),
        34: (BasicBlock, (3, 4, 6, 3)),
        50: (Bottleneck, (3, 4, 6, 3)),
        101: (Bottleneck, (3, 4, 23, 3)),
        152: (Bottleneck, (3, 8, 36, 3))
    }

    def __init__(self,
                 depth,
                 in_channels=3,
                 stem_channels=64,
                 base_channels=64,
                 num_stages=4,
                 strides=(1, 2, 2, 2),
                 dilations=(1, 1, 1, 1),
                 out_indices=(0, 1, 2, 3),
                 style='pytorch',
                 deep_stem=False,
                 avg_down=False,
                 frozen_stages=-1,
                 conv_cfg=None,
                 norm_cfg=dict(type='BN', requires_grad=True),
                 norm_eval=True,
                 dcn=None,
                 stage_with_dcn=(False, False, False, False),
                 plugins=None,
                 with_cp=False,
                 zero_init_residual=True):
        super(ResNet, self).__init__()
        # ========================== 初始化属性 =============================
        if depth not in self.arch_settings:
            raise KeyError(f'invalid depth {depth} for resnet')
        self.depth = depth
        self.stem_channels = stem_channels
        self.base_channels = base_channels
        self.num_stages = num_stages
        assert num_stages >= 1 and num_stages <= 4
        self.strides = strides
        self.dilations = dilations
        assert len(strides) == len(dilations) == num_stages
        self.out_indices = out_indices
        assert max(out_indices) < num_stages
        self.style = style
        self.deep_stem = deep_stem
        self.avg_down = avg_down
        self.frozen_stages = frozen_stages
        self.conv_cfg = conv_cfg
        self.norm_cfg = norm_cfg
        self.with_cp = with_cp
        self.norm_eval = norm_eval
        self.dcn = dcn
        self.stage_with_dcn = stage_with_dcn
        if dcn is not None:
            assert len(stage_with_dcn) == num_stages
        self.plugins = plugins
        self.zero_init_residual = zero_init_residual
        # ===================================================================
        self.block, stage_blocks = self.arch_settings[depth]
        self.stage_blocks = stage_blocks[:num_stages]

        # stem 层
        self.inplanes = stem_channels
        self._make_stem_layer(in_channels, stem_channels)

        # res 层
        self.res_layers = []
        for i, num_blocks in enumerate(self.stage_blocks):
            stride = strides[i]
            dilation = dilations[i]
            dcn = self.dcn if self.stage_with_dcn[i] else None
            if plugins is not None:
                stage_plugins = self.make_stage_plugins(plugins, i)
            else:
                stage_plugins = None
            planes = base_channels * 2**i
            res_layer = self.make_res_layer(
                block=self.block,
                inplanes=self.inplanes,
                planes=planes,
                num_blocks=num_blocks,
                stride=stride,
                dilation=dilation,
                style=self.style,
                avg_down=self.avg_down,
                with_cp=with_cp,
                conv_cfg=conv_cfg,
                norm_cfg=norm_cfg,
                dcn=dcn,
                plugins=stage_plugins)
            self.inplanes = planes * self.block.expansion
            layer_name = f'layer{i + 1}'
            self.add_module(layer_name, res_layer)
            self.res_layers.append(layer_name)

        self._freeze_stages()

        self.feat_dim = self.block.expansion * base_channels * 2**(
            len(self.stage_blocks) - 1)

    def make_stage_plugins(self, plugins, stage_idx):
        """Make plugins for ResNet ``stage_idx`` th stage.

        Currently we support to insert ``context_block``,
        ``empirical_attention_block``, ``nonlocal_block`` into the backbone
        like ResNet/ResNeXt. They could be inserted after conv1/conv2/conv3 of
        Bottleneck.

        An example of plugins format could be:

        Examples:
            >>> plugins=[
            ...     dict(cfg=dict(type='xxx', arg1='xxx'),
            ...          stages=(False, True, True, True),
            ...          position='after_conv2'),
            ...     dict(cfg=dict(type='yyy'),
            ...          stages=(True, True, True, True),
            ...          position='after_conv3'),
            ...     dict(cfg=dict(type='zzz', postfix='1'),
            ...          stages=(True, True, True, True),
            ...          position='after_conv3'),
            ...     dict(cfg=dict(type='zzz', postfix='2'),
            ...          stages=(True, True, True, True),
            ...          position='after_conv3')
            ... ]
            >>> self = ResNet(depth=18)
            >>> stage_plugins = self.make_stage_plugins(plugins, 0)
            >>> assert len(stage_plugins) == 3

        Suppose ``stage_idx=0``, the structure of blocks in the stage would be:

        .. code-block:: none

            conv1-> conv2->conv3->yyy->zzz1->zzz2

        Suppose 'stage_idx=1', the structure of blocks in the stage would be:

        .. code-block:: none

            conv1-> conv2->xxx->conv3->yyy->zzz1->zzz2

        If stages is missing, the plugin would be applied to all stages.

        Args:
            plugins (list[dict]): List of plugins cfg to build. The postfix is
                required if multiple same type plugins are inserted.
            stage_idx (int): Index of stage to build

        Returns:
            list[dict]: Plugins for current stage
        """
        stage_plugins = []
        for plugin in plugins:
            plugin = plugin.copy()
            stages = plugin.pop('stages', None)
            assert stages is None or len(stages) == self.num_stages
            # whether to insert plugin into current stage
            if stages is None or stages[stage_idx]:
                stage_plugins.append(plugin)

        return stage_plugins

    def make_res_layer(self, **kwargs):
        """Pack all blocks in a stage into a ``ResLayer``."""
        return ResLayer(**kwargs)

    @property
    def norm1(self):
        """nn.Module: the normalization layer named "norm1" """
        return getattr(self, self.norm1_name)

     def _make_stem_layer(self, in_channels, stem_channels):
        # 使用 deep stem, 即 3 个 3x3 conv.
        if self.deep_stem:
            self.stem = nn.Sequential(
                build_conv_layer(
                    self.conv_cfg,
                    in_channels,
                    stem_channels // 2,
                    kernel_size=3,
                    stride=2,
                    padding=1,
                    bias=False),
                build_norm_layer(self.norm_cfg, stem_channels // 2)[1],
                nn.ReLU(inplace=True),
                build_conv_layer(
                    self.conv_cfg,
                    stem_channels // 2,
                    stem_channels // 2,
                    kernel_size=3,
                    stride=1,
                    padding=1,
                    bias=False),
                build_norm_layer(self.norm_cfg, stem_channels // 2)[1],
                nn.ReLU(inplace=True),
                build_conv_layer(
                    self.conv_cfg,
                    stem_channels // 2,
                    stem_channels,
                    kernel_size=3,
                    stride=1,
                    padding=1,
                    bias=False),
                build_norm_layer(self.norm_cfg, stem_channels)[1],
                nn.ReLU(inplace=True))
        # 使用原版的 stem 即 7x7 conv
        else:
            self.conv1 = build_conv_layer(
                self.conv_cfg,
                in_channels,
                stem_channels,
                kernel_size=7,
                stride=2,
                padding=3,
                bias=False)
            self.norm1_name, norm1 = build_norm_layer(
                self.norm_cfg, stem_channels, postfix=1)
            self.add_module(self.norm1_name, norm1)
            self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

    def _freeze_stages(self):
        # 冻结 stem 层 --> stage 为 0
        if self.frozen_stages >= 0:
            if self.deep_stem:
                self.stem.eval()
                for param in self.stem.parameters():
                    param.requires_grad = False
            else:
                self.norm1.eval()
                for m in [self.conv1, self.norm1]:
                    for param in m.parameters():
                        param.requires_grad = False

        # 冻结 res 层  --> stage 大于 0
        for i in range(1, self.frozen_stages + 1):
            m = getattr(self, f'layer{i}')
            m.eval()
            for param in m.parameters():
                param.requires_grad = False

    def init_weights(self, pretrained=None):
        """Initialize the weights in backbone.

        Args:
            pretrained (str, optional): Path to pre-trained weights.
                Defaults to None.
        """
        if isinstance(pretrained, str):
            logger = get_root_logger()
            load_checkpoint(self, pretrained, strict=False, logger=logger)
        elif pretrained is None:
            for m in self.modules():
                if isinstance(m, nn.Conv2d):
                    kaiming_init(m)
                elif isinstance(m, (_BatchNorm, nn.GroupNorm)):
                    constant_init(m, 1)

            if self.dcn is not None:
                for m in self.modules():
                    if isinstance(m, Bottleneck) and hasattr(
                            m.conv2, 'conv_offset'):
                        constant_init(m.conv2.conv_offset, 0)

            if self.zero_init_residual:
                for m in self.modules():
                    if isinstance(m, Bottleneck):
                        constant_init(m.norm3, 0)
                    elif isinstance(m, BasicBlock):
                        constant_init(m.norm2, 0)
        else:
            raise TypeError('pretrained must be a str or None')

    def forward(self, x):
        """Forward function."""
        if self.deep_stem:
            x = self.stem(x)
        else:
            x = self.conv1(x)
            x = self.norm1(x)
            x = self.relu(x)
        x = self.maxpool(x)
        outs = []
        for i, layer_name in enumerate(self.res_layers):
            res_layer = getattr(self, layer_name)
            x = res_layer(x)
            if i in self.out_indices:
                outs.append(x)
        return tuple(outs)

    def train(self, mode=True):
        """Convert the model into training mode while keep normalization layer
        freezed."""
        super(ResNet, self).train(mode)
        self._freeze_stages()
        if mode and self.norm_eval:
            for m in self.modules():
                # trick: eval have effect on BatchNorm only
                if isinstance(m, _BatchNorm):
                    m.eval()

[更多ResNeXt](https://zhuanlan.zhihu.com/p/166248079)

# [MMDetection Faster R-CNN 源码详解（二）](https://zhuanlan.zhihu.com/p/183098688)

本篇文章中，会重点剖析 MMDetection 实现的 Faster R-CNN 中的 neck 相关的源码。

## 一、FPN 的思想(Feature Pyramid Network)

在 MMDetection 中，Faster R-CNN 的 baseline 结合了 FPN，通过 FPN 会大大的提升检测的效果。为什么 FPN 有这么强的作用呢？我们从如下两点分析

1. 提取了多尺度的特征: 相对于单尺度的特征，多尺度的特征对大中小物体都能覆盖。对于 CNN 网络，浅层特征的特征途较大, 感受野较小, 方便检测小物体. 深层特征的特征图较小, 感受野较大, 方便检测大物体. 所以使用多尺度的特征对于不同大小的物体都有很好的覆盖效果。

2. 融合了各个尺度的特征：对于浅层的特征图，对位置敏感但是语义信息较弱。深层的特征图，对位置不敏感但是语义信息较强。那么我们就可以将当前的尺度与更深层的尺度的特征图相融合。这样既有深层的语义信息又对位置敏感。


## 二、Faster R-CNN 结合 FPN
接下来我们就来看一下如何将 Faster R-CNN 与 FPN 向结合，下图是将 Faster R-CNN 与 FPN 融合的网络结构。

![](https://pic1.zhimg.com/80/v2-4aee99e6842420bf682433ff4c0ff720_720w.jpg) Faster R-CNN 结合 FPN 的网络结构


拿到 backbone 的后四个尺度的输出，也就是 C2 ～ C5。以 resnet 50 为例，通道数分别为：256、512、1024、2048。先使用 1×1 的卷积，将 resnet 输出的特征图压缩通道数到 256，得到 P2 ～ P5。然后将浅层的特征图与深层的特征图相加（需要先上采样，保证特征图大小相同再相加）。再使用 3 × 3 的卷积核进行卷积，此步骤会将相加的特征进一步融合，得到 FPN 输出的特征，也就是 P2 ～ P5。在 P5 上使用 1 × 1 步长为 2 的 max pool 进行下采样，得到 P6。所以当数据经过 FPN 后，我们会拿到五个尺度的特征，也就是 P2 ～ P6。每个特征的通道数都是 256。因为每个尺度输出的特征图的通道数固定，所以我们就可以使用相同的头部进行预测了。

![](https://pic1.zhimg.com/80/v2-c0172be282021a1029f7b72b51079ffe_1440w.jpg)


![](https://pic2.zhimg.com/v2-e49ebcf931b5cf424ed311338f9ff35d_b.jpg)


## 三、RetinaNet 中的 FPN

![](https://pic1.zhimg.com/80/v2-b5a2faa28bd62f532d4fb405159f15dc_720w.jpg)

在 RetinaNet 中，利用的 backbone 的特征是 C3 ～ C5。将 C3 ～ C5 的特征图经过 1×1 的卷积压缩通道数，再将深层特征图与浅层特征图相加。然后使用 3 × 3 的卷积进一步融合特征。这样操作后，我们提取了从 P3 ～ P5 的特征。对于 C5，我们用 3 × 3 步长为 2，padding 为 1 的卷积进行两次下采样就得到 FPN 的 P6 和 P7。



## 四、FPN 源码分析
（一）Faster R-CNN 的 FPN 配置
紧接着，将x输入到neck中,也就是fpn结构中，我这里fpn的设置参数为

In [None]:
neck=dict(
        type='FPN',
        in_channels=[256, 512, 1024, 2048],
        out_channels=256,
        num_outs=5),

（二）RetinaNet 的 FPN 配置

In [None]:
neck=dict(
        type='FPN',
        in_channels=[256, 512, 1024, 2048],
        out_channels=256,
        start_level=1,
        add_extra_convs='on_input',
        num_outs=5),

结合上面的配置文件我们来看一下源码：


In [None]:
import torch.nn as nn
import torch.nn.functional as F
from mmcv.cnn import ConvModule, xavier_init

from mmdet.core import auto_fp16
from ..builder import NECKS

@NECKS.register_module()
class FPN(nn.Module):
    """Feature Pyramid Network.  (https://arxiv.org/abs/1612.03144)

    Args:
        in_channels:                    (List[int])     每个尺度的输入通道数, 也是 backbone 的输出通道数.
        out_channels:                  (int)     fpn 的输出通道数, 所有尺度的输出通道数相同, 都是一个值.
        num_outs:                      (int)     输出 stage 的个数.(可以附加额外的层, num_outs 不一定等于 in_channels)
        start_level:                   (int)     使用 backbone 的起始 stage 索引, 默认为 0.
        end_level:                     (int)     使用 backbone 的终止 stage 索引。
                                                默认为 -1, 代表到最后一层(包括)全使用.
        add_extra_convs:        (bool | str)     可以是 bool 或 str:
                                                (bool)  bool 代表是否添加额外的层.(默认值: False)
                                                        True:   在最顶层 feature map 上添加额外的卷积层,
                                                                具体的模式需要 extra_convs_on_inputs 指定.
                                                        False:  不添加额外的卷积层
                                                (str)   str  需要指定 extra convs 的输入的 feature map 的来源
                                                        'on_input':     最高层的 feature map 作为 extra 的输入
                                                        'on_lateral':   最高层的 lateral 结果 作为 extra 的输入
                                                        'on_output':    最高层的经过 conv 的 lateral 结果作为 extra 的输入
        extra_convs_on_inputs:  (bool, deprecated)  True  等同于 `add_extra_convs='on_input'
                                                    False 等同于 `add_extra_convs='on_output'
                                                    默认值为True
        relu_before_extra_convs:      (bool)     是否在 extra conv 前使用 relu. (默认值: False)
        no_norm_on_lateral:           (bool)     是否对 lateral 使用 bn. (默认值: False)
        conv_cfg:                     (dict)     构建 conv 层的 config 字典. (默认值: None)
        norm_cfg:                     (dict)     构建  bn  层的 config 字典. (默认值: None)
        act_cfg:                      (dict)     构建 activation  层的 config 字典. (默认值: None)
        upsample_cfg:                 (dict)     构建 interpolate 层的 config 字典. (默认值: `dict(mode='nearest')`)                                                    
        
        Example:
        >>> import torch
        >>> in_channels = [2, 3, 5, 7]
        >>> scales = [340, 170, 84, 43]
        >>> inputs = [torch.rand(1, c, s, s)
        ...           for c, s in zip(in_channels, scales)]
        >>> self = FPN(in_channels, 11, len(in_channels)).eval()
        >>> outputs = self.forward(inputs)
        >>> for i in range(len(outputs)):
        ...     print(f'outputs[{i}].shape = {outputs[i].shape}')
        outputs[0].shape = torch.Size([1, 11, 340, 340])
        outputs[1].shape = torch.Size([1, 11, 170, 170])
        outputs[2].shape = torch.Size([1, 11, 84, 84])
        outputs[3].shape = torch.Size([1, 11, 43, 43])
    """

    def __init__(self,
                 in_channels,
                 out_channels,
                 num_outs,
                 start_level=0,
                 end_level=-1,
                 add_extra_convs=False,
                 extra_convs_on_inputs=True,
                 relu_before_extra_convs=False,
                 no_norm_on_lateral=False,
                 conv_cfg=None,
                 norm_cfg=None,
                 act_cfg=None,
                 upsample_cfg=dict(mode='nearest')):
        super(FPN, self).__init__()
        assert isinstance(in_channels, list)
        self.in_channels = in_channels          # [256, 512, 1024, 2048]
        self.out_channels = out_channels        # 256
        self.num_ins = len(in_channels)         # 4
        self.num_outs = num_outs                # 5
        self.relu_before_extra_convs = relu_before_extra_convs  # False
        self.no_norm_on_lateral = no_norm_on_lateral            # False
        self.fp16_enabled = False
        self.upsample_cfg = upsample_cfg.copy()

        # end_level 是对 backbone 输出的尺度中使用的最后一个尺度的索引
        # 如果是 -1 表示使用 backbone 最后一个 feature map, 作为最终的索引.
        if end_level == -1:
            self.backbone_end_level = self.num_ins      #4
            # 因为还有 extra conv 所以存在 num_outs > num_ins - start_level 的情况
            assert num_outs >= self.num_ins - start_level
        else:
            # 如果 end_level < inputs, 说明不使用 backbone 全部的尺度, 并且不会提供额外的层.
            self.backbone_end_level = end_level
            assert end_level <= len(in_channels)
            assert num_outs == end_level - start_level

        self.start_level = start_level                      # 0
        self.end_level = end_level                          # -1
        self.add_extra_convs = add_extra_convs              # False
        assert isinstance(add_extra_convs, (str, bool))
        # add_extra_convs 可以是 bool 或 str
        # 1. add_extra_convs 是 str
        if isinstance(add_extra_convs, str):
            # 确保 add_extra_convs 是 'on_input', 'on_lateral' 或 'on_output'
            assert add_extra_convs in ('on_input', 'on_lateral', 'on_output')
        # 2. add_extra_convs 是 bool, 需要看 extra_convs_on_inputs
        elif add_extra_convs:
            if extra_convs_on_inputs:
                # For compatibility with previous release
                # TODO: deprecate `extra_convs_on_inputs`
                self.add_extra_convs = 'on_input'
            else:
                self.add_extra_convs = 'on_output'

        self.lateral_convs = nn.ModuleList()
        self.fpn_convs = nn.ModuleList()

        # 构建Lateral conv 和 fpn conv 
        for i in range(self.start_level, self.backbone_end_level):
            # 水平卷积(lateral conv): 1×1, C=256,
            l_conv = ConvModule(
                in_channels[i],
                out_channels,
                1,
                conv_cfg=conv_cfg,
                norm_cfg=norm_cfg if not self.no_norm_on_lateral else None,
                act_cfg=act_cfg,
                inplace=False)
            # fpn 输出卷积: 3×3, C=256, P=1
            fpn_conv = ConvModule(
                out_channels,
                out_channels,
                3,
                padding=1,
                conv_cfg=conv_cfg,
                norm_cfg=norm_cfg,
                act_cfg=act_cfg,
                inplace=False)
            
            self.lateral_convs.append(l_conv)
            self.fpn_convs.append(fpn_conv)

        # add extra conv layers (e.g., RetinaNet)
        extra_levels = num_outs - self.backbone_end_level + self.start_level
        # 只有 add_extra_convs 为 True 或 str 时才添加 extra_convs
        if self.add_extra_convs and extra_levels >= 1:
            for i in range(extra_levels):
                if i == 0 and self.add_extra_convs == 'on_input':
                    in_channels = self.in_channels[self.backbone_end_level - 1]
                else:
                    in_channels = out_channels
                # extra conv 是3x3步长为2, padding为1的卷积
                extra_fpn_conv = ConvModule(
                    in_channels,
                    out_channels,
                    3,
                    stride=2,
                    padding=1,
                    conv_cfg=conv_cfg,
                    norm_cfg=norm_cfg,
                    act_cfg=act_cfg,
                    inplace=False)
                self.fpn_convs.append(extra_fpn_conv)
    # default init_weights for conv(msra) and norm in ConvModule
    def init_weights(self):
        """Initialize the weights of FPN module."""
        # 使用xavier初始化卷积层
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                xavier_init(m, distribution='uniform')
    
    @auto_fp16()
    def forward(self, inputs):
        """Forward function."""
        assert len(inputs) == len(self.in_channels)

        # ====================== 进行水平计算(1x1卷积) ====================
        laterals = [
            lateral_conv(inputs[i + self.start_level])
            for i, lateral_conv in enumerate(self.lateral_convs)
        ]
        # ==============================================================

        # ========================== 计算 top-down =============================
        used_backbone_levels = len(laterals)
        # 自上至下将 laterals 里面的结果更新为经过 top-down 的结果.
        for i in range(used_backbone_levels - 1, 0, -1):
           # In some cases, fixing `scale factor` (e.g. 2) is preferred, but
            #  it cannot co-exist with `size` in `F.interpolate`.
            # 有 scale 的情况
            if 'scale_factor' in self.upsample_cfg:
                 # 因为range函数不包括右边的端点, 所以可以使用 i - 1
                 laterals[i-1] += F.interpolate(laterals[i], **self.upsample_cfg)
            # 没有 scale 的情况, 需要计算下层的 feature map 大小.
            else:
                 # 计算下层 feature map 大小
                prev_shape = laterals[i - 1].shape[2:]
                laterals[i - 1] += F.interpolate(
                    laterals[i], size=prev_shape, **self.upsample_cfg)
        # =====================================================================

        # ========================== 计算输出的结果 =============================
        # part 1: 计算所有 lateral 的输出的结果
        outs = [
            self.fpn_convs[i](laterals[i]) for i in range(used_backbone_levels)
        ]
        
        # part 2: 添加 extra levels
        if self.num_outs > len(outs):
            # 使用 max pool 获得更高层的输出信息, 如: Faster R-CNN, Mask R-CNN (4 lateral + 1 max pool)
            if not self.add_extra_convs:
                for i in range(self.num_outs - used_backbone_levels):
                    outs.append(F.max_pool2d(outs[-1], 1, stride=2))
            # 添加额外的卷积层获得高层输出信息, 如: RetinaNet (3 lateral + 2 conv3x3 stride2)
            else:
                # 'on_input':   最高层的 feature map 作为 extra 的输入
                if self.add_extra_convs == 'on_input':
                    extra_source = inputs[self.backbone_end_level - 1]
                # 'on_lateral': 最高层的 lateral 结果 作为 extra 的输入
                elif self.add_extra_convs == 'on_lateral':
                    extra_source = laterals[-1]
                # 'on_output':  最高层的经过 conv 的 lateral 结果作为 extra 的输入
                elif self.add_extra_convs == 'on_output':
                    extra_source = outs[-1]
                else:
                    raise NotImplementedError
                # 计算 input extra
                outs.append(self.fpn_convs[used_backbone_levels](extra_source))
                # 计算 extra
                for i in range(used_backbone_levels + 1, self.num_outs):
                    if self.relu_before_extra_convs:
                        outs.append(self.fpn_convs[i](F.relu(outs[-1])))
                    else:
                        outs.append(self.fpn_convs[i](outs[-1]))
        return tuple(outs)


# [MMDetection Faster R-CNN 源码详解（三)](https://zhuanlan.zhihu.com/p/184618997)

在上一小节中，我们介绍了 neck 部分的源码。在接下来的两篇文章中，会详细的剖析 Faster R-CNN 中 rpn 部分的源码。在本篇文章主要讲解 RPN 中的重点概念和原理。


## RPN（原理篇）

Faster R-CNN 最突出的贡献就在于提出了 Region Proposal Network（RPN），使用了卷积神经网络生成候选区域，替代了传统的 Selective Search 方法，使网络在速度和精度上都有了显著的提升。我们会从如下五个方面详解 RPN 中的重点概念和原理：

1. Anchor
2. BBox 编码
3. RPN 网络流程
4. RPN 网络训练
5. 生成候选区域

## 一、Anchor
对于RPN来说, 一个最重要的概念就是Anchor, 那么什么是Anchor? 将feature map上的一个个点,映射到哦原始图片上的一个像素点.以这个像素点为中心生成固定大小和长宽比的窗口, 那么这个窗口,就叫做anchor.

我们看下面的图片，对原始图片（下图左侧）下采样后我们得到了feature map(下图右侧). 把 feature map 上的每个格点，映射到原图，我们就会得到锚点。也就是右侧每个相同颜色的格子映射到左侧的橘色的点。其中左侧橘色的点代表了原图的一个像素点，也就是我们得到的锚点。因为有padding, 所以锚点不会再左侧网格的中心, 而是再左侧每个网格左上角的第一个像素点.

得到锚点后, 以每个锚点为中心, 对每个锚点生成固定大小和长宽比例的錨框(Anchor), 也就是左图中红紫蓝颜色的框. 那么当所有的各自都生成了錨框后, 錨框就会作为我们的先验, 遍布整张图片. (如下图左侧，为了方便可视化，我们将大小和长宽比设置为合适的大小）


![](https://pic3.zhimg.com/80/v2-8efb16e525ab70cdac979755b7aaabbe_720w.jpg)
图一：Anchor 的概念

记scale为feature maps上每个各点映射到原图后再进行缩放的系数, 锚框真实的大小为 base_size × scale，记 ratio 为 anchor 的纵横比（H : W）。

在 MMDetection 中，Faster R-CNN 结合了 FPN。所以每个尺度只设置了一个为 8 的 scale。ratio 为 {0.5, 1, 2}（纵横比 1 : 2、1 : 1、2 : 1）。如下图：

![](https://pic3.zhimg.com/80/v2-6826119a8fe48a0e12f46cb684e7baba_720w.jpg)图二：结合 FPN 后的 Anchor 设置

在 MMDetection 中，RPN 使用 AnchorGenerator 类生成 Anchor，下面是 AnchorGenerator 的设置。

In [None]:
anchor_generator=dict(
    type='AnchorGenerator',
    scales=[8],
    ratios=[0.5, 1.0, 2.0],
    straides=[4, 8, 16,32, 64]
),

在 AnchorGenerator 中，构造函数会调用gen_base_anchors 生成各个尺度的每个格点的anchor(如图二), 形状为(尺度个数,K,4). grid_anchors 是生成所有尺度 anchor 的方法，生成的 anchor 的形状为（尺度个数，H × W × K，4）。因为在 collect_fn 中有额外的 padding 操作来保证一个 batch 的图像大小相同（不是数据预处理里面的 padding），所以需要 valid_flags 方法筛选出哪些 anchor 是在 padding 以内的，valid_flags 函数的输出的形状为（尺度个数，H × W × K）代表有效 anchor 的 mask。下面我们来开看一看源码：

In [None]:
import mmcv
import numpy as np
import torch
from torch.nn.modules.utils import _pair

from .builder import ANCHOR_GENERATORS


@ANCHOR_GENERATORS.register_module()
class AnchorGenerator(object):
    """2D anchor-base 模型的 anchor 生成器

    Args:
        strides:      (list[int] | list[tuple[int, int]])     各尺度 feature map 相对于原图的步长 (下采样率).
        ratios:              (list[float])                    单个尺度中 anchor 的高宽比 (H : W) 的列表.
        scales:              (list[int] | None)               单个尺度中 anchor 的缩放比率 (扩张率).
                                                              不能与 `octave_base_scale` 和 `scales_per_octave` 一同设置.
        base_sizes:          (list[int] | None)               各个尺度基础 anchor 大小的列表.
                                                              如果没有给定, 将使用 strides 作为 base_sizes.
                                                              如果 stride 非正方形, 则采用最短 stride.
        scale_major:         (bool)            如果为 True 先列举完所有的 scales, 如果为 False, 先列举完所有的 ratios
        octave_base_scale:   (int)             RetinaNet 中的 base scale
        scales_per_octave:   (int)             每一个 grid 的 scale 个数. 在 RetinaNet 中会根据个数自动生成 scales
                                               使用 `octave_base_scale` 和 `scales_per_octave` 时, scale 应该为 None.
        centers:(list[tuple[float, float]] | None): anchor 的中心点, 默认为 None 代表使用 center_offset 来推断.
        center_offset (float):                      一个小数, 代表 center 的偏移量.

    Examples:
        >>> from mmdet.core import AnchorGenerator
        >>> self = AnchorGenerator([16], [1.], [1.], [9])
        >>> all_anchors = self.grid_anchors([(2, 2)], device='cpu')
        >>> print(all_anchors)
        [tensor([[-4.5000, -4.5000,  4.5000,  4.5000],
                [11.5000, -4.5000, 20.5000,  4.5000],
                [-4.5000, 11.5000,  4.5000, 20.5000],
                [11.5000, 11.5000, 20.5000, 20.5000]])]
        >>> self = AnchorGenerator([16, 32], [1.], [1.], [9, 18])
        >>> all_anchors = self.grid_anchors([(2, 2), (1, 1)], device='cpu')
        >>> print(all_anchors)
        [tensor([[-4.5000, -4.5000,  4.5000,  4.5000],
                [11.5000, -4.5000, 20.5000,  4.5000],
                [-4.5000, 11.5000,  4.5000, 20.5000],
                [11.5000, 11.5000, 20.5000, 20.5000]]), \
        tensor([[-9., -9., 9., 9.]])]
    """

     def __init__(self,
                 strides,
                 ratios,
                 scales=None,
                 base_sizes=None,
                 scale_major=True,
                 octave_base_scale=None,
                 scales_per_octave=None,
                 centers=None,
                 center_offset=0.):
        # faster_rcnn_fpn 配置:
        # 对每一个 level 使用单尺度.
        # strides: [4, 8, 16, 32, 64]
        # ratios:  [0.5, 1.0, 2.0]
        # scales:  [8]
        # base_sizes: None
        # scale_major:True
        # octave_base_scale:None
        # scales_per_octave:None
        # centers: None
        # center_offset: 0.0

        # center 和 center_offset 只能使用一个

        if center_offset != 0:
            assert centers is None, 'center cannot be set when center_offset' \
                f'!=0, {centers} is given.'
        # 确保 center_offset 是一个 0～1 的小数
        if not (0 <= center_offset <= 1):
            raise ValueError('center_offset should be in range [0, 1], '
                             f'{center_offset} is given.')
        if centers is not None:
            assert len(centers) == len(strides), \
                'The number of strides should be the same as centers, got ' \
                f'{strides} and {centers}'

        # _pair 函数:    如果是数转化为(数, 数) 的元祖, 如果是元祖不变.
        self.strides = [_pair(stride) for stride in strides]
        # 如果 base_sizes 为 None, 遍历所有尺度的 stride.
        # 使用当前尺度的 stride 的长宽最小值作为当前尺度的 base_size
        self.base_sizes = [min(stride) for stride in self.strides
                           ] if base_sizes is None else base_sizes

        assert len(self.base_sizes) == len(self.strides), \
            'The number of strides should be the same as base sizes, got ' \
            f'{self.strides} and {self.base_sizes}'

        # 确保 scales 和 octave_base_scale 与 scales_per_octave 不能同时出现.
        assert ((octave_base_scale is not None
                and scales_per_octave is not None) ^ (scales is not None)), \
            'scales and octave_base_scale with scales_per_octave cannot' \
            ' be set at the same time'

        # faster rcnn fpn: 单个尺度只有一个 scale --> [8]
         if scales is not None:
            self.scales = torch.Tensor(scales)
        # retinanet: 单个尺度 scale 为 [2 ^ 0, 2 ^ (1/3), 2 ^ (2/3)]
        elif octave_base_scale is not None and scales_per_octave is not None:
            octave_scales = np.array(
                [2**(i / scales_per_octave) for i in range(scales_per_octave)])
            # octave_scales: 4
            scales = octave_scales * octave_base_scale
            self.scales = torch.Tensor(scales)
        else:
            raise ValueError('Either scales or octave_base_scale with '
                             'scales_per_octave should be set')

        self.octave_base_scale = octave_base_scale
        self.scales_per_octave = scales_per_octave
        self.ratios = torch.Tensor(ratios)
        self.scale_major = scale_major
        self.centers = centers
        self.center_offset = center_offset
        self.base_anchors = self.gen_base_anchors()

    @property
    def num_base_anchors(self):
        """list[int]: 每个尺度的 grid 生成 anchor 的数量"""
        # eg. faster rcnn fpn [3, 3, 3, 3, 3]
        return [base_anchors.size(0) for base_anchors in self.base_anchors]

    @property
    def num_levels(self):
        """int: 有多少个尺度."""
        # eg. faster rcnn fpn: 5
        return len(self.strides)

    def gen_base_anchors(self):
        """生成多个尺度的 base anchors. (base anchor 是每个 grid 生成的 anchor)

        Returns:
            list(torch.Tensor):  所有尺度的 base anchor 的列表,
                                 其中每个尺度 base anchor 的形状为 (K, 4),
        """
        # base_sizes    [4, 8, 16, 32, 64]
        multi_level_base_anchors = []
        # 遍历每个尺度
        for i, base_size in enumerate(self.base_sizes):
            center = None
            if self.centers is not None:
                center = self.centers[i]
            multi_level_base_anchors.append(
                self.gen_single_level_base_anchors(
                    base_size,
                    scales=self.scales,
                    ratios=self.ratios,
                    center=center))
        return multi_level_base_anchors

    def gen_single_level_base_anchors(self,
                                      base_size,
                                      scales,
                                      ratios,
                                      center=None):
        """生成单尺度 base_anchors

        Args:
            base_size:  (int | float):              基础 anchor 大小.
            scales:     (torch.Tensor):             单尺度 anchor 的缩放, base_size * scale 为 anchor 真实大小.
            ratios:     (torch.Tensor):             单尺度 anchor 的高宽比 (H : W)
            center:     (tuple[float], optional):   单尺度的 base_anchor 的中心点, 默认为 None, 代表自动计算.

        Returns:
            torch.Tensor: 单尺度的 base_anchor
        """
        # base_size 依次为 4, 8, 16, 32, 64, 代表基础 anchor 大小.
        # scales:   [8]
        w = base_size
        h = base_size
        # 如果 center 为 None, 根据 center_offset 自动计算中心点坐标.
        if center is None:
            x_center = self.center_offset * w   # 0.0
            y_center = self.center_offset * h   # 0.0
        # 如果 center 不是 None, 使用指定的 center.
        else:
            x_center, y_center = center

        h_ratios = torch.sqrt(ratios)
        w_ratios = 1 / h_ratios
        # [n_ratios, n_scales]
        # 展平后为 [scale1_ratio1, scale1_ratio2, ..., scale2_ratio1, scale2_ratio2...]
        if self.scale_major:
            ws = (w * w_ratios[:, None] * scales[None, :]).view(-1)
            hs = (h * h_ratios[:, None] * scales[None, :]).view(-1)
        # [n_scales, n_ratios]
        # 展平后为 [ratio1_scale1, ratio1_scale2, ..., ratio2_scale1, ratio2_scale2...]
        else:
            ws = (w * scales[:, None] * w_ratios[None, :]).view(-1)
            hs = (h * scales[:, None] * h_ratios[None, :]).view(-1)

        # use float anchor and the anchor's center is aligned with the pixel center
        base_anchors = [
            x_center - 0.5 * ws, y_center - 0.5 * hs, x_center + 0.5 * ws,
            y_center + 0.5 * hs
        ]
        base_anchors = torch.stack(base_anchors, dim=-1)
        return base_anchors

    def _meshgrid(self, x, y, row_major=True):
        """Generate mesh grid of x and y.

        Args:
            x (torch.Tensor): Grids of x dimension.
            y (torch.Tensor): Grids of y dimension.
            row_major (bool, optional): Whether to return y grids first.
                Defaults to True.

        Returns:
            tuple[torch.Tensor]: The mesh grids of x and y.
        """
        xx = x.repeat(len(y))
        yy = y.view(-1, 1).repeat(1, len(x)).view(-1)
        if row_major:
            return xx, yy
        else:
            return yy, xx

    def grid_anchors(self, featmap_sizes, device='cuda'):
        """生成多尺度的 anchor

        Args:
            featmap_sizes:  (list[tuple]):  各个尺度的 feature map 大小.
            device:         (str):          anchor 存放的设备.

        Return:
            list[torch.Tensor]: 多尺度的 anchor, 每个尺度生成的 anchor 的形状为: [N, 4],
                                其中 N = feature map 高 × feature map 宽 × 每个格点的 anchor 个数
        """
        assert self.num_levels == len(featmap_sizes)
        multi_level_anchors = []
        for i in range(self.num_levels):
            anchors = self.single_level_grid_anchors(
                self.base_anchors[i].to(device),
                featmap_sizes[i],
                self.strides[i],
                device=device)
            multi_level_anchors.append(anchors)
        return multi_level_anchors

    def single_level_grid_anchors(self,
                                  base_anchors,
                                  featmap_size,
                                  stride=(16, 16),
                                  device='cuda'):
        """生成单个尺度的 anchor

        Note:
            此方法会被 ``self.grid_anchors`` 调用

        Args:
            base_anchors:   (torch.Tensor):         单尺度的 base_anchors
            featmap_size:   (tuple[int]):           feature map 的大小
            stride:         (tuple[int], optional): feature map 的 stride, 默认为 (16, 16)
            device:         (str, optional):        存放 tensor 的设备, 默认为 'cuda'

        Returns:
            torch.Tensor: 整个 feature map 生成的 anchor, 形状为 (N, 4)
        """
        # 获取 feature map 的高和宽
        feat_h, feat_w = featmap_size
        # 生成锚点的 x, y 坐标范围
        shift_x = torch.arange(0, feat_w, device=device) * stride[0]
        shift_y = torch.arange(0, feat_h, device=device) * stride[1]
        # 生成网格
        shift_xx, shift_yy = self._meshgrid(shift_x, shift_y)
        # 生成锚点
        shifts = torch.stack([shift_xx, shift_yy, shift_xx, shift_yy], dim=-1)
        shifts = shifts.type_as(base_anchors)

        # 锚点加上偏移得到所有的 anchors
        # (K, H × W, 4)
        all_anchors = base_anchors[None, :, :] + shifts[:, None, :]
        # (K × H × W, 4)
        all_anchors = all_anchors.view(-1, 4)
        # first A rows correspond to A anchors of (0, 0) in feature map,
        # then (0, 1), (0, 2), ...
        return all_anchors

    def valid_flags(self, featmap_sizes, pad_shape, device='cuda'):
        """生成多个尺度的 valid flags
            valid flags 标记在图片内的有效的 anchor, 因为图像有 padding 操作, anchor 可能在原始图片之外.

        Args:
            featmap_sizes:   (list(tuple)): 多尺度的 feature map 的大小
            pad_shape:       (tuple):       填充后的图像的大小
            device:          (str):         存放 tensor 的设备

        Return:
            list(torch.Tensor): 多尺度的 valid flags, 形状为 (n_levels, H × W × K)
        """
        assert self.num_levels == len(featmap_sizes)
        multi_level_flags = []
        for i in range(self.num_levels):
            anchor_stride = self.strides[i]
            feat_h, feat_w = featmap_sizes[i]
            # 经过 pad 的 h 和 w.
            h, w = pad_shape[:2]
            # 有效的宽高
            valid_feat_h = min(int(np.ceil(h / anchor_stride[0])), feat_h)
            valid_feat_w = min(int(np.ceil(w / anchor_stride[1])), feat_w)
            flags = self.single_level_valid_flags((feat_h, feat_w),
                                                  (valid_feat_h, valid_feat_w),
                                                  self.num_base_anchors[i],
                                                  device=device)
            multi_level_flags.append(flags)
        return multi_level_flags

    def single_level_valid_flags(self,
                                 featmap_size,
                                 valid_size,
                                 num_base_anchors,
                                 device='cuda'):
        """Generate the valid flags of anchor in a single feature map.

        Args:
            featmap_size (tuple[int]): The size of feature maps.
            valid_size (tuple[int]): The valid size of the feature maps.
            num_base_anchors (int): The number of base anchors.
            device (str, optional): Device where the flags will be put on.
                Defaults to 'cuda'.

        Returns:
            torch.Tensor: The valid flags of each anchor in a single level
                feature map.
        """
        feat_h, feat_w = featmap_size
        valid_h, valid_w = valid_size
        assert valid_h <= feat_h and valid_w <= feat_w
        valid_x = torch.zeros(feat_w, dtype=torch.bool, device=device)
        valid_y = torch.zeros(feat_h, dtype=torch.bool, device=device)
        valid_x[:valid_w] = 1
        valid_y[:valid_h] = 1
        valid_xx, valid_yy = self._meshgrid(valid_x, valid_y)
        valid = valid_xx & valid_yy
        # (H × W, K), K 为每个格点生成的 anchor 个数. --. (H × W × K)
        valid = valid[:, None].expand(valid.size(0),
                                      num_base_anchors).contiguous().view(-1)
        return valid

    def __repr__(self):
        """str: a string that describes the module"""
        indent_str = '    '
        repr_str = self.__class__.__name__ + '(\n'
        repr_str += f'{indent_str}strides={self.strides},\n'
        repr_str += f'{indent_str}ratios={self.ratios},\n'
        repr_str += f'{indent_str}scales={self.scales},\n'
        repr_str += f'{indent_str}base_sizes={self.base_sizes},\n'
        repr_str += f'{indent_str}scale_major={self.scale_major},\n'
        repr_str += f'{indent_str}octave_base_scale='
        repr_str += f'{self.octave_base_scale},\n'
        repr_str += f'{indent_str}scales_per_octave='
        repr_str += f'{self.scales_per_octave},\n'
        repr_str += f'{indent_str}num_levels={self.num_levels}\n'
        repr_str += f'{indent_str}centers={self.centers},\n'
        repr_str += f'{indent_str}center_offset={self.center_offset})'
        return repr_str


## 二、BBox 编码

让网络直接回归边框的坐标或者宽高比较困难，所以才有了 Anchor 的概念。在训练时，对 Anchor 进行编码。编码成 Anchor 与 Ground Truth 的中心点偏移和宽高的修正量，让网络学习。这样网络学习到的实际上是如何对 Anchor 进行修正。在测试时，将网络输出的偏移量解码成 BBox 的坐标即可。

那么如何将 Anchor 编码为修正量呢？

记 t 为训练目标，x, y 代表 bbox 的中心坐标，w, h 代表 bbox 的宽和高。不带下标的代表是 ground truth，带下标 a 的代表的是 anchor。编码公式如下：

$t_x = (x-x_0)/w_a$ $\space\space\space\space\space\space\space\space$  $t_y = (y-y_0/w_a$  (1)

$t_w = log(w/w_a)$ $\space\space\space\space\space\space\space\space\space\space\space\space\space\space\space\space$  $t_h = log(h/h_a$  (2)


记 $t^*$为网络的预测值，由（1）（2）可推断出我们会得到如下的结果：

$t^*_x = (x^*-x_0)/w_a$ $\space\space\space\space\space\space\space\space$  $t^*_y = (y^*-y_0/w_a$  (3)

$t^*_w = log(w^*/w_a)$ $\space\space\space\space\space\space\space\space\space\space\space\space\space\space\space\space$  $t_h = log(h^*/h_a$  (4)

将（3）（4）式变形，可以得到解码公式如下：

$x^* = x_a + w_a + t^*_x $ $\space\space\space\space\space\space\space\space\space\space\space\space\space\space$ $y^* = y_a + h_at^*_y$  (5)

$w^* = w_ae^{t^*_w} $ $\space\space\space\space\space\space\space\space\space\space\space\space\space\space$ $h^* = h_ae^{t^*_h} $ (6)

在 RPN 中使用 DeltaXYWHBBoxCoder 编码 bbox，具体设置如下。

In [None]:
    bbox_coder=dict(
            type='DeltaXYWHBBoxCoder',
            target_means=[.0, .0, .0, .0],
            target_stds=[1.0, 1.0, 1.0, 1.0]),


源码上，我们先来看一下 encode 的核心代码，encode 的公式是上面的（1）和（2），代码如下：

In [None]:
    def bbox2delta(proposals, gt, means=(0., 0., 0., 0.), stds=(1., 1., 1., 1.)):
        """生成 bbox 与 gt 的修正量

        Args:
            proposals:      (Tensor):           需要转换的 bbox, 形状为 (N, ..., 4)
            gt:             (Tensor):           Gt bboxes, 形状为 (N, ..., 4)
            means:          (Sequence[float]):  对坐标标准化的均值
            stds:           (Sequence[float]):  对坐标标准化的方差

        Returns:
            Tensor: (dx, dy, dw, dh) 形式的偏移
        """
        assert proposals.size() == gt.size()

        proposals = proposals.float()
        gt = gt.float()

        # bbox 的中点坐标
        px = (proposals[..., 0] + proposals[..., 2]) * 0.5
        py = (proposals[..., 1] + proposals[..., 3]) * 0.5
        # box 的宽高
        pw = proposals[..., 2] - proposals[..., 0]
        ph = proposals[..., 3] - proposals[..., 1]

        # ground truth 的中点坐标
        gx = (gt[..., 0] + gt[..., 2]) * 0.5
        gy = (gt[..., 1] + gt[..., 3]) * 0.5
        # ground truth 的宽高
        gw = gt[..., 2] - gt[..., 0]
        gh = gt[..., 3] - gt[..., 1]

        # 需要预测的 x
        dx = (gx - px) / pw
        # 需要预测的 y
        dy = (gy - py) / ph
        # 需要预测的 w
        dw = torch.log(gw / pw)
        # 需要预测的 h
        dh = torch.log(gh / ph)
        deltas = torch.stack([dx, dy, dw, dh], dim=-1)

        means = deltas.new_tensor(means).unsqueeze(0)
        stds = deltas.new_tensor(stds).unsqueeze(0)
        deltas = deltas.sub_(means).div_(stds)

        return deltas

再来看一下 decode 的核心代码，decode 对应的公式是上面的（5）和（6）。注意：如果提供 max_shape，decode 会对超出 max_shape 的 bbox 裁剪，可以用 max_shape 来裁剪超出图片大小的 anchor。

In [None]:
    def delta2bbox(rois,
               deltas,
               means=(0., 0., 0., 0.),
               stds=(1., 1., 1., 1.),
               max_shape=None,
               wh_ratio_clip=16 / 1000):
        """将 (dx, dy, dw, dh) 解码为 (x1, y1, x2, y2)

        Args:
            rois:           (Tensor):           需要转换的 bbox, 形状为 (N, 4)
            deltas:         (Tensor):           bbox 的修正量, 形状为 (N, 4 * num_classes). 
                                                其中 N = num_anchors * W * H
            means:          (Sequence[float]):  对坐标标准化的均值
            stds:           (Sequence[float]):  对坐标标准化的方差
            max_shape:      (tuple[int, int]):  bbox 最大的大小(图片大小), 用于裁剪超出图片的 bbox
            wh_ratio_clip:  (float):            bbox 最大的缩放比例

        Returns:
            Tensor: Boxes with shape (N, 4), where columns represent
                tl_x, tl_y, br_x, br_y.
        """
        means = deltas.new_tensor(means).repeat(1, deltas.size(1) // 4)
        stds = deltas.new_tensor(stds).repeat(1, deltas.size(1) // 4)
        # 还原没有 norm 的 box
        denorm_deltas = deltas * stds + means
        # 获得 dx, dy, dw, dh
        dx = denorm_deltas[:, 0::4]
        dy = denorm_deltas[:, 1::4]
        dw = denorm_deltas[:, 2::4]
        dh = denorm_deltas[:, 3::4]
        # 将 |ratio| 限定到 max_ratio 以内
        max_ratio = np.abs(np.log(wh_ratio_clip))
        dw = dw.clamp(min=-max_ratio, max=max_ratio)
        dh = dh.clamp(min=-max_ratio, max=max_ratio)
        # ============= 预测的 roi 转化为：中心点+宽高 的形式。===================
        px = ((rois[:, 0] + rois[:, 2]) * 0.5).unsqueeze(1).expand_as(dx)
        py = ((rois[:, 1] + rois[:, 3]) * 0.5).unsqueeze(1).expand_as(dy)
        # Compute width/height of each roi
        pw = (rois[:, 2] - rois[:, 0]).unsqueeze(1).expand_as(dw)
        ph = (rois[:, 3] - rois[:, 1]).unsqueeze(1).expand_as(dh)
        # ===================================================================

        # =============== 计算经过网络偏移后的 roi 的中心点和宽高。================
        gw = pw * dw.exp()
        gh = ph * dh.exp()
        # Use network energy to shift the center of each roi
        gx = px + pw * dx
        gy = py + ph * dy
        # ===================================================================

        # 中心点+宽高  ==> 左上角坐标+右下角坐标。
        x1 = gx - gw * 0.5
        y1 = gy - gh * 0.5
        x2 = gx + gw * 0.5
        y2 = gy + gh * 0.5

        # 如果有最大的大小，裁剪经过偏移后的坐标
        if max_shape is not None:
            x1 = x1.clamp(min=0, max=max_shape[1])
            y1 = y1.clamp(min=0, max=max_shape[0])
            x2 = x2.clamp(min=0, max=max_shape[1])
            y2 = y2.clamp(min=0, max=max_shape[0])
        bboxes = torch.stack([x1, y1, x2, y2], dim=-1).view_as(deltas)
        return bboxes

DeltaXYWHBBoxCoder 类调用了上述两个方法。并提供了 encode 和 decode 方法，分别用来编码和解码 bbox。

In [None]:
    import numpy as np
    import torch

    from ..builder import BBOX_CODERS
    from .base_bbox_coder import BaseBBoxCoder


    @BBOX_CODERS.register_module()
    class DeltaXYWHBBoxCoder(BaseBBoxCoder):
        """Delta XYWH BBox 编码器.

        encode 将 (x1, y1, x2, y2) 编码为 (dx, dy, dw, dh)
        decode 将 (dx, dy, dw, dh) 解码为 (x1, y1, x2, y2)

        Args:
            target_means:   (Sequence[float]): 对坐标标准化的均值
            target_stds:    (Sequence[float]): 对坐标标准化的方差
        """

        def __init__(self,
                    target_means=(0., 0., 0., 0.),
                    target_stds=(1., 1., 1., 1.)):
            super(BaseBBoxCoder, self).__init__()
            self.means = target_means
            self.stds = target_stds

        def encode(self, bboxes, gt_bboxes):
            """将 (x1, y1, x2, y2) 编码为 (dx, dy, dw, dh)

            Args:
                bboxes:     (torch.Tensor): 源 bboxes, 例如: object proposals.
                gt_bboxes:  (torch.Tensor): 目标 bboxes, 例如: ground-truth boxes.

            Returns:
                torch.Tensor: (dx, dy, dw, dh) 形式的偏移
            """

            assert bboxes.size(0) == gt_bboxes.size(0)
            assert bboxes.size(-1) == gt_bboxes.size(-1) == 4
            encoded_bboxes = bbox2delta(bboxes, gt_bboxes, self.means, self.stds)
            return encoded_bboxes

        def decode(self,
                bboxes,
                pred_bboxes,
                max_shape=None,
                wh_ratio_clip=16 / 1000):
            """将 (dx, dy, dw, dh) 解码为 (x1, y1, x2, y2)

            Args:
                bboxes:         (torch.Tensor):         基准 boxes.
                pred_bboxes:    (torch.Tensor):         预测的 bboxes
                max_shape:      (tuple[int], optional): bbox 最大的大小(图片大小), 用于裁剪超出图片的 bbox
                wh_ratio_clip:  (float, optional):      The allowed ratio between
                    width and height.

            Returns:
                torch.Tensor: (x1, y1, x2, y2) 形式的坐标
            """

            assert pred_bboxes.size(0) == bboxes.size(0)
            decoded_bboxes = delta2bbox(bboxes, pred_bboxes, self.means, self.stds,
                                        max_shape, wh_ratio_clip)

            return decoded_bboxes

## 三、RPN 网络流程

MMDetection 的 Faster R-CNN 结合了 FPN。在 RPN 中我们会拿到 P2 ～ P6，总共 5 个尺度的特征图，通道数都为 256。对每个特征图，先使用 3×3 的卷积提取特征，然后再使用两个并列的 1×1 的卷积，生成前景与背景的概率预测和边框的修正量。注意：所有的特征图共享相同的卷积（因为所有的特征图的通道数相同，可以共享）。

设每个feature map 每个各点生成的anchor个数为K, 那么分类分支的输出通道数为 K(使用sigmoid为K, 如果使用softmax 为2K) 代表类别为前景或背景的置信度. 回归分支的输出通道数为: 4K, 代表bbox的中心点坐标偏移以及宽高的修正量.

每个尺度经过相同的卷积后会得到置信度与修正量的输出值。对于输出值：
1. 对每个尺度的输出值: 计算损失
2. 整合所有尺度的输出值: 提供候选区域


![](https://pic2.zhimg.com/80/v2-d2fc185742120b0e5e4ca82b2942597d_720w.jpg)

我们先来看一下配置文件中的内容，下面是所有关于 rpn 的配置。其中 rpn_head 是构建 RPN 网络的主要的配置。又因为在训练时需要分配正负样本并采样，在测试时不需要分配正负样本或采样，且筛选候选区域的设置不同。所以有两个配置，分别是 train_cfg（训练时的配置）和 test_cfg（测试时的配置），分别指定训练和测试不同的设置。

In [None]:
    # RPNHead 的配置
    rpn_head=dict(
            type='RPNHead',
            in_channels=256,
            feat_channels=256,
            anchor_generator=dict(
                type='AnchorGenerator',
                scales=[8],
                ratios=[0.5, 1.0, 2.0],
                strides=[4, 8, 16, 32, 64]),
            bbox_coder=dict(
                type='DeltaXYWHBBoxCoder',
                target_means=[.0, .0, .0, .0],
                target_stds=[1.0, 1.0, 1.0, 1.0]),
            loss_cls=dict(
                type='CrossEntropyLoss', use_sigmoid=True, loss_weight=1.0),
            loss_bbox=dict(type='L1Loss', loss_weight=1.0)),

    # 训练时 rpn 的配置
    train_cfg = dict(
        rpn=dict(
            assigner=dict(
                type='MaxIoUAssigner',
                pos_iou_thr=0.7,
                neg_iou_thr=0.3,
                min_pos_iou=0.3,
                match_low_quality=True,
                ignore_iof_thr=-1),
            sampler=dict(
                type='RandomSampler',
                num=256,
                pos_fraction=0.5,
                neg_pos_ub=-1,
                add_gt_as_proposals=False),
            allowed_border=-1,
            pos_weight=-1,
            debug=False),
        rpn_proposal=dict(
            nms_across_levels=False,
            nms_pre=2000,
            nms_post=1000,
            max_num=1000,
            nms_thr=0.7,
            min_bbox_size=0),

    # 测试时 rpn 的配置
    test_cfg = dict(
        rpn=dict(
            nms_across_levels=False,
            nms_pre=1000,
            nms_post=1000,
            max_num=1000,
            nms_thr=0.7,
            min_bbox_size=0),

此部分的具体代码，我们会在下篇文章中具体讲解。在本篇文章中只会讲解大体的思路

## 四、RPN 网络训练
那么 RPN 网络如何训练呢？既然是训练一定要有训练的样本，那么 RPN 训练的数据如何定义呢？我们一起来看一看。

（一）数据准备
对 RPN 来说会生成很多个 anchor，但是我们不会全拿过来训练。原因就是正负样本的不均衡。一整张图片的 gt bbox 可能就几个，但是 anchor 的数量实在是太多了。为什么样本不均衡就不能直接拿来训练呢？我们举个例子：假设有一个数据集，有 100 张图片，其中 99 张是猫，1 张是狗。那么网络只用对任何图片都预测为猫就可以得到 99% 的准确率。这样显然不符合实际的要求。所以对于正负样本的数量不能差距太大。

所以对于 RPN 的数据（Anchor）准备，我们需要两个步骤：

分配正负样本
采样并平衡正负样本的数量（在 RPN 中为 1 : 1）
这两个操作也刚好对应了MMDetection 中的 assigner 和 sampler。



1. 分配正负样本

分配正负样本有四个步骤，步骤如下。

（1）初始化时将每个 anchor 的 mask 设置为 -1，表示此 anchor 既不是正样本也不是负样本

（2）将 anchor 与所有 gt 的最大的 iou ＜ neg_iou_thr（0.3）的 anchor 的 mask 设置为 0，表示负样本（背景）

（3）将 anchor 与所有 gt 的最大的 iou ≥ pos_iou_thr（0.7）的 anchor 的 mask 设置为当前 gt 的类别（前景），表示正样本

（4）在（3）中的设置可能会导致有一些 gt 没有分配 anchor，所以对每个 gt 找出与它 iou 最大的 anchor 如果此 iou ≥ min_pos_iou（0.3），将此 anchor 设置为正样本。



配置如下：

In [None]:
assigner=dict(
            type='MaxIoUAssigner',
            pos_iou_thr=0.7,
            neg_iou_thr=0.3,
            min_pos_iou=0.3,
            match_low_quality=True,
            ignore_iof_thr=-1),

在 RPN 中，使用 MaxIoUAssigner，完成正负样本的分配，具体源码如下：

In [None]:
import torch

from ..builder import BBOX_ASSIGNERS
from ..iou_calculators import build_iou_calculator
from .assign_result import AssignResult
from .base_assigner import BaseAssigner


@BBOX_ASSIGNERS.register_module()
class MaxIoUAssigner(BaseAssigner):
    """根据 IOU 分配正负样本.

    - -1:         代表忽略, 表示既不是正样本也不是负样本.
    - 非负整数:    代表匹配到的类别, 可以是正样本或负样本的类别.

    Args:
        pos_iou_thr:            (float):            正样本的 IOU 阈值
        neg_iou_thr:            (float or tuple):   负样本的 IoU 阈值
        min_pos_iou:            (float):            分配正样本时最小的 IOU
        gt_max_assign_all:      (bool):             是否分配所有与 gt 的 IOU 最大的 bbox 为正样本(满足 min_pos_iou).
        ignore_iof_thr:         (float):            忽略 bboxes 的 IoF 阈值（如果 `gt_bboxes_ignore` 已指定）
                                                    负值表示不忽略任何 bboxes。
        ignore_wrt_candidates:  (bool):             是否计算 `bboxes` 和 `gt bboxes_ignore` 的 iof.
        match_low_quality:      (bool):             是否对每个 gt 找出与它 iou 最大的 anchor.
                                                    如果此 iou ≥ min_pos_iou（0.3）, 将此 anchor 设置为正样本.
                                                    此操作一般不会用在第二阶段, 如 roi_head 中.
        gpu_assign_thr:         (int):              GPU 的 GT 个数的上界分配. 当 gt 的数量超过此阈值时, 将分配在 CPU 设备上.
                                                    负值表示不在 CPU 上分配.
    """
    def __init__(self,
                 pos_iou_thr,
                 neg_iou_thr,
                 min_pos_iou=.0,
                 gt_max_assign_all=True,
                 ignore_iof_thr=-1,
                 ignore_wrt_candidates=True,
                 match_low_quality=True,
                 gpu_assign_thr=-1,
                 iou_calculator=dict(type='BboxOverlaps2D')):
        self.pos_iou_thr = pos_iou_thr
        self.neg_iou_thr = neg_iou_thr
        self.min_pos_iou = min_pos_iou
        self.gt_max_assign_all = gt_max_assign_all
        self.ignore_iof_thr = ignore_iof_thr
        self.ignore_wrt_candidates = ignore_wrt_candidates
        self.gpu_assign_thr = gpu_assign_thr
        self.match_low_quality = match_low_quality
        self.iou_calculator = build_iou_calculator(iou_calculator)

    def assign(self, bboxes, gt_bboxes, gt_bboxes_ignore=None, gt_labels=None):
        """给 bbox 分配 gt

        Args:
            bboxes:             (Tensor):               需要分配的 bboxes, 形状为(n, 4).
            gt_bboxes:          (Tensor):               Ground truth boxes, 形状为 (k, 4).
            gt_bboxes_ignore:   (Tensor, optional):     被标记为忽略的 Ground truth bboxes.
            gt_labels:          (Tensor, optional):     gt_bboxes 的标签, 形状为 (k, ).

        Returns:
            :obj:`AssignResult`: 分配的结果

        Example:
            >>> self = MaxIoUAssigner(0.5, 0.5)
            >>> bboxes = torch.Tensor([[0, 0, 10, 10], [10, 10, 20, 20]])
            >>> gt_bboxes = torch.Tensor([[0, 0, 10, 9]])
            >>> assign_result = self.assign(bboxes, gt_bboxes)
            >>> expected_gt_inds = torch.LongTensor([1, 0])
            >>> assert torch.all(assign_result.gt_inds == expected_gt_inds)
        """
        # 如果当前的 gt box 的数量大于 gpu 分配数量的阈值, 就使用 cpu 分配.
        assign_on_cpu = True if (self.gpu_assign_thr > 0) and (
            gt_bboxes.shape[0] > self.gpu_assign_thr) else False
        
        if assign_on_cpu:
            device = bboxes.device
            bboxes = bboxes.cpu()
            gt_bboxes = gt_bboxes.cpu()
            if gt_bboxes_ignore is not None:
                gt_bboxes_ignore = gt_bboxes_ignore.cpu()
            if gt_labels is not None:
                gt_labels = gt_labels.cpu()
        # 计算 gt box 和 bbox 的交并比, 形状为 (n_gts, n_bboxes)
        overlaps = self.iou_calculator(gt_bboxes, bboxes)

        if (self.ignore_iof_thr > 0 and gt_bboxes_ignore is not None
                and gt_bboxes_ignore.numel() > 0 and bboxes.numel() > 0):
            if self.ignore_wrt_candidates:
                ignore_overlaps = self.iou_calculator(
                    bboxes, gt_bboxes_ignore, mode='iof')
                ignore_max_overlaps, _ = ignore_overlaps.max(dim=1)
            else:
                ignore_overlaps = self.iou_calculator(
                    gt_bboxes_ignore, bboxes, mode='iof')
                ignore_max_overlaps, _ = ignore_overlaps.max(dim=0)
            overlaps[:, ignore_max_overlaps > self.ignore_iof_thr] = -1
        # 分配样本
        assign_result = self.assign_wrt_overlaps(overlaps, gt_labels)
        if assign_on_cpu:
            assign_result.gt_inds = assign_result.gt_inds.to(device)
            assign_result.max_overlaps = assign_result.max_overlaps.to(device)
            if assign_result.labels is not None:
                assign_result.labels = assign_result.labels.to(device)
        return assign_result

    def assign_wrt_overlaps(self, overlaps, gt_labels=None):
        """根据 overlaps 对 bbox 分配正负样本.

        Args:
            overlaps:   (Tensor):           k 个 gt_bboxes 和 n 个 bboxes 的 Overlaps,
                                            形状为 (k, n).
            gt_labels:  (Tensor, optional): k 个 gt_bboxes 的 labels, 形状为 (k,).

        Returns:
            :obj:`AssignResult`: 分配后结果
        """
        # 获取 gt 的数量和 bbox 的数量.
        num_gts, num_bboxes = overlaps.size(0), overlaps.size(1)

        # 1. 给所有的 box 初始化为 -1 代表全为负样本. (num_bboxes,)
        assigned_gt_inds = overlaps.new_full((num_bboxes, ),
                                             -1,
                                             dtype=torch.long)
        # 如果 gt 的数量或 bbox 的数量为 0, 直接返回.
        if num_gts == 0 or num_bboxes == 0:
            # No ground truth or boxes, return empty assignment
            max_overlaps = overlaps.new_zeros((num_bboxes, ))
            if num_gts == 0:
                # No truth, assign everything to background
                assigned_gt_inds[:] = 0
            if gt_labels is None:
                assigned_labels = None
            else:
                assigned_labels = overlaps.new_full((num_bboxes, ),
                                                    -1,
                                                    dtype=torch.long)
            return AssignResult(
                num_gts,
                assigned_gt_inds,
                max_overlaps,
                labels=assigned_labels)

        # 对于每个 bbox 找出和它 IOU 最大的 GT box
        max_overlaps, argmax_overlaps = overlaps.max(dim=0)
        # 对于每个 gt, 找出和他 IOU 最大的 bbox
        gt_max_overlaps, gt_argmax_overlaps = overlaps.max(dim=1)

        # 2. 最大 iou < neg_iou_thr 的设置为负样本, mask 设置为 0
        if isinstance(self.neg_iou_thr, float):
            assigned_gt_inds[(max_overlaps >= 0)
                             & (max_overlaps < self.neg_iou_thr)] = 0
        elif isinstance(self.neg_iou_thr, tuple):
            assert len(self.neg_iou_thr) == 2
            assigned_gt_inds[(max_overlaps >= self.neg_iou_thr[0])
                             & (max_overlaps < self.neg_iou_thr[1])] = 0

        # 3. 最大 iou ≥ pos_iou_thr 的设置为正样本, mask 设置为对应类别标签
        pos_inds = max_overlaps >= self.pos_iou_thr
        assigned_gt_inds[pos_inds] = argmax_overlaps[pos_inds] + 1

        # 4. 对每个 gt 找出与它 iou 最大的 bbox 如果此 iou ≥ min_pos_iou, 将此 bbox 设置为正样本
        if self.match_low_quality:
            # For example, if bbox A has 0.9 and 0.8 iou with GT bbox 1 & 2,
            # bbox 1 will be assigned as the best target for bbox A in step 3.
            # However, if GT bbox 2's gt_argmax_overlaps = A, bbox A's
            # assigned_gt_inds will be overwritten to be bbox B.
            # This might be the reason that it is not used in ROI Heads.
            for i in range(num_gts):
                if gt_max_overlaps[i] >= self.min_pos_iou:
                    if self.gt_max_assign_all:
                        max_iou_inds = overlaps[i, :] == gt_max_overlaps[i]
                        assigned_gt_inds[max_iou_inds] = i + 1
                    else:
                        assigned_gt_inds[gt_argmax_overlaps[i]] = i + 1

        if gt_labels is not None:
            assigned_labels = assigned_gt_inds.new_full((num_bboxes, ), -1)
            pos_inds = torch.nonzero(
                assigned_gt_inds > 0, as_tuple=False).squeeze()
            if pos_inds.numel() > 0:
                assigned_labels[pos_inds] = gt_labels[
                    assigned_gt_inds[pos_inds] - 1]
        else:
            assigned_labels = None

        return AssignResult(
            num_gts, assigned_gt_inds, max_overlaps, labels=assigned_labels)
        
    

2. 采样并平衡正负样本的数量

通过 assigner 我们得到了打了标签的 bbox。其中：-1 代表既不是正样本也不是负样本，0 代表负样本（背景），1 代表正样本（前景）。因为样本的数量实在是太多了，我们还需要限制训练样本的数量。在 MMDetection 中会通过 sampler 来实现。

在 RPN 中，样本总数为 256，正负样本的比值为 1 : 1。如过正样本的数量不足 128，会使用负样本填充，配置如下：

In [None]:
sampler=dict(
            type='RandomSampler',
            num=256,
            pos_fraction=0.5,
            neg_pos_ub=-1,
            add_gt_as_proposals=False),

MMDetection 中的所有 sampler 继承 BaseSampler，在 BaseSampler 中 sample 方法调用 _sample_pos，_sample_neg 完成正负样本的采样。所以子类需要重写这两个方法，BaseSampler 类如下：

In [None]:
from abc import ABCMeta, abstractmethod

import torch

from .sampling_result import SamplingResult


class BaseSampler(metaclass=ABCMeta):
    """samplers 的基类

    Args:
        num:                  (int)   采样后的总数
        pos_fraction        (float)   正样本的比例
        neg_pos_ub            (int)   负样本与正样本数量比值的上界
        add_gt_as_proposals  (bool)   是否添加 ground truth 作为 proposal.
    """

    def __init__(self,
                 num,
                 pos_fraction,
                 neg_pos_ub=-1,
                 add_gt_as_proposals=True,
                 **kwargs):
        self.num = num
        self.pos_fraction = pos_fraction
        self.neg_pos_ub = neg_pos_ub
        self.add_gt_as_proposals = add_gt_as_proposals
        self.pos_sampler = self
        self.neg_sampler = self

    @abstractmethod
    def _sample_pos(self, assign_result, num_expected, **kwargs):
        """正样本采样, 此方法需要子类实现."""
        pass

    @abstractmethod
    def _sample_neg(self, assign_result, num_expected, **kwargs):
        """负样本采样, 此方法需要子类实现."""
        pass

    def sample(self,
               assign_result,
               bboxes,
               gt_bboxes,
               gt_labels=None,
               **kwargs):
        """采样正负样本

        This is a simple implementation of bbox sampling given candidates,
        assigning results and ground truth bboxes.

        Args:
            assign_result:  (:obj:`AssignResult`):  Bbox 分配后的结果.
            bboxes:         (Tensor):               需要采样的 bbox, 形状为 (N, 4)
            gt_bboxes:      (Tensor):               Ground truth bboxes, 形状为 (n_gts, 4)
            gt_labels:      (Tensor, optional):     ground truth bboxes 的类别标签.

        Returns:
            :obj:`SamplingResult`:  采样的结果.

        Example:
            >>> from mmdet.core.bbox import RandomSampler
            >>> from mmdet.core.bbox import AssignResult
            >>> from mmdet.core.bbox.demodata import ensure_rng, random_boxes
            >>> rng = ensure_rng(None)
            >>> assign_result = AssignResult.random(rng=rng)
            >>> bboxes = random_boxes(assign_result.num_preds, rng=rng)
            >>> gt_bboxes = random_boxes(assign_result.num_gts, rng=rng)
            >>> gt_labels = None
            >>> self = RandomSampler(num=32, pos_fraction=0.5, neg_pos_ub=-1,
            >>>                      add_gt_as_proposals=False)
            >>> self = self.sample(assign_result, bboxes, gt_bboxes, gt_labels)
        """
        if len(bboxes.shape) < 2:
            bboxes = bboxes[None, :]

        bboxes = bboxes[:, :4]

        gt_flags = bboxes.new_zeros((bboxes.shape[0], ), dtype=torch.uint8)
        # 是否将 gt 添加到正样本.
        if self.add_gt_as_proposals and len(gt_bboxes) > 0:
            if gt_labels is None:
                raise ValueError(
                    'gt_labels must be given when add_gt_as_proposals is True')
            bboxes = torch.cat([gt_bboxes, bboxes], dim=0)
            assign_result.add_gt_(gt_labels)
            gt_ones = bboxes.new_ones(gt_bboxes.shape[0], dtype=torch.uint8)
            gt_flags = torch.cat([gt_ones, gt_flags])
        # 预期的正样本数量（eg, rpn 128）
        num_expected_pos = int(self.num * self.pos_fraction)
        # 采样正样本.
        pos_inds = self.pos_sampler._sample_pos(
            assign_result, num_expected_pos, bboxes=bboxes, **kwargs)
        # We found that sampled indices have duplicated items occasionally.
        # (may be a bug of PyTorch)
        pos_inds = pos_inds.unique()
        num_sampled_pos = pos_inds.numel()
        # 正样本不够的用负样本填充
        num_expected_neg = self.num - num_sampled_pos
        if self.neg_pos_ub >= 0:
            _pos = max(1, num_sampled_pos)
            neg_upper_bound = int(self.neg_pos_ub * _pos)
            if num_expected_neg > neg_upper_bound:
                num_expected_neg = neg_upper_bound
        neg_inds = self.neg_sampler._sample_neg(
            assign_result, num_expected_neg, bboxes=bboxes, **kwargs)
        neg_inds = neg_inds.unique()

        sampling_result = SamplingResult(pos_inds, neg_inds, bboxes, gt_bboxes,
                                         assign_result, gt_flags)
        return sampling_result

RPN 使用 RandomSampler，它继承了 BaseSampler。源码如下：

In [None]:
import torch

from ..builder import BBOX_SAMPLERS
from .base_sampler import BaseSampler


@BBOX_SAMPLERS.register_module()
class RandomSampler(BaseSampler):
    """Random sampler.

    Args:
        num:                    (int):              需要采样的个数
        pos_fraction:           (float):            正样本数量 : 负样本数量
        neg_pos_up:             (int, optional):    负样本与正样本数量比值的上界
        add_gt_as_proposals:    (bool, optional):   是否添加 ground truth 作为 proposal.
    """

    def __init__(self,
                 num,
                 pos_fraction,
                 neg_pos_ub=-1,
                 add_gt_as_proposals=True,
                 **kwargs):
        from mmdet.core.bbox import demodata
        super(RandomSampler, self).__init__(num, pos_fraction, neg_pos_ub,
                                            add_gt_as_proposals)
        self.rng = demodata.ensure_rng(kwargs.get('rng', None))

    def random_choice(self, gallery, num):
        """Random select some elements from the gallery.

        If `gallery` is a Tensor, the returned indices will be a Tensor;
        If `gallery` is a ndarray or list, the returned indices will be a
        ndarray.

        Args:
            gallery (Tensor | ndarray | list): indices pool.
            num (int): expected sample num.

        Returns:
            Tensor or ndarray: sampled indices.
        """
        assert len(gallery) >= num

        is_tensor = isinstance(gallery, torch.Tensor)
        if not is_tensor:
            gallery = torch.tensor(
                gallery, dtype=torch.long, device=torch.cuda.current_device())
        perm = torch.randperm(gallery.numel(), device=gallery.device)[:num]
        rand_inds = gallery[perm]
        if not is_tensor:
            rand_inds = rand_inds.cpu().numpy()
        return rand_inds

    def _sample_pos(self, assign_result, num_expected, **kwargs):
        """Randomly sample some positive samples."""
        pos_inds = torch.nonzero(assign_result.gt_inds > 0, as_tuple=False)
        if pos_inds.numel() != 0:
            pos_inds = pos_inds.squeeze(1)
        if pos_inds.numel() <= num_expected:
            return pos_inds
        else:
            return self.random_choice(pos_inds, num_expected)

    def _sample_neg(self, assign_result, num_expected, **kwargs):
        """Randomly sample some negative samples."""
        neg_inds = torch.nonzero(assign_result.gt_inds == 0, as_tuple=False)
        if neg_inds.numel() != 0:
            neg_inds = neg_inds.squeeze(1)
        if len(neg_inds) <= num_expected:
            return neg_inds
        else:
            return self.random_choice(neg_inds, num_expected)

（二）损失函数
对于前景置信度的预测，我们使用 BCELoss。对于 Anchor 的修正量，我们使用 L1 Loss，（原论文用的是 Smooth L1 Loss）。对于 L1 Loss，我们只计算正样本的损失。也就是只让网络对正样本进行修正。两个 Loss 相加作为最终的 Loss，配置如下：

In [None]:
loss_cls=dict(
            type='CrossEntropyLoss', use_sigmoid=True, loss_weight=1.0),
loss_bbox=dict(type='L1Loss', loss_weight=1.0)),

五、生成候选区域
RPN 网络不仅需要自己训练，还需要为 roi head 提供候选区域。这个过程是不需要反向传播的。首先使用网络获取置信度和修正值，然后再使用如下的 5 个步骤生成 region proposals：

1. 根据前景的置信度筛选出每个尺度前 nms_pre（2000）个预测值
2. 合并筛选后的多个尺度的预测值
3. 将 bbox 的预测值解码为左上角坐标和右下角坐标
4. 用 nms（阈值=nms_thr）合并 bbox
5. 根据置信度筛选出前 nms_post（1000）个 proposal


设置如下：

In [None]:
rpn_proposal=dict(
        nms_across_levels=False,
        nms_pre=2000,
        nms_post=1000,
        max_num=1000,
        nms_thr=0.7,
        min_bbox_size=0),

源码在 RPNHead 中的 _get_bboxes_single 方法。

In [None]:
def _get_bboxes_single(self,
                           cls_scores,
                           bbox_preds,
                           mlvl_anchors,
                           img_shape,
                           scale_factor,
                           cfg,
                           rescale=False):
        """将一张图片的输出转化为 bbox 的结果.

        Args:
            cls_scores:     (list[Tensor]):    网络输出的 confidence, list 的长度为 level 的长度(5),
                                               每个 tensor 的形状是 [K, H, W]
            bbox_preds:     (list[Tensor]):    网络输出的坐标值, list 代表每个尺度(如： 长度 5),
                                               每个 tensor 的形状是 [4K, H, W]
            mlvl_anchors:   (list[Tensor]):    每个 scale 的生成的 anchor,
                                               每个 tensor 的形状为: [H × W × K, 4]
            img_shape:      (tuple[int]):      图像的大小
            scale_factor:   (ndarray):         Scale factor of the image arange as
                (w_scale, h_scale, w_scale, h_scale).
            cfg:            (mmcv.Config):     Test / postprocessing configuration,
                if None, test_cfg would be used.
            rescale (bool): If True, return boxes in original image space.

        Returns:
            Tensor: Labeled boxes in shape (n, 5), where the first 4 columns
                are bounding box positions (tl_x, tl_y, br_x, br_y) and the
                5-th column is a score between 0 and 1.
        """
        # 1. 根据类别的置信度筛选出每个尺度 topK（K = 2000）个 bbox
        # 2. 合并筛选后的多个尺度的 bbox
        # 3. 将网络预测值解码
        # 4. 用 nms（阈值=0.7）合并 bbox
        # 5. 筛选出前 nms_post（1000）个 bbox 作为 proposal

        cfg = self.test_cfg if cfg is None else cfg
        # bboxes from different level should be independent during NMS,
        # level_ids are used as labels for batched NMS to separate them
        level_ids = []
        mlvl_scores = []
        mlvl_bbox_preds = []
        mlvl_valid_anchors = []
        # 遍历每个尺度
        for idx in range(len(cls_scores)):
            # 取到一个尺度的网络输出的类别和位置预测
            rpn_cls_score = cls_scores[idx]
            rpn_bbox_pred = bbox_preds[idx]
            # 保证后两个维度相同
            assert rpn_cls_score.size()[-2:] == rpn_bbox_pred.size()[-2:]
            # [A, H, W] --> [H, W, A]
            rpn_cls_score = rpn_cls_score.permute(1, 2, 0)
            # 将类别的数值压缩成概率.
            if self.use_sigmoid_cls:
                # [H × W × A]
                rpn_cls_score = rpn_cls_score.reshape(-1)
                scores = rpn_cls_score.sigmoid()
            else:
                # 转成 (-1, 2), 这个 2 代表是背景或不是背景。
                # 前景 label 设置为:  [0, 类别数 - 1],
                # 背景 label 设置为:  类别数
                rpn_cls_score = rpn_cls_score.reshape(-1, 2)
                # we set FG labels to [0, num_class-1] and BG label to
                # num_class in other heads since mmdet v2.0, However we
                # keep BG label as 0 and FG label as 1 in rpn head
                # 对类别维度进行 softmax, 取背景的概率
                # 形状：[2000]
                scores = rpn_cls_score.softmax(dim=1)[:, 1]
            # [A × 4, H, W] --> [H × W × A, 4]
            rpn_bbox_pred = rpn_bbox_pred.permute(1, 2, 0).reshape(-1, 4)
            # 取对应层的 anchor: [单尺度 anchor 总数, 4]
            anchors = mlvl_anchors[idx]

            # 根据类别的置信度筛选出 topK 个 box
            if cfg.nms_pre > 0 and scores.shape[0] > cfg.nms_pre:
                # sort is faster than topk
                # _, topk_inds = scores.topk(cfg.nms_pre)
                # 对 score 从高到低排序
                # [182400]
                ranked_scores, rank_inds = scores.sort(descending=True)
                # 取 topK（cfg.nms_pre） 个 index 和 score: anchor --> [2000]
                topk_inds = rank_inds[:cfg.nms_pre]
                scores = ranked_scores[:cfg.nms_pre]
                # 根据类别的预测值，取 topK 个 bbox
                rpn_bbox_pred = rpn_bbox_pred[topk_inds, :]
                # anchor 也取 topK 个
                anchors = anchors[topk_inds, :]
            mlvl_scores.append(scores)
            mlvl_bbox_preds.append(rpn_bbox_pred)
            mlvl_valid_anchors.append(anchors)
            # new_full(形状，填充值，数据类型)
            level_ids.append(
                scores.new_full((scores.size(0), ), idx, dtype=torch.long))
        # cat 的 dim 默认为 0 维
        scores = torch.cat(mlvl_scores)
        anchors = torch.cat(mlvl_valid_anchors)
        rpn_bbox_pred = torch.cat(mlvl_bbox_preds)
        proposals = self.bbox_coder.decode(
            anchors, rpn_bbox_pred, max_shape=img_shape)
        ids = torch.cat(level_ids)

        # 如多对 anchor 的大小有限定
        if cfg.min_bbox_size > 0:
            # 计算 W, H
            w = proposals[:, 2] - proposals[:, 0]
            h = proposals[:, 3] - proposals[:, 1]
            # 取长宽都 > min_bbox_size 的索引
            valid_inds = torch.nonzero(
                (w >= cfg.min_bbox_size)
                & (h >= cfg.min_bbox_size),
                as_tuple=False).squeeze()
            # 筛选目标
            if valid_inds.sum().item() != len(proposals):
                proposals = proposals[valid_inds, :]
                scores = scores[valid_inds]
                ids = ids[valid_inds]

        # TODO: remove the hard coded nms type
        nms_cfg = dict(type='nms', iou_threshold=cfg.nms_thr)
        dets, keep = batched_nms(proposals, scores, ids, nms_cfg)
        # nms 后对置信度有排序。直接取前 nms_post 个
        return dets[:cfg.nms_post]

# [MMDetection Faster R-CNN 源码详解（四)](https://zhuanlan.zhihu.com/p/194285023)

在上一小结中，我们详解了 RPN 的原理及部分代码。在本篇文章中会详解 RPN 的全部源码。


RPN（源码篇）
1. 整体结构
2. BaseDenseHead
4. RPNTestMixin
5. RPNHead

一、整体结构

在 MMDetection 中，RPN 部分是通过 RPNHead 类来实现的。其中 RPNHead 继承了 AnchorHead 和 RPNTestMixin，AnchorHead 继承了 BaseDenseHead。继承关系如下图：

![](https://pic3.zhimg.com/80/v2-8aad81cc25ee038d5f991048f4af5226_720w.jpg)图一：RPNHead 的继承关系

接下来，我们自底至上来分析源码。

二、BaseDenseHead（基类）

对于训练 RPN 来说，主要有两个流程，计算损失和生成候选区域。因为 RPN 需要训练 Anchor，所以需要采样、平衡正负样本并计算损失。RPN 又需要为 roi_head 提供候选区域，所以 RPN 也需要进行区域推荐。其中训练时计算损失的步骤需要反向传播，而生成候选区域的过程不需要反向传播。而测试时不需要计算损失只需要为后续的网络生成候选区域即可。BaseDenseHead 提供了计算损失和生成候选区域的接口，对应的方法为 loss 和 get_bboxes。

从数据上来看，图片经过 RPN 后，会生成每个 Anchor 的置信度和修正量。对于经过 RPN 卷积的输出，一方面需要与 anchor 编码，然后和 gt_bboxes 计算损失，另一方面通过 anchor 解码，筛选后得到 proposal 提供给 roi_head。

我们来看下基类 BaseDenseHead，它有 2 个抽象方法，分别是 loss（计算损失）和 get_bboxes（生成候选区域），这两个方法都需要子类来实现。

loss 方法的作用是计算 rpn 网络的损失，并将损失以字典的形式返回。get_bboxes 的作用是将网络预测的 bbox 结果转化为 proposal，为 roi head 提供候选区域。

** forward_train 方法实现了 RPN 在训练时的前向传播，其主要步骤如下：**

1）获得类别（前景）和边框回归的输出值。

2）计算类别与边框回归的损失。

3）将网络的预测转化为候选区域。

我们来看一下源码：

In [None]:
from abc import ABCMeta, abstractmethod
import torch.nn as nn


class BaseDenseHead(nn.Module, metaclass=ABCMeta):
    """DenseHeads 基类"""

    def __init__(self):
        super(BaseDenseHead, self).__init__()

    @abstractmethod
    def loss(self, **kwargs):
        """计算 head 的 loss"""
        pass

    @abstractmethod
    def get_bboxes(self, **kwargs):
        """将 bbox 预测转化为 region proposals"""
        pass

    def forward_train(self,
                      x,
                      img_metas,
                      gt_bboxes,
                      gt_labels=None,
                      gt_bboxes_ignore=None,
                      proposal_cfg=None,
                      **kwargs):
        """
        Args:
            x:                        (list[Tensor])  经过 FPN 后的 features
            img_metas:                  (list[dict])  一个 batch 的 image 的信息的 list, 如: 大小, 缩放等.
            gt_bboxes:                (list[Tensor])  一个 batch 的 Ground truth bboxes 的 list,
                                                      每个图片 gt bboxes 的形状为 (num_gts, 4).
            gt_labels:         (list[Tensor] | None)  一个 batch 的 Ground truth labels 的 list,
                                                      每个图片 gt labels 的形状为 (num_gts,).
            gt_bboxes_ignore:  (list[Tensor] | None)  一个 batch 忽略的 ground truth bboxes,
                                                      每个图片的 gt_bboxes_ignore 的形状为 (num_ignored_gts, 4).
            proposal_cfg:              (mmcv.Config)  测试 / 后处理 的配置, 如果为 None, 就会使用 test_cfg

        Returns:
            tuple:
                losses:          (dict[str, Tensor])  一个 loss 字典
                proposal_list:        (list[Tensor])  一个批次的图片的 proposal 列表,
                                                      每个 proposal 的形状为 (1000, 5)
        """
        # 前向计算, 拿到 confidence 和 bbox 坐标偏移的结果.
        # 结果为 [cls_list, reg_list]
        # cls_list 是每个尺度的分类预测结果, 形状为 [torch.Size([batch, anchor, H, W]), ...]
        # reg_list 是每个尺度的回归预测结果, 形状为 [torch.Size([batch, anchor × 4, H, W ]), ...]
        outs = self(x)
        if gt_labels is None:
            loss_inputs = outs + (gt_bboxes, img_metas)
        else:
            loss_inputs = outs + (gt_bboxes, gt_labels, img_metas)
        # 计算损失
        # 结果为 dict('loss_rpn_cls', 'loss_rpn_bbox')
        # loss_rpn_cls  是每个尺度的分类损失, 是一个 tensor 数值.
        # loss_rpn_bbox 是每个尺度的回归损失, 是一个 tensor 数值.
        losses = self.loss(*loss_inputs, gt_bboxes_ignore=gt_bboxes_ignore)
        # proposal_cfg 为 None, 只返回损失不提供 proposal, 例如: 单独训练 RPN 就不需要提供 proposal
        if proposal_cfg is None:
            return losses
        # proposal_cfg 不是 None, 返回损失且提供 proposal
        else:
            # 一个批次的图片的 proposal 列表, 每个 proposal 的形状为 (1000, 5)
            proposal_list = self.get_bboxes(*outs, img_metas, cfg=proposal_cfg)
            return losses, proposal_list

## 三、AnchorHead（继承 BaseDenseHead）
AnchorHead 继承了 BaseDenseHead，提供了所有 anchor-base 模型通用的方法。对于不同的 anchor-base 的网络模型只用继承 AnchorHead 并重写一部分方法就可以实现它的功能。下面我们一个一个方法的来看 AnchorHead 的功能。

### （一）__init__

在 构造函数 中，AnchorHead 会构建所有需要的模块，并调用 _init_layers 初始化 Head 的层。 具体 构建的模块如下：

anchor_generator：anchor 生成
bbox_coder：bbox 编码
loss_cls：分类损失
loss_bbox：bbox 损失
assigner：分配正负样本（训练时需要）
sampler：正负样本采样（训练时需要）


In [None]:
import torch
import torch.nn as nn
from mmcv.cnn import normal_init

from mmdet.core import (anchor_inside_flags, build_anchor_generator,
                        build_assigner, build_bbox_coder, build_sampler,
                        force_fp32, images_to_levels, multi_apply,
                        multiclass_nms, unmap)
from ..builder import HEADS, build_loss
from .base_dense_head import BaseDenseHead


@HEADS.register_module()
class AnchorHead(BaseDenseHead):
    """Anchor-based head (RPN, RetinaNet, SSD, etc.).

    Args:
        num_classes:              (int)   类别的个数, 不包括背景类.
        in_channels:              (int)   输入的 feature map 的通道数.
        feat_channels:            (int)   中间提取特征使用的通道数.
        anchor_generator:        (dict)   anchor generator 的配置文件字典.
        bbox_coder:              (dict)   box coder 的配置文件字典.
        reg_decoded_bbox:        (bool)   如果为 True, 将会对解码的 bbox 回归损失. (默认值: False).
        background_label:  (int | None)   背景标签的 id. 在 RPN 中为 0, 其他的 head 为 num_classes.
                                          如果为 None 会自动设置为 num_classes.
        loss_cls:                (dict)   分类 loss 的配置文件字典.
        loss_bbox:               (dict)   回归 loss 的配置文件字典.
        train_cfg:               (dict)   anchor head 的训练配置.
        test_cfg:                (dict)   anchor head 的测试配置.
    """  # noqa: W605

    def __init__(self,
                 num_classes,
                 in_channels,
                 feat_channels=256,
                 anchor_generator=dict(
                     type='AnchorGenerator',
                     scales=[8, 16, 32],
                     ratios=[0.5, 1.0, 2.0],
                     strides=[4, 8, 16, 32, 64]),
                 bbox_coder=dict(
                     type='DeltaXYWHBBoxCoder',
                     target_means=(.0, .0, .0, .0),
                     target_stds=(1.0, 1.0, 1.0, 1.0)),
                 reg_decoded_bbox=False,
                 background_label=None,
                 loss_cls=dict(
                     type='CrossEntropyLoss',
                     use_sigmoid=True,
                     loss_weight=1.0),
                 loss_bbox=dict(
                     type='SmoothL1Loss', beta=1.0 / 9.0, loss_weight=1.0),
                 train_cfg=None,
                 test_cfg=None):
        super(AnchorHead, self).__init__()
        self.in_channels = in_channels        # 256, FPN 每个尺度的输出通道数
        self.num_classes = num_classes        # 类别个数, 不包括背景类.
        self.feat_channels = feat_channels    # 256
        self.use_sigmoid_cls = loss_cls.get('use_sigmoid', False)
        # TODO better way to determine whether sample or not
        # 使用 FocalLoss 不需要对 proposal 进行采样, 所以 sampling = False
        self.sampling = loss_cls['type'] not in [
            'FocalLoss', 'GHMC', 'QualityFocalLoss'
        ]
        if self.use_sigmoid_cls:
            self.cls_out_channels = num_classes
        else:
            self.cls_out_channels = num_classes + 1

        if self.cls_out_channels <= 0:
            raise ValueError(f'num_classes={num_classes} is too small')
        self.reg_decoded_bbox = reg_decoded_bbox

        # 如果为 None 设置为 num_classes, 否则设置为 0 (RPN: 0)
        self.background_label = (
            num_classes if background_label is None else background_label)
        # background_label 必须是 0 或 num_classes
        assert (self.background_label == 0
                or self.background_label == num_classes)

        self.bbox_coder = build_bbox_coder(bbox_coder)
        self.loss_cls = build_loss(loss_cls)
        self.loss_bbox = build_loss(loss_bbox)
        self.train_cfg = train_cfg
        self.test_cfg = test_cfg
        # 只有训练时才会分配正负样本 (assigner) 和平衡正负样本的数量 (sampler)
        if self.train_cfg:
            self.assigner = build_assigner(self.train_cfg.assigner)
            # use PseudoSampler when sampling is False
            if self.sampling and hasattr(self.train_cfg, 'sampler'):
                sampler_cfg = self.train_cfg.sampler
            else:
                sampler_cfg = dict(type='PseudoSampler')
            self.sampler = build_sampler(sampler_cfg, context=self)
        self.fp16_enabled = False
        
        # anchor 的生成, 无论是训练或测试都需要.
        self.anchor_generator = build_anchor_generator(anchor_generator)

        # num_anchors 每个尺度 base_anchor 的数量.
        # 通常每个尺度 base_anchor 的数量相同, 如 RPN(3, 3, 3, 3, 3), 除了 ssd.
        self.num_anchors = self.anchor_generator.num_base_anchors[0]
        self._init_layers()

###（二）_init_layers（RPNHead 会重写）
此类的构造函数会调用此方法，完成 Head 层的创建。不过在子类 RPNHead 会对它重写。在这里使用的 Head 是两个 1×1 的卷积。

In [None]:
 def _init_layers(self):
        """初始化 Head 的 layer"""
        self.conv_cls = nn.Conv2d(self.in_channels,
                                  self.num_anchors * self.cls_out_channels, 1)
        self.conv_reg = nn.Conv2d(self.in_channels, self.num_anchors * 4, 1)

### （三）init_weights（RPNHead 会重写）
初始化 Head 的层。在 RPNHead 中也会对此方法重写，这里使用的是均值为 0，方差为 0.01 的正态分布初始化卷积。

In [None]:
    def init_weights(self):
        """初始化 head 的权重"""
        # 使用均值为 0, 方差为 0.01 初始化
        normal_init(self.conv_cls, std=0.01)
        normal_init(self.conv_reg, std=0.01)

（四）forward_single（RPNHead 会重写）
forward_single 会对单尺度的 feature map 前向传播，此方法也会在 RPNHead 重写。需要注意的是，在 RPN 中所有的尺度都使用同一个网络结构得到输出结果。

In [None]:
 def forward_single(self, x):
        """对单尺度的 feature map 前向传播.

        Args:
            x (Tensor):    单尺度的 feature map

        Returns:
            tuple:
                cls_score (Tensor): 单尺度的置信度 (通道数为: anchors 的数量 * num_classes)
                bbox_pred (Tensor): 单尺度预测的偏移量 (通道数为: anchors 的数量 * 4)
        """
        # 所有尺度使用相同的 conv_cls 和 conv_reg 进行预测.
        cls_score = self.conv_cls(x)
        bbox_pred = self.conv_reg(x)
        return cls_score, bbox_pred

（五）forward
前向传播，生成多个尺度的预测值。传入一个经过 backbone 和 neck 后得到的多尺度 feature map 的元祖。经过网络输出后，会得到两个 list，第一个 list 是所有尺度 Anchor 的置信度预测，每个元素的形状为（batch，n_anchors，H，W），第二个 list 是所有尺度 Anchor 的 bbox 回归的预测结果，每个元素的形状为（batch，n_anchors × 4，H，W）。

In [None]:
def forward(self, feats):
        """前向传播, 获得网络预测的分类和回归的结果.

        Args:
            feats:  (tuple[Tensor]): 经过 backbone 和 neck 后的 features 的元祖, 每个元素是一个尺度的 feature.

        Returns:
            tuple: 一个元祖, 包括分类分数和 bbox 回归预测的结果
                cls_scores (list[Tensor]):  每个尺度的分类分数的 list, 每个元素代表一个尺度, 数据类型为 tensor.
                                            每个元素的形状为 [batch, anchor 数量 × 类别个数, H, W]
                bbox_preds (list[Tensor]):  每个尺度的 bbox 回归的 list, 每个元素代表一个尺度, 类型为 tensor.
                                            每个元素的形状为 [batch, anchor 数量 × 4, H, W]
        """
        return multi_apply(self.forward_single, feats)

（六）get_anchors
输入各个尺度的 feature map 大小，获取一个批次中多张图片多个尺度的 anchor 和 valid flag。

In [None]:
  def get_anchors(self, featmap_sizes, img_metas, device='cuda'):
        """根据特征图的大小获取 anchor.

        Args:
            featmap_sizes: (list[tuple]):           各个尺度的 feature map 的大小.
            img_metas:     (list[dict]):            一个批次的图像属性信息
            device:        (torch.device | str):    返回的 tensor 的设备

        Returns:
            tuple:
                anchor_list:        (list[list[Tensor]]): 一个批次图片的 anchor, 每个元素是一张图片所有尺度的 anchor
                valid_flag_list:    (list[list[Tensor]]): 一个批次图片有效的 anchor, 每个元素是一张图片所有尺度的 anchor
        """
        # batch size
        num_imgs = len(img_metas)

        # 由于一个批次所有图像的特征图大小相同, 所以只用生成 anchor 一次, 就可以得到整个批次的 anchor.
        multi_level_anchors = self.anchor_generator.grid_anchors(
            featmap_sizes, device)
        # 因为同一个 batch 的图片大小相同所以这里直接循环 batch 次数次 anchor 就行.
        # anchor_list 是每个图片每个尺度生成的 anchor 的列表, 形状为 list(list(Tensor)).
        anchor_list = [multi_level_anchors for _ in range(num_imgs)]

        # 对于每个图像, 计算多尺度 anchor 的有效标志.
        # 形状为 list(list(Tensor)), 其中 Tensor 代表一张图片一个尺度的 Anchor.
        valid_flag_list = []
        for img_id, img_meta in enumerate(img_metas):
            multi_level_flags = self.anchor_generator.valid_flags(
                featmap_sizes, img_meta['pad_shape'], device)
            valid_flag_list.append(multi_level_flags)

        return anchor_list, valid_flag_list

（七）_get_targets_single
此函数会生成 RPN 单张图片训练的目标，其主要流程为如下 5 步：

筛选出有效的 anchor
anchor 分配正负样本（assigner）
anchor 正负样本采样（sampler）
构建 bbox 和 label 的目标和权重
(1) 构建 bbox 的目标和权重：

① 将正样本的 anchor 编码为中心点坐标，宽和高的偏移量。

② 将正样本对应的 indices 设置为编码后的 anchor

③ 将正样本的权重设置为 1

(2) 构建 label 的目标和权重：

① 将正样本的 label 设置为 1，代表前景

② 将正样本的权重设置为 1

5. 填充 anchor 到没有筛选 valid flag 的长度

下面我们来看看源码：

In [None]:
 def _get_targets_single(self,
                            flat_anchors,
                            valid_flags,
                            gt_bboxes,
                            gt_bboxes_ignore,
                            gt_labels,
                            img_meta,
                            label_channels=1,
                            unmap_outputs=True):
        """计算一张图片 anchor 的回归和分类的目标

        Args:
            flat_anchors:       (Tensor):   合并后的多尺度的 anchor. 形状为: (num_anchors ,4).
            valid_flags:        (Tensor):   合并后的多尺度的 anchor 的 flag, 形状为 (num_anchors,).
            gt_bboxes:          (Tensor):   图像的 ground truth bbox, 形状为 (num_gts, 4).
            gt_bboxes_ignore:   (Tensor):   需要忽略的 Ground truth bboxes 形状为: (num_ignored_gts, 4).
            img_meta:           (dict):     此图像的属性信息
            gt_labels:          (Tensor):   每个 box 的 Ground truth labels, 形状为 (num_gts,).
            label_channels:     (int):      label 所在的通道.
            unmap_outputs:      (bool):     是否将输出映射回原始 anchor 配置.

        Returns:
            tuple:
                labels:          (Tensor)     训练的标签, 形状为 (anchor 总数,)
                label_weights:   (Tensor)     训练标签的权重, 形状为 (anchor 总数,)
                bbox_targets:    (Tensor)     bbox 训练的目标值, 形状为 (anchor 总数, 4)
                bbox_weights:    (Tensor)     bbox 训练目标值的权重, 形状为 (anchor 总数, 4)
                pos_inds:        (Tensor)     正样本的索引, 形状为 (正样本总数,)
                neg_inds:        (Tensor)     负样本的索引, 形状为 (负样本总数,)
        """
        # ===================== 1. 筛选出有效的 anchor ===========================
        # 获得有效的 flag, 这里的 inside_flags 就等于 valid_flags, 形状为 (num_anchors,)
        inside_flags = anchor_inside_flags(flat_anchors, valid_flags,
                                           img_meta['img_shape'][:2],
                                           self.train_cfg.allowed_border)
        # 如果 anchor 没有一个有效, 直接返回
        if not inside_flags.any():
            return (None, ) * 7

        # 筛选有效的 anchor, 此时 Anchor 数量会减少为有效的 anchor 数量.
        anchors = flat_anchors[inside_flags, :]

        # ========================== 2. anchor 分配正负样本 ==============================
        assign_result = self.assigner.assign(
            anchors, gt_bboxes, gt_bboxes_ignore,
            None if self.sampling else gt_labels)

        # ========================== 3. anchor 正负样本采样 ==============================
        sampling_result = self.sampler.sample(assign_result, anchors,
                                              gt_bboxes)

        # ======================== 4. 构建 label 和 bbox 的目标和权重 =====================
        # 有效的 anchor 数量
        num_valid_anchors = anchors.shape[0]
        # bbox 目标, 初始化将目标设置为 0
        bbox_targets = torch.zeros_like(anchors)
        # bbox 权重, 即是否需要算入损失, 是否需要网络学习. 初始化将权重设置为 0
        bbox_weights = torch.zeros_like(anchors)
        # label 的目标, 初始化先将所有有效的 anchor 的标签标记为背景 (0)
        labels = anchors.new_full((num_valid_anchors, ),
                                  self.background_label,
                                  dtype=torch.long)
        # label 的权重, 初始化将将权重权设置为 0
        label_weights = anchors.new_zeros(num_valid_anchors, dtype=torch.float)
        # 获得正负样本的索引
        pos_inds = sampling_result.pos_inds
        neg_inds = sampling_result.neg_inds
        if len(pos_inds) > 0:
            # ================ （1）构建 bbox 的目标和权重 ====================
            # 获得所有正样本 box 的 anchor, 形状 [正样本数量, 4]
            if not self.reg_decoded_bbox:
                # 将 anchor 编码为中心点坐标，宽和高的偏移量
                pos_bbox_targets = self.bbox_coder.encode(
                    sampling_result.pos_bboxes, sampling_result.pos_gt_bboxes)
            else:
                pos_bbox_targets = sampling_result.pos_gt_bboxes
            # 将正样本对应的 indices 设置为编码后的 anchor, 将权重设置为 1
            bbox_targets[pos_inds, :] = pos_bbox_targets
            bbox_weights[pos_inds, :] = 1.0
            # ================ （2）构建 label 的目标和权重 ===================
            if gt_labels is None:
                # 只有 rpn 的 gt_labels 才设置为 None
                labels[pos_inds] = 1
            else:
                # 否则设置为对应的类别编号
                labels[pos_inds] = gt_labels[
                    sampling_result.pos_assigned_gt_inds]
            # 将正样本的权重设置为 1
            if self.train_cfg.pos_weight <= 0:
                label_weights[pos_inds] = 1.0
            else:
                label_weights[pos_inds] = self.train_cfg.pos_weight
        if len(neg_inds) > 0:
            label_weights[neg_inds] = 1.0

        # ===================== 5. 填充 anchor 到没有筛选 valid flag 的长度. ==================
        if unmap_outputs:
            num_total_anchors = flat_anchors.size(0)
            # 填充 labels
            labels = unmap(
                labels,
                num_total_anchors,
                inside_flags,
                fill=self.background_label)  # fill bg label
            # 填充 label_weights
            label_weights = unmap(label_weights, num_total_anchors,
                                  inside_flags)
            # 填充 bbox_targets
            bbox_targets = unmap(bbox_targets, num_total_anchors, inside_flags)
            # 填充 bbox_weights
            bbox_weights = unmap(bbox_weights, num_total_anchors, inside_flags)

        return (labels, label_weights, bbox_targets, bbox_weights, pos_inds,
                neg_inds, sampling_result)

（八）get_targets
此函数会接受一个批次所有 anchor 的列表和一个批次的 ground truth bbox。返回各个尺度的训练目标。

In [None]:
def get_targets(self,
                    anchor_list,
                    valid_flag_list,
                    gt_bboxes_list,
                    img_metas,
                    gt_bboxes_ignore_list=None,
                    gt_labels_list=None,
                    label_channels=1,
                    unmap_outputs=True,
                    return_sampling_results=False):
        """获得一个批次的训练和回归目标.

        Args:
            anchor_list:            (list[list[Tensor]])    所有批次所有尺度的 anchor 的列表,
                                                            每个 tensor 代表一张图片的一个尺度的 anchor.
                                                            形状为 (num_anchors, 4).
            valid_flag_list:        (list[list[Tensor]]):   所有批次所有尺度 anchor 的 valid flag,
                                                            每个 tensor 代表一张图片的一个尺度的 anchor 的 valid flag.
                                                            形状为 (num_anchors,)
            gt_bboxes_list:         (list[Tensor]):         一个 batch 的 gt bbox, 每个 tensor 的形状为 (num_gts, 4)
            img_metas:              (list[dict]):           一个 batch 的图片的属性信息.
            gt_bboxes_ignore_list:  (list[Tensor]):         需要忽略的 gt bboxes
            gt_labels_list:         (list[Tensor] | None):  一个 batch 的 gt labels.
            label_channels:         (int):                  标签的通道
            unmap_outputs:          (bool):                 是否填充 anchor 到没有筛选 valid flag 的长度

        Returns:
            tuple:
                labels_list:        (list[Tensor]):     每个尺度的 label, 每个元素的形状为 (batch, n_anchors)
                label_weights_list: (list[Tensor]):     每个尺度 label 的权重, 每个元素的形状为 (batch, n_anchors)
                bbox_targets_list:  (list[Tensor]):     每个尺度的 bbox, 每个元素的形状为 (batch, n_anchors, 4)
                bbox_weights_list:  (list[Tensor]):     每个尺度 bbox 的权重, 每个元素的形状为 (batch, n_anchors, 4)
                num_total_pos:      (int):              一个批次所有图片的正样本总数
                num_total_neg:      (int):              一个批次所有图片的负样本总数
            additional_returns: This function enables user-defined returns from
                `self._get_targets_single`. These returns are currently refined
                to properties at each feature map (i.e. having HxW dimension).
                The results will be concatenated after the end
        """
        # 计算 batch 的数量
        num_imgs = len(img_metas)
        assert len(anchor_list) == len(valid_flag_list) == num_imgs

        # 计算每个尺度 anchor 的数量 [187200, 46800, 11700, 2925, 780]
        num_level_anchors = [anchors.size(0) for anchors in anchor_list[0]]

        concat_anchor_list = []
        concat_valid_flag_list = []
        # 遍历每个图片, 合并每个图片中所有尺度的 anchor
        for i in range(num_imgs):
            assert len(anchor_list[i]) == len(valid_flag_list[i])
            # 合并所有尺度的 anchor
            concat_anchor_list.append(torch.cat(anchor_list[i]))
            # 合并所有尺度的 flag
            concat_valid_flag_list.append(torch.cat(valid_flag_list[i]))

        # compute targets for each image
        if gt_bboxes_ignore_list is None:
            # <class 'list'>: [None, None, None, None]
            gt_bboxes_ignore_list = [None for _ in range(num_imgs)]
        if gt_labels_list is None:
            # <class 'list'>: [None, None, None, None]
            gt_labels_list = [None for _ in range(num_imgs)]
        results = multi_apply(
            self._get_targets_single,
            concat_anchor_list,
            concat_valid_flag_list,
            gt_bboxes_list,
            gt_bboxes_ignore_list,
            gt_labels_list,
            img_metas,
            label_channels=label_channels,
            unmap_outputs=unmap_outputs)
        (all_labels, all_label_weights, all_bbox_targets, all_bbox_weights,
         pos_inds_list, neg_inds_list, sampling_results_list) = results[:7]
        rest_results = list(results[7:])  # user-added return values
        # no valid anchors
        if any([labels is None for labels in all_labels]):
            return None
        # 统计所有 image 的正负样本
        num_total_pos = sum([max(inds.numel(), 1) for inds in pos_inds_list])
        num_total_neg = sum([max(inds.numel(), 1) for inds in neg_inds_list])
        # split targets to a list w.r.t. multiple levels
        labels_list = images_to_levels(all_labels, num_level_anchors)
        label_weights_list = images_to_levels(all_label_weights,
                                              num_level_anchors)
        bbox_targets_list = images_to_levels(all_bbox_targets,
                                             num_level_anchors)
        bbox_weights_list = images_to_levels(all_bbox_weights,
                                             num_level_anchors)
        res = (labels_list, label_weights_list, bbox_targets_list,
               bbox_weights_list, num_total_pos, num_total_neg)
        if return_sampling_results:
            res = res + (sampling_results_list, )
        for i, r in enumerate(rest_results):  # user-added return values
            rest_results[i] = images_to_levels(r, num_level_anchors)

        return res + tuple(rest_results)

（九）loss_single
计算每个尺度分类和回归的损失。注意，对于分类损失正负样本都需要计算。对于回归损失，只计算正样本不计算负样本。因为总采样数为 num_totoal_samples（256 × batch）所以对于每一个尺度都要除以 num_totoal_samples。举个例子，假如有两个尺度，这两个尺度的采样数分别为 2 和 3。那么对于第一个尺度应该占总 loss 的 2/5，对于第二个尺度应该占总 loss 的 3/5。这样各个尺度的 loss 之和就是总 loss。下面我们看一下源码：

In [None]:
   def loss_single(self, cls_score, bbox_pred, anchors, labels, label_weights,
                    bbox_targets, bbox_weights, num_total_samples):
        """计算单个尺度的损失.

        Args:
            cls_score:      (Tensor): 单尺度的 box score, 形状 (batch, n_anchors * n_classes, H, W).
            bbox_pred:      (Tensor): 单尺度 bbox 的修正量, 形状 (batch, n_anchors * 4, H, W)
            anchors:        (Tensor): 单个尺度的 anchor, 形状为 (batch, n_anchors, 4).
            labels:         (Tensor): 每个 anchor 的标签, 形状为 (batch, n_anchors)
            label_weights:  (Tensor): label 的权重, 形状为 (batch, n_anchors)
            bbox_targets:   (Tensor): bbox 的修正量, 形状为 (batch, n_anchors, 4).
            bbox_weights:   (Tensor): bbox 修正量的权重, 形状为 (batch, n_anchors, 4).
            num_total_samples  (int): 如果采样, 则 num_total_samples 等于锚点总数, 否则为正样本数量。

        Returns:
            loss_cls:   (Tensor)    分类的损失值
            loss_bbox:  (Tensor)    回归的损失值
        """
        # 分类损失
        # torch.Size([batch, n_anchors]) --> torch.Size([batch × n_anchors])
        labels = labels.reshape(-1)
        label_weights = label_weights.reshape(-1)

        # torch.Size([batch, n_anchors × 类别数, H, W]) -->  torch.Size([batch × H × W × n_anchor, 类别数])
        cls_score = cls_score.permute(0, 2, 3,
                                      1).reshape(-1, self.cls_out_channels)
        # 对正负样本计算分类损失, 因为最后要相加, 所以这里 avg_factor=正负样本总数作为分子.
        loss_cls = self.loss_cls(
            cls_score, labels, label_weights, avg_factor=num_total_samples)
        # 回归损失
        bbox_targets = bbox_targets.reshape(-1, 4)
        bbox_weights = bbox_weights.reshape(-1, 4)
        bbox_pred = bbox_pred.permute(0, 2, 3, 1).reshape(-1, 4)
        if self.reg_decoded_bbox:
            anchors = anchors.reshape(-1, 4)
            bbox_pred = self.bbox_coder.decode(anchors, bbox_pred)
        # 只计算正样本的回归损失
        loss_bbox = self.loss_bbox(
            bbox_pred,
            bbox_targets,
            bbox_weights,
            avg_factor=num_total_samples)
        return loss_cls, loss_bbox

（十）loss
此方法为 RPN 损失的计算方法。输入经过 RPN 后的多尺度的分类和回归预测结果和 gt bbox。此函数会调用 loss_single 分别计算每个尺度的损失，输出每个尺度分类损失的列表与回归损失的列表的字典。

In [None]:
 @force_fp32(apply_to=('cls_scores', 'bbox_preds'))
    def loss(self,
             cls_scores,
             bbox_preds,
             gt_bboxes,
             gt_labels,
             img_metas,
             gt_bboxes_ignore=None):
        """计算 Head 的损失

        Args:
            cls_scores:  (list[Tensor])  多个尺度的预测的置信度 list,
                                         其中每个尺度的 tensor 的形状为 [batch, n_anchors × 类别数, H, W]
            bbox_preds:  (list[Tensor])  多个尺度位置的修正量的 list,
                                         其中每个尺度的 tensor 的形状为 [batch, n_anchors × 4, H, W]
            gt_bboxes:   (list[Tensor])  一个 batch 每张图片的 ground truth. list 的长度为 batch 长度.
                                         每个 tensor 的形状为 (num_gts, 4) 其中维度 1 代表 [tl_x, tl_y, br_x, br_y]
            gt_labels:   (list[Tensor])  每个 gt box 的类别索引.
            img_metas:  (list[dict]):    一个批次的图像的属性信息
            gt_bboxes_ignore (None | list[Tensor]): specify which bounding
                boxes can be ignored when computing the loss. Default: None

        Returns:
            dict[str, Tensor]: 损失的字典
        """
        # 获取各个尺度 feature map 大小
        featmap_sizes = [featmap.size()[-2:] for featmap in cls_scores]
        assert len(featmap_sizes) == self.anchor_generator.num_levels

        device = cls_scores[0].device
        # 获取一个批次的 anchor 和 valid flag
        anchor_list, valid_flag_list = self.get_anchors(
            featmap_sizes, img_metas, device=device)

        label_channels = self.cls_out_channels if self.use_sigmoid_cls else 1
        # 得到训练的 target
        cls_reg_targets = self.get_targets(
            anchor_list,
            valid_flag_list,
            gt_bboxes,
            img_metas,
            gt_bboxes_ignore_list=gt_bboxes_ignore,
            gt_labels_list=gt_labels,
            label_channels=label_channels)
        if cls_reg_targets is None:
            return None
        # labels_list:  多个尺度的 list, 每个尺度的 tensor 形状 [batch, n_anchors]
        # label_weights_list: 多个尺度的 list, 每个尺度的 tensor 形状 [batch, n_anchors]
        # bbox_targets_list:  多个尺度的 list, 每个尺度的 tensor 形状 [batch, n_anchors, 4]
        # bbox_weights_list:  多个尺度的 list, 每个尺度的 tensor 形状 [batch, n_anchors, 4]
        # num_total_pos: 正样本总数
        # num_total_neg: 负样本总数
        (labels_list, label_weights_list, bbox_targets_list, bbox_weights_list,
         num_total_pos, num_total_neg) = cls_reg_targets
        # 计算样本总数, 如果不采样, 总数为正样本个数. 否则为正负样本总个数
        num_total_samples = (
            num_total_pos + num_total_neg if self.sampling else num_total_pos)

        # 获得每个尺度的 anchor 数量
        num_level_anchors = [anchors.size(0) for anchors in anchor_list[0]]
        # 把 anchor 变成含多个尺度的 list
        concat_anchor_list = []
        for i in range(len(anchor_list)):
            concat_anchor_list.append(torch.cat(anchor_list[i]))
        all_anchor_list = images_to_levels(concat_anchor_list,
                                           num_level_anchors)

        losses_cls, losses_bbox = multi_apply(
            self.loss_single,
            cls_scores,
            bbox_preds,
            all_anchor_list,
            labels_list,
            label_weights_list,
            bbox_targets_list,
            bbox_weights_list,
            num_total_samples=num_total_samples)
        return dict(loss_cls=losses_cls, loss_bbox=losses_bbox

（十一）get_bboxes
输入经过 RPN 预测的各个尺度的置信度和 bbox 修正量，此函数会调用 _get_bboxes_single 输出一个批次的 proposal 的列表。每个元素是一张图片的 proposal，每个 proposal 的形状为 （nms_post, 5）代表解码后的 bbox 坐标和置信度预测。

In [None]:
@force_fp32(apply_to=('cls_scores', 'bbox_preds'))
    def get_bboxes(self,
                   cls_scores,
                   bbox_preds,
                   img_metas,
                   cfg=None,
                   rescale=False):
        """将网络的输出转化为一个批次的预测

        Args:
            cls_scores:     (list[Tensor]):         每个尺度的 bbox 分数预测, 形状为 (batch, n_anchors * n_classes, H, W)
            bbox_preds:     (list[Tensor]):         每个尺度的 bbox 修正量, 形状为 (batch, n_anchors * 4, H, W)
            img_metas:      (list[dict]):           一个批次的图像属性信息
            cfg:            (mmcv.Config | None):   Test / postprocessing 配置文件, 如果为 None, 将会使用 test_cfg
            rescale:        (bool):                 如果为 True, 则返回原始图像空间中的框, 默认值：False.

        Returns:
            list(Tensor):   一个批次每个图片的 proposal 的列表, 每个 Tensor 的形状为 (nms_post, 5), 
                            其中前四列代表解码后的 bbox 坐标, 最后一列代表置信度.

        Example:
            >>> import mmcv
            >>> self = AnchorHead(
            >>>     num_classes=9,
            >>>     in_channels=1,
            >>>     anchor_generator=dict(
            >>>         type='AnchorGenerator',
            >>>         scales=[8],
            >>>         ratios=[0.5, 1.0, 2.0],
            >>>         strides=[4,]))
            >>> img_metas = [{'img_shape': (32, 32, 3), 'scale_factor': 1}]
            >>> cfg = mmcv.Config(dict(
            >>>     score_thr=0.00,
            >>>     nms=dict(type='nms', iou_thr=1.0),
            >>>     max_per_img=10))
            >>> feat = torch.rand(1, 1, 3, 3)
            >>> cls_score, bbox_pred = self.forward_single(feat)
            >>> # note the input lists are over different levels, not images
            >>> cls_scores, bbox_preds = [cls_score], [bbox_pred]
            >>> result_list = self.get_bboxes(cls_scores, bbox_preds,
            >>>                               img_metas, cfg)
            >>> det_bboxes, det_labels = result_list[0]
            >>> assert len(result_list) == 1
            >>> assert det_bboxes.shape[1] == 5
            >>> assert len(det_bboxes) == len(det_labels) == cfg.max_per_img
        """
        assert len(cls_scores) == len(bbox_preds)
        num_levels = len(cls_scores)

        device = cls_scores[0].device
        # 获取每个尺度的 feature map 大小
        featmap_sizes = [cls_scores[i].shape[-2:] for i in range(num_levels)]
        # 生成 anchor
        mlvl_anchors = self.anchor_generator.grid_anchors(
            featmap_sizes, device=device)

        result_list = []
        # 遍历 batch 里的每个图片
        for img_id in range(len(img_metas)):
            # 获取一个图片每个尺度的 anchor 的分数的列表.
            # list(anchor × 类别数, H, W), list 代表每个尺度的列表
            cls_score_list = [
                cls_scores[i][img_id].detach() for i in range(num_levels)
            ]
            # 获取一张图片每个尺度的 anchor 的 bbox 修正量的列表.
            # list(anchor × 4, H, W), list 代表每个尺度的列表
            bbox_pred_list = [
                bbox_preds[i][img_id].detach() for i in range(num_levels)
            ]
            img_shape = img_metas[img_id]['img_shape']
            scale_factor = img_metas[img_id]['scale_factor']
            proposals = self._get_bboxes_single(cls_score_list, bbox_pred_list,
                                                mlvl_anchors, img_shape,
                                                scale_factor, cfg, rescale)
            result_list.append(proposals)
        return result_lis

（十二）_get_bboxes_single（RPNHead 会重写）
这部分的代码会在 RPNHead 中重写，和 RPN 不是很相关，所以不会详细的讲解。有兴趣的可以看看下面的代码：

def _get_bboxes_single(self,
                           cls_score_list,
                           bbox_pred_list,
                           mlvl_anchors,
                           img_shape,
                           scale_factor,
                           cfg,
                           rescale=False):
        """Transform outputs for a single batch item into bbox predictions.

        Args:
            cls_score_list (list[Tensor]): Box scores for a single scale level
                Has shape (num_anchors * num_classes, H, W).
            bbox_pred_list (list[Tensor]): Box energies / deltas for a single
                scale level with shape (num_anchors * 4, H, W).
            mlvl_anchors (list[Tensor]): Box reference for a single scale level
                with shape (num_total_anchors, 4).
            img_shape (tuple[int]): Shape of the input image,
                (height, width, 3).
            scale_factor (ndarray): Scale factor of the image arange as
                (w_scale, h_scale, w_scale, h_scale).
            cfg (mmcv.Config): Test / postprocessing configuration,
                if None, test_cfg would be used.
            rescale (bool): If True, return boxes in original image space.

        Returns:
            Tensor: Labeled boxes in shape (n, 5), where the first 4 columns
                are bounding box positions (tl_x, tl_y, br_x, br_y) and the
                5-th column is a score between 0 and 1.
        """
        cfg = self.test_cfg if cfg is None else cfg
        assert len(cls_score_list) == len(bbox_pred_list) == len(mlvl_anchors)
        mlvl_bboxes = []
        mlvl_scores = []
        for cls_score, bbox_pred, anchors in zip(cls_score_list,
                                                 bbox_pred_list, mlvl_anchors):
            assert cls_score.size()[-2:] == bbox_pred.size()[-2:]
            cls_score = cls_score.permute(1, 2,
                                          0).reshape(-1, self.cls_out_channels)
            if self.use_sigmoid_cls:
                scores = cls_score.sigmoid()
            else:
                scores = cls_score.softmax(-1)
            bbox_pred = bbox_pred.permute(1, 2, 0).reshape(-1, 4)
            nms_pre = cfg.get('nms_pre', -1)
            if nms_pre > 0 and scores.shape[0] > nms_pre:
                # Get maximum scores for foreground classes.
                if self.use_sigmoid_cls:
                    max_scores, _ = scores.max(dim=1)
                else:
                    # remind that we set FG labels to [0, num_class-1]
                    # since mmdet v2.0
                    # BG cat_id: num_class
                    max_scores, _ = scores[:, :-1].max(dim=1)
                _, topk_inds = max_scores.topk(nms_pre)
                anchors = anchors[topk_inds, :]
                bbox_pred = bbox_pred[topk_inds, :]
                scores = scores[topk_inds, :]
            bboxes = self.bbox_coder.decode(
                anchors, bbox_pred, max_shape=img_shape)
            mlvl_bboxes.append(bboxes)
            mlvl_scores.append(scores)
        mlvl_bboxes = torch.cat(mlvl_bboxes)
        if rescale:
            mlvl_bboxes /= mlvl_bboxes.new_tensor(scale_factor)
        mlvl_scores = torch.cat(mlvl_scores)
        if self.use_sigmoid_cls:
            # Add a dummy background class to the backend when using sigmoid
            # remind that we set FG labels to [0, num_class-1] since mmdet v2.0
            # BG cat_id: num_class
            padding = mlvl_scores.new_zeros(mlvl_scores.shape[0], 1)
            mlvl_scores = torch.cat([mlvl_scores, padding], dim=1)
        det_bboxes, det_labels = multiclass_nms(mlvl_bboxes, mlvl_scores,
                                                cfg.score_thr, cfg.nms,
                                                cfg.max_per_img)
        return det_bboxes, det_labels

四、RPNTestMixin
此类提供了 RPN 在测试时需要调用的方法，RPN 一般在测试时会调用 simple_test_rpn。对于经过 backbone 和 neck 生成的特征图。直接调用 forward 方法生成所有 bbox 的置信度和位置的修正量。然后不需要计算损失，直接调用 get_bboxes 为 roi_head 生成候选区域。具体源码如下：

In [None]:
import sys

from mmdet.core import merge_aug_proposals

if sys.version_info >= (3, 7):
    from mmdet.utils.contextmanagers import completed


class RPNTestMixin(object):
    """Test methods of RPN."""

    if sys.version_info >= (3, 7):

        async def async_simple_test_rpn(self, x, img_metas):
            sleep_interval = self.rpn_head.test_cfg.pop(
                'async_sleep_interval', 0.025)
            async with completed(
                    __name__, 'rpn_head_forward',
                    sleep_interval=sleep_interval):
                rpn_outs = self(x)

            proposal_list = self.get_bboxes(*rpn_outs, img_metas)
            return proposal_list

    def simple_test_rpn(self, x, img_metas):
        """Test without augmentation.

        Args:
            x:          (tuple[Tensor]):    经过 backbone 和 neck 后的 features 的元祖, 每个元素是一个尺度的 feature.
            img_metas:  (list[dict]):       每个图像的属性信息

        Returns:
            list[Tensor]:   每个图片生成的 Proposals
        """
        rpn_outs = self(x)
        proposal_list = self.get_bboxes(*rpn_outs, img_metas)
        return proposal_list

    def aug_test_rpn(self, feats, img_metas):
        samples_per_gpu = len(img_metas[0])
        aug_proposals = [[] for _ in range(samples_per_gpu)]
        for x, img_meta in zip(feats, img_metas):
            proposal_list = self.simple_test_rpn(x, img_meta)
            for i, proposals in enumerate(proposal_list):
                aug_proposals[i].append(proposals)
        # reorganize the order of 'img_metas' to match the dimensions
        # of 'aug_proposals'
        aug_img_metas = []
        for i in range(samples_per_gpu):
            aug_img_meta = []
            for j in range(len(img_metas)):
                aug_img_meta.append(img_metas[j][i])
            aug_img_metas.append(aug_img_meta)
        # after merging, proposals will be rescaled to the original image size
        merged_proposals = [
            merge_aug_proposals(proposals, aug_img_meta, self.test_cfg)
            for proposals, aug_img_meta in zip(aug_proposals, aug_img_metas)
        ]
        return merged_proposals

五、RPNHead（继承 AnchorHead 和 RPNTestMixin）
RPNHead 继承了 AnchorHead 和 RPNTestMixin。其中 AnchorHead 负责提供 Anchor 的一些通用的接口和公共方法，并完成了训练部分的代码。RPNTestMixin 负责 RPN 的预测部分。

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from mmcv.cnn import normal_init
from mmcv.ops import batched_nms

from ..builder import HEADS
from .anchor_head import AnchorHead
from .rpn_test_mixin import RPNTestMixin


@HEADS.register_module()
class RPNHead(RPNTestMixin, AnchorHead):
    """RPN head.

    Args:
        in_channels (int): feature map 的输入通道数
    """  # noqa: W605

    def __init__(self, in_channels, **kwargs):
        # RPN 的背景类为 0, 类别数为 1
        super(RPNHead, self).__init__(
            1, in_channels, background_label=0, **kwargs)

    def _init_layers(self):
        """初始化 head 的层"""
        # 先用 3 x 3, 通道数为 256 的卷积.
        self.rpn_conv = nn.Conv2d(
            self.in_channels, self.feat_channels, 3, padding=1)
        # 然后接上两个 1 x 1 的卷积核:
        # cls 分支: 通道数, anchor 的数量 × 类别个数, 因为使用 sigmoid 所以类别个数设置为 1.
        self.rpn_cls = nn.Conv2d(self.feat_channels,
                                 self.num_anchors * self.cls_out_channels, 1)
        # reg 分支: 通道数, anchor 的数量 × 4
        self.rpn_reg = nn.Conv2d(self.feat_channels, self.num_anchors * 4, 1)

    def init_weights(self):
        """初始化 head 的权重"""
        # 将所有卷积初始化为均值为 0 方差为 0.01 的正态分布
        normal_init(self.rpn_conv, std=0.01)
        normal_init(self.rpn_cls, std=0.01)
        normal_init(self.rpn_reg, std=0.01)

    def forward_single(self, x):
        """单尺度前向传播"""
        # 所有尺度都使用相同的 conv 预测.
        x = self.rpn_conv(x)
        x = F.relu(x, inplace=True)
        # 注意输出的时候不要使用 relu
        rpn_cls_score = self.rpn_cls(x)
        rpn_bbox_pred = self.rpn_reg(x)
        return rpn_cls_score, rpn_bbox_pred

    def loss(self,
             cls_scores,
             bbox_preds,
             gt_bboxes,
             img_metas,
             gt_bboxes_ignore=None):
        """计算 head 的损失

        Args:
            cls_scores:    (list[Tensor])   每个尺度预测的 bbox 的 score,
                                            每个 tensor 的形状为 (N, anchor 数量 × 类别数, H, W)
            bbox_preds:    (list[Tensor])   每个尺度预测的 bbox 的位置偏移.
                                            每个 tensor 的形状为 (N, anchor 数量 × 4, H, W)
            gt_bboxes:     (list[Tensor])   每个图片的 Ground truth bboxes,
                                            每个 tensor 的形状为 (num_gts, 4), 其中 4 为 [tl_x, tl_y, br_x, br_y]
            img_metas:     (list[dict])     每个图片的信息. 例如图片大小等
            gt_bboxes_ignore: (None | list[Tensor]): 指定哪个 bbox 在计算损失的时候会被忽略.

        Returns:
            dict[str, Tensor]: 多个损失的组成的字典.
        """
        losses = super(RPNHead, self).loss(
            cls_scores,
            bbox_preds,
            gt_bboxes,
            None,
            img_metas,
            gt_bboxes_ignore=gt_bboxes_ignore)
        return dict(
            loss_rpn_cls=losses['loss_cls'], loss_rpn_bbox=losses['loss_bbox'])

    def _get_bboxes_single(self,
                           cls_scores,
                           bbox_preds,
                           mlvl_anchors,
                           img_shape,
                           scale_factor,
                           cfg,
                           rescale=False):
        """将一张图片的输出转化为 bbox 的结果.

        Args:
            cls_scores:     (list[Tensor]):    网络输出的 confidence, list 的长度为 level 的长度(5),
                                               每个 tensor 的形状是 [K, H, W]
            bbox_preds:     (list[Tensor]):    网络输出的坐标值, list 代表每个尺度(如： 长度 5),
                                               每个 tensor 的形状是 [4K, H, W]
            mlvl_anchors:   (list[Tensor]):    每个 scale 的生成的 anchor,
                                               每个 tensor 的形状为: [H × W × K, 4]
            img_shape:      (tuple[int]):      图像的大小
            scale_factor:   (ndarray):         Scale factor of the image arange as
                (w_scale, h_scale, w_scale, h_scale).
            cfg:            (mmcv.Config):     Test / postprocessing configuration,
                if None, test_cfg would be used.
            rescale (bool): If True, return boxes in original image space.

        Returns:
            Tensor: Labeled boxes in shape (n, 5), where the first 4 columns
                are bounding box positions (tl_x, tl_y, br_x, br_y) and the
                5-th column is a score between 0 and 1.
        """
        # 1. 根据类别的置信度筛选出每个尺度 topK（K = 2000）个 bbox
        # 2. 合并筛选后的多个尺度的 bbox
        # 3. 将网络预测值解码
        # 4. 用 nms（阈值=0.7）合并 bbox
        # 5. 筛选出前 nms_post（1000）个 bbox 作为 proposal

        cfg = self.test_cfg if cfg is None else cfg
        # bboxes from different level should be independent during NMS,
        # level_ids are used as labels for batched NMS to separate them
        level_ids = []
        mlvl_scores = []
        mlvl_bbox_preds = []
        mlvl_valid_anchors = []
        # 遍历每个尺度
        for idx in range(len(cls_scores)):
            # 取到一个尺度的网络输出的类别和位置预测
            rpn_cls_score = cls_scores[idx]
            rpn_bbox_pred = bbox_preds[idx]
            # 保证后两个维度相同
            assert rpn_cls_score.size()[-2:] == rpn_bbox_pred.size()[-2:]
            # [A, H, W] --> [H, W, A]
            rpn_cls_score = rpn_cls_score.permute(1, 2, 0)
            # 将类别的数值压缩成概率.
            if self.use_sigmoid_cls:
                # [H × W × A]
                rpn_cls_score = rpn_cls_score.reshape(-1)
                scores = rpn_cls_score.sigmoid()
            else:
                # 转成 (-1, 2), 这个 2 代表是背景或不是背景。
                # 前景 label 设置为:  [0, 类别数 - 1],
                # 背景 label 设置为:  类别数
                rpn_cls_score = rpn_cls_score.reshape(-1, 2)
                # we set FG labels to [0, num_class-1] and BG label to
                # num_class in other heads since mmdet v2.0, However we
                # keep BG label as 0 and FG label as 1 in rpn head
                # 对类别维度进行 softmax, 取背景的概率
                # 形状：[2000]
                scores = rpn_cls_score.softmax(dim=1)[:, 1]
            # [A × 4, H, W] --> [H × W × A, 4]
            rpn_bbox_pred = rpn_bbox_pred.permute(1, 2, 0).reshape(-1, 4)
            # 取对应层的 anchor: [单尺度 anchor 总数, 4]
            anchors = mlvl_anchors[idx]

            # 根据类别的置信度筛选出 topK 个 box
            if cfg.nms_pre > 0 and scores.shape[0] > cfg.nms_pre:
                # sort is faster than topk
                # _, topk_inds = scores.topk(cfg.nms_pre)
                # 对 score 从高到低排序
                # [182400]
                ranked_scores, rank_inds = scores.sort(descending=True)
                # 取 topK（cfg.nms_pre） 个 index 和 score: anchor --> [2000]
                topk_inds = rank_inds[:cfg.nms_pre]
                scores = ranked_scores[:cfg.nms_pre]
                # 根据类别的预测值，取 topK 个 bbox
                rpn_bbox_pred = rpn_bbox_pred[topk_inds, :]
                # anchor 也取 topK 个
                anchors = anchors[topk_inds, :]
            mlvl_scores.append(scores)
            mlvl_bbox_preds.append(rpn_bbox_pred)
            mlvl_valid_anchors.append(anchors)
            # new_full(形状，填充值，数据类型)
            level_ids.append(
                scores.new_full((scores.size(0), ), idx, dtype=torch.long))
        # cat 的 dim 默认为 0 维
        scores = torch.cat(mlvl_scores)
        anchors = torch.cat(mlvl_valid_anchors)
        rpn_bbox_pred = torch.cat(mlvl_bbox_preds)
        proposals = self.bbox_coder.decode(
            anchors, rpn_bbox_pred, max_shape=img_shape)
        ids = torch.cat(level_ids)

        # 如多对 anchor 的大小有限定
        if cfg.min_bbox_size > 0:
            # 计算 W, H
            w = proposals[:, 2] - proposals[:, 0]
            h = proposals[:, 3] - proposals[:, 1]
            # 取长宽都 > min_bbox_size 的索引
            valid_inds = torch.nonzero(
                (w >= cfg.min_bbox_size)
                & (h >= cfg.min_bbox_size),
                as_tuple=False).squeeze()
            # 筛选目标
            if valid_inds.sum().item() != len(proposals):
                proposals = proposals[valid_inds, :]
                scores = scores[valid_inds]
                ids = ids[valid_inds]

        # TODO: remove the hard coded nms type
        nms_cfg = dict(type='nms', iou_threshold=cfg.nms_thr)
        dets, keep = batched_nms(proposals, scores, ids, nms_cfg)
        # nms 后对置信度有排序。直接取前 nms_post 个
        return dets[:cfg.nms_post]

# [源码解读：Faster RCNN的细节（一）](https://zhuanlan.zhihu.com/p/65471961)

![](https://pica.zhimg.com/v2-2a51ac64bbb4b620ab1aa90f1793b898_1440w.jpg?source=172ae18b)