>### DeepLabV3

In [2]:
import os
import numpy as np
import pandas as pd
import pickle
import glob

# Plotting
import imageio
import PIL
from PIL import ImageDraw, Image
import cv2
import seaborn as sns
import matplotlib.pyplot as plt
%matplotlib inline
plt.rcParams['font.sans-serif']=['Arial Unicode MS'] 
plt.rcParams['axes.unicode_minus']=False 

# Framework
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_datasets as tfds
import torch
import torch.nn as nn
from torchsummary import summary
from torchvision import transforms
import torch.nn.functional as F

#### Pytorch Model

沿用[ResNet Blocks](#resnet_block)

In [4]:
class ResNet_Atrous(nn.Module): #没有FC，32倍下采样，用作backbone
    def __init__(self, name, in_chans=3, block=None, n_blocks=None,
                 dilation_rate=[1,1,1], #若增加空洞卷积，layer 5直接指定每个block的d
                 os=16): #output_stride 可选下采样倍数os=8/16，传统resnet都是2**5=32倍downsampling 
        super(ResNet_Atrous, self).__init__() 
       
        # 结构：使用自定义的结构，或者经典结构
        self.structure = name
        self.block = block #用basicblock还是bottleneck
        self.n_blocks = n_blocks #每个block循环次数
        self.get_structure() #重新整理结构
        self.strides = self.get_strides(os, dilation_rate)
            
        # Stem
        self.layer1 = nn.Sequential(
                nn.Conv2d(in_chans, 64, kernel_size=7, stride=2, padding=3, bias=False),
                nn.BatchNorm2d(64), 
                nn.ReLU(inplace=True))
        self.pool = nn.MaxPool2d(3, 2, padding=1) #4倍downsample
        
        #除了layer2因为有事先pooling降维之外，其余layer都是第一个block的stride=2降维，其余blocks的stride=1
        self.in_chans = 64 #后续不断更新
        self.layer2 = self.build_blocks(64, self.n_blocks[0], stride=self.strides[0])         
        self.layer3 = self.build_blocks(128, self.n_blocks[1], stride=self.strides[1]) #8倍downsample，根据下面layers决定最终downsample倍数 
        self.layer4 = self.build_blocks(256, self.n_blocks[2], stride=self.strides[2],
                                        dilation_rate=16//os) #可以对每个block做空洞d，因为不同resnet结构layer4的n_blocks不同，在函数另外构造d列表 
        self.layer5 = self.build_blocks(512, self.n_blocks[3], stride=self.strides[3],
                                        dilation_rate=[i*16//os for i in dilation_rate]) #若下采样os=8倍为止，那么这层的dilated_rate要乘以2
        
        #若用空洞卷积，还可以自定义额外增加几次最后一个layer结构
        self.layer6 = self.build_blocks(512, self.n_blocks[3], stride=self.strides[3],
                                        dilation_rate=[i*16//os for i in dilation_rate]) #同layer5
        self.layer7 = self.build_blocks(512, self.n_blocks[3], stride=self.strides[3],
                                        dilation_rate=[i*16//os for i in dilation_rate]) 
     
    
    def get_structure(self):
        versions = {'resnet18':(BasicBlock,[2,2,2,2]), 
                     'resnet34':(BasicBlock,[3,4,6,3]),
                     'resnet50':(BottleNeck,[3,4,6,3]),
                     'resnet101':(BottleNeck,[3,4,23,3]),
                     'resnet152':(BottleNeck,[3,8,36,3])}
        
        if not self.block or not self.n_blocks: #如果任意一项没有定义
            self.block, self.n_blocks = versions[self.structure]
        
        
    def get_strides(self, os, dilation_rate):
        if os==16 and dilation_rate==[1,1,1]: #代表用传统的32倍downsample，不用空洞卷积
            strides=[1,2,2,2]
        elif os==16: #若16倍downsample，仅最后layer 5 strides=1，可以用空洞卷积
            strides=[1,2,2,1] 
        elif os==8: #若8倍downsample，最后layer 4-5 strides=1，可以用空洞卷积
            strides=[1,2,1,1] 
        return strides
    
    
    def build_blocks(self, out_chans, n_blocks, stride, dilation_rate=1):      
        if isinstance(dilation_rate, int): #根据具体blocks数构造
            dilation_rate=[dilation_rate]*n_blocks
         
        blocks = [self.block(self.in_chans, out_chans, stride=stride, dilation_rate=dilation_rate[0])]
        self.in_chans = out_chans*self.block.expansion
        for i in range(1, n_blocks):   #跳过第一个block          
            blocks.append(self.block(self.in_chans, out_chans, stride=1, dilation_rate=dilation_rate[i])) 
        blocks = nn.Sequential(*blocks) 
        return blocks

            
    def forward(self, inputs): 
        f1 = self.layer1(inputs) #2
        p = self.pool(f1)
        f2 = self.layer2(p) #4
        f3 = self.layer3(f2) #8        
        f4 = self.layer4(f3) #8/16        
        f5 = self.layer5(f4) #8/16/32
        f5 = self.layer6(f5)
        f5 = self.layer7(f5)
        return f1, f2, f3, f4, f5 #返回4个bridges和一个bottleneck
    
    
    def load_pretrained(self, model_path): #但是名字对不上咋办
        old_dict = model_zoo.load_url(model_path)
        model_dict = model.state_dict()
        old_dict = {k: v for k, v in old_dict.items() if (k in model_dict)}
        model_dict.update(old_dict) #合并字典
        model.load_state_dict(model_dict) #把pretrained的weights对应填充到自定义模型
    
    

In [8]:
model = ResNet_Atrous('resnet101', dilation_rate=[1,2,1])
summary(model, (3,384,1024))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 192, 512]           9,408
       BatchNorm2d-2         [-1, 64, 192, 512]             128
              ReLU-3         [-1, 64, 192, 512]               0
         MaxPool2d-4          [-1, 64, 96, 256]               0
            Conv2d-5          [-1, 64, 96, 256]           4,096
       BatchNorm2d-6          [-1, 64, 96, 256]             128
              ReLU-7          [-1, 64, 96, 256]               0
            Conv2d-8          [-1, 64, 96, 256]          36,864
       BatchNorm2d-9          [-1, 64, 96, 256]             128
             ReLU-10          [-1, 64, 96, 256]               0
           Conv2d-11         [-1, 256, 96, 256]          16,384
      BatchNorm2d-12         [-1, 256, 96, 256]             512
           Conv2d-13         [-1, 256, 96, 256]          16,384
      BatchNorm2d-14         [-1, 256, 

In [7]:
# 预先训练模型地址
import torch.utils.model_zoo as model_zoo
bn_mom = 0.0003
model_paths = {
    'resnet18': 'https://download.pytorch.org/models/resnet18-5c106cde.pth',
    'resnet34': 'https://download.pytorch.org/models/resnet34-333f7ec4.pth',
    'resnet50': 'https://download.pytorch.org/models/resnet50-19c8e357.pth',
    'resnet101': 'https://download.pytorch.org/models/resnet101-5d3b4d8f.pth'}

In [None]:
model.load_pretrained(model_paths['resnet50'])

<img src='images/aspp.png' style='height:150px'></img>

In [105]:
#Atrous Spatial Pyramid Pooling 空洞空间金字塔池化
class ASPP(nn.Module): 
    def __init__(self, in_chans, out_chans, rate=1):
        super(ASPP, self).__init__()
        kernel_size=[1,3,3,3]
        dilation_rate = [i*rate for i in [1,6,12,18]]
        padding = [0] + dilation_rate[1:] #首个valid之后，都是same conv
        
        # 对ResNet结果用不同d分别做空洞卷积，从而以多个比例感受野捕捉图像的上下文 (14,14,2048)->(14,14,256)
        self.dilated_conv = nn.ModuleList()
        for i in range(4):
            self.dilated_conv.append(nn.Sequential(
                 nn.Conv2d(in_chans, out_chans, kernel_size[i], 1, padding=padding[i], dilation=dilation_rate[i]), 
                 nn.BatchNorm2d(out_chans), 
                 nn.ReLU(inplace=True)))
        
        # 图像层级特征 (1,1,2048)->(1,1,256)
        self.avgpool = nn.AdaptiveAvgPool2d(output_size=1) #默认keep_dims，h,w=1，相当于gap
        self.branch5 = nn.Sequential(
                 nn.Conv2d(in_chans, out_chans, 1, 1), 
                 nn.BatchNorm2d(out_chans), 
                 nn.ReLU(inplace=True))
        
        # 合并4个空洞卷积结果和1个gpa结果后，再做一次1x1卷积
        self.branch6 = nn.Sequential(
                 nn.Conv2d(out_chans*5, out_chans, 1, 1), 
                 nn.BatchNorm2d(out_chans), 
                 nn.ReLU(inplace=True))


    def forward(self, inputs):
        b, c, h, w = inputs.shape 
        fusion = []
        for layer in self.dilated_conv:
            x = layer(inputs) #(14,14,256)
            fusion.append(x)
        
        gap = self.avgpool(inputs)
        gap = self.branch5(gap)
        gap = nn.UpsamplingBilinear2d(size=[h,w])(gap) # 上采样到原始inputs大小
        fusion.append(gap)
        
        x = torch.cat(fusion, dim=1) #(14,14,256*5)
        x = self.branch6(x) #(14,14,256)
        return x

In [106]:
model = ASPP(2048,256)
summary(model, (2048,24,64))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1          [-1, 256, 24, 64]         524,544
       BatchNorm2d-2          [-1, 256, 24, 64]             512
              ReLU-3          [-1, 256, 24, 64]               0
            Conv2d-4          [-1, 256, 24, 64]       4,718,848
       BatchNorm2d-5          [-1, 256, 24, 64]             512
              ReLU-6          [-1, 256, 24, 64]               0
            Conv2d-7          [-1, 256, 24, 64]       4,718,848
       BatchNorm2d-8          [-1, 256, 24, 64]             512
              ReLU-9          [-1, 256, 24, 64]               0
           Conv2d-10          [-1, 256, 24, 64]       4,718,848
      BatchNorm2d-11          [-1, 256, 24, 64]             512
             ReLU-12          [-1, 256, 24, 64]               0
AdaptiveAvgPool2d-13           [-1, 2048, 1, 1]               0
           Conv2d-14            [-1, 25

In [120]:
class DeeplabV3Plus(nn.Module):
    def __init__(self, backbone='resnet50', in_chans=3, num_classes=8, aspp_filters=256, 
                 shortcut_filters=48, shortcut_kernel_size=1, os=16):
        super(DeeplabV3Plus, self).__init__()
        
        # ResNet提取特征
        self.backbone = ResNet_Atrous(backbone, in_chans, dilation_rate=[1,2,1], os=os) #(224,224,3)->(14,14,2048)
        
        #L5 深层feature 从原图downsample 16倍 -> upsample 4倍
        in_chans = 512*self.backbone.block.expansion #resnet的输出
        self.aspp = ASPP(in_chans, aspp_filters, dilation_rate=16//os) #(14,14,2048)->(14,14,256)
        self.dropout = nn.Dropout(0.5)
        self.upsample1 = nn.UpsamplingBilinear2d(scale_factor=os//4) #(56,56,256)
        
        #L2 浅层feature 从原图downsample 4倍 (56,56,64)->(56,56,48)
        in_chans = 64*self.backbone.block.expansion #layer 2的输出
        self.shortcut = nn.Sequential(
                 nn.Conv2d(in_chans, shortcut_filters, shortcut_kernel_size, 1, 
                           padding=shortcut_kernel_size//2, bias=False), 
                 nn.BatchNorm2d(shortcut_filters), 
                 nn.ReLU(inplace=True)) 
        
        # 合并feature (56,56,256+48)->(56,56,256) 
        self.concat = nn.Sequential(
                 nn.Conv2d(aspp_filters+shortcut_filters, aspp_filters, 3, 1, padding=1, bias=False), 
                 nn.BatchNorm2d(aspp_filters), 
                 nn.ReLU(inplace=True),
                 nn.Dropout(0.5),
                 nn.Conv2d(aspp_filters, aspp_filters, 3, 1, padding=1, bias=False), 
                 nn.BatchNorm2d(aspp_filters), 
                 nn.ReLU(inplace=True),
                 nn.Dropout(0.1))
        
        self.classifier = nn.Conv2d(aspp_filters, num_classes, 1, 1) #(56,56,n_classes)
        self.upsample2 = nn.UpsamplingBilinear2d(scale_factor=4)  #(224,224,n_classes)

        # 初始化参数
        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm2d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)
                
                
    def forward(self, inputs):
        # 深层features
        f1, f2, f3, f4, f5 = self.backbone(inputs) 
        feature_aspp = self.aspp(f5)
        feature_aspp = self.dropout(feature_aspp)
        feature_aspp = self.upsample1(feature_aspp)

        # 浅层features
        feature_shallow = self.shortcut(f2)
        
        # 合并features
        feature = torch.cat([feature_aspp, feature_shallow], axis=1)
        x = self.concat(feature)
        x = self.classifier(x)
        x = self.upsample2(x)
        x = nn.Softmax(dim=1)(x)
        return x

In [121]:
model = DeeplabV3Plus('resnet50')
summary(model, (3,384,1024))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1         [-1, 64, 192, 512]           9,408
       BatchNorm2d-2         [-1, 64, 192, 512]             128
              ReLU-3         [-1, 64, 192, 512]               0
         MaxPool2d-4          [-1, 64, 96, 256]               0
            Conv2d-5          [-1, 64, 96, 256]           4,096
       BatchNorm2d-6          [-1, 64, 96, 256]             128
              ReLU-7          [-1, 64, 96, 256]               0
            Conv2d-8          [-1, 64, 96, 256]          36,864
       BatchNorm2d-9          [-1, 64, 96, 256]             128
             ReLU-10          [-1, 64, 96, 256]               0
           Conv2d-11         [-1, 256, 96, 256]          16,384
      BatchNorm2d-12         [-1, 256, 96, 256]             512
           Conv2d-13         [-1, 256, 96, 256]          16,384
      BatchNorm2d-14         [-1, 256, 

#### TF Model

In [3]:
# 适用于ResNet18,34
class BasicBlock(keras.Model):
    expansion=1
    def __init__(self, filters, strides, kernel_size=3, downsampling=False, dilation_rate=1): 
        super(BasicBlock, self).__init__()
        #第一层stride有时候需要downsample，而第二层stride默认1
        #tf的padding神奇在于，选择same和stride=2时，相当于减半而非不变
        self.conv1 = layers.Conv2D(filters, kernel_size, strides, padding='same', use_bias=False,
                                  dilation_rate=dilation_rate) #可以增加dilation_rate做空洞卷积
        self.bn1 = layers.BatchNormalization()
        self.conv2 = layers.Conv2D(filters*BasicBlock.expansion, kernel_size, strides=1, padding='same',use_bias=False)
        self.bn2 = layers.BatchNormalization()
        self.relu = layers.Activation('relu') #可以重复使用，因为只是个函数，没有weights
        self.add = layers.Add()
        
        #当downsampling时，需要用ConvBlock
        if downsampling:
            self.shortcut = keras.Sequential([
                layers.Conv2D(filters, 1, strides, use_bias=False),
                layers.BatchNormalization()])
        else:
            self.shortcut = lambda x:x
        
    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.bn1(x)
        x = self.relu(x) 
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.add([x, self.shortcut(inputs)])
        x = self.relu(x) 
        return x

In [4]:
# 适用于ResNet50,101,152
class BottleNeck(keras.Model):
    expansion=4
    def __init__(self, filters, strides, kernel_size=3, downsampling=False, dilation_rate=1): 
        super(BottleNeck, self).__init__()
        #瓶颈 feature map
        self.conv1 = layers.Conv2D(filters, 1, strides=1, padding='same', use_bias=False) #降c
        self.bn1 = layers.BatchNormalization()
        self.conv2 = layers.Conv2D(filters, kernel_size, strides, padding='same', use_bias=False, #降hw
                                   dilation_rate=dilation_rate) #可以增加空洞卷积，对应strides=1
        self.bn2 = layers.BatchNormalization()
        self.conv3 = layers.Conv2D(filters*BottleNeck.expansion, 1, strides=1, padding='same', use_bias=False) #增c
        self.bn3 = layers.BatchNormalization()
        self.relu = layers.Activation('relu')
        self.add = layers.Add()
       
        #当downsampling时，需要用ConvBlock
        if downsampling:
            self.shortcut = keras.Sequential([
                layers.Conv2D(filters*BottleNeck.expansion, 1, strides, use_bias=False),
                layers.BatchNormalization(axis=3)])
        else:
            self.shortcut = lambda x:x
            
    def call(self, inputs):
        x = self.conv1(inputs)
        x = self.bn1(x)
        x = self.relu(x) 
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x) 
        x = self.conv3(x)
        x = self.bn3(x)
        x = self.add([x, self.shortcut(inputs)])
        x = self.relu(x)
        return x

In [5]:
class ResNet(keras.Model):    
    def __init__(self, name, block=None, n_blocks=None,  
                 dilation_rate=[1,1,1], #若增加空洞卷积，layer 5直接指定每个block的d
                 os=16): #output_stride 可选下采样倍数os=8/16，传统resnet都是2**5=32倍downsampling
        super(ResNet, self).__init__(name=name) 
        
        # 结构：使用自定义的结构，或者经典结构
        self.structure = name
        self.block = block #用basicblock还是bottleneck
        self.n_blocks = n_blocks #每个block循环次数
        self.get_structure() #重新整理结构
        strides = self.get_strides(os, dilation_rate)
        
        # Stem
        self.layer1 = keras.Sequential(
            [layers.ZeroPadding2D(3), #由于Conv2D内无法自定义padding=3，需要先另外做padding 
             layers.Conv2D(64, 7, 2, use_bias=False), #2倍downsample
             layers.BatchNormalization(), 
             layers.Activation('relu'), 
             layers.MaxPool2D(3, 2, padding='same')],name='layer1') #4倍downsample
            
        #除了layer2因为有事先pooling降维之外，其余layer都是第一个block的stride=2降维，其余blocks的stride=1
        self.layer2 = self.build_blocks(2, 64, self.n_blocks[0], strides=strides[0])         
        self.layer3 = self.build_blocks(3, 128, self.n_blocks[1], strides=strides[1]) #8倍downsample，根据下面layers决定最终downsample倍数   
        self.layer4 = self.build_blocks(4, 256, self.n_blocks[2], strides=strides[2], 
                                        dilation_rate=16//os) #可以对每个block做空洞d，因为不同resnet结构layer4的n_blocks不同，在函数另外构造d列表     
        self.layer5 = self.build_blocks(5, 512, self.n_blocks[3], strides=strides[3], 
                                        #dilation_rate=dilation_rate) #由于layer5统一都是3个blocks，直接指定每个block的空洞d，如[1,2,1]
                                        dilation_rate=[i*16//os for i in dilation_rate]) #若下采样os=8倍为止，那么这层的dilated_rate要乘以2
        #若用空洞卷积，还可以自定义额外增加几次最后一个layer结构
        self.layer6 = self.build_blocks(6, 512, self.n_blocks[3], strides=strides[3], dilation_rate=[i*16//os for i in dilation_rate]) #同layer5
        self.layer7 = self.build_blocks(7, 512, self.n_blocks[3], strides=strides[3], dilation_rate=[i*16//os for i in dilation_rate]) 
        
    
    def build_blocks(self, idx, filters, n_blocks, strides, dilation_rate=1):
        if isinstance(dilation_rate, int): #根据具体blocks数构造
            dilation_rate=[dilation_rate]*n_blocks
            
        res_blocks = keras.Sequential(name=f'layer{idx}') 
        res_blocks.add(self.block(filters, strides, downsampling=True, dilation_rate=dilation_rate[0])) 
        for i in range(1, n_blocks): #跳过第一个block            
            res_blocks.add(self.block(filters, strides=1, dilation_rate=dilation_rate[i])) 
        return res_blocks
    
    
    def get_structure(self):
        versions = {'resnet18':(BasicBlock,[2,2,2,2]), 
                     'resnet34':(BasicBlock,[3,4,6,3]),
                     'resnet50':(BottleNeck,[3,4,6,3]),
                     'resnet101':(BottleNeck,[3,4,23,3]),
                     'resnet152':(BottleNeck,[3,8,36,3])}
        
        if not self.block or not self.n_blocks: #如果任意一项没有定义
            self.block, self.n_blocks = versions[self.structure]
        
        
    def get_strides(self, os, dilation_rate):
        if os==16 and dilation_rate==[1,1,1]: #代表用传统的32倍downsample，不用空洞卷积
            strides=[1,2,2,2]
        elif os==16: #若16倍downsample，仅最后layer 5 strides=1，可以用空洞卷积
            strides=[1,2,2,1] 
        elif os==8: #若8倍downsample，最后layer 4-5 strides=1，可以用空洞卷积
            strides=[1,2,1,1] 
        return strides
   

    def call(self, inputs): 
        x = self.layer1(inputs) #4倍  
        l2 = self.layer2(x) #4倍         
        l3 = self.layer3(l2) #8倍        
        l4 = self.layer4(l3) #8/16倍        
        x = self.layer5(l4) #8/16/32倍
        x = self.layer6(x)
        x = self.layer7(x)
        return [l2, l3, l4, x]

In [6]:
#Atrous Spatial Pyramid Pooling 空洞空间金字塔池化
class ASPP(keras.Model): 
    def __init__(self, filters, dilation_rate=1):
        super(ASPP, self).__init__(name='ASPP')
        kernel_size=[1,3,3,3]
        padding = ['valid','same','same','same']
        dilation_rate = [1*dilation_rate,6*dilation_rate,12*dilation_rate,18*dilation_rate]
        
        # 对ResNet结果用不同d分别做空洞卷积，从而以多个比例感受野捕捉图像的上下文 (14,14,2048)->(14,14,256)
        self.branch1 = keras.Sequential([
                 layers.Conv2D(filters, kernel_size[0], 1, padding=padding[0], dilation_rate=dilation_rate[0],use_bias=False), 
                 layers.BatchNormalization(), 
                 layers.Activation('relu')],name='branch1') 
        self.branch2 = keras.Sequential([
                 layers.Conv2D(filters, kernel_size[1], 1, padding=padding[1], dilation_rate=dilation_rate[1], use_bias=False), 
                 layers.BatchNormalization(), 
                 layers.Activation('relu')],name='branch2') 
        self.branch3 = keras.Sequential([
                 layers.Conv2D(filters, kernel_size[2], 1, padding=padding[2], dilation_rate=dilation_rate[2], use_bias=False), 
                 layers.BatchNormalization(), 
                 layers.Activation('relu')],name='branch3') 
        self.branch4 = keras.Sequential([
                 layers.Conv2D(filters, kernel_size[3], 1, padding=padding[3], dilation_rate=dilation_rate[3], use_bias=False), 
                 layers.BatchNormalization(), 
                 layers.Activation('relu')],name='branch4') 
        
        # 图像层级特征 (1,1,2048)->(1,1,256)
        self.avgpool = layers.GlobalAveragePooling2D(keepdims=True) 
        self.branch5 = keras.Sequential(
                [layers.Conv2D(filters, 1, 1, use_bias=False), 
                 layers.BatchNormalization(), 
                 layers.Activation('relu')],name='branch5') 
        
        # 合并4个空洞卷积结果和1个gpa结果后，再做一次1x1卷积
        self.branch6 = keras.Sequential(
                [layers.Conv2D(filters, 1, 1, use_bias=False), 
                 layers.BatchNormalization(), 
                 layers.Activation('relu')],name='branch6') 

    def call(self, inputs):
        b, h, w, c = inputs.shape 
        c1 = self.branch1(inputs) #(14,14,256)
        c2 = self.branch2(inputs)
        c3 = self.branch3(inputs)
        c4 = self.branch4(inputs)
        gap = self.avgpool(inputs)
        gap = self.branch5(gap)
        gap = layers.UpSampling2D(size=(h,w),interpolation='bilinear')(gap) # 上采样到原始inputs大小
        
        x = layers.concatenate([c1,c2,c3,c4,gap]) #(14,14,256*5)
        x = self.branch6(x) #(14,14,256)
        return x

In [7]:
class DeeplabV3(keras.Model):
    def __init__(self, backbone=config.BACKBONE, num_classes=config.NUM_CLASSES, aspp_filters=config.ASPP_OUTDIM, 
                 shortcut_filters=config.SHORTCUT_DIM, shortcut_kernel=config.SHORTCUT_KERNEL, os=config.OUTPUT_STRIDE):
        super(DeeplabV3, self).__init__(name='DeepLabV3')
        
        #L5 深层feature 从原图downsample 16倍 -> upsample 4倍
        self.backbone = ResNet(backbone, dilation_rate=[1,2,1], os=os) #(224,224,3)->(14,14,2048)
        self.aspp = ASPP(aspp_filters, dilation_rate=16//os) #(14,14,2048)->(14,14,256)
        self.dropout = layers.Dropout(0.5)
        self.upsample1 = layers.UpSampling2D(size=(os//4,os//4),interpolation='bilinear') #(56,56,256)
        
        #L1 浅层feature 从原图downsample 4倍 (56,56,64)->(56,56,48)
        self.shortcut = keras.Sequential([ 
                 layers.Conv2D(shortcut_filters, shortcut_kernel, 1, padding='same', use_bias=False), 
                 layers.BatchNormalization(), 
                 layers.Activation('relu')],name='shortcut') 
        
        # 合并feature (56,56,256+48)->(56,56,256) 
        self.concat = keras.Sequential([ 
                 layers.Conv2D(aspp_filters, 3, 1, padding='same', use_bias=False), 
                 layers.BatchNormalization(), 
                 layers.Activation('relu'),
                 layers.Dropout(0.5),
                 layers.Conv2D(aspp_filters, 3, 1, padding='same', use_bias=False), 
                 layers.BatchNormalization(), 
                 layers.Activation('relu'),
                 layers.Dropout(0.1)],name='concat')
        self.classifier = layers.Conv2D(num_classes, 1, 1, padding='same', use_bias=False,name='classifier') #(56,56,n_classes)
        self.upsample2 = layers.UpSampling2D(size=(4,4),interpolation='bilinear') #(224,224,n_classes)

    def call(self, inputs):
        # 深层features
        l2,l3,l4,l7 = self.backbone(inputs) 
        feature_aspp = self.aspp(l7)
        feature_aspp = self.dropout(feature_aspp)
        feature_aspp = self.upsample1(feature_aspp)

        # 浅层features
        feature_shallow = self.shortcut(l2)
        
        # 合并features
        feature = layers.concatenate([feature_aspp, feature_shallow], axis=-1)
        x = self.concat(feature)
        x = self.classifier(x)
        x = self.upsample2(x)
        return x

In [8]:
inputs = keras.Input(shape=(384,1024,3))
model = DeeplabV3(backbone='resnet50', os=16)
model.build(input_shape=(None,384,1024,3))
model.call(inputs)
model.summary()

2021-12-16 15:21:35.427908: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


<KerasTensor: shape=(None, 384, 1024, 8) dtype=float32 (created by layer 'up_sampling2d_1')>

Model: "DeepLabV3"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
resnet50 (ResNet)            [(None, 96, 256, 256), (N 58778560  
_________________________________________________________________
ASPP (ASPP)                  (None, 24, 64, 256)       15538176  
_________________________________________________________________
dropout (Dropout)            (None, 24, 64, 256)       0         
_________________________________________________________________
up_sampling2d (UpSampling2D) (None, 96, 256, 256)      0         
_________________________________________________________________
shortcut (Sequential)        (None, 96, 256, 48)       12480     
_________________________________________________________________
concat (Sequential)          (None, 96, 256, 256)      1292288   
_________________________________________________________________
classifier (Conv2D)          (None, 96, 256, 8)        20