In [1]:
import torch
import torch.nn as nn

In [58]:
# from models.common import *
from models.common import Conv, Concat, MP, SPPCSPC, RepConv
from models.yolo import IDetect
from utils.general import make_divisible
from copy import deepcopy

# TODO

* ELANBlock 상위 클래스 만들어서 기본 메소드 상속하는 방식

In [52]:
# from yolov7 in common.py
class Conv(nn.Module):
    # Standard convolution
    def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True):  # ch_in, ch_out, kernel, stride, padding, groups
        super(Conv, self).__init__()
        p=k//2
        self.conv = nn.Conv2d(c1, c2, k, s, p, groups=g, bias=False)
        self.bn = nn.BatchNorm2d(c2)
        self.act = nn.SiLU() if act is True else (act if isinstance(act, nn.Module) else nn.Identity())

    def forward(self, x):
        return self.act(self.bn(self.conv(x)))

    def fuseforward(self, x):
        return self.act(self.conv(x))
    
class Concat(nn.Module):
    def __init__(self, dimension=1):
        super(Concat, self).__init__()
        self.d = dimension

    def forward(self, x):
        return torch.cat(x, self.d)

In [194]:
# ELANBlock for Backbone
class BBoneELAN(nn.Module):
    def __init__(self, c1, c2, k, depth):
        super(BBoneELAN, self).__init__()
        assert c1 % 2 == 0 and depth < 5
        c_ = int(c1 / 2)
        
        self.depth = depth
        
        # depth 1
        self.cv1 = Conv(c1, c2, 1, 1)
        self.cv2 = Conv(c1, c2, 1, 1)
        # depth 2
        self.cv3 = Conv(c2, c2, k, 1)
        self.cv4 = Conv(c2, c2, k, 1)
        # depth 3
        self.cv5 = Conv(c2, c2, k, 1)
        self.cv6 = Conv(c2, c2, k, 1)
        # depth 4
        self.cv7 = Conv(c2, c2, k, 1)
        self.cv8 = Conv(c2, c2, k, 1)
        # transition layer
        # TODO need to calculate the number of filter depending on depth at SuperNet class
        # DynamicConv2d를 사용하고 max_in_C 로 놓고, 
        # self.trans = Conv(c_, c2, 1, 1)
        
        self.act_idx = [0, 1, 3, 5, 7][:depth+1] 
    
    def forward(self, x):
        outputs = []
        # depth 1
        x1 = self.cv1(x)
        outputs.append(x1)
        x2 = self.cv2(x)    
        outputs.append(x2)
        # depth 2
        x3 = self.cv3(x2)
        outputs.append(x3)
        x4 = self.cv4(x3)
        outputs.append(x4)
        # depth 3
        x5 = self.cv5(x4)
        outputs.append(x5)
        x6 = self.cv6(x5)
        outputs.append(x6)
        # depth 4
        x7 = self.cv7(x6)
        outputs.append(x7)
        x8 = self.cv8(x7)
        outputs.append(x8)
        
        return torch.cat([outputs[i] for i in self.act_idx], dim=1)

In [195]:
input = torch.randn(1, 128, 64, 64)
block = BBoneELAN(c1=128, c2=64, k=3, depth=3)

In [196]:
block(input).shape

torch.Size([1, 256, 64, 64])

In [262]:
# ELANBlock for Head
# there are differences about cardinality(path) and channel size
class HeadELAN(nn.Module):
    def __init__(self, c1, c2, k, depth):
        super(HeadELAN, self).__init__()
        assert c1 % 2 == 0 and c2 % 2 == 0 and depth < 6
        c_ = int(c2 / 2)
        self.depth = depth
        
        # depth 1
        self.cv1 = Conv(c1, c2, 1, 1)
        self.cv2 = Conv(c1, c2, 1, 1)
        # depth 2
        self.cv3 = Conv(c2, c_, k, 1)
        # depth 3
        self.cv4 = Conv(c_, c_, k, 1)
        # depth 4
        self.cv5 = Conv(c_, c_, k, 1)
        # depth 5
        self.cv6 = Conv(c_, c_, k, 1)
        
        self.act_idx = [0, 1, 2, 3, 4, 5, 6][:depth+1] 
    
    def forward(self, x):
        outputs = []
        # depth 1
        x1 = self.cv1(x)
        outputs.append(x1)
        x2 = self.cv2(x)    
        outputs.append(x2)
        # depth 2
        x3 = self.cv3(x2)
        outputs.append(x3)
        # depth 3
        x4 = self.cv4(x3)
        outputs.append(x4)
        # depth 4
        x5 = self.cv5(x4)
        outputs.append(x5)
        # depth 5
        x6 = self.cv6(x5)
        outputs.append(x6)
        
        return torch.cat([outputs[i] for i in self.act_idx], dim=1)

