<a href="https://colab.research.google.com/github/xiaochengJF/DeepLearning/blob/master/DetNet59.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [0]:
'''导入必要模块'''
from __future__ import print_function
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import numpy as np
import math
import torchvision
from torch.autograd import Variable

'''
Norm = "Batch" 下面所有的正则化都是用 Batch Normalization
Norm = "Group" 下面所有的正则化都是用 Group Normalization
'''
Norm = 'Batch'  

# 
# 搭建 Residual 模块
# 可以参考 torchvision.models.resnet50() 中的实现细节
# 搭建子模块
class ResidualBlock(nn.Module):
    # 实现子module: Resifual Block
    expansion = 4

    def __init__(self, inchannel, outchannel, stride=1, shortcut=None, norm=Norm):
        super(ResidualBlock,self).__init__()
        self.conv1 = nn.Conv2d(inchannel, outchannel, kernel_size=1, bias=False)
        self.nf_1 = self._Norm_function(norm, outchannel)
        self.conv2 = nn.Conv2d(outchannel,outchannel, kernel_size=3, stride=stride,
                               padding=1, bias=False)
        self.nf_2 = self._Norm_function(norm, outchannel)
        self.conv3 = nn.Conv2d(outchannel, outchannel*4, kernel_size=1, bias=False)
        self.nf_3 = self._Norm_function(norm, outchannel*4)
        self.relu = nn.ReLU(inplace=True)
        self.stride = stride

        # 将Residual Block 看成两边组成，左边为普通卷积连接
        self.left = nn.Sequential(self.conv1, self.nf_1, self.relu,
                                  self.conv2, self.nf_2, self.relu,
                                  self.conv3, self.nf_3,)

        # 右边为 skip connection 或者叫shortcut
        self.right = shortcut

    def forward(self, x):
        residual = x
        out = self.left(x)

        if self.right is not None:
            residual = self.right(x)

        out += residual
        out = self.relu(out)
        return out

    def _Norm_function(self, norm, outchannel):
        if norm == 'Batch':     return nn.BatchNorm2d(outchannel)
        elif norm == 'Group':   return nn.GroupNorm(num_groups=1, num_channels=outchannel)

In [0]:
class Dilated_BottleNeck(nn.Module):
    '''实现module：Dilated bottleNeck '''
    expansion = 4
    def __init__(self, inchannel, outchannel=256, stride=1, shortcut=None, norm=Norm):
        super(Dilated_BottleNeck, self).__init__()
        
        self.conv1 = nn.Conv2d(inchannel, outchannel, kernel_size=1, bias=False)
        self.nf_1 = self._Norm_function(norm, outchannel)

        # 空洞卷积：对应论文图A、B中绿色的小模块
        # 为什么stride=1，padding=2，后面会有解释
        self.conv2 = nn.Conv2d(outchannel, outchannel, kernel_size=3, stride=stride,
                               padding=2, dilation=2, bias=False)
        
        self.nf_2 = self._Norm_function(norm, outchannel)
        self.conv3 = nn.Conv2d(outchannel, outchannel, kernel_size=1, bias=False)
        self.nf_3 = self._Norm_function(norm, outchannel)
        self.relu = nn.ReLU(inplace=True)
        self.stride = stride

        self.left = nn.Sequential(self.conv1, self.nf_1, self.relu,
                                  self.conv2, self.nf_2, self.relu,
                                  self.conv3, self.nf_3, )
       # ''' 先保留右边为空 '''
        self.right = shortcut

    def forward(self,x):
        residual = x
        out = self.left(x)

        if self.right is not None:
            residual = self.right(x)

        out += residual
        out = self.relu(out)
        return out

    def _Norm_function(self,norm, outchannel):
        if norm == 'Group':     return nn.GroupNorm(num_groups=1, num_channels=outchannel)
        elif norm == 'Batch':   return nn.BatchNorm2d(outchannel)

