<a href="https://colab.research.google.com/github/youuRee/DeepLearning_Lecture/blob/main/02_MLP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Multi-layer Perceptron (MLP)

## 외부 파일 가져오기 & requirements 설치

In [None]:
from google.colab import drive
drive.mount("/content/drive")
import os
import sys
sys.path.append("/content/drive/MyDrive/#fastcampus")
!pip install -r "/content/drive/MyDrive/#fastcampus/requirements.txt"

In [None]:
import numpy as np
from tqdm import tqdm
import matplotlib.pyplot as plt
import torch
from torch import nn
import torch.nn.functional as F
from torch import optim
from torch.utils.tensorboard import SummaryWriter
from torch.utils.data import random_split

from torchvision.datasets import FashionMNIST
from torchvision import transforms

In [None]:
from data_utils import dataset_split

Transform
- transforms.ToTensor() : 데이터를 tensor로 바꿔준다.
-transforms.Normalize(mean, std, inplace=False) : 정규화한다.
-transforms.ToPILImage() : csv 파일로 데이터셋을 받을 경우, PIL image로 바꿔준다.
- transforms.Compose : 여러 단계로 변환해야 하는 경우, Compose를 통해 여러 단계를 묶을 수 있다

In [None]:
data_root = os.path.join(os.getcwd(), "data")

# 전처리 부분 (preprocessing) & 데이터 셋 정의.
transform = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize([0.5], [0.5]), # mean, # std
    ]
)
fashion_mnist_dataset = FashionMNIST(data_root, download=True, train=True, transform=transform)


## Dataloader를 정의

In [None]:
# 훈련데이터 : 0.9, 테스트데이터: 0.1 비율로 나눔
datasets = dataset_split(fashion_mnist_dataset, split=[0.9, 0.1])

train_dataset = datasets["train"]
val_dataset = datasets["val"]

train_batch_size = 100
val_batch_size = 10

train_dataloader = torch.utils.data.DataLoader(
    train_dataset, batch_size=train_batch_size, shuffle=True, num_workers=1
)
# num_workers : 병렬 프로세싱할 때 얼마나 있으면 좋냐
val_dataloader = torch.utils.data.DataLoader(
    val_dataset, batch_size=val_batch_size, shuffle=False, num_workers=1
)


In [None]:
for sample_batch in train_dataloader:
    print(sample_batch)
    print(sample_batch[0].shape, sample_batch[1].shape)
    break

## 모델 (Multi-layer Perceptron) (MLP) ! 정의

In [None]:
# Define Model.
# nn.Module 사용 --> 사용안하면 디바이스 변경시 꼬임
class MLP(nn.Module):
    def __init__(self, in_dim: int, h1_dim: int, h2_dim: int, out_dim: int):
        super().__init__()
        self.linear1 = nn.Linear(in_dim, h1_dim)
        self.linear2 = nn.Linear(h1_dim, h2_dim)
        self.linear3 = nn.Linear(h2_dim, out_dim)
        self.relu = F.relu
        
    def forward(self, input):
        x = torch.flatten(input, start_dim=1)
        x = self.relu(self.linear1(x))
        x = self.relu(self.linear2(x))
        out = self.linear3(x)
        # out = F.softmax(out)
        return out

## 모델 선언 및 손실 함수, 최적화(Optimizer) 정의, Tensorboard Logger 정의 

In [None]:
# define model.
model = MLP(28*28, 128, 64, 10)

# define loss
loss_function = nn.CrossEntropyLoss()

# define optimizer
#  parameters() 메소드는 먼저 모든 하위 모듈들을 탐색하고(recursive=True), 각 모듈의 _parameters에 들어있는 파라미터들을 하나씩 반환해주는 함수
# 모델 a->b->c , a.parameter() : c의 파라미터 반환
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
max_epoch = 15

# define tensorboard logger
writer = SummaryWriter()
log_interval = 100

In [None]:
%load_ext tensorboard
%tensorboard --logdir runs/

