In [1]:
import torch
import torch.nn as nn
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split
import numpy as np
import os
from collections import namedtuple
from torch import optim
import time

# device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device = torch.device('mps:0' if torch.backends.mps.is_available() else 'cpu')

# ResNet

## 데이터 전처리 설정

In [2]:
def transform_config(resize, phase):
    data_transform = {
        'train': transforms.Compose([
            transforms.RandomResizedCrop(resize, scale=(0.5,1.0)),
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(10.),
            transforms.ToTensor()
        ]),
        'val': transforms.Compose([
            transforms.Resize(resize),
            # transforms.CenterCrop(resize),
            transforms.ToTensor()
        ]),
        'test': transforms.Compose([
            transforms.Resize(resize),
            transforms.ToTensor()
        ])
    }
    
    return data_transform[phase]

## param 설정

In [3]:
# ResNet required input data of (3,224,224) size,
# but it was trained using (1,112,112) size to reduce the long learning time and test with data of different sizes.


data_size = 112
num_class = 10
epochs = 10
batch_size = 32
learning_rate = 1e-3

## dataset 정의

In [4]:
all_train_data = datasets.FashionMNIST(root = './data/train', train = True, download = True, transform = transform_config(data_size,'train'))
all_test_data = datasets.FashionMNIST(root = './data/test', train = False, download = True, transform = transform_config(data_size,'test'))

all_train_data_length = len(all_train_data)
all_test_data_length = len(all_test_data)
train_data_length = int(all_train_data_length*0.8)
valid_data_length = int(all_train_data_length*0.2)
test_data_length = int(all_test_data_length)

train_data, valid_data, residual_train_data = random_split(all_train_data,[train_data_length, valid_data_length, all_train_data_length - train_data_length - valid_data_length])
test_data, residual_test_data = random_split(all_test_data,[test_data_length, all_test_data_length - test_data_length])

print('train_data_shape:{} / train_data_length:{}'.format(train_data[0][0].shape, train_data_length))
print('valid_data_shape:{} / valid_data_length:{}'.format(valid_data[0][0].shape, valid_data_length))
print('test_data_shape:{} / test_data_length:{}'.format(test_data[0][0].shape, test_data_length))

train_data_shape:torch.Size([1, 112, 112]) / train_data_length:48000
valid_data_shape:torch.Size([1, 112, 112]) / valid_data_length:12000
test_data_shape:torch.Size([1, 112, 112]) / test_data_length:10000


#### dataset의 데이터 DataLoader를 이용하여 메모리로 불러오기

In [5]:
train_iterator = DataLoader(train_data, batch_size=batch_size, shuffle=True)
valid_iterator = DataLoader(valid_data, batch_size=batch_size, shuffle=False)

## ResNet 구현

In [6]:
class BasicBlock(nn.Module):
    expansion = 1
    
    def __init__(self, in_channels, out_channels, stride):
        super().__init__()
        
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.relu = nn.ReLU(inplace=True)
        
        self.shortcut = nn.Sequential()
        
        if stride != 1:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(out_channels)
            )
    
    def forward(self, x):
        i = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out += self.shortcut(i)
        out = self.relu(out)
        return out

In [7]:
class Bottleneck(nn.Module):
    expansion = 4
    
    def __init__(self, in_channels, out_channels, stride):
        super().__init__()
        
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=1, stride=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)
        
        self.conv3 = nn.Conv2d(out_channels, self.expansion*out_channels, kernel_size=1, stride=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*out_channels)
        
        self.relu = nn.ReLU(inplace=True)
        
        self.shortcut = nn.Sequential()
        
        if stride != 1 or in_channels != self.expansion*out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, self.expansion*out_channels, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*out_channels)
            )
        
    def forward(self, x):
        i = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        out = self.conv3(out)
        out = self.bn3(out)
        out += self.shortcut(i)
        out = self.relu(out)
        return out

In [8]:
class ResNet(nn.Module):
    def __init__(self, config, num_classes, zero_init_residual=False):
        super().__init__()
        
        block, n_block, channels = config
        self.in_channels = channels[0]
        
