# 신경망 깊게 쌓아 컬러 데이터셋에 적용하기
Convolutional Neural Network (CNN) 을 쌓아올려 딥한 러닝을 해봅시다.

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import transforms, datasets, models

In [2]:
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")
DATA_PATH = './.data'

# 하이퍼파라미터
EPOCHS     = 300
BATCH_SIZE = 128


## 데이터셋 불러오기

In [3]:
train_loader = torch.utils.data.DataLoader(
    datasets.CIFAR10(DATA_PATH,
                   train=True,
                   download=True,
                   transform=transforms.Compose([
                       transforms.RandomCrop(32, padding=4),
                       transforms.RandomHorizontalFlip(),
                       transforms.ToTensor(),
                       transforms.Normalize((0.5, 0.5, 0.5),
                                            (0.5, 0.5, 0.5))])),
    batch_size=BATCH_SIZE, shuffle=True)

test_loader = torch.utils.data.DataLoader(
    datasets.CIFAR10(DATA_PATH,
                   train=False, 
                   transform=transforms.Compose([
                       transforms.ToTensor(),
                       transforms.Normalize((0.5, 0.5, 0.5),
                                            (0.5, 0.5, 0.5))])),
    batch_size=BATCH_SIZE, shuffle=True)

Files already downloaded and verified


In [5]:
data_tuple = iter(test_loader).next()

In [7]:
print(data_tuple[0].type())
print(data_tuple[0].shape)
print(data_tuple[1].type())
print(data_tuple[1].shape)


torch.FloatTensor
torch.Size([128, 3, 32, 32])
torch.LongTensor
torch.Size([128])


## ResNet 모델 만들기

In [8]:
test = iter(train_loader)
test = next(test)
print(test[0].shape)
print(test[1])

torch.Size([128, 3, 32, 32])
tensor([4, 3, 9, 0, 9, 3, 2, 3, 7, 0, 3, 8, 4, 6, 6, 9, 2, 3, 6, 6, 9, 5, 7, 2,
        7, 1, 8, 4, 9, 0, 9, 1, 9, 4, 1, 4, 3, 2, 9, 7, 7, 9, 2, 3, 8, 5, 0, 2,
        3, 1, 6, 4, 1, 2, 6, 1, 0, 6, 5, 0, 9, 4, 1, 1, 0, 2, 0, 6, 5, 7, 0, 4,
        0, 2, 1, 4, 1, 1, 0, 0, 9, 0, 3, 9, 8, 9, 8, 1, 2, 3, 4, 6, 1, 2, 0, 1,
        6, 3, 7, 1, 6, 3, 7, 6, 9, 0, 5, 1, 0, 1, 6, 1, 7, 1, 3, 5, 3, 7, 2, 4,
        0, 8, 7, 6, 5, 5, 4, 4])


In [31]:
import math
import torch
from torch import nn
from torch.nn import Linear
from torch.nn import Conv2d
from torch.nn import BatchNorm2d
from torch.nn import LeakyReLU

class Conv2dBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, stride=1):
        super(Conv2dBlock, self).__init__()
        same_padding_size = self._get_same_padded_info(kernel_size, stride)
        self.conv = Conv2d(in_channels, out_channels, kernel_size,
                               stride=stride, padding=same_padding_size, bias=False)
        self.batch_normalization = BatchNorm2d(num_features=out_channels)
        self.leakyRelu = LeakyReLU(negative_slope=0.2)
    def forward(self, x):
        x = self.conv(x)
        x = self.batch_normalization(x)
        x = self.leakyRelu(x)
        
        return x
        
    def _get_same_padded_info(self, kernel_size, stride=1):
        # solve equation (i-k+2p)/s + 1 = i / s
        same_padding_size = math.ceil((-stride + kernel_size) / 2)

        return same_padding_size

class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size=3, sample_mode = "same"):
        super(ResidualBlock, self).__init__()
        self.sample_mode = sample_mode
        self.conv_stride_1 = Conv2dBlock(out_channels, out_channels, kernel_size, stride=1)
        self.conv_stride_2 = Conv2dBlock(in_channels, out_channels, kernel_size, stride=2)
        self.batch_normalization = BatchNorm2d(num_features=out_channels)
        self.leakyRelu = LeakyReLU(negative_slope=0.2)
    def forward(self, x):
        stack_layer = []
        
        residual_x = x if self.sample_mode == "same" else self.conv_stride_2(x)
                    
        for _ in range(32):
            if self.sample_mode == "down":
                conv_x = self.conv_stride_2(x)
            else :
                conv_x = x
                
            conv_x = self.conv_stride_1(conv_x)
            out = torch.add(conv_x, residual_x) 
            out = self.batch_normalization(out)
            out = self.leakyRelu(out)
            stack_layer.append(out)
        stack_layer = torch.stack(stack_layer)
        stack_layer = stack_layer.sum(dim=0)
        stack_layer = self.batch_normalization(stack_layer)
        stack_layer = self.leakyRelu(stack_layer)
        
        return stack_layer

In [32]:
from torch.nn import Softmax

