<div class="alert alert-block" style="border: 2px solid #1976D2;background-color:#E3F2FD;padding:5px;font-size:0.9em;">
본 자료는 저작권법 제25조 2항에 의해 보호를 받습니다. 본 자료를 외부에 공개하지 말아주세요.<br>
<b><a href="https://school.fun-coding.org/">잔재미코딩 (https://school.fun-coding.org/)</a> 에서 본 강의를 포함하는 최적화된 로드맵도 확인하실 수 있습니다</b></div>

### PyTorch 와 RNN, LSTM

- PyTorch 에는 RNN, LSTM 을 쉽게 적용할 수 있도록, API 를 제공함

### RNN 모델 구성

### torch.nn.RNN
torch.nn.RNN(*args, **kwargs)

- input_size: Input 크기(feature의 수)
- hidden_size: hidden state 크기
   - hidden state 의 수는 DNN 의 hidden layer 의 사이즈라고 생각하면 됨
- num_layers: 순환 레이서 수 (Default: 1)
   - Multi-layer 로 RNN 을 구성할 수 있음
   > Multi-layer 로 구성시에는 Gradient Vanishing 이슈가 있을 수 있음 (4개 Multi-Layer 정도는 괜찮음)

<img src="https://miro.medium.com/max/576/1*UCiibKij5-kHP__Igb2_1Q.jpeg">

- nonlinearity – 비선형 활성화 함수 설정, 'tanh' 또는 'relu', (Default: 'tanh')
- bias: bias 값 활성화 여부 설정 (Default: True)
- batch_first: True일 시, Output 사이즈를 (batch, seq, feature) 로 출력
   - Default 로 pytorch 는 Output 사이즈를 (seq, batch, feature) 와 같이 출력함
- dropout: 드롭아웃 비율 설정 (Default: 0)
- bidirectional: True일 시, Bidirectional RNN 적용 (Default: False)

#### Inputs: input, h_0
- input: 입력 텐서 - (sequence_length, batch_size, input_size)
- h_0: hidden states 의 초기값 텐서 - (num_layers * bidirections, batch_size, hidden_size) 형태, bidirectional 이 True 이면, bidirections 는 2, False 면 1

#### Outputs: output, h_n
- output: 마지막 레이어의 출력 텐서 - (sequence_length, batch_size, bidirections * hidden_size), bidirectional 이 True 이면, bidirections 는 2,False 면 1
- h_n: 마지막 hidden state 텐서 - (num_layers * bidirections, batch_size, hidden_size), bidirectional이 True 이면, bidirections 는 2,False 면 1

### input 사이즈 및 하이퍼 파라미터 설정
<img src="https://www.fun-coding.org/00_Images/mnist-rnn.png">

### input, sequence, 하이퍼 파라미터 설정

In [76]:
sequence_length = 28 # MNIST row 를 일종의 순서(sequence) 로 다룸
feature_size = 28 # 입력 차원
hidden_size = 128 # Hidden Layer 사이즈 설정처럼 설정
num_layers = 4 # stacked RNN (최대 4개까지는 Gradient Vanishing 현상이 적을 수 있으므로)
dropout_p = 0.2 # dropout rate
output_size = 10 # 0 ~ 9 숫자 부류(클래스)
minibatch_size = 128 # minibatch_size

### LSTM 모델 구성

### torch.nn.LSTM

torch.nn.LSTM(*args, **kwargs)

- input_size: Input 크기(feature의 수)
- hidden_size: hidden state 크기
   - hidden state 의 수는 DNN 의 hidden layer 의 사이즈라고 생각하면 됨
- num_layers: 순환 레이서 수 (Default: 1)
   - Multi-layer 로 LSTM 을 구성할 수 있음
   > Multi-layer 로 구성시에는 Gradient Vanishing 이슈가 있을 수 있음 (4개 Multi-Layer 정도는 괜찮음)
   
   
<img src="https://miro.medium.com/max/576/1*UCiibKij5-kHP__Igb2_1Q.jpeg">

- bias: bias 값 활성화 여부 설정 (Default: True)
- batch_first: True일 시, Output 사이즈를 (batch, seq, feature) 로 출력
   - Default 로 pytorch 는 Output 사이즈를 (seq, batch, feature) 와 같이 출력함
- dropout: 드롭아웃 비율 설정 (Default: 0)
- bidirectional: True일 시, Bidirectional LSTM 적용 (Default: False)


#### Inputs: input, (h_0, c_0)
- input: (sequence_length, batch_size, input_size)
- h_0: (num_layers * bidirections, batch_size, hidden_size), bidirectional 이 True 이면, bidirections 는 2, False 면 1
- c_0: (num_layers * bidirections, batch_size, hidden_size), bidirectional 이 True 이면, bidirections 는 2, False 면 1

#### Outputs: output, (h_n, c_n)
- output: (sequence_length, batch_size, bidirections * hidden_size), bidirectional 이 True 이면, bidirections 는 2,False 면 1
- h_n: (num_layers * bidirections, batch_size, hidden_size), bidirectional이 True 이면, bidirections 는 2,False 면 1
- c_n: (num_layers * bidirections, batch_size, hidden_size), bidirectional이 True 이면, bidirections 는 2,False 면 1

### RNN/LSTM 모델 구현

In [77]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms
from sklearn.model_selection import train_test_split
import numpy as np
from copy import deepcopy

In [78]:
sequence_length = 28 # MNIST row 를 일종의 순서(sequence) 로 다룸
feature_size = 28 # 입력 차원
hidden_size = 128 # Hidden Layer 사이즈 설정처럼 설정
num_layers = 4 # stacked RNN (최대 4개까지는 Gradient Vanishing 현상이 적을 수 있으므로)
dropout_p = 0.2 # dropout rate
output_size = 10 # 0 ~ 9 숫자 부류(클래스)
minibatch_size = 128 # minibatch_size

In [79]:
class Net(nn.Module):
    def __init__(self, feature_size, hidden_size, num_layers, dropout_p, output_size, model_type):
        super().__init__()
        if model_type == 'rnn':
            self.sequenceclassifier = nn.RNN(
                input_size = feature_size,
                hidden_size = hidden_size,
                num_layers = num_layers,
                batch_first = True,
                dropout = dropout_p,
                bidirectional = True
            )
        elif model_type == 'lstm':
            self.sequenceclassifier = nn.LSTM(
                input_size = feature_size,
                hidden_size = hidden_size,
                num_layers = num_layers,
                batch_first = True,
                dropout = dropout_p,
                bidirectional = True
            )
        self.layers = nn.Sequential(
            nn.LeakyReLU(0.1),
            nn.BatchNorm1d(hidden_size * 2),
            # self.rnn() 의 출력값은 (batch_size, sequence_length, bidirections * hidden_size)
            # bidirectional 이 True 이므로, bidirections 는 2, 즉 2 * hidden_size
            nn.Linear(hidden_size * 2, output_size),
            nn.LogSoftmax(dim=-1)
        )

    def forward(self, x):
        # |x| = batch_first=True 이므로 (batch_size, sequence_length, input_size)
        out, _ = self.sequenceclassifier(x) # output, h_n 이므로, h_n 은 사용안함
        # output, h_n 이므로, h_n 은 사용안함
        # |out| = batch_first=True 이므로 (batch_size, sequence_length, 2 * hidden_size)
        # bidirectional 이 True 이면, bidirections 는 2 * hidden_size
        out = out[:, -1]
        # out[:, -1] 은 (batch_size, sequence_length, 2 * hidden_size) 에서, 
        # 전체 batch_size 를 선택한다는 의미가 :, 
        # sequence_length 인 28개의 순서가 있고, 각 순서마다 2 * hidden_size 만큼 있음
        # 이중에 최종 값은 맨 마지막  sequence_length 의 2 * hidden_size 임
        # |out| = (batch_size, hidden_size * 2)
        y = self.layers(out)
        # |y| = (batch_size, output_size)
        return y

### 참고 코드: shape 과 slicing 이해

In [80]:
import torch
import torch.nn as nn

data1 = torch.full((minibatch_size, sequence_length, 2 * hidden_size), 1) # vector 생성
data2 = data1[:, -1]
print (data1.shape, data2.shape)
data3 = torch.full((minibatch_size, 1, sequence_length, feature_size), 1) # vector 생성
data4 = data3.reshape(-1, sequence_length, feature_size)
print (data3.shape, data4.shape)

torch.Size([128, 28, 256]) torch.Size([128, 256])
torch.Size([128, 1, 28, 28]) torch.Size([128, 28, 28])


In [81]:
model = Net(feature_size, hidden_size, num_layers, dropout_p, output_size, 'rnn')
model

Net(
  (sequenceclassifier): RNN(28, 128, num_layers=4, batch_first=True, dropout=0.2, bidirectional=True)
  (layers): Sequential(
    (0): LeakyReLU(negative_slope=0.1)
    (1): BatchNorm1d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): Linear(in_features=256, out_features=10, bias=True)
    (3): LogSoftmax(dim=-1)
  )
)

