In [60]:
import torch

import pdb
import torch.nn as nn
import torch.autograd as autograd
import torch.nn.functional as F
from torchsummary import summary
import torch.optim as optim

## SEBlock ##

In [61]:
class SEBlock(nn.Module):
    def __init__(self, in_channels, reduction_ratio=16):
        super(SEBlock, self).__init__()
        # Squeeze operation
        self.squeeze = nn.AdaptiveAvgPool2d(1)
        # Excitation operation
        self.excitation = nn.Sequential(
            nn.Linear(in_channels, in_channels//reduction_ratio, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(in_channels//reduction_ratio, in_channels, bias=False),
            nn.Sigmoid()
        )
    def forward(self, x):
        batch_size, channels, _, _ = x.size()
        # print(x.size())
        # Squeeze and excitation
        y=self.squeeze(x).view(batch_size, channels)#squeeze하면 1,1,channel인데 이를 (batch_size, channels) 차원으로 변경
        y=self.excitation(y).view(batch_size, channels, 1, 1)
        return x*y # feature map 'x'에 대한 Attention 가중치가 적용된 결과

## SEBlock_selfattention ##

In [62]:
class SEBlock_self(nn.Module): ## GPT가 알려준 MSA 추가한 Block
    def __init__(self, in_channels, reduction_ratio=16, num_heads=4, dropout=0.1):
        super(SEBlock_self, self).__init__()
        self.squeeze = nn.AdaptiveAvgPool2d(1)
        self.excitation = nn.Sequential(
            nn.Linear(in_channels, in_channels//reduction_ratio, bias=False),
            nn.ReLU(inplace=True),
            nn.Linear(in_channels//reduction_ratio, in_channels, bias=False),
            nn.Sigmoid()
        )
        self.multihead_attn = nn.MultiheadAttention(in_channels, num_heads, dropout=dropout)

    def forward(self, x):
        # print(x.size())torch.Size([128, 256, 8, 8])

        batch_size, channels, height, width = x.size()
        y = self.squeeze(x).view(batch_size, channels)
        y = self.excitation(y).view(batch_size, channels, 1, 1)
        # apply self-attention
        x = x.flatten(2).transpose(1, 2)  # (batch_size, seq_len, hidden_dim)
        x = self.multihead_attn(x, x, x)[0]
        # print(x.size()) torch.Size([128, 64, 256])
        x = x.transpose(1, 2).reshape(batch_size, channels, height, width)  # (batch_size, hidden_dim, height, width)
        return x * y


## BasicBlock 구현 ##

In [63]:
class BasicBlock(nn.Module):
    mul = 1
    def __init__(self, in_planes, out_planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_planes)
        
        self.conv2 = nn.Conv2d(out_planes, out_planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_planes)
        
        self.shortcut = nn.Sequential()
        self.SEBlock = SEBlock(in_channels=out_planes*self.mul)
        self.SEBlock_self = SEBlock_self(in_channels = out_planes*self.mul)
        if stride != 1: # stride가 1이 아니라면, identity mapping이 아닌 경우
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_planes) # 배치 정규화(batch normalization) : conv layer가 끝날 때마다 수행
              )
        
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x))) 
        out = self.bn2(self.conv2(out))
        out = self.SEBlock_self(out)
        out += self.shortcut(x)
        out = F.relu(out)
        return out

## BottleNeck 구현 ##

In [14]:
class BottleNeck(nn.Module):
    mul = 4
    def __init__(self, in_planes, out_planes, stride=1):
        super(BottleNeck, self).__init__() 
        self.conv1 = nn.Conv2d(in_planes, out_planes, kernel_size=1, stride=stride, bias=False)
        self.bn1 = nn.BatchNorm2d(out_planes) 
        
        self.conv2 = nn.Conv2d(out_planes, out_planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_planes) # 배치 정규화(batch normalization) : conv layer가 끝날 때마다 수행
        
        
        self.conv3 = nn.Conv2d(out_planes, out_planes*self.mul, kernel_size=1, stride=1, bias=False)
        self.bn3 = nn.BatchNorm2d(out_planes*self.mul)  # 배치 정규화(batch normalization) : conv layer가 끝날 때마다 수행
        
        
        self.shortcut = nn.Sequential() 
        
        self.SEBlock = SEBlock(in_channels=out_planes * self.mul)
        self.SEBlock_self = SEBlock_self(in_channels=out_planes * self.mul)
        if stride != 1 or in_planes != out_planes*self.mul:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, out_planes*self.mul, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_planes*self.mul)
            )
    
    # 값을 도출하는 함수
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x))) #conv1을 통과후 BN1시행하고 ReLu를 통과시킴
        
        out = F.relu(self.bn2(self.conv2(out))) #conv2을 통과후 BN2시행하고 ReLu를 통과시킴
        out = self.bn3(self.conv3(out)) #conv3을 통과후 BN3시행
        # print(out.size()) -> [128, 256, 8, 8]
        # print(SEBlock(out))
        out = self.SEBlock_self(out)
        
        out += self.shortcut(x) # (핵심) skip connection
        out = F.relu(out) #skip connection을 수행한 결과에 Relu 적용
        return out

