## 다중분류
- 소프트맥스 함수 <-> 교차엔트로피 손실함수(CE)
- 로그소프트맥스 함수 <-> 음의 가능도 손실함수(NLL)

In [None]:
import numpy as np
from copy import deepcopy
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

#### 다중분류 심층신경망

In [None]:
from torchvision import datasets, transforms

train = datasets.MNIST(
    '../data', train=True, download=True,
    transform=transforms.Compose([      # 데이터 정제(정규화 같은 느낌)
        transforms.ToTensor(),
    ]),
)
test = datasets.MNIST(
    '../data', train=False,
    transform=transforms.Compose([
        transforms.ToTensor(),
    ]),
)

# 이미지 시각화
# def plot(x):
#     img = (np.array(x.detach().cpu(), dtype='float')).reshape(28, 28)

#     plt.imshow(img, cmap='gray')
#     plt.show()

# plot(train.data[0])

# 학습 데이터 벡터로 변환
x = train.data.float() / 255.
y = train.targets   # label(정답 데이터)
print(y)
x = x.view(x.size(0), -1)
print(x.shape, y.shape)

# 입출력 데이터 크기 설정
input_size = x.size(-1)
output_size = int(max(y)) + 1

print(input_size, output_size)

print('input_size: %d, output_size: %d' % (input_size, output_size))

# 훈련/검증 데이터 분류
ratios = [.8, .2]   # 훈련:80%, 검증:20%

train_cnt = int(x.size(0) * ratios[0])
valid_cnt = int(x.size(0) * ratios[1])
test_cnt = len(test.data)
cnts = [train_cnt, valid_cnt]

print("Train %d / Valid %d / Test %d samples." % (train_cnt, valid_cnt, test_cnt))

indices = torch.randperm(x.size(0))

x = torch.index_select(x, dim=0, index=indices)
y = torch.index_select(y, dim=0, index=indices)

x = list(x.split(cnts, dim=0))
y = list(y.split(cnts, dim=0))

x += [(test.data.float() / 255.).view(test_cnt, -1)]
y += [test.targets]
print(x)

for x_i, y_i in zip(x, y):
    print(x_i.size(), y_i.size())

# 심층신경망 구성
model = nn.Sequential(
    nn.Linear(input_size, 400),
    nn.LeakyReLU(),
    nn.Linear(400, 200),
    nn.LeakyReLU(),
    nn.Linear(200, 50),
    nn.LeakyReLU(),
    nn.Linear(50, output_size),
    nn.LogSoftmax(dim=-1),  # 로그소프트맥스 -> NNL손실함수
)

# 옵티마이저 및 손실함수 설정 (LogSoftmax - NLL)
optimizer = optim.Adam(model.parameters())
crit = nn.NLLLoss()

# GPU 활용 가능 여부 판정
device = torch.device('cpu')
if torch.cuda.is_available():
    device = torch.device('cuda')

# 모델과 텐서를 선택된 장치로 각각 이동 및 복사
model = model.to(device)

x = [x_i.to(device) for x_i in x]
y = [y_i.to(device) for y_i in y]

# 하이퍼파라미터 설정
n_epochs = 1000
batch_size = 256
print_interval = 10

lowest_loss = np.inf
best_model = None

early_stop = 50
lowest_epoch = np.inf

# 훈련 시작(피드포워드,역전파,경사하강법)
train_history, valid_history = [], []

for i in range(n_epochs):
    indices = torch.randperm(x[0].size(0)).to(device)
    x_ = torch.index_select(x[0], dim=0, index=indices)
    y_ = torch.index_select(y[0], dim=0, index=indices)
    
    x_ = x_.split(batch_size, dim=0)
    y_ = y_.split(batch_size, dim=0)
    
    train_loss, valid_loss = 0, 0
    y_hat = []
    
    for x_i, y_i in zip(x_, y_):
        y_hat_i = model(x_i)
        loss = crit(y_hat_i, y_i.squeeze())

        optimizer.zero_grad()
        loss.backward()

        optimizer.step()        
        train_loss += float(loss)

    train_loss = train_loss / len(x_)
    
    # 검증 시작(피드포워드)
    with torch.no_grad():
        x_ = x[1].split(batch_size, dim=0)
        y_ = y[1].split(batch_size, dim=0)
        
        valid_loss = 0
        
        for x_i, y_i in zip(x_, y_):
            y_hat_i = model(x_i)
            loss = crit(y_hat_i, y_i.squeeze())
            
            valid_loss += float(loss)
            
            y_hat += [y_hat_i]
            
    valid_loss = valid_loss / len(x_)
    
    train_history += [train_loss]
    valid_history += [valid_loss]
        
    if (i + 1) % print_interval == 0:
        print('Epoch %d: train loss=%.4e  valid_loss=%.4e  lowest_loss=%.4e' % (
            i + 1,
            train_loss,
            valid_loss,
            lowest_loss,
        ))
    
    # 검증: 조기종료 및 베스트 모델 선택
    if valid_loss <= lowest_loss:
        lowest_loss = valid_loss
        lowest_epoch = i
        
        best_model = deepcopy(model.state_dict())
    else:
        if early_stop > 0 and lowest_epoch + early_stop < i + 1:
            print("There is no improvement during last %d epochs." % early_stop)
            break

print("The best validation loss from epoch %d: %.4e" % (lowest_epoch + 1, lowest_loss))
model.load_state_dict(best_model)

#### 평가 (손실)

In [None]:
# 손실 곡선 확인
plot_from = 0

plt.figure(figsize=(20, 10))
plt.grid(True)
plt.title("Train / Valid Loss History")
plt.plot(
    range(plot_from, len(train_history)), train_history[plot_from:],
    range(plot_from, len(valid_history)), valid_history[plot_from:],
)
plt.yscale('log')
plt.show()

# 평가(테스트)
test_loss = 0
y_hat = []

with torch.no_grad():
    x_ = x[-1].split(batch_size, dim=0)
    y_ = y[-1].split(batch_size, dim=0)

    for x_i, y_i in zip(x_, y_):
        y_hat_i = model(x_i)
        loss = crit(y_hat_i, y_i.squeeze())

        test_loss += loss

        y_hat += [y_hat_i]

test_loss = test_loss / len(x_)
y_hat = torch.cat(y_hat, dim=0)

print("Test loss: %.4e" % test_loss)

#### 평가 (정확도)

In [None]:
# 분류 정확도 계산
correct_cnt = (y[-1].squeeze() == torch.argmax(y_hat, dim=-1)).sum()
total_cnt = float(y[-1].size(0))

print("Test Accuracy: %.4f" % (correct_cnt / total_cnt))

#### 평가 (혼동 행렬)

In [None]:
# 혼동 행렬 출력
import pandas as pd
from sklearn.metrics import confusion_matrix

print(pd.DataFrame(confusion_matrix(y[-1].detach().cpu().numpy(), torch.argmax(y_hat, dim=-1).detach().cpu().numpy()),
             index=['true_%d' % i for i in range(10)],
             columns=['pred_%d' % i for i in range(10)]))