## Import module

In [1]:
import os
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torchsummary import summary

from math import ceil
import pdb
import tqdm
import time

## EfficientNet

In [2]:
base_model = [
    # expand_ratio, channels, repeats, stride, kernel_size
    [1, 16, 1, 1, 3],
    [6, 24, 2, 2, 3],
    [6, 40, 2, 2, 5],
    [6, 80, 3, 2, 3],
    [6, 112, 3, 1, 5],
    [6, 192, 4, 2, 5],
    [6, 320, 1, 1, 3],
]

phi_values = {
    # tuple of: (phi_value, resolution, drop_rate)
    'b0': (0, 224, 0.2),  # alpha, beta, gamma, depth = alpha ** phi
    'b1': (0.5, 240, 0.2),
    'b2': (1, 260, 0.3),
    'b3': (2, 300, 0.3),
    'b4': (3, 380, 0.4),
    'b5': (4, 456, 0.4),
    'b6': (5, 528, 0.5),
    'b7': (6, 600, 0.5),
}

class CNNBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stride, padding, groups=1):
        super(CNNBlock, self).__init__()
        self.cnn = nn.Conv2d(
            in_channels,
            out_channels,
            kernel_size,
            stride,
            padding,
            groups=groups,  # groups=1이면 일반적인 Conv, groups=in_channels 일때만 Depthwise Conv 수행
            bias=False
        )
        self.bn = nn.BatchNorm2d(out_channels)
        self.silu = nn.SiLU()  # SiLU <-> Swish

    def forward(self, x):
        return self.silu(self.bn(self.cnn(x)))

class SqueezeExcitation(nn.Module):
    def __init__(self, in_channels, reduced_dim):
        super(SqueezeExcitation, self).__init__()
        self.se = nn.Sequential(
            nn.AdaptiveAvgPool2d(1),  # C x H x W -> C x 1 x 1
            nn.Conv2d(in_channels, reduced_dim, 1),
            nn.SiLU(),
            nn.Conv2d(reduced_dim, in_channels, 1),
            nn.Sigmoid(),  # 각 채널에 대한 score (0~1)
        )

    def forward(self, x):
        # pdb.set_trace()
        return x * self.se(x)  # input channel x 채널의 중요도


class InvertedResidualBlock(nn.Module):
    def __init__(
            self,
            in_channels,
            out_channels,
            kernel_size,
            stride,
            padding,
            expand_ratio,
            reduction=4,  # squeeze excitation
            survival_prob=0.8,  # for stochastic depth
    ):
        super(InvertedResidualBlock, self).__init__()
        self.survival_prob = survival_prob
        self.use_residual = in_channels == out_channels and stride == 1
        hidden_dim = in_channels * expand_ratio
        self.expand = in_channels != hidden_dim
        reduced_dim = int(in_channels / reduction)

        if self.expand:
            self.expand_conv = CNNBlock(
                in_channels, hidden_dim, kernel_size=3, stride=1, padding=1,
            )

        self.conv = nn.Sequential(
            CNNBlock(
                hidden_dim, hidden_dim, kernel_size, stride, padding, groups=hidden_dim,  # Depthwise Conv
            ),
            SqueezeExcitation(hidden_dim, reduced_dim),
            nn.Conv2d(hidden_dim, out_channels, 1, bias=False),  # point wise conv
            nn.BatchNorm2d(out_channels),
        )

    def stochastic_depth(self, x):
        '''
        vanishing gradient로 인해 학습이 느리게 되는 문제를 완화시키고자 stochastic depth 라는 randomness에 기반한 학습 방법
        Stochastic depth란 network의 depth를 학습 단계에 random하게 줄이는 것을 의미
        복잡하고 큰 데이터 셋에서는 별다를 효과를 보지는 못한다고 함
        '''
        if not self.training:
            return x

        binary_tensor = torch.rand(x.shape[0], 1, 1, 1, device=x.device) < self.survival_prob
        return torch.div(x, self.survival_prob) * binary_tensor  # torch.div으로 감싼 연산은 stochastic_depth 논문에 나와있음.

    def forward(self, inputs):
        x = self.expand_conv(inputs) if self.expand else inputs

        if self.use_residual:
            return self.stochastic_depth(self.conv(x)) + inputs
        else:
            return self.conv(x)



class EfficientNet(nn.Module):
    def __init__(self, version, num_classes):
        super(EfficientNet, self).__init__()
        width_factor, depth_factor, dropout_rate = self.calculate_factors(version)
        last_channels = ceil(1280 * width_factor)
        self.features = self.create_features(width_factor, depth_factor, last_channels)
        self.pool = nn.AdaptiveAvgPool2d(1)  # stage9 pool
        self.classifier = nn.Sequential(  # stage9 FC
            nn.Dropout(dropout_rate),
            nn.Linear(last_channels, num_classes),
        )

    def calculate_factors(self, version, alpha=1.2, beta=1.1):
        phi, res, drop_rate = phi_values[version]
        depth_factor = alpha ** phi
        width_factor = beta ** phi
        return width_factor, depth_factor, drop_rate

    def create_features(self, width_factor, depth_factor, last_channels):
        channels = int(32 * width_factor)  # B0의 32는 첫레이어의 channel
        features = [CNNBlock(3, channels, 3, stride=2, padding=1)]  # stage 1
        in_channels = channels

        for expand_ratio, chanels, repeats, stride, kernel_size in base_model:
            out_channels = 4 * ceil(int(channels*width_factor) / 4)
            layers_repeats = ceil(repeats * depth_factor)

            for layer in range(layers_repeats):  # stage 2~8
                features.append(
                    InvertedResidualBlock(
                        in_channels,
                        out_channels,
                        kernel_size=kernel_size,
                        stride=stride if layer == 0 else 1,
                        padding=kernel_size // 2,  # if k=1:pad=0, k=3:pad=1, k=5:pad=2
                        expand_ratio=expand_ratio
                    )
                )
                in_channels = out_channels


        features.append(
            CNNBlock(in_channels, last_channels, kernel_size=1, stride=1, padding=0)  # stage9 Conv 1x1
        )

        return nn.Sequential(*features)

    def forward(self, x):
        x = self.pool(self.features(x))
        return self.classifier(x.view(x.shape[0], -1))  # flatten

