In [None]:
# 加载必备库文件
import numpy as np

import mxnet as mx
from mxnet import nd
from mxnet import init
from mxnet import gluon
from mxnet import autograd

from mxnet.gluon import nn

In [None]:
res18_blk_num  = [2, 2, 2,  2] # 不是 bottleneck
res50_blk_num  = [3, 4, 6,  3]
res101_blk_num = [3, 4, 23, 3]
res152_blk_num = [3, 8, 36, 3]

# 为了便于参数导入，变量名最好和 gluoncv 的变量名相同
class BottleneckV1b(gluon.HybridBlock):
    """res 基本模块"""
    def __init__(self, channels, strides, isdownsample=False, **kwargs):
        super(BottleneckV1b, self).__init__(**kwargs)
        # 一个 bottleneck 内， 1*1 卷积 channel 扩大的倍数
        self.expansion = 4
        self.isdownsample = isdownsample
        self.strides = strides

        
        # bottletneck 内总是 1*1 conv -- 3*3 conv -- 1*1 conv
        self.conv1 = nn.Conv2D(channels=channels, kernel_size=(1,1), strides=(1,1),
                               padding=(0,0), groups=1, use_bias=False)
        self.bn1   = nn.BatchNorm() # use_global_stats=True 默认为 False
        self.relu  = nn.Activation('relu')
        
        self.conv2 = nn.Conv2D(channels=channels, kernel_size=3, strides=self.strides,
                               padding=(1,1), groups=1, use_bias=False)
        self.bn2   = nn.BatchNorm()
        
        self.conv3 = nn.Conv2D(channels=channels * self.expansion, kernel_size=(1,1), strides=(1,1),
                               padding=(0,0), groups=1, use_bias=False)
        self.bn3   = nn.BatchNorm()
        
        if self.isdownsample:
            self.downsample = nn.HybridSequential()
            self.downsample.add(nn.Conv2D(channels=channels * self.expansion, kernel_size=(1,1), strides=self.strides,
                                         padding=(0,0), groups=1, use_bias=False),
                                nn.BatchNorm())


    def hybrid_forward(self, F, x):
        residual = self.relu(self.bn1(self.conv1(x)))
        residual = self.relu(self.bn2(self.conv2(residual)))
        residual = self.bn3(self.conv3(residual))

        if self.isdownsample:
            x = self.downsample(x)

        x = x + residual
        out = self.relu(x)

        return out


In [None]:
net = BottleneckV1b(64, (2,2), isdownsample=True)
x = nd.random.uniform(shape=(1,1,224,224))
net.initialize(init.Xavier())
y = net(x)

In [None]:
class ResNet(gluon.HybridBlock):
    """ Pre-trained ResNetV1b Model, which produces the strides of 8
    featuremaps at conv5.

    Parameters
    ----------
    block : Block
        Class for the residual block. Options are BasicBlockV1, BottleneckV1.
    layers : list of int
        Numbers of layers in each block
    classes : int, default 1000
        Number of classification classes.
    norm_layer : object
        Normalization layer used (default: :class:`mxnet.gluon.nn.BatchNorm`)
        Can be :class:`mxnet.gluon.nn.BatchNorm` or :class:`mxnet.gluon.contrib.nn.SyncBatchNorm`.
    last_gamma : bool, default False
        Whether to initialize the gamma of the last BatchNorm layer in each bottleneck to zero.
    deep_stem : bool, default False
        Whether to replace the 7x7 conv1 with 3 3x3 convolution layers.
    avg_down : bool, default False
        Whether to use average pooling for projection skip connection between stages/downsample.
    final_drop : float, default 0.0
        Dropout ratio before the final classification layer.
    use_global_stats : bool, default False
        Whether forcing BatchNorm to use global statistics instead of minibatch statistics;
        optionally set to True if finetuning using ImageNet classification pretrained models.
    """
    # pylint: disable=unused-variable
    def __init__(self, block_nums, channels, strides, classes=1000, **kwargs):
        super(ResNet, self).__init__(**kwargs)

        self.features = nn.HybridSequential()
        self.features.add(nn.Conv2D(channels=64, kernel_size=(7,7), strides=(2,2),
                                    padding=(3,3), groups=1, use_bias=False))
        self.features.add(nn.BatchNorm())
        self.features.add(nn.Activation('relu'))
        self.features.add(nn.MaxPool2D(pool_size=(3,3), strides=(2,2), padding=(1,1)))
        
        # block_nums = [3, 4, 6, 3]
        # channels = [64, 128, 256, 512]
        # strides = [(1,1), (2,2), (2,2), (2,2)]
        for i in range(len(block_nums)):
            blk = nn.HybridSequential()
            for num in range(block_nums[i]) :
                if num == 0:
                    bottleneck = BottleneckV1b(channels[i], strides[i], isdownsample=True)
                else:
                    bottleneck = BottleneckV1b(channels[i], (1,1), isdownsample=False)
                blk.add(bottleneck)
            self.features.add(blk)

        self.avgpool = nn.GlobalAvgPool2D()
        self.out = nn.Dense(classes)

    def hybrid_forward(self, F, x):
        feature = self.features(x)
        out = self.avgpool(feature)
        out = self.out(out)
        return out