### MNIST with LSTM

In [82]:
train_rawdata = datasets.MNIST(root = 'dataset',
                            train=True,
                            download=True,
                            transform=transforms.ToTensor())
test_dataset = datasets.MNIST(root = 'dataset',
                            train=False,
                            download=True,
                            transform=transforms.ToTensor())
print('number of training data : ', len(train_rawdata))
print('number of test data : ', len(test_dataset))

number of training data :  60000
number of test data :  10000


In [83]:
VALIDATION_RATE = 0.2
train_indices, val_indices, _, _ = train_test_split(
    range(len(train_rawdata)), # X index 번호
    train_rawdata.targets, # y
    stratify=train_rawdata.targets, # 균등분포
    test_size=VALIDATION_RATE # test dataset 비율
)

In [84]:
train_dataset = Subset(train_rawdata, train_indices)
validation_dataset = Subset(train_rawdata, val_indices)

In [85]:
print (len(train_dataset), len(validation_dataset), len(test_dataset))

48000 12000 10000


In [86]:
minibatch_size = 128 # Mini-batch 사이즈는 128 로 설정
# create batches
train_batches = DataLoader(train_dataset, batch_size=minibatch_size, shuffle=True)
val_batches = DataLoader(validation_dataset, batch_size=minibatch_size, shuffle=True)
test_batches = DataLoader(test_dataset, batch_size=minibatch_size, shuffle=True)