class ResNet(nn.Module):
    def __init__(self, class_num, size, depth, input_shape, device):
        super(ResNet, self).__init__()
        
        input_dim = input_shape[-1]
        size_coefficient = 8
        
        output_width = input_shape[0] // (2 ** depth) 
        output_height = input_shape[1] // (2 ** depth)
        output_dim = size_coefficient * (depth+1) * size
        self.output_flatten_length = output_width * output_height * output_dim
        
        self.conv2d_block = Conv2dBlock(in_channels=input_dim, out_channels=size_coefficient*size,
                                        kernel_size=3, stride=1).to(device)
        self.residual_block_1_same = ResidualBlock(in_channels=size_coefficient*size, out_channels=size_coefficient*size, 
                                                   kernel_size=3, sample_mode="same").to(device)
        
        self.residual_block_down_list = []
        self.residual_block_same_list = []
        
        for index in range(1,depth+1):
            residual_block_down = ResidualBlock(in_channels=size_coefficient*index*size, out_channels=size_coefficient*(index+1)*size, 
                                                kernel_size=3, sample_mode="down").to(device)    
            residual_block_same = ResidualBlock(in_channels=size_coefficient*(index+1)*size, out_channels=size_coefficient*(index+1)*size, 
                                                kernel_size=3, sample_mode="same").to(device)
            self.residual_block_down_list.append(residual_block_down)
            self.residual_block_same_list.append(residual_block_same)        
        
        self.linear = Linear(self.output_flatten_length, class_num).to(device)
        self.softmax = Softmax(1)
    def forward(self, x):
        x = self.conv2d_block(x)
        x = self.residual_block_1_same(x)
        for (residual_block_down, residual_block_same) in zip(self.residual_block_down_list, self.residual_block_same_list):
            x = residual_block_down(x)
            x = residual_block_same(x)
        # reshape for linearLayer
        x = x.view(-1,self.output_flatten_length)
        x = self.linear(x)
        x = self.softmax(x)
        return x

## 준비

In [33]:
model = ResNet(class_num=10, size=1, depth=2, input_shape=(32,32,3), device=DEVICE)
optimizer = optim.SGD(model.parameters(), lr=0.1,
                      momentum=0.9, weight_decay=0.0005)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=50, gamma=0.1)

In [34]:
#print(model)

## 학습하기

In [35]:
def train(model, train_loader, optimizer, epoch):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data, target = data.to(DEVICE), target.to(DEVICE)
        optimizer.zero_grad()
        output = model(data)
#         print(output[0])
#         print(output.shape)
#         print(target[0])
#         print(target.shape)
        loss = F.cross_entropy(output, target)

        loss.backward()
        optimizer.step()

        if batch_idx % 200 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))

## 테스트하기

In [36]:
def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in test_loader:
            data, target = data.to(DEVICE), target.to(DEVICE)
            output = model(data)

            # 배치 오차를 합산
            test_loss += F.cross_entropy(output, target,
                                         reduction='sum').item()

            # 가장 높은 값을 가진 인덱스가 바로 예측값
            pred = output.max(1, keepdim=True)[1]
            correct += pred.eq(target.view_as(pred)).sum().item()

    test_loss /= len(test_loader.dataset)
    test_accuracy = 100. * correct / len(test_loader.dataset)
    return test_loss, test_accuracy

## 코드 돌려보기

자, 이제 모든 준비가 끝났습니다. 코드를 돌려서 실제로 훈련이 되는지 확인해봅시다!

In [41]:
print(output.type())
print(output.shape)

torch.cuda.FloatTensor
torch.Size([80, 10])


In [42]:
print(target.type())
print(target.shape)

torch.cuda.LongTensor
torch.Size([80])


In [38]:
model.train()
for batch_idx, (data, target) in enumerate(train_loader):
    data, target = data.to(DEVICE), target.to(DEVICE)
    optimizer.zero_grad()
    output = model(data)
#         print(output[0])
#         print(output.shape)
#         print(target[0])
#         print(target.shape)
    loss = F.cross_entropy(output, target)

    loss.backward()
    optimizer.step()

    if batch_idx % 200 == 0:
        print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
            epoch, batch_idx * len(data), len(train_loader.dataset),
            100. * batch_idx / len(train_loader), loss.item()))



In [37]:
for epoch in range(1, EPOCHS + 1):
    scheduler.step()
    train(model, train_loader, optimizer, epoch)
    test_loss, test_accuracy = evaluate(model, test_loader)
    
    print('[{}] Test Loss: {:.4f}, Accuracy: {:.2f}%'.format(
          epoch, test_loss, test_accuracy))

[1] Test Loss: 2.1620, Accuracy: 28.98%
[2] Test Loss: 2.1733, Accuracy: 27.48%
[3] Test Loss: 2.1850, Accuracy: 25.88%
[4] Test Loss: 2.2221, Accuracy: 22.48%
[5] Test Loss: 2.2331, Accuracy: 21.14%
[6] Test Loss: 2.2165, Accuracy: 23.21%
[7] Test Loss: 2.2951, Accuracy: 16.02%
[8] Test Loss: 2.1885, Accuracy: 26.19%
[9] Test Loss: 2.1911, Accuracy: 25.50%


KeyboardInterrupt: 

In [31]:
import math
def _get_same_padded_info(kernel_size, stride=1):
    # solve equation (i-k+2p)/s + 1 = i / s
    same_padding_size = math.ceil((-stride + kernel_size) / 2)

    return same_padding_size


In [49]:
in_channels = 3
out_channels = 10
kernel_size = 3
stride = 2
same_padding_size = _get_same_padded_info(kernel_size, stride)
conv = Conv2d(in_channels, out_channels, kernel_size,
            stride=stride, padding=same_padding_size, bias=False)

x = torch.randn(3,7,7).unsqueeze(0)
print(x.shape)
x = conv(x)
print(x.shape)

NameError: name '_get_same_padded_info' is not defined