<a href="https://colab.research.google.com/github/joosk3R/jskRprac/blob/main/efficientnet_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# import
import math
import torch
import torch.nn as nn
import torch.nn.functional as F

import os
from torchsummary import summary
from torch import optim
import torchvision
import torchvision.transforms as transforms

In [2]:
se_channels = max(1, 100)


In [3]:
se_channels

100

In [4]:
# 논문에서 정한 reduction_ratio=0.25

class SEBlock(nn.Module):
    def __init__(self, in_channels, r=0.25):
        super().__init__()

        # se_channels : reduce layer out channels 계산
        se_channels = max(1, int(in_channels*r))

        self.se = nn.Sequential(# squeeze
                                nn.AdaptiveAvgPool2d(1),
                                # excitation
                                nn.Conv2d(in_channels, se_channels, kernel_size=1),
                                nn.SiLU(),
                                nn.Conv2d(se_channels, in_channels, kernel_size=1),
                                nn.Sigmoid()
                               )

    def forward(self, x):
        return x * self.se(x)

In [5]:
class MBConvBlock(nn.Module):
    def __init__(self, in_channels, out_channels, expand, kernel_size, stride=1, r=0.25, dropout_rate=0.2, bias=True):
        super().__init__()

        # 변수 설정
        self.dropout_rate = dropout_rate
        self.expand = expand

        # skip connection 사용을 위한 조건 지정
        self.use_residual = in_channels == out_channels and stride == 1

        # 논문에서 수행한 BatchNorm, SiLU 적용
        # stage1. Expansion
        expand_channels = in_channels*expand
        self.expansion = nn.Sequential(nn.Conv2d(in_channels, expand_channels, 1, bias=False),
                                       nn.BatchNorm2d(expand_channels, momentum=0.99),
                                       nn.SiLU(),
                                      )

        # stage2. Depth-wise convolution
        self.depth_wise = nn.Sequential(nn.Conv2d(expand_channels, expand_channels, kernel_size=kernel_size, stride=1, padding=1, groups=expand_channels),
                                        nn.BatchNorm2d(expand_channels, momentum=0.99),
                                        nn.SiLU(),
                                       )

        # stage3. Squeeze and Excitation
        self.se_block = SEBlock(expand_channels, r)

        # stage4. Point-wise convolution
        self.point_wise = nn.Sequential(nn.Conv2d(expand_channels, out_channels, 1, 1, bias=False),
                                        nn.BatchNorm2d(out_channels, momentum=0.99)
                                       )

    def forward(self, x):

        # stage1
        if self.expand != 1:
            x = self.expansion(x)

        # stage2
        x = self.depth_wise(x)

        # stage3
        x = self.se_block(x)

        # stage4
        x = self.point_wise(x)

        # stage5 skip connection
        res = x

        if self.use_residual:
            if self.training and (self.dropout_rate is not None):
                x = F.dropout2d(input=x, p=self.dropout_rate, training=self.training, inplace=True)

            x = x + res

        return x