### input, output, loss, optimizer 설정

In [87]:
loss_func = nn.NLLLoss() # log softmax 는 NLLLoss() 로 진행해야 함
optimizer = torch.optim.Adam(model.parameters()) # Adam, learning rate 필요없음

### Training & Validation

In [88]:
def train_model(model, early_stop, n_epochs, progress_interval):
    
    train_losses, valid_losses, lowest_loss = list(), list(), np.inf

    for epoch in range(n_epochs):
        # train the model
        model.train() # prep model for training
        for x_minibatch, y_minibatch in train_batches:
            x_minibatch = x_minibatch.reshape(-1, sequence_length, feature_size)
            y_minibatch_pred = model(x_minibatch)
            loss = loss_func(y_minibatch_pred, y_minibatch)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()            
            train_losses.append(loss.item())

        # validate the model
        model.eval()
        with torch.no_grad():
            for x_minibatch, y_minibatch in val_batches:
                x_minibatch = x_minibatch.reshape(-1, sequence_length, feature_size)                
                y_minibatch_pred = model(x_minibatch)
                loss = loss_func(y_minibatch_pred, y_minibatch)
                valid_losses.append(loss.item())

        if valid_losses[-1] < lowest_loss:
            lowest_loss = valid_losses[-1]
            lowest_epoch = epoch
            best_model = deepcopy(model.state_dict())
        else:
            if early_stop > 0 and lowest_epoch + early_stop < epoch:
                print ("Early Stopped", epoch, "epochs")
                model.load_state_dict(best_model)
                break
                
        if (epoch % progress_interval) == 0:
            print (train_losses[-1], valid_losses[-1], lowest_loss, lowest_epoch, epoch)
            
    model.load_state_dict(best_model)        
    return model, lowest_loss, train_losses, valid_losses

### 훈련 실행
<div class="alert alert-block" style="border: 2px solid #E65100;background-color:#FFF3E0;padding:10px">
<font size="4em" style="color:#BF360C;">CPU 만으로는 테스트에 상당한 시간이 걸림</font><br>
    <font size="4em" style="color:#BF360C;">colab 을 통한 테스트 추천 (13_LSTM_MNIST_GPU.ipynb) 파일 기반</font>
</div>

In [89]:
nb_epochs = 100 
progress_interval = 3
early_stop = 30

model, lowest_loss, train_losses, valid_losses = train_model(model, early_stop, nb_epochs, progress_interval)

0.3198077082633972 0.34681251645088196 0.34681251645088196 0 0


KeyboardInterrupt: 

### GPU 기반 훈련 방법 (코드 수정 방법, Nvidia GPU 기반)

- GPU 사용 가능 환경 설정
   - torch.cuda.is_available() 을 통해 Nvidia GPU(+ CUDA 설치) 사용 가능시, device 를 'cuda' 로 설정
   - torch.cuda.manual_seed_all(1) 을 통해, 매번 실행시 동일한 결과가 나오도록 random 값 generation seed 를 설정 (옵션)
    ```python
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    torch.manual_seed(1)
    if device == 'cuda':
        torch.cuda.manual_seed_all(1)
    ```