#         self.conv1 = nn.Conv2d(3, self.in_channels, kernel_size=7, stride=2, padding=3, bias=False)
        # for using non-color photos change in_chennels 3 to 1
        self.conv1 = nn.Conv2d(1, self.in_channels, kernel_size=7, stride=2, padding=3, bias=False) 
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)
        
        self.layer1 = self.get_resnet_layer(block, n_block[0], channels[0], stride=1)
        self.layer2 = self.get_resnet_layer(block, n_block[1], channels[1], stride=2)
        self.layer3 = self.get_resnet_layer(block, n_block[2], channels[2], stride=2)
        self.layer4 = self.get_resnet_layer(block, n_block[3], channels[3], stride=2)
        
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(self.in_channels, num_classes)
        
        self.relu = nn.ReLU(inplace=True)
        
        if zero_init_residual:
            for m in self.modules():
                if isinstance(m, Bottleneck):
                    nn.init.constant_(m.bn3.weight, 0)
                elif isinstance(m, BasicBlock):
                    nn.init.constant_(m.bn2.weight, 0)
    
    def get_resnet_layer(self, block, n_blocks, channels, stride):
        layer = []
        
        layer.append(block(self.in_channels, channels, stride))
        
        for i in range(1, n_blocks):
            layer.append(block(block.expansion*channels, channels, stride=1))
            
        self.in_channels = block.expansion*channels
        
        return nn.Sequential(*layer)
    
    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        h = x.view(x.shape[0], -1)
        x = self.fc(h)
        
        return x, h

## ResNet 함수 정의

In [9]:
ResNetConfig = namedtuple('ResNetConfig', ['block', 'n_blocks', 'channels'])

resnet18_config = ResNetConfig(block = BasicBlock,
                               n_blocks = [2,2,2,2],
                               channels = [64, 128, 256, 512])

resnet34_config = ResNetConfig(block = BasicBlock,
                               n_blocks = [3,4,6,3],
                               channels = [64, 128, 256, 512])

resnet50_config = ResNetConfig(block = Bottleneck,
                               n_blocks = [3, 4, 6, 3],
                               channels = [64, 128, 256, 512])

resnet101_config = ResNetConfig(block = Bottleneck,
                                n_blocks = [3, 4, 23, 3],
                                channels = [64, 128, 256, 512])

resnet152_config = ResNetConfig(block = Bottleneck,
                                n_blocks = [3, 8, 36, 3],
                                channels = [64, 128, 256, 512])

def ResNet50():
    return ResNet(resnet50_config, num_class, True)

#### 모델 구조

In [10]:
model = ResNet50()

print(model)