In [19]:
class EfficientNet(nn.Module):
    def __init__(self, num_classes, width, depth, resolution, dropout):
        super().__init__()

        # stage1
        out_ch = int(32*width)
        self.stage1 = nn.Sequential(nn.Conv2d(in_channels=3, out_channels=out_ch, kernel_size=3, stride=2, padding=1),
                                    nn.BatchNorm2d(out_ch, momentum=0.99))

        # stage2
        self.stage2 = nn.Sequential(MBConvBlock(in_channels=out_ch, out_channels=16, expand=1, kernel_size=3, stride=1, dropout_rate=dropout))

        # stage3
        self.stage3 = nn.Sequential(MBConvBlock(in_channels=16, out_channels=24, expand=6, kernel_size=3, stride=2, dropout_rate=dropout),
                                    MBConvBlock(in_channels=24, out_channels=24, expand=6, kernel_size=3, stride=1, dropout_rate=dropout),
                                   )

        # stage4
        self.stage4 = nn.Sequential(MBConvBlock(in_channels=24, out_channels=40, expand=6, kernel_size=5, stride=2, dropout_rate=dropout),
                                    MBConvBlock(in_channels=40, out_channels=40, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                   )

        # stage5
        self.stage5 = nn.Sequential(MBConvBlock(in_channels=40, out_channels=80, expand=6, kernel_size=3, stride=2, dropout_rate=dropout),
                                    MBConvBlock(in_channels=80, out_channels=80, expand=6, kernel_size=3, stride=1, dropout_rate=dropout),
                                    MBConvBlock(in_channels=80, out_channels=80, expand=6, kernel_size=3, stride=1, dropout_rate=dropout),
                                   )

        # stage6
        self.stage6 = nn.Sequential(MBConvBlock(in_channels=80, out_channels=112, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                    MBConvBlock(in_channels=112, out_channels=112, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                    MBConvBlock(in_channels=112, out_channels=112, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                   )

        # stage7
        self.stage7 = nn.Sequential(MBConvBlock(in_channels=112, out_channels=192, expand=6, kernel_size=5, stride=2, dropout_rate=dropout),
                                    MBConvBlock(in_channels=192, out_channels=192, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                    MBConvBlock(in_channels=192, out_channels=192, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                    MBConvBlock(in_channels=192, out_channels=192, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                   )

        # stage8
        self.stage8 = nn.Sequential(MBConvBlock(in_channels=192, out_channels=320, expand=6, kernel_size=3, stride=1, dropout_rate=dropout))

        # stage9
        self.last_channels = math.ceil(1280*width)
        self.stage9 = nn.Conv2d(in_channels=320, out_channels=self.last_channels, kernel_size=1)

        # result
        self.out_layer = nn.Linear(self.last_channels, num_classes)

    def forward(self, x):
        x = self.stage1(x)
        x = self.stage2(x)
        x = self.stage3(x)
        x = self.stage4(x)
        x = self.stage5(x)
        x = self.stage6(x)
        x = self.stage7(x)
        x = self.stage8(x)
        x = self.stage9(x)
        x = F.adaptive_avg_pool2d(x, (1, 1)).view(-1, self.last_channels)
        x = self.out_layer(x)

        return x

In [20]:
class EfficientNet2(nn.Module):
    def __init__(self, num_classes, width, depth, resolution, dropout):
        super().__init__()

        # stage1
        out_ch = int(32*width)
        self.stage1 = nn.Sequential(nn.Conv2d(in_channels=3, out_channels=out_ch, kernel_size=3, stride=2, padding=1),
                                    nn.BatchNorm2d(out_ch, momentum=0.99))

        # stage2
        self.stage2 = nn.Sequential(MBConvBlock(in_channels=out_ch, out_channels=16, expand=1, kernel_size=3, stride=1, dropout_rate=dropout))

        # stage3
        self.stage3 = nn.Sequential(MBConvBlock(in_channels=16, out_channels=24, expand=6, kernel_size=3, stride=2, dropout_rate=dropout),
                                    MBConvBlock(in_channels=24, out_channels=24, expand=6, kernel_size=3, stride=1, dropout_rate=dropout),
                                   )

        # stage4
        self.stage4 = nn.Sequential(MBConvBlock(in_channels=24, out_channels=40, expand=6, kernel_size=5, stride=2, dropout_rate=dropout),
                                    MBConvBlock(in_channels=40, out_channels=40, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                   )

        # stage5
        self.stage5 = nn.Sequential(MBConvBlock(in_channels=40, out_channels=80, expand=6, kernel_size=3, stride=2, dropout_rate=dropout),
                                    MBConvBlock(in_channels=80, out_channels=80, expand=6, kernel_size=3, stride=1, dropout_rate=dropout),
                                    MBConvBlock(in_channels=80, out_channels=80, expand=6, kernel_size=3, stride=1, dropout_rate=dropout),
                                   )

        # stage6
        self.stage6 = nn.Sequential(MBConvBlock(in_channels=80, out_channels=112, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                    MBConvBlock(in_channels=112, out_channels=112, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                    MBConvBlock(in_channels=112, out_channels=112, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                   )

        # stage7
        self.stage7 = nn.Sequential(MBConvBlock(in_channels=112, out_channels=192, expand=6, kernel_size=5, stride=2, dropout_rate=dropout),
                                    MBConvBlock(in_channels=192, out_channels=192, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                    MBConvBlock(in_channels=192, out_channels=192, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                    MBConvBlock(in_channels=192, out_channels=192, expand=6, kernel_size=5, stride=1, dropout_rate=dropout),
                                   )

        # stage8
        self.stage8 = nn.Sequential(MBConvBlock(in_channels=192, out_channels=320, expand=6, kernel_size=3, stride=1, dropout_rate=dropout))

        # stage9
        self.last_channels = math.ceil(1280*width)
        self.stage9 = nn.Conv2d(in_channels=320, out_channels=self.last_channels, kernel_size=1)

        # result
        self.out_layer = nn.Linear(self.last_channels, num_classes)

    def forward(self, x):
        x = self.stage1(x)
        x = self.stage2(x)
        x = self.stage3(x)
        x = self.stage4(x)
        x = self.stage5(x)
        x = self.stage6(x)
        x = self.stage7(x)
        x = self.stage8(x)
        x = self.stage9(x)
        x = F.adaptive_avg_pool2d(x, (1, 1)).view(-1, self.last_channels)
        x = self.out_layer(x)

        return x

In [33]:
# efficientnet 모델 b0 ~ b7
def efficientnet_b0(num_classes=10):
    return EfficientNet(num_classes=num_classes, width=1.0, depth=1.0, resolution=224, dropout=0.2)

def efficientnet_b1(num_classes=10):
    return EfficientNet(num_classes=num_classes, width=1.0, depth=1.1, resolution=240, dropout=0.2)

def efficientnet_b2(num_classes=10):
    return EfficientNet(num_classes=num_classes, width=1.1, depth=1.2, resolution=260, dropout=0.3)

def efficientnet_b3(num_classes=10):
    return EfficientNet(num_classes=num_classes, width=1.2, depth=1.4, resolution=300, dropout=0.3)

def efficientnet_b4(num_classes=10):
    return EfficientNet(num_classes=num_classes, width=1.4, depth=1.8, resolution=380, dropout=0.4)

def efficientnet_b5(num_classes=10):
    return EfficientNet(num_classes=num_classes, width=1.6, depth=2.2, resolution=456, dropout=0.4)

def efficientnet_b6(num_classes=10):
    return EfficientNet(num_classes=num_classes, width=1.8, depth=2.6, resolution=528, dropout=0.5)

def efficientnet_b7(num_classes=10):
    return EfficientNet(num_classes=num_classes, width=2.0, depth=3.1, resolution=600, dropout=0.5)

In [None]:
if __name__ == '__main__':
    device = 'cuda' if torch.cuda.is_available() else 'cpu'

    x = torch.randn(4, 3, 224, 224).to(device)
    model = efficientnet_b0().to(device)
    output = model(x)
    print('output size:', output.size())

    summary(model, input_size=(3, 224, 224))

In [14]:
# 변수설정
batch_size = 128
n_epochs = 10
lr = 0.1
data_dir = '/data/cifa10'

In [27]:
!pip install efficientnet_pytorch

Collecting efficientnet_pytorch
  Downloading efficientnet_pytorch-0.7.1.tar.gz (21 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: efficientnet_pytorch
  Building wheel for efficientnet_pytorch (setup.py) ... [?25l[?25hdone
  Created wheel for efficientnet_pytorch: filename=efficientnet_pytorch-0.7.1-py3-none-any.whl size=16429 sha256=9108c63e829c95526a7f11fa0a7ff627a4c3a0a1db8c4fd2b1d3617a957bf531
  Stored in directory: /root/.cache/pip/wheels/03/3f/e9/911b1bc46869644912bda90a56bcf7b960f20b5187feea3baf
Successfully built efficientnet_pytorch
Installing collected packages: efficientnet_pytorch
Successfully installed efficientnet_pytorch-0.7.1


In [35]:
# 모델호출
# 위에서 내가 만든 모델 사용


# 모델 패키지 사용
from efficientnet_pytorch import EfficientNet
model = EfficientNet.from_pretrained('efficientnet-b0').to(device)

# 비용함수, 옵티마이저 정의
cost = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

Loaded pretrained weights for efficientnet-b0


In [29]:
# 데이터 transform
transform = transforms.Compose([transforms.Resize((224, 224)),
                                transforms.ToTensor(),
                                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
                               ])

# 데이터 불러오기
trainset = torchvision.datasets.CIFAR10(root=data_dir+'/train', train=True, download=True, transform=transform)
valset = torchvision.datasets.CIFAR10(root=data_dir+'/test', train=False, download=True, transform=transform)

# DataLoader
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=2)
valloader = torch.utils.data.DataLoader(valset, batch_size=batch_size, shuffle=False, num_workers=2)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to /data/cifa10/train/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:13<00:00, 12850397.52it/s]


Extracting /data/cifa10/train/cifar-10-python.tar.gz to /data/cifa10/train
Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to /data/cifa10/test/cifar-10-python.tar.gz


100%|██████████| 170498071/170498071 [00:13<00:00, 13095237.91it/s]


Extracting /data/cifa10/test/cifar-10-python.tar.gz to /data/cifa10/test


In [None]:
# 학습
for epoch in range(n_epochs):
    avg_cost = 0
    accuracy = 0

    for X, Y in trainloader:
        X = X.to(device)
        Y = Y.to(device)

        # 가설
        hypothesis = model(X)

        # loss
        loss = cost(hypothesis, Y)

        # update
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # avg_cost
        avg_cost += loss / len(trainloader)
        correct = torch.argmax(hypothesis, 1) == Y
        accuracy = correct.float().mean()

    print(f'Epoch {epoch}/{n_epochs}\tLoss {avg_cost:.4f}\tAcc {accuracy:.4f}')