In [264]:
input = torch.randn(1, 512, 64, 64)
block = HeadELAN(c1=512, c2=256, k=3, depth=5)

In [265]:
block(input).shape

torch.Size([1, 1024, 64, 64])

# YOLOSuperNet

* parse_supernet 함수 정의
: BBoneELAN과 HeadELAN에 대한 c2 계산 방식이 상이함.

In [266]:
def parse_supernet(d, ch):  # model_dict, input_channels(3)
    print('\n%3s%18s%3s%10s  %-40s%-30s' % ('', 'from', 'n', 'params', 'module', 'arguments'))
    anchors, nc, gd, gw = d['anchors'], d['nc'], d['depth_multiple'], d['width_multiple']
    na = (len(anchors[0]) // 2) if isinstance(anchors, list) else anchors  # number of anchors
    no = na * (nc + 5)  # number of outputs = anchors * (classes + 5)

    layers, save, c2 = [], [], ch[-1]  # layers, savelist, ch out
    for i, (f, n, m, args) in enumerate(d['backbone'] + d['head']):  # from, number, module, args
        m = eval(m) if isinstance(m, str) else m  # eval strings
        for j, a in enumerate(args):
            try:
                args[j] = eval(a) if isinstance(a, str) else a  # eval strings
            except:
                pass

        n = max(round(n * gd), 1) if n > 1 else n  # depth gain
        if m in [nn.Conv2d, Conv, RepConv, SPPCSPC]:
            c1, c2 = ch[f], args[0]
            if c2 != no:  # if not output
                c2 = make_divisible(c2 * gw, 8)

            args = [c1, c2, *args[1:]]
            if m in [SPPCSPC]:
                args.insert(2, n)  # number of repeats
                n = 1
        elif m is BBoneELAN:
            c1, c2 = ch[f], int(args[0]*(args[-1]+1))
            args = [c1, *args]
        elif m is HeadELAN:
            c1, c2 = ch[f], int((args[0]*2) + (args[0]/2 * (args[-1]-1)))
            args = [c1, *args]
        elif m is nn.BatchNorm2d:
            args = [ch[f]]
        elif m is Concat:
            c2 = sum([ch[x] for x in f])
        # elif m is Chuncat:
        #     c2 = sum([ch[x] for x in f])
        # elif m is Shortcut:
        #     c2 = ch[f[0]]
        # elif m is Foldcut:
        #     c2 = ch[f] // 2
        elif m in [IDetect]:
            args.append([ch[x] for x in f])
            if isinstance(args[1], int):  # number of anchors
                args[1] = [list(range(args[1] * 2))] * len(f)
        # elif m is ReOrg:
        #     c2 = ch[f] * 4
        # elif m is Contract:
        #     c2 = ch[f] * args[0] ** 2
        # elif m is Expand:
        #     c2 = ch[f] // args[0] ** 2
        else:
            c2 = ch[f]
        print(m)
        m_ = nn.Sequential(*[m(*args) for _ in range(n)]) if n > 1 else m(*args)  # module
        t = str(m)[8:-2].replace('__main__.', '')  # module type
        np = sum([x.numel() for x in m_.parameters()])  # number params
        m_.i, m_.f, m_.type, m_.np = i, f, t, np  # attach index, 'from' index, type, number params
        print('%3s%18s%3s%10.0f  %-40s%-30s' % (i, f, n, np, t, args))  # print
        save.extend(x % i for x in ([f] if isinstance(f, int) else f) if x != -1)  # append to savelist
        layers.append(m_)
        if i == 0:
            ch = []
        ch.append(c2)
    return nn.Sequential(*layers), sorted(save)

In [267]:
import yaml
yaml_file = '../yaml/yolov7_elan_sample.yml'
with open(yaml_file) as f:
    yaml = yaml.load(f, Loader=yaml.SafeLoader)

In [268]:
# Define model
ch, nc, anchors = 3, None, None

ch = yaml['ch'] = yaml.get('ch', ch)  # input channels
if nc and nc != yaml['nc']:
    print(f"Overriding model.yaml nc={yaml['nc']} with nc={nc}")
    yaml['nc'] = nc  # override yaml value
if anchors:
    print(f'Overriding model.yaml anchors with anchors={anchors}')
    yaml['anchors'] = round(anchors)  # override yaml value

In [269]:
model, save = parse_supernet(deepcopy(yaml), ch=[ch])


                 from  n    params  module                                  arguments                     
<class 'models.common.Conv'>
  0                -1  1       928  models.common.Conv                      [3, 32, 3, 1]                 
<class 'models.common.Conv'>
  1                -1  1     18560  models.common.Conv                      [32, 64, 3, 2]                
<class 'models.common.Conv'>
  2                -1  1     36992  models.common.Conv                      [64, 64, 3, 1]                
<class 'models.common.Conv'>
  3                -1  1     73984  models.common.Conv                      [64, 128, 3, 2]               
<class '__main__.BBoneELAN'>
  4                -1  1    238592  BBoneELAN                               [128, 64, 3, 3]               
<class 'models.common.Conv'>
  5                -1  1     66048  models.common.Conv                      [256, 256, 1, 1]              
<class 'models.common.MP'>
  6                -1  1         0  models.common

In [270]:
m = model[-1]  # Detect()
# s = 256  # 2x min stride
# m.stride = torch.tensor([s / x.shape[-2] for x in forward(torch.zeros(1, ch, s, s))])  # forward
# check_anchor_order(m)
# m.anchors /= m.stride.view(-1, 1, 1)
# self.stride = m.stride
# self._initialize_biases()  # only run once

In [271]:
def forward_once(x, model, profile=False):
    y, dt = [], []  # outputs
    for m in model:
        if m.f != -1:  # if not from previous layer
            x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f]  # from earlier layers

        # if not hasattr(self, 'traced'):
        #     self.traced=False

        # if self.traced:
        #     if isinstance(m, Detect) or isinstance(m, IDetect) or isinstance(m, IAuxDetect) or isinstance(m, IKeypoint):
        #         break

        # if profile:
        #     c = isinstance(m, (Detect, IDetect, IAuxDetect, IBin))
        #     o = thop.profile(m, inputs=(x.copy() if c else x,), verbose=False)[0] / 1E9 * 2 if thop else 0  # FLOPS
        #     for _ in range(10):
        #         m(x.copy() if c else x)
        #     t = time_synchronized()
        #     for _ in range(10):
        #         m(x.copy() if c else x)
        #     dt.append((time_synchronized() - t) * 100)
        #     print('%10.1f%10.0f%10.1fms %-40s' % (o, m.np, dt[-1], m.type))

        x = m(x)  # run
        
        y.append(x if m.i in save else None)  # save output

    if profile:
        print('%.1fms total' % sum(dt))
    return x