ResNet(
  (conv1): Conv2d(1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (shortcut): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (1): Ba

#### pytorch에서 제공하는 사전 훈련된 구조

In [11]:
import torchvision.models as models

pretrained_resnet50 = models.resnet50(pretrained = True)

print(pretrained_resnet50)



ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

## optimizer & loos_function 정의

In [12]:
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
criterion = nn.CrossEntropyLoss()

model = model.to(device)
criterion = criterion.to(device)

## train & evaluate 함수 정의

In [13]:
def train(model, iterator, optimizer, criterion, device):
    epoch_loss = 0
    epoch_acc_1 = 0
    epoch_acc_5 = 0
    
    model.train()
    for(x, y) in iterator:
        x = x.to(device)
        y = y.to(device)
        
        optimizer.zero_grad()
        y_pred = model(x)
      
        loss = criterion(y_pred[0], y)
        
        acc_1, acc_5 = calc_topk_accuracy(y_pred[0], y)
        loss.backward()
        optimizer.step()
        
        epoch_loss += loss.item()
        epoch_acc_1 += acc_1.item()
        epoch_acc_5 += acc_5.item()
        
    epoch_loss /= len(iterator)
    epoch_acc_1 /= len(iterator)
    epoch_acc_5 /= len(iterator)
    
    return epoch_loss, epoch_acc_1, epoch_acc_5

In [14]:
def evaluate(model, iterator, criterion, device):    
    epoch_loss = 0
    epoch_acc_1 = 0
    epoch_acc_5 = 0
    
    model.eval()    
    with torch.no_grad():        
        for (x, y) in iterator:
            
            x = x.to(device)
            y = y.to(device)
            y_pred = model(x)
            
            loss = criterion(y_pred[0], y)
            
            acc_1, acc_5 = calc_topk_accuracy(y_pred[0], y)

            epoch_loss += loss.item()
            epoch_acc_1 += acc_1.item()
            epoch_acc_5 += acc_5.item()
        
    epoch_loss /= len(iterator)
    epoch_acc_1 /= len(iterator)
    epoch_acc_5 /= len(iterator)        
    
    return epoch_loss, epoch_acc_1, epoch_acc_5

In [15]:
@torch.no_grad()
def calc_topk_accuracy(y_pred, y , k=num_class):
#     with torch.no_grad(): #-> @torch.no_grad() 'can be replaced with decorator'
    batch_size = y.shape[0]
    
    _, top_pred = y_pred.topk(k,1) #return : 'k' largest value of ascending tensor, 'k' index of origin tensor -> [batchSize, k]
    
    top_pred = top_pred.t() # transposed matrix -> [k, batchSize] : [[1st],[2nd],[3rd]....]

    correct = top_pred.eq(y.view(1, -1).expand_as(top_pred))
#     y.view(1, -1).expand_as(top_pred) -> change shape [batchSize] to [1, batchSize] and expand this tensor to make [k, batchSize]

    correct_1 = correct[:1].reshape(-1).float().sum(0, keepdim = True) # result of predicted
    correct_k = correct[:k].reshape(-1).float().sum(0, keepdim = True)
    
    acc_1 = correct_1 / batch_size
    acc_k = correct_k / batch_size
    
    return acc_1, acc_k

In [16]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    
    return elapsed_mins, elapsed_secs

## train & evaluate

In [17]:
best_valid_loss = float('inf')

for epoch in range(epochs):
    start_time = time.monotonic()
    
    train_loss, train_acc_1, train_acc_5 = train(model, train_iterator, optimizer, criterion, device)
    valid_loss, valid_acc_1, valid_acc_5 = evaluate(model, valid_iterator, criterion, device)
        
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), './data/ResNet-FashionMNIST-model.pt')

    end_time = time.monotonic()
    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f} | Train Acc @1: {train_acc_1*100:6.2f}% | Train Acc @5: {train_acc_5*100:6.2f}%')
    print(f'\tValid Loss: {valid_loss:.3f} | Valid Acc @1: {valid_acc_1*100:6.2f}% | Valid Acc @5: {valid_acc_5*100:6.2f}%')

Epoch: 01 | Epoch Time: 5m 28s
	Train Loss: 0.771 | Train Acc @1:  70.97% | Train Acc @5: 100.00%
	Valid Loss: 0.592 | Valid Acc @1:  77.74% | Valid Acc @5: 100.00%
Epoch: 02 | Epoch Time: 5m 26s
	Train Loss: 0.525 | Train Acc @1:  80.31% | Train Acc @5: 100.00%
	Valid Loss: 0.459 | Valid Acc @1:  82.86% | Valid Acc @5: 100.00%
Epoch: 03 | Epoch Time: 5m 26s
	Train Loss: 0.449 | Train Acc @1:  83.50% | Train Acc @5: 100.00%
	Valid Loss: 0.452 | Valid Acc @1:  84.13% | Valid Acc @5: 100.00%
Epoch: 04 | Epoch Time: 5m 26s
	Train Loss: 0.410 | Train Acc @1:  84.63% | Train Acc @5: 100.00%
	Valid Loss: 0.403 | Valid Acc @1:  85.23% | Valid Acc @5: 100.00%
Epoch: 05 | Epoch Time: 5m 26s
	Train Loss: 0.384 | Train Acc @1:  85.72% | Train Acc @5: 100.00%
	Valid Loss: 0.392 | Valid Acc @1:  85.47% | Valid Acc @5: 100.00%
Epoch: 06 | Epoch Time: 5m 25s
	Train Loss: 0.364 | Train Acc @1:  86.59% | Train Acc @5: 100.00%
	Valid Loss: 0.371 | Valid Acc @1:  86.39% | Valid Acc @5: 100.00%
Epoch: 07 