In [0]:
class DetNet(nn.Module):
    '''
      实现module：DetNet
      DetNet59：model = DetNet([3,4,6])
    '''
    def __init__(self, layers, num_classes=2):
        # 初始 residual 的位置
        self.inchannel = 64
        self.det_channel = 256
        super(DetNet, self).__init__()

        # 前面 stage1 ～ stage4 与 ResNet 相同
        self.stage_1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            self._Norm_function(Norm, outchannel=64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        self.stage_2 = self._make_layer(ResidualBlock, 64,  layers[0], stride=2)
        self.stage_3 = self._make_layer(ResidualBlock, 128, layers[1], stride=2)
        self.stage_4 = self._make_layer(ResidualBlock, 256, layers[2], stride=2)

        self.stage_5 = nn.Sequential(self._DetNetBlock_B(self.inchannel,   self.det_channel),
                                     self._DetNetBlock_A(self.det_channel, self.det_channel),
                                     self._DetNetBlock_A(self.det_channel, self.det_channel),)

        self.stage_6 = nn.Sequential(self._DetNetBlock_B(self.det_channel, self.det_channel),
                                     self._DetNetBlock_A(self.det_channel, self.det_channel),
                                     self._DetNetBlock_A(self.det_channel, self.det_channel),)

        self.avgpool = nn.AvgPool2d(7,stride=1)
        self.fc = nn.Linear(self.det_channel, num_classes)

    def forward(self, x):
        x = self.stage_1(x)
        x = self.stage_2(x)
        x = self.stage_3(x)
        x = self.stage_4(x)

        x = self.stage_5(x)
        x = self.stage_6(x)

        x = self.avgpool(x)
        x = x.view(x.size(0), -1)
        x = self.fc(x)

        return x

    # 构建图A模块
    def _DetNetBlock_A(self, inchannel, outchannel ):
        return Dilated_BottleNeck(inchannel, outchannel)

    # 构建图B模块
    def _DetNetBlock_B(self, inchannel, outchannel):
        shortcut = nn.Sequential(
            nn.Conv2d(inchannel, outchannel, kernel_size=1, stride=1, padding=0, bias=False),
            self._Norm_function(Norm, outchannel))
        layers = []
        layers.append(Dilated_BottleNeck(inchannel, outchannel, stride=1, shortcut=shortcut))
        return nn.Sequential(*layers)

    def _make_layer(self, block, outchannel, block_num, stride=1):
        shortcut = None
        if stride !=1 or self.inchannel != outchannel * block.expansion:
            # 构建残差 1X1 卷积
            shortcut = nn.Sequential(
                nn.Conv2d(self.inchannel, outchannel* block.expansion,kernel_size=1, stride=stride,
                          bias=False),
                self._Norm_function(Norm, outchannel* block.expansion)
            )

        layers = []
        layers.append(block(self.inchannel, outchannel, stride, shortcut))

        # 更新residual的位置
        self.inchannel = outchannel * block.expansion

        for i in range(1, block_num):
            layers.append(block(self.inchannel, outchannel))

        return nn.Sequential(*layers)

    def _Norm_function(self,norm, outchannel):
        if norm == 'Group':     return nn.GroupNorm(num_groups=1, num_channels=outchannel)
        elif norm == 'Batch':   return nn.BatchNorm2d(outchannel)

In [0]:
class ResNet(nn.Module):
    '''
    实现module：ResNet
    用子module实现residual block，用_make_layer函数实现layer

    ResNet50:  model = ResNet(ResidualBlock, [3, 4, 6, 3])
    ReaNet101: model = ResNet(ResidualBlock, [3, 4, 23, 3])

    '''
    def __init__(self, block, layers, num_classes=1000):
        self.inchannel = 64
        super(ResNet, self).__init__()

        self.Conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=7, stride=2, padding=3, bias=False),
            self._Norm_function(Norm, outchannel=64),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        )

        self.Conv2_x = self._make_layer(block, 64, layers[0])
        self.Conv3_x = self._make_layer(block, 128, layers[1], stride=2)
        self.Conv4_x = self._make_layer(block, 256, layers[2], stride=2)
        self.Conv5_x = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AvgPool2d(7, stride=1)
        self.fc = nn.Linear(512 * block.expansion , num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
                m.weight.data.normal_(0, math.sqrt(2. / n))
            elif isinstance(m, nn.BatchNorm2d):
                m.weight.data.fill_(1)
                m.bias.data.zero_()

    def forward(self, x):
        x = self.Conv1(x)

        x = self.Conv2_x(x)
        x = self.Conv3_x(x)
        x = self.Conv4_x(x)
        x = self.Conv5_x(x)

        x = self.avgpool(x)
        x = x.view(x.size(0),-1)
        x = self.fc(x)

        return x

    def _make_layer(self, block, outchannel, block_num, stride=1):
        shortcut = None
        if stride !=1 or self.inchannel != outchannel * block.expansion:
            shortcut = nn.Sequential(
                nn.Conv2d(self.inchannel, outchannel* block.expansion,
                          kernel_size=1, stride=stride, bias=False),
                self._Norm_function(Norm, outchannel * block.expansion)
            )

        layers = []
        layers.append(block(self.inchannel, outchannel, stride, shortcut))
        self.inchannel = outchannel * block.expansion

        for i in range(1, block_num):
            layers.append(block(self.inchannel, outchannel))

        return nn.Sequential(*layers)

    def _Norm_function(self, norm, outchannel):
        if norm == 'Batch':     return nn.BatchNorm2d(outchannel)
        elif norm == 'Group':   return nn.GroupNorm(num_groups=1, num_channels=outchannel)

In [0]:
if __name__ == '__main__':
    x = Variable(torch.randn(1, 3, 224, 224))
    # model = ResNet(ResidualBlock, [3, 4, 6, 3])
    # model = torchvision.models.resnet50()

    model = DetNet([3,4,6])
    y = model(x)
    print(model)

DetNet(
  (stage_1): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  )
  (stage_2): Sequential(
    (0): ResidualBlock(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (nf_1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (nf_2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (nf_3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (left): Sequential(
        (0): Conv2d(64, 64, kernel_si

## 可视化网络

In [0]:
!pip install graphviz



In [0]:
from graphviz import Digraph
import torch
from torch.autograd import Variable

def make_dot(var, params=None):
    """
    Produces Graphviz representation of PyTorch autograd graph
    Blue nodes are the Variables that require grad, orange are Tensors
    saved for backward in torch.autograd.Function
    Args:
        var: output Variable
        params: dict of (name, Variable) to add names to node that
            require grad (TODO: make optional)
    """
    if params is not None:
        assert isinstance(params.values()[0], Variable)
        param_map = {id(v): k for k, v in params.items()}

    node_attr = dict(style='filled',
                     shape='box',
                     align='left',
                     fontsize='12',
                     ranksep='0.1',
                     height='0.2')
    dot = Digraph(node_attr=node_attr, graph_attr=dict(size="12,12"))
    seen = set()

    def size_to_str(size):
        return '('+(', ').join(['%d' % v for v in size])+')'

    def add_nodes(var):
        if var not in seen:
            if torch.is_tensor(var):
                dot.node(str(id(var)), size_to_str(var.size()), fillcolor='orange')
            elif hasattr(var, 'variable'):
                u = var.variable
                name = param_map[id(u)] if params is not None else ''
                node_name = '%s\n %s' % (name, size_to_str(u.size()))
                dot.node(str(id(var)), node_name, fillcolor='lightblue')
            else:
                dot.node(str(id(var)), str(type(var).__name__))
            seen.add(var)
            if hasattr(var, 'next_functions'):
                for u in var.next_functions:
                    if u[0] is not None:
                        dot.edge(str(id(u[0])), str(id(var)))
                        add_nodes(u[0])
            if hasattr(var, 'saved_tensors'):
                for t in var.saved_tensors:
                    dot.edge(str(id(t)), str(id(var)))
                    add_nodes(t)
    add_nodes(var.grad_fn)
    return dot

In [0]:
import sys

sys.path.append('./')

if __name__ == '__main__':

    x = Variable(torch.randn(1, 3, 224, 224))
    model = DetNet([3,4,6])
    y = model(x)
    g = make_dot(y)
    g.view()

    #print(model)