In [272]:
input = torch.randn(1, 3, 256, 256)

forward_once(input, model)

[tensor([[[[[ 9.62405e-02, -2.82034e-01,  8.82211e-02,  ...,  1.43535e-01, -5.01884e-02, -1.83264e-01],
            [ 3.33772e-01,  6.69793e-02, -3.94132e-02,  ...,  2.77099e-01, -2.46048e-01,  4.75415e-01],
            [ 8.72103e-02,  1.05546e-01, -5.34077e-01,  ...,  1.82255e-01, -5.55680e-02, -2.81438e-02],
            ...,
            [-1.57657e-01,  2.25387e-02,  2.96150e-04,  ...,  2.02318e-01, -2.57377e-01,  8.22329e-02],
            [-5.02242e-03,  1.58540e-01,  2.54187e-01,  ...,  1.61377e-01, -1.44317e-01,  4.12106e-02],
            [ 2.48228e-01,  1.30747e-01, -8.35850e-02,  ..., -3.30460e-02, -1.14832e-01,  8.35851e-02]],
 
           [[ 3.29135e-01,  5.09496e-02, -1.76339e-01,  ...,  2.55737e-01, -2.92476e-01, -4.21397e-02],
            [ 8.27574e-01, -9.81518e-03, -3.61783e-01,  ...,  3.60753e-01, -1.80278e-01,  2.56541e-01],
            [ 2.58602e-01,  4.00327e-01, -4.22274e-01,  ...,  4.45446e-01, -4.04309e-01, -1.68436e-01],
            ...,
            [-2.30726e-01, 

In [273]:
x = torch.randn(1, 3, 256, 256)

y, dt = [], []  # outputs
for i, m in enumerate(model):
    if m.f != -1:  # if not from previous layer
        x = y[m.f] if isinstance(m.f, int) else [x if j == -1 else y[j] for j in m.f]  # from earlier layers
        
    x = m(x)  # run
    
    y.append(x if m.i in save else None)  # save output

In [274]:
len(x)

3

## ETC

In [51]:
# ELANBlock for Backbone
class ELANBlock(nn.Module):
    def __init__(self, c1, k, depth):
        super(ELANBlock, self).__init__()
        assert c1 % 2 == 0 and depth < 5
        c_ = int(c1 / 2)
        c2 = int(c1 / 2 * (depth+1))
        self.depth = depth
        # depth 1
        self.cv1 = Conv(c1, c_, 1, 1)
        self.cv2 = Conv(c1, c_, 1, 1)
        # depth 2
        self.cv3 = Conv(c_, c_, 3, 1)
        self.cv4 = Conv(c_, c_, 3, 1)
        # depth 3
        self.cv5 = Conv(c_, c_, 3, 1)
        self.cv6 = Conv(c_, c_, 3, 1)
        # depth 4
        self.cv7 = Conv(c_, c_, 3, 1)
        self.cv8 = Conv(c_, c_, 3, 1)
        
        self.cat = Concat(dimension=1)
        
        self.blocks, self.act_idx = self.set_blocks(c1, k, s, depth)
    
    def set_blocks(self, c1, k, s, depth):
        c_ = int(c1 / 2)
        layers = []
        # depth에 따라 from 이 결정됨.
        # depth = 3
        act_idx = [0, 1, 3, 5, 7][:depth] 
        f = [-1, -3, -5, -6]
        for i in range(depth):
            if i == 0:
                # depth 1
                m1, m2 = Conv(c1, c_, 1, 1), Conv(c1, c_, 1, 1)
                m1.i, m1.f = i, 0
                m2.i, m2.f = i+1, 0
                layers.append(m1)
                layers.append(m2)
                # layers.append(Conv(c1, c_, 1, 1))
                # layers.append(Conv(c1, c_, 1, 1))
            else:
                # another depth
                m3, m4 = Conv(c_, c_, k, 1), Conv(c_, c_, k, 1)
                m3.i, m3.f = 2*i, 2*i-1
                m4.i, m4.f = 2*i+1, 2*i
                layers.append(m3)
                layers.append(m4)
                # layers.append(Conv(c_, c_, k, 1))
                # layers.append(Conv(c_, c_, k, 1))
                m = nn.Sequential(
                    Conv(c_, c_, k, 1),
                    Conv(c_, c_, k, 1)
                )
                
        
        layers.append(Concat(1))
        return nn.Sequential(*layers), act_idx
    
    # def forward(self, x):
    #     x1 = self.cv1(x)
    #     x2 = self.cv2(x)    
    #     # depth 2
    #     x3 = self.cv3(x2)
    #     x4 = self.cv4(x3)
    #     # depth 3
    #     x5 = self.cv5(x4)
    #     x6 = self.cv6(x5)
        
    #     x = self.depth
    #     return torch.cat([x1, x2, x4, x6], dim=1)
    
    def forward(self, x):
        outputs = []
        x1 = self.cv1(x)
        outputs.append(x1)
        x2 = self.cv2(x)    
        outputs.append(x2)
        # depth 2
        x3 = self.cv3(x2)
        outputs.append(x3)
        x4 = self.cv4(x3)
        outputs.append(x4)
        # depth 3
        x5 = self.cv5(x4)
        outputs.append(x5)
        x6 = self.cv6(x5)
        outputs.append(x6)
        # depth 4
        x7 = self.cv7(x6)
        outputs.append(x7)
        x8 = self.cv8(x7)
        outputs.append(x8)
        
        return torch.cat([outputs[i] for i in self.act_idx], dim=1) # 0, 1, 3, 5 
        # return torch.cat([x1, x2, x4, x6], dim=1)
    
    def forward_blocks(self, x, depth=3):
        self.saved = []
        output = []
        
        for i, block in enumerate(self.blocks):
            if isinstance(block.f, int):
                output.append(block(x))
            
        return self.cat(output[self.act_idx])            

        
    
        

In [79]:
input = torch.randn(1, 128, 64, 64)
block = ELANBlock(c1=128, k=3, depth=4)

In [80]:
block(input).shape

torch.Size([1, 320, 64, 64])

5.0