## Data Loader

In [3]:
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2470, 0.2434, 0.2615)),
])

transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])

train_dataset = torchvision.datasets.CIFAR10(root='./', train=True, download=True, transform=transform_train)
test_dataset = torchvision.datasets.CIFAR10(root='./', train=False, download=True, transform=transform_test)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=4)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=32, shuffle=False, num_workers=4)

Downloading https://www.cs.toronto.edu/~kriz/cifar-10-python.tar.gz to ./cifar-10-python.tar.gz


HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

Extracting ./cifar-10-python.tar.gz to ./
Files already downloaded and verified


## Set hyper parameters

In [4]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
num_classes = 10
version = 'b5'
epochs = 100
learning_rate = 0.001
best_acc = 0
start_epoch = 0

model = EfficientNet(version, num_classes)
model = model.to(device)
scaler = torch.cuda.amp.GradScaler()

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate, weight_decay=5e-4)
scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=200)





In [5]:
def train(epoch):
    model.train()
    train_loss = 0
    correct = 0
    total = 0
    for idx, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.to(device), targets.to(device)

        # forward
        with torch.cuda.amp.autocast():    
            outputs = model(inputs)
            loss = criterion(outputs, targets)
        
        # backward
        optimizer.zero_grad()
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        

        train_loss += loss.item()
        _, predicted = outputs.max(1)

        total += targets.size(0)
        correct += (predicted == targets).sum().item()

        if (idx+1) % 50 == 0:
            print(f'EPOCH: {epoch+1}/{epochs} idx: {idx+1}/{len(train_loader)} Loss: {loss.item():.3f} accuracy: {predicted.eq(targets).sum().item() / targets.size(0) * 100:.3f}')

In [6]:
def test(epoch):
    global best_acc
    model.eval()
    loss = 0
    correct = 0
    total = 0

    with torch.no_grad():
        for idx, (inputs, targets) in enumerate(test_loader):
            inputs, targets = inputs.to(device), targets.to(device)

            outputs = model(inputs)

            loss += criterion(outputs, targets).item()
            total += targets.size(0)

            _, predicted = outputs.max(1)
            correct += (predicted == targets).sum().item()

    print(f'Test accuarcy:', 100. * correct / total)
    print(f'Test average loss:', loss / total)

    acc = correct / total * 100
    if acc > best_acc:
        print('Saving')
        state = {
            'model': model.state_dict(),
            'acc': acc,
            'epoch': epoch,
        }
        if not os.path.isdir('checkpoint'):
            os.mkdir('checkpoint')
        torch.save(state, 'checkpoint/ckpt.pth')
        best_acc = acc 

In [7]:
# fp32 - 194.15641498565674

In [9]:
for epoch in range(start_epoch, epochs): 
    train(epoch)
    test(epoch)
    scheduler.step()

EPOCH: 1/100 idx: 50/391 Loss: 2.278 accuracy: 16.406
EPOCH: 1/100 idx: 100/391 Loss: 2.012 accuracy: 23.438
EPOCH: 1/100 idx: 150/391 Loss: 1.911 accuracy: 22.656
EPOCH: 1/100 idx: 200/391 Loss: 1.957 accuracy: 24.219
EPOCH: 1/100 idx: 250/391 Loss: 1.730 accuracy: 27.344
EPOCH: 1/100 idx: 300/391 Loss: 1.965 accuracy: 23.438
EPOCH: 1/100 idx: 350/391 Loss: 1.905 accuracy: 27.344
Test accuarcy: 32.26
Test average loss: 0.05307036824226379
Saving
EPOCH: 2/100 idx: 50/391 Loss: 1.633 accuracy: 42.969
EPOCH: 2/100 idx: 100/391 Loss: 1.541 accuracy: 41.406
EPOCH: 2/100 idx: 150/391 Loss: 1.630 accuracy: 41.406
EPOCH: 2/100 idx: 200/391 Loss: 1.565 accuracy: 38.281
EPOCH: 2/100 idx: 250/391 Loss: 1.721 accuracy: 42.969
EPOCH: 2/100 idx: 300/391 Loss: 1.228 accuracy: 50.781
EPOCH: 2/100 idx: 350/391 Loss: 1.416 accuracy: 43.750
Test accuarcy: 49.84
Test average loss: 0.042384281659126284
Saving
EPOCH: 3/100 idx: 50/391 Loss: 1.445 accuracy: 47.656
EPOCH: 3/100 idx: 100/391 Loss: 1.464 accur

In [10]:
while True:
    continue

KeyboardInterrupt: ignored