In [64]:
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10):
        
        super(ResNet, self).__init__()
        self.in_planes = 64 

        
        self.conv1 = nn.Conv2d(3, self.in_planes, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm2d(self.in_planes) 
        self.maxpool1 = nn.MaxPool2d(kernel_size=3, stride=2, padding=1) 

       
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, 256, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, 512, num_blocks[3], stride=2)

        self.avgpool = nn.AdaptiveAvgPool2d((1,1))

        self.linear = nn.Linear(512 * block.mul, num_classes)


    def _make_layer(self, block, out_planes, num_blocks, stride):
        
        strides = [stride] + [1] * (num_blocks - 1)
          

        layers = [] 
        for i in range(num_blocks): 
            layers.append(block(self.in_planes, out_planes, strides[i]))
            self.in_planes = block.mul * out_planes 
        return nn.Sequential(*layers)

    def forward(self, x):
        #conv1
        out = F.relu(self.bn1(self.conv1(x)))
        
        #conv2_x
        out = self.maxpool1(out)
        out = self.layer1(out)

        #conv3_x
        out = self.layer2(out)

        #conv4_x
        out = self.layer3(out)

        #conv5_x
        out = self.layer4(out)

        #output을 1*1로 만드는 과정
        out = self.avgpool(out)
        out = torch.flatten(out,1)
        out = self.linear(out)
        return out

In [65]:
def SENet50():
    return ResNet(BottleNeck, [3,4,6,3])

def SENet10():
    return ResNet(BasicBlock, [1,1,1,1])

In [66]:
import torchvision
import torchvision.transforms as transforms
import os

In [67]:
transform_train = transforms.Compose([ 
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(), 
    transforms.ToTensor(), 
]) 


transform_test = transforms.Compose([
    transforms.ToTensor(), 
])

train_dataset = torchvision.datasets.CIFAR10(root='./data', train=True, download=True, transform=transform_train)
test_dataset = torchvision.datasets.CIFAR10(root='./data', train=False, download=True, transform=transform_test)

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=128, shuffle=True, num_workers=2)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=100, shuffle=False, num_workers=2)

Files already downloaded and verified
Files already downloaded and verified


In [68]:
device = 'cuda' 
model = SENet10() 
model = model.to(device)

In [69]:
learning_rate = 0.1
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, weight_decay=0.0002)

In [70]:
def train(epoch):
    autograd.set_detect_anomaly(True)
    print('\n[ Train epoch: %d ]' % epoch)
    model.train()
    correct_top1 = 0
    correct_top5 = 0
    train_loss = 0
    correct = 0
    total = 0
    for batch_idx, (inputs, targets) in enumerate(train_loader):
        #print(torch.any(torch.isnan(inputs)))

        inputs, targets = inputs.to(device), targets.to(device)
        optimizer.zero_grad()

        outputs = model(inputs)
        eps = 1e-6
        loss = loss_fn(outputs, targets)
        
        loss.backward()
        

        optimizer.step()
        train_loss += loss.item()
        
        _, predicted = outputs.max(1)

        total += targets.size(0)

        current_correct = predicted.eq(targets).sum().item()
        current_correct_top1 = predicted.eq(targets).sum().item()
        current_correct_top5 = torch.topk(outputs, k=5, dim=1)[1].eq(targets.unsqueeze(dim=1)).sum().item()
        correct += current_correct
        correct_top1 += current_correct_top1
        correct_top5 += current_correct_top5

        if batch_idx % 100 == 0:
            print('\nCurrent batch:', str(batch_idx))

            print('Current batch average train accuracy:', current_correct / targets.size(0))
            print('Current batch average train top-1 error:', (1 - current_correct_top1 / targets.size(0)))
            print('Current batch average train top-5 error:', (1 - current_correct_top5 / targets.size(0)))
            # print("Current batch loss.item() : ", loss.item())
            # print("Current batch targets.size(0) : ", targets.size())
            print('Current batch average train loss:', loss.item() / targets.size(0))

    print('\nTotal average train accuracy:', correct / total)
    print('Total average train top-1 error:', (1 - correct_top1 / total))
    print('Total average train top-5 error:', (1 - correct_top5 / total))
    print('Total average train loss:', train_loss / total)

    return [(1 - correct_top1 / total), (1 - correct_top5 / total), correct / total, train_loss / total]


