# CNN을 통한 MNIST Classification ###

유명한 데이터셋 중 하나인 MNIST (손글씨로 표현된 숫자)를 구별하는 CNN을 학습시켜보고 정확도를 알아볼 것이다.  

In [1]:
# 기존 pytorch를 돌릴 때 항상 썼던 import 구문
import torch
import torch.nn as nn
import torch.nn.init as init
import torch.optim as optim

In [3]:
# MNIST dataset을 가져오고 처리하기 위한 import 구문
import torchvision.datasets.mnist as dset
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

In [4]:
batch_size = 256
learning_rate = 0.0002
num_epoch = 10

In [5]:
# 데이터를 다운 받을 곳을 지정
# train=True면 train set, 아니면 test set
# transform은 데이터에 행해줄 transform을 지정해준다. MNIST의 경우에는 PIL의 형태로 되어 있어서 tensor로 바꾸어주어야 한다.
# target transform은 뭔지 모르겠다.
# download 여부
mnist_train = dset.MNIST("./mnist_dataset", train=True, transform=transforms.ToTensor(), target_transform=None, download=True)
mnist_test = dset.MNIST("./mnist_dataset", train=False, transform=transforms.ToTensor(), target_transform=None, download=True)

In [6]:
# Data Loader는 데이터들을 batch_size만큼씩 나누어서 image와 label로 나누어 놓는다.
# loader의 각 샘플들이 어떤 형태인지는 dataset에 따라 다른데
# MNIST 같은 경우에는 [image, label]의 형태로 sample들이 나온다.
# Shuffle은 데이터를 섞어줄 것인지에 대한 것이고
# num_workers는 data loader에 대해 iteration을 돌릴 때 multi processing을 지원한다.
# drop_last는 마지막에 batch_size씩 자르고 남은 짜투리 부분을 어떻게 할지 처리한다. (데이터가 충분하다면 drop해도 괜찮다.)
train_loader = DataLoader(mnist_train, batch_size=batch_size, shuffle=True, num_workers=2, drop_last=True)
test_loader = DataLoader(mnist_test, batch_size=batch_size, shuffle=False, num_workers=2, drop_last=True)

In [9]:
class CNN(nn.Module):
    def __init__(self):
        super(CNN, self).__init__() # nn.Module의 초기화
        self.layer = nn.Sequential(
            nn.Conv2d(1, 16, 5), # 차례대로 (input ch #, output ch #, kernel size) -> (16, 24, 24)
            nn.ReLU(),
            nn.Conv2d(16, 32, 5), # -> (32, 20, 20)
            nn.ReLU(),
            nn.MaxPool2d(2, 2), # -> (32, 10, 10)
            nn.Conv2d(32, 64, 5), # -> (64, 6, 6))
            nn.ReLU(),
            nn.MaxPool2d(2, 2), # -> (64, 3, 3)
            nn.Flatten(), # -> (64 * 3 * 3)
            nn.Linear(64*3*3, 100), # -> (100)
            nn.ReLU(),
            nn.Linear(100,10) # -> (10)
        )

        # self.fc_layer = nn.Sequential(
        #     nn.Linear(64*3*3, 100), #64 channel, w, h = 3
        #     nn.ReLU(),
        #     nn.Linear(100,10)
        # )

    def forward(self, x):
        out = self.layer(x)
        # out = out.view(batch_size, -1) # 3d array를 1차원으로 펴준다.
        # out = self.fc_layer(out)
        return out

In [10]:
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")
model = CNN().to(device)
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [11]:
loss_arr = []
for i in range(num_epoch):
    
    for j, [image, label] in enumerate(train_loader):

        # image, label은 이미 tensor이긴 하지만 속도를 위해 device로 옮겨줘야 한다.
        x = image.to(device)
        y_ = label.to(device)

        # 지난번 예제에서도 했던 zero grad, forward prop, back prop, 그리고 parameter update
        optimizer.zero_grad()
        output = model.forward(x)
        loss = loss_func(output, y_)
        loss.backward()
        optimizer.step()

        # training data는 총 60,000개이고,
        # batch size가 256이므로
        # batch 갯수는 234개 밖에 안되서 아래 print 구문은 epoch당 1번 밖에 출력되지 않는다.
        if j % 1000 == 0:
            print(loss)
            loss_arr.append(loss.cpu().detach().numpy())


tensor(2.3044, device='cuda:1', grad_fn=<NllLossBackward>)
tensor(0.2139, device='cuda:1', grad_fn=<NllLossBackward>)
tensor(0.1123, device='cuda:1', grad_fn=<NllLossBackward>)
tensor(0.0826, device='cuda:1', grad_fn=<NllLossBackward>)
tensor(0.1223, device='cuda:1', grad_fn=<NllLossBackward>)
tensor(0.0614, device='cuda:1', grad_fn=<NllLossBackward>)
tensor(0.0343, device='cuda:1', grad_fn=<NllLossBackward>)
tensor(0.0657, device='cuda:1', grad_fn=<NllLossBackward>)
tensor(0.0228, device='cuda:1', grad_fn=<NllLossBackward>)
tensor(0.0344, device='cuda:1', grad_fn=<NllLossBackward>)


In [14]:
correct = 0
total = 0
# no_grad를 걸어주는 이유는 test 시에는 back propagation이 필요 없기 때문이다.
with torch.no_grad():
    for image, label in test_loader:
        x = image.to(device)
        y_ = label.to(device)

        output = model.forward(x)
        # 아래 1은 column 방향으로 max를 찾는다는 의미.
        # 즉 여러개의 row 중에서 하나를 고른다.
        # return 값 중 앞의 값은 max value이다.
        _, output_index = torch.max(output, 1)

        total += label.size(0)
        # output_index == y_ 는 True, False로 이루어진 어레이를 만들고
        # 뒤의 sum이 True인 부분들만 구해서 합쳐준다.
        correct += (output_index == y_).sum()
    
    print("Accuracy of Test Data: {}".format(100* correct / total))

Accuracy of Test Data: 98.8581771850586


## 별책부록 ##
pytorch 모델은 coreml로 변환하기 위해 ONNX로 저장하기.

In [19]:
dummy_input = torch.rand(1, 1, 28,28).to(device)
torch.onnx.export(model, dummy_input, "mnist.onnx", verbose=True, input_names=['input image'], output_names=['label'])

graph(%input image : Float(1, 1, 28, 28, strides=[784, 784, 28, 1], requires_grad=0, device=cuda:1),
      %layer.0.weight : Float(16, 1, 5, 5, strides=[25, 25, 5, 1], requires_grad=1, device=cuda:1),
      %layer.0.bias : Float(16, strides=[1], requires_grad=1, device=cuda:1),
      %layer.2.weight : Float(32, 16, 5, 5, strides=[400, 25, 5, 1], requires_grad=1, device=cuda:1),
      %layer.2.bias : Float(32, strides=[1], requires_grad=1, device=cuda:1),
      %layer.5.weight : Float(64, 32, 5, 5, strides=[800, 25, 5, 1], requires_grad=1, device=cuda:1),
      %layer.5.bias : Float(64, strides=[1], requires_grad=1, device=cuda:1),
      %layer.9.weight : Float(100, 576, strides=[576, 1], requires_grad=1, device=cuda:1),
      %layer.9.bias : Float(100, strides=[1], requires_grad=1, device=cuda:1),
      %layer.11.weight : Float(10, 100, strides=[100, 1], requires_grad=1, device=cuda:1),
      %layer.11.bias : Float(10, strides=[1], requires_grad=1, device=cuda:1)):
  %11 : Float(1, 16,