block_nums = [3, 4, 6, 3]
channels = [64, 128, 256, 512]
strides = [(1,1), (2,2), (2,2), (2,2)]

In [None]:
net = ResNet(block_nums, channels, strides)
x = nd.random.uniform(shape=(1,1,224,224))
net.initialize(init.Xavier())
y = net(x)

In [None]:
#print(net)

In [None]:
def anchor_gen(ratios, scales, width, height, stride):
    """生成 anchor"""
    anchor = []
    for s in scales:
        for r in ratios:
            w = s / np.sqrt(r)
            w = np.round(w * 0.5)
            h = s * np.sqrt(r)
            h = np.round(h * 0.5)
            anchor.append([-w, -h, w, h])
            
    anchor = np.array(anchor)

    x = range(width)
    y = range(height)
    x, y = np.meshgrid(x, y)
    
    offsets = np.concatenate((x[:,:,np.newaxis], y[:,:,np.newaxis], x[:,:,np.newaxis], y[:,:,np.newaxis]), axis = -1)
    offsets *= stride
    
    anchor = anchor.reshape(1, -1, 4) + offsets.reshape(-1,1,4)
    anchor = anchor.reshape(-1, 4)
    
    return anchor

ratios = [0.5, 1, 2]
scales = [32, 64, 128, 256, 512]
scales = [32]

In [None]:
anchor_gen(ratios, scales, 2, 2, 2)

In [None]:

def get_realbbox(bbox_pred, anchors, wh_max=4.42):
    """根据 anchor 得到预测边框的真实值
    bbox_pred, anchors 都是 numpy array 数据类型 (x, y, w, h) 形式，维度 (B,N,4)"""
    batch = bbox_pred.shape[0]

    x = bbox_pred[:,:,0] * anchors[:,:,2] + anchors[:,:,0]
    y = bbox_pred[:,:,1] * anchors[:,:,3] + anchors[:,:,1]
    w = anchors[:,:,2] * np.minimum(np.exp(bbox_pred[:,:,2]), wh_max)
    h = anchors[:,:,3] * np.minimum(np.exp(bbox_pred[:,:,3]), wh_max)

    return np.concatenate((x.reshape(batch,-1,1),y.reshape(batch,-1,1),
                           w.reshape(batch,-1,1),h.reshape(batch,-1,1)), axis=-1)

In [None]:
# batch 为 1 的处理函数
def get_realbbox(bbox_pred, anchors, wh_max=4.42):
    """根据 anchor 得到预测边框的真实值
    bbox_pred, anchors 都是 numpy array 数据类型 (x, y, w, h) 形式，维度 (B,N,4)"""

    x = bbox_pred[:,0] * anchors[:,2] + anchors[:,0]
    y = bbox_pred[:,1] * anchors[:,3] + anchors[:,1]
    w = anchors[:,2] * np.minimum(np.exp(bbox_pred[:,2]), wh_max)
    h = anchors[:,3] * np.minimum(np.exp(bbox_pred[:,3]), wh_max)

    return np.concatenate((x.reshape(-1,1),y.reshape(-1,1),
                           w.reshape(-1,1),h.reshape(-1,1)), axis=-1)

In [None]:
box = np.array([[[1,2,3,4],[2,3,4,5]]])
anchor = np.array([[[6,7,8,9],[1,2,5,6]]])
get_realbbox(box, anchor)

In [None]:
def bbox_clip_by_img(bbox, img):
    """bbox 形式 (x1, y1, x2, y2) 维度 (N,4)"""
    imgsize = img[-2:]
    print(imgsize)
    bbox[:,:2] = np.maximum(bbox[:,:2], 0)
    bbox[:,2] = np.minimum(bbox[:,2], imgsize[0])
    bbox[:,3] = np.minimum(bbox[:,3], imgsize[1])
    return bbox

In [None]:
def compute_iou(bbox_a, bbox_b):
    """计算两组 bounding boxes 的 Intersection-Over-Union(IOU)

    Parameters
    ----------
    bbox_a : numpy.ndarray
        shape (M, 4) . bbox 格式 (xmin,ymin,xmax,ymax)
    bbox_b : numpy.ndarray
        shape (N, 4) . bbox 格式 (xmin,ymin,xmax,ymax)

    Returns
    -------
    二维数组 shape (M,N) ，其中任意一个元素 (i,j) 表示 bboxa[i] 和 bboxb[j] 的 IoU

    """
    
    if bbox_a.shape[1] < 4 or bbox_b.shape[1] < 4:
        raise IndexError("Bounding boxes axis 1 must have at least length 4")

    tl = np.maximum(bbox_a[:, None, :2], bbox_b[:, :2])
    br = np.minimum(bbox_a[:, None, 2:4], bbox_b[:, 2:4])

    area_i = np.prod(br - tl, axis=2) * (tl < br).all(axis=2)
    area_a = np.prod(bbox_a[:, 2:4] - bbox_a[:, :2], axis=1)
    area_b = np.prod(bbox_b[:, 2:4] - bbox_b[:, :2], axis=1)
    return area_i / (area_a[:, None] + area_b - area_i)