- model 객체 GPU 내 생성
    ```python
    model = CNNModel().to(device)
    ```
- 학습을 위한 Training 함수내 텐서를 GPU 로 보냄
    ```python
    for x_minibatch, y_minibatch in train_batches:
        x_minibatch = x_minibatch.to(device)
        y_minibatch = y_minibatch.to(device)
        y_minibatch_pred = model(x_minibatch)
    ```

### 훈련 실행
<div class="alert alert-block" style="border: 2px solid #E65100;background-color:#FFF3E0;padding:10px">
<font size="4em" style="color:#BF360C;">Overfitting 문제를 해결하기 위해, Dropout() 과 함께 적용</font><br>
    <font size="4em" style="color:#BF360C;">colab 을 통한 테스트 추천</font>
    <a href="https://colab.research.google.com/drive/17eBJuM6iQR-P1WWUUhZrf-Anbbbt4Okj?usp=sharing">colab 개선 코드</a>
</div>

```
CNNModel(
  (conv_layers): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): LeakyReLU(negative_slope=0.1)
    (2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Conv2d(32, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): LeakyReLU(negative_slope=0.1)
    (5): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Dropout(p=0.25, inplace=False)
    (8): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): LeakyReLU(negative_slope=0.1)
    (10): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (11): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (12): LeakyReLU(negative_slope=0.1)
    (13): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (14): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (15): Dropout(p=0.25, inplace=False)
    (16): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (17): LeakyReLU(negative_slope=0.1)
    (18): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (19): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (20): Dropout(p=0.25, inplace=False)
  )
  (linear_layers): Sequential(
    (0): Linear(in_features=1152, out_features=128, bias=True)
    (1): LeakyReLU(negative_slope=0.1)
    (2): BatchNorm1d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Linear(in_features=128, out_features=10, bias=True)
    (4): LogSoftmax(dim=-1)
  )
)
```

### 테스트셋 기반 Evaluation

In [None]:
test_loss = 0
correct = 0
model.eval()

wrong_samples, wrong_preds, actual_preds = list(), list(), list()

model.eval()
with torch.no_grad():
    for x_minibatch, y_minibatch in test_batches:
        #x_minibatch = x_minibatch.to(device)
        #y_minibatch = y_minibatch.to(device)         
        y_test_pred = model(x_minibatch)
        loss = loss_func(y_test_pred, y_minibatch)            
        test_loss += loss_func(y_test_pred, y_minibatch)  
        pred = torch.argmax(y_test_pred, dim=1)
        correct += pred.eq(y_minibatch.view_as(pred)).sum().item()
        
        wrong_idx = (pred != y_minibatch.view_as(pred)).nonzero()[:, 0]
        for index in wrong_idx:
            wrong_samples.append(x_minibatch[index])
            wrong_preds.append(pred[index])
            actual_preds.append(y_minibatch.view_as(pred)[index])
test_loss /= len(test_batches.dataset)
print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(test_loss, correct, len(test_batches.dataset), 100. * correct / len(test_batches.dataset)))

### incorrect data 만 확인해보기
- GPU 로 학습하였을 경우, 텐서.numpy() 는 동작하지 않음 
- 다음과 같이 텐서.cpu().numpy() 로 CPU 로 복사해서, numpy() 로 변환해야 함
    ```python
    wrong_samples[index].cpu().numpy( ).reshape(28,28)
    ```

In [None]:
# incorrect 데이터 중, 100개 이미지만 출력해보기
import matplotlib.pyplot as plt
# 주피터 노트북에서 그림을 주피터 노트북 내에 표시하도록 강제하는 명령
%matplotlib inline 

plt.figure(figsize=(18 , 20))

for index in range(len(wrong_samples)):
    plt.subplot(10, 10, index + 1)
    plt.axis('off')
    plt.imshow(wrong_samples[index].numpy( ).reshape(28,28), cmap = "gray")
    plt.title("Pred" + str(wrong_preds[index].item()) + "(" + str(actual_preds[index].item()) + ")", color='red')

<div class="alert alert-block" style="border: 2px solid #1976D2;background-color:#E3F2FD;padding:5px;font-size:0.9em;">
본 자료는 저작권법 제25조 2항에 의해 보호를 받습니다. 본 자료를 외부에 공개하지 말아주세요.<br>
<b><a href="https://school.fun-coding.org/">잔재미코딩 (https://school.fun-coding.org/)</a> 에서 본 강의를 포함하는 최적화된 로드맵도 확인하실 수 있습니다</b></div>