train_step = 0
for epoch in range(1, max_epoch+1):
    # valid step
    # valid data에 최적화x --> no_grad()
    with torch.no_grad():
        val_loss = 0.0
        val_corrects = 0

        for val_batch_idx, (val_images, val_labels) in enumerate(
            tqdm(val_dataloader, position=0, leave=True, desc="validation")
        ):
            # forward
            val_outputs = model(val_images)
            _, val_preds = torch.max(val_outputs, 1)
            
            # loss & acc
            val_loss += loss_function(val_outputs, val_labels) / val_outputs.shape[0]
            val_corrects += torch.sum(val_preds == val_labels.data) / val_outputs.shape[0]
    
    # valid step logging
    val_epoch_loss = val_loss / len(val_dataloader)
    val_epoch_acc = val_corrects / len(val_dataloader)
    
    print(
        f"{epoch} epoch, {train_step} step: val_loss: {val_epoch_loss}, val_acc: {val_epoch_acc}"
    )
    writer.add_scalar("Loss/val", val_epoch_loss, train_step)
    writer.add_scalar("Acc/val", val_epoch_acc, train_step)
    writer.add_images("Images/val", val_images, train_step)
    
    # train step
    current_loss = 0
    current_corrects = 0

    for batch_idx, (images, labels) in enumerate(
         tqdm(train_dataloader, position=0, leave=True, desc="training")
    ):
        current_loss = 0.0
        current_corrects = 0

        # Forward
        # get predictions
        outputs = model(images)
        # 가장 높은 값(energy)를 갖는 분류(class)를 정답으로 선택
        _, preds = torch.max(outputs, 1)
        
        # get loss (Loss 계산)
        loss = loss_function(outputs, labels)

        # Backpropagation
        # optimizer 초기화 (zero화)
        optimizer.zero_grad()

        # Perform backward pass
        loss.backward()

        # Perform Optimization
        optimizer.step()

        current_loss += loss.item()
        current_corrects += torch.sum(preds == labels.data)

        if train_step % log_interval == 0:
            train_loss = current_loss / log_interval
            train_acc = current_corrects / log_interval

            print(
                f"{train_step}: train_loss: {train_loss}, train_acc: {train_acc}"
            )
            writer.add_scalar("Loss/train", train_loss, train_step)
            writer.add_scalar("Acc/train", train_acc, train_step)
            writer.add_images("Images/train", images, train_step)
            writer.add_graph(model, images)
            current_loss = 0
            current_corrects = 0

        train_step += 1

In [None]:
# save model
os.makedirs("./logs/models", exist_ok=True)
torch.save(model, "./logs/models/mlp.ckpt")

In [None]:
# load model
loaded_model = torch.load("./logs/models/mlp.ckpt")
loaded_model.eval()
print(loaded_model)

In [None]:
def softmax(x, axis=0):
    "numpy softmax" # numpy 형태로 저장
    max = np.max(x, axis=axis, keepdims=True)
    e_x = np.exp(x - max)
    sum = np.sum(e_x, axis=axis, keepdims=True)
    f_x = e_x / sum
    return f_x

In [None]:
# Test
test_batch_size = 100
test_dataset = FashionMNIST(data_root, download=True, train=False, transform=transforms.ToTensor())
test_dataloader = torch.utils.data.DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False, num_workers=1)

test_labels_list = []
test_preds_list = []
test_outputs_list = []

for i, (test_images, test_labels) in enumerate(tqdm(test_dataloader, position=0, leave=True, desc="testing")):
    # forward
    test_outputs = loaded_model(test_images)
    _, test_preds = torch.max(test_outputs, 1)

    # 분석 -> numpy 사용하는게 좋음
    # GPU -> CPU detach()
    final_outs = softmax(test_outputs.detach().numpy(), axis=1)
    test_outputs_list.extend(final_outs)
    test_preds_list.extend(test_preds.detach().numpy())
    test_labels_list.extend(test_labels.detach().numpy())

test_preds_list = np.array(test_preds_list)
test_labels_list = np.array(test_labels_list)

print(f"\nacc: {np.mean(test_preds_list == test_labels_list)*100}%")

In [None]:
# ROC Curve
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score

fpr = {}
tpr = {}
thresh = {}
n_class = 10

for i in range(n_class):
    fpr[i], tpr[i], thresh[i] = roc_curve(test_labels_list, np.array(test_outputs_list)[:, i], pos_label=i)

# plot.
for i in range(n_class):
    plt.plot(fpr[i], tpr[i], linestyle="--", label=f"Class {i} vs Rest")
plt.title("Multi-class ROC Curve")
plt.xlabel("False Positive Rate")
plt.ylabel("True Positive Rate")
plt.legend(loc="best")
plt.show()

print("auc_score", roc_auc_score(test_labels_list, test_outputs_list, multi_class="ovo", average="macro"))