# 依据 score 排序，从 score 最高第一个 box 开始，所有与该 box IOU 大于指定阈值的
# box 都会被删掉；同时把 box 加入到最终的队列中，并从原 list 中删除。
# 接着用队列中剩下的 score 最高的 box 去抑制队列中剩余的 box

# 可以先删掉 score 比较低的 box
def non_max_suppression(boxes, scores, topk=None, threshold=0.7):
    """执行 non-maximum suppression ，返回保留 boxes 的索引.
    boxes: [N, (y1, x1, y2, x2)]
    scores: 1-D array of box scores.
    threshold: Float. IoU 阈值，一般为 0.7
    """
    assert boxes.shape[0] > 0
    if boxes.dtype.kind != "f":
        boxes = boxes.astype(np.float32)

    # scores 从大到小排序
    ixs = scores.argsort()[::-1]
    if topk:
        ixs = ixs[:topk]
    print("ixs", ixs)

    pick = []
    while len(ixs) > 0:
        # 每次都选择队列中 score 最高的 box ，加入最终结果，并用他抑制队列中剩余的 box
        i = ixs[0]
        pick.append(i)
        # Compute IoU of the picked box with the rest
        iou = compute_iou(boxes[i][np.newaxis,:], boxes[ixs[1:]])
        # Identify boxes with IoU over the threshold. This
        # returns indices into ixs[1:], so add 1 to get
        # indices into ixs.
        remove_ixs = np.where(iou > threshold)[1] + 1
        print("remove_ixs", remove_ixs)
        # Remove indices of the picked and overlapped boxes.
        # 所有与 score 最高的 box IoU 大于阈值的 box 都从队列中移除
        ixs = np.delete(ixs, remove_ixs)
        ixs = np.delete(ixs, 0)
    print("pick",pick)
    return np.array(pick, dtype=np.int32)

In [None]:
def proposal(anchor, score, bbox_pred, img=[600,800], nms_threshold=0.3):
    """
    Generate proposals. Limit to batch-size=1 in current implementation.
    img = [600,800]
    """
    pre_nms  = 6000
    post_nms = 300
    min_size = 60

    # 根据 anchor 得到 Bbox 的真实大小
    roi = get_realbbox(bbox_pred, anchor)

    # roi 不能超过图像的边界
    roi = bbox_clip_by_img(roi, img)

    # remove bounding boxes that don't meet the min_size constraint
    width  = roi[:,2] - roi[:,0]
    height = roi[:,3] - roi[:,1]
    print("width", width)
    print("height", height)
    invalid = (width < min_size) + (height < min_size)
    print("invalid", invalid)
    print("scores", score)
    invalidindex = np.where(invalid!=0)
    score = np.delete(score, invalidindex)
    print("scores", score)
    print("roi", roi)
    roi = np.delete(roi, invalidindex, axis=0)
    
    print("roi", roi)
    print("scores", score)

    # Non-maximum suppression
    keepindex = non_max_suppression(roi, score, pre_nms, nms_threshold)
    
    print("keepindex", keepindex)

    # 仅仅保留 post_nms 个数的 boxes
    if post_nms:
        keepindex = keepindex[:post_nms]

    print("keepindex", keepindex)

    rpn_scores = score[keepindex]
    rpn_bbox = roi[keepindex,:]

    return rpn_scores, rpn_bbox


In [None]:
anchor = np.array([[-23., -11.,  23.,  11.],                   
                [-16., -16.,  16.,  16.],
                [-11., -23.,  11.,  23.],
                [-21., -11.,  25.,  11.],
                [-14., -16.,  18.,  16.],
                [ -9., -23.,  13.,  23.],
                [-23.,  -9.,  23.,  13.],
                [-16., -14.,  16.,  18.],
                [-11., -21.,  11.,  25.],
                [-21.,  -9.,  25.,  13.],
                [-14., -14.,  18.,  18.],
                [ -9., -21.,  13.,  25.]])
print(anchor.shape)
score = np.array([0.8,0.2,0.4, 0.1,0.45,0.23, 0.45,0.21,0.93, 0.34,0.45,0.42])
print(score.shape)
bbox_pred = anchor + [1,2,3,4]

In [None]:
proposal(anchor, score, bbox_pred)

In [None]:
print(bbox_pred)