In [71]:
def test(epoch):
    print('\n[ Test epoch: %d ]' % epoch)
    model.eval()

    loss = 0 
    correct = 0 
    total = 0
    top1 = 0
    top5 = 0
    for batch_idx, (inputs, targets) in enumerate(test_loader):
        inputs, targets = inputs.to(device), targets.to(device)
        total += targets.size(0)
        outputs = model(inputs)
        loss += loss_fn(outputs, targets).item()
        _, predicted = outputs.max(1)
        current_correct = predicted.eq(targets).sum().item()
        correct += predicted.eq(targets).sum().item()
        _, pred = outputs.topk(5, 1, True, True)
        pred = pred.t()
        correct_tensor = pred.eq(targets.view(1, -1).expand_as(pred))
        top1 += correct_tensor[:1].reshape(-1).float().sum(0, keepdim=True)
        top5 += correct_tensor[:5].reshape(-1).float().sum(0, keepdim=True)
    print('\nTotal average test accuarcy:', correct / total)
    print('Total average test loss:', loss / total)
    print('Top-1 error rate:', (1 - top1 / total).item())
    print('Top-5 error rate:', (1 - top5 / total).item())
    state = {
        'model': model.state_dict()
    }
    if not os.path.isdir('checkpoint'): 
        os.mkdir('checkpoint') 
    file_name = 'SENet50_cifar10.pth'
    torch.save(state, './checkpoint/' + file_name)
    print('Model Saved!')
    return [(1 - top1 / total).item(),  (1 - top5 / total).item(), correct / total, loss / total]

In [72]:
def adjust_learning_rate(optimizer, epoch):
    lr = learning_rate
    if epoch >= 50: 
        lr /= 10
    if epoch >= 100: 
        lr /= 10
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

In [None]:
import time #시간과 관련된 기능을 제공하는 모듈

start_time = time.time() #시작시간을 측정

train_losses=[]
train_accuracies=[]
train_top1_err=[]
train_top5_err=[]
test_losses=[]
test_accuracies=[]
test_top1_err=[]
test_top5_err=[]
#0번째 에폭부터 149번째 에폭까지 반복하면서
#모델학습(train)과 테스트수행
for epoch in range(0, 150):
    #optimizer의 학습률을 조정하고
    adjust_learning_rate(optimizer, epoch)
    #학습(train)
    x=train(epoch)
    train_losses.append(x[3])
    train_accuracies.append(x[2])
    train_top1_err.append(x[0])
    train_top5_err.append(x[1])
    print(train_losses, train_accuracies)
    #테스트(test)
    y=test(epoch)
    test_losses.append(y[3])
    test_accuracies.append(y[2])
    test_top1_err.append(y[0])
    test_top5_err.append(y[1])
    print(test_losses, test_accuracies)
    #매 에폭이 끝날때마다 학습에 소요된 시간을 계산해서(현재시간 - 시작시간) 출력
    print('\nTime elapsed:', time.time() - start_time)


[ Train epoch: 0 ]

Current batch: 0
Current batch average train accuracy: 0.0625
Current batch average train top-1 error: 0.9375
Current batch average train top-5 error: 0.4765625
Current batch average train loss: 0.018503447994589806

Current batch: 100
Current batch average train accuracy: 0.296875
Current batch average train top-1 error: 0.703125
Current batch average train top-5 error: 0.1640625
Current batch average train loss: 0.013955810107290745

Current batch: 200
Current batch average train accuracy: 0.3984375
Current batch average train top-1 error: 0.6015625
Current batch average train top-5 error: 0.1171875
Current batch average train loss: 0.013011811301112175

Current batch: 300
Current batch average train accuracy: 0.359375
Current batch average train top-1 error: 0.640625
Current batch average train top-5 error: 0.078125
Current batch average train loss: 0.012232858687639236

Total average train accuracy: 0.34674
Total average train top-1 error: 0.65326
Total average

In [None]:
print(train_accuracies)

In [None]:
print(train_losses)

In [None]:
print(test_accuracies)

In [None]:
print(test_losses)

In [None]:
print(train_top1_err)

In [None]:
print(train_top5_err)

In [None]:
print(test_top1_err)

In [None]:
print(test_top5_err)