In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from pathlib import Path

# Recurrent Neural Network

## Layers

### RNN Layer

[`nn.RNN`](https://pytorch.org/docs/stable/nn.html#rnn) 합성곱 연산층은 다음과 같은 arguments를 받는다.

* `input_size`: 입력 토큰의 차원수
* `hidden_size`: 히든 뉴런 갯수
* `num_layers`: 층의 갯수
* `nonlinearity`: 활성화 함수, 기본으로 tanh 로 설정되어있다.
* `batch_first`: 미니배치 차원이 제일 앞에 있는지 여부를 체크, 기본 형태가 (T, B, input_size) 로 되어 있다. 활성화 하면 (B, T, input_size)로 입력을 넣어줘야한다.
* `bidirectional`: 양방향 RNN 활성화 여부

**single layer rnn**

In [None]:
input_size = 10
hidden_size = 20
rnn_layer = nn.RNN(input_size=input_size, 
                   hidden_size=hidden_size, 
                   batch_first=True)

In [None]:
rnn_layer.weight_ih_l0.size()

torch.Size([20, 10])

In [None]:
batch = 5
time_step = 4
inputs = torch.rand(batch, time_step, input_size)
hiddens = torch.zeros(1, batch, hidden_size)
outputs, hiddens = rnn_layer(inputs, hiddens)

# inputs: (B, T, I)
# outputs: (B, T, H)
# hiddens: (num_layer, B, H)

print(f"outputs: {outputs.size()}\nhiddens: {hiddens.size()}")

**multi-layer rnn**

In [None]:
num_layers = 2
rnn_layer = nn.RNN(input_size=input_size, 
                   hidden_size=hidden_size, 
                   num_layers=num_layers,
                   batch_first=True)

In [None]:
inputs = torch.rand(batch, time_step, input_size)
outputs, hiddens = rnn_layer(inputs)

# inputs: (B, T, I)
# outputs: (B, T, H)
# hiddens: (num_layer, B, H)

print(f"outputs: {outputs.size()}\nhiddens: {hiddens.size()}")

outputs: torch.Size([5, 4, 20])
hiddens: torch.Size([2, 5, 20])


**bidirectional rnn**

In [None]:
bidirection = True
rnn_layer = nn.RNN(input_size=input_size, 
                   hidden_size=hidden_size, 
                   num_layers=num_layers,
                   batch_first=True,
                   bidirectional=bidirection)

In [None]:
inputs = torch.rand(batch, time_step, input_size)
outputs, hiddens = rnn_layer(inputs)

# inputs: (B, T, I)
# outputs: (B, T, H*2)
# hiddens: (num_layer*2, B, H)

print(f"outputs: {outputs.size()}\nhiddens: {hiddens.size()}")

outputs: torch.Size([5, 4, 40])
hiddens: torch.Size([4, 5, 20])


### LSTM Layer

In [None]:
lstm_layer = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
hiddens = torch.zeros(num_layers, batch, hidden_size)
cells = torch.zeros(num_layers, batch, hidden_size)

outputs, (hiddens, cells) = lstm_layer(inputs, (hiddens, cells))

# inputs: (B, T, I)
# outputs: (B, T, H)
# hiddens: (num_layer, B, H)
# cells: (num_layer, B, H)

print(f"outputs: {outputs.size()}\nhiddens: {hiddens.size()}\ncells: {cells.size()}")

outputs: torch.Size([5, 4, 20])
hiddens: torch.Size([2, 5, 20])
cells: torch.Size([2, 5, 20])


### GRU Layer

In [None]:
gru_layer = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
hiddens = torch.zeros(num_layers, batch, hidden_size)

outputs, hiddens = gru_layer(inputs, hiddens)

# inputs: (B, T, I)
# outputs: (B, T, H)
# hiddens: (num_layer, B, H)

print(f"outputs: {outputs.size()}\nhiddens: {hiddens.size()}")

outputs: torch.Size([5, 4, 20])
hiddens: torch.Size([2, 5, 20])


---

# 이름 분류기 만들기

RNN을 사용해 5개의 국가("German", "United States", "Spain", "Korean", "Russian" )의 이름 데이터를  입력하면 어떤 국가의 이름인지 분류하는 모델 만들기

## Sequential Data Processing

1. 이름을 한 글자씩 분해한다.
2. pad, unknown을 포함해서 사용된 모든 문자에 각각 유니크한 숫자를 부여한다.
3. 각 배치에 알맞게 길이가 맞지 않는 데이터를 동일하게 만들기 위해 "\<pad\>" 데이터를 붙인다. 


* `<unk>`: 알지 못하는 문자에 대처
* `<pad>`: 배치 데이터 길이를 맞추기 위함

```
15: 
'D', 'i', 'e', 't', 'h', 'a', 'r', 'd', ' ', 'T', 'e', 'x', 't', 'o', 'r'

11 + 4: 
'M', 'a', 'r', 'c', 'o', ' ', 'D', 'i', 'e', 't', 'z', '<pad>', '<pad>', '<pad>', '<pad>'
...
```

### Custom Dataset 만들기

In [None]:
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict

class CustomDataset(Dataset):
    def __init__(self, path, fmt="\t", pad_idx=1, vocab_stoi=None, labels_stoi=None):
        # 파일에서 데이터를 불러오고 전처리 등을 진행
        with Path(path).open(mode="r", encoding="utf-8") as file:
            datas = file.readlines()
        datas = [line.strip().split(fmt) for line in datas]
        datas, labels = list(zip(*datas))
        datas = [list(d) for d in datas]
        self.pad_idx = pad_idx
        
        if (vocab_stoi is None) and (labels_stoi is None):
            self.vocab_stoi = defaultdict()           # dictionary 확장값. 값이 없어도 에러 안남
            self.vocab_stoi["<unk>"] = 0
            self.vocab_stoi["<pad>"] = self.pad_idx   # 1이들어감
        
            self.labels_stoi = defaultdict()  
            labels_unique = set(labels)
            for i, label in enumerate(labels_unique):
                self.labels_stoi[label] = i
        else:
            self.vocab_stoi = vocab_stoi
            self.labels_stoi = labels_stoi
        
        for name in datas:
            for letter in name:
                if self.vocab_stoi.get(letter) is None:
                    self.vocab_stoi[letter] = len(self.vocab_stoi)
        self.vocab_len = len(self.vocab_stoi)
        self.vector_matrix = torch.eye(self.vocab_len)
        
        self.datas = datas
        self.labels = labels
        
        
    def __getitem__(self, index):
        # 인덱스에 해당하는 데이터셋 리턴
        return self.datas[index], self.labels[index]
        
    def __len__(self):
        # 데이터셋 수
        return len(self.datas)
    
    def custom_collate_fn(self, data):
        x, y = list(zip(*data))
        max_len = max([len(name) for name in x])
        datas = [list(map(self.vocab_stoi.get, name)) for name in x]
        datas = [name + [self.pad_idx]*(max_len - len(name)) if len(name) < max_len else name for name in datas]
        datas = torch.stack([torch.stack(list(self.vector_matrix[idx] for idx in name)) for name in datas])
        labels = list(map(self.labels_stoi.get, y))
        
        return datas, torch.LongTensor(labels)

In [None]:
from pathlib import Path

path_txt = "names/names_{}.tsv"
batch_size = 64
train_dataset = CustomDataset(path_txt.format("train"), fmt="\t")
test_dataset = CustomDataset(path_txt.format("test"), fmt="\t", 
                             vocab_stoi=train_dataset.vocab_stoi,
                             labels_stoi=train_dataset.labels_stoi)

train_loader = DataLoader(dataset=train_dataset,
                          collate_fn=train_dataset.custom_collate_fn,
                          batch_size=batch_size, 
                          shuffle=True)

test_loader = DataLoader(dataset=test_dataset,
                         collate_fn=test_dataset.custom_collate_fn,
                         batch_size=batch_size, 
                         shuffle=True)

In [None]:
for names, labels in train_loader:
    break
print(names.size(), labels.size())

torch.Size([64, 41, 222]) torch.Size([64])


In [None]:
names[0][0]

tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
        0., 0., 0., 0., 0., 0.])

In [None]:
names[0][0].argmax()

tensor(131)

## RNN 모델

### 네트워크 설계

* Input Size = (B, T, vocab_size)
* Output Size = (B, 5)
* Loss Function(`nn.CrossEntropyLoss`): Cross Entropy Loss
* Optimizer(`optim.Adam`): Adam
* RNN Layer: `nn.LSTM`을 사용해서 `self.lstm_layer` 변수에 구현

In [None]:
class Network(nn.Module):
    def __init__(self, input_size, hidden_size, output_size, num_layers):
        super(Network, self).__init__()
        # 층을 구성
        self.lstm_layer = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)
        self.linear = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        # forward propagation 수행
        outputs, (hiddens, cells) = self.lstm_layer(x)
        # x: (B, T, I)
        # outputs: (B, T, H)
        # hiddens: (num_layer, B, H)
        # cells: (num_layer, B, H)
        last_hidden = hiddens[-1]  # (B, H)
        o = self.linear(last_hidden)
        
        return o

In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
input_size = len(train_dataset.vocab_stoi)
hidden_size = 200
output_size = len(train_dataset.labels_stoi)
num_layers = 3
# 커스텀 모듈 호출
model = Network(input_size, hidden_size, output_size, num_layers).to(device)

# loss_function = 
# optimizer = 
loss_function = nn.CrossEntropyLoss(ignore_index=1)
optimizer = optim.Adam(model.parameters(), lr=0.001)

## 모델훈련

### Train 함수

In [None]:
def train(model, train_loader, loss_function, optimizer, n_train, print_step, device):
    # Training
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        # 입력과 타겟 텐서에 GPU 를 사용여부 전달
        data, target = data.to(device), target.to(device)
        # 경사 초기화
        model.zero_grad()
        # 순방향 전파
        output = model(data)
        # 손실값 계산
        loss = loss_function(output, target)
        # 역방향 전파
        loss.backward()
        # 매개변수 업데이트
        optimizer.step()
        # 중간 과정 print
        if batch_idx % print_step == 0:
            percentage = (batch_idx*train_loader.batch_size / n_train) * 100
            print(f" - [{percentage:.2f}%] train loss: {loss:.4f}")

### Validation 함수

In [None]:
def validation(model, test_loader, loss_function, n_test, device):
    # Validation
    model.eval()
    test_loss = 0
    correct = 0
    # torch.no_grad 를 사용하면 requires_grad 를 꺼두게 된다.
    with torch.no_grad():
        for data, target in test_loader:
            # 입력과 타겟 텐서에 GPU 를 사용여부 전달
            data, target = data.to(device), target.to(device)
            # 순방향전파
            output = model(data)
            # 손실값 계산
            test_loss += loss_function(output, target).item()
            # 예측 값에 해당하는 클래스 번호 반환
            pred = output.softmax(1).argmax(dim=1, keepdim=True)
            # 정확하게 예측한 개수를 기록한다
            correct += pred.eq(target.view_as(pred)).sum().item()
    test_accuracy = correct / n_test
    
    return test_loss, test_accuracy

### 훈련하기

In [None]:
n_train = len(train_dataset)
n_test = len(test_dataset)
n_step = 10
print_step = 300
best_accuracy = 0

for step in range(n_step):
    print(f"[Step] {step+1}/{n_step}\n [Training Step]")
    train(model, train_loader, loss_function, optimizer, n_train, print_step, device)
    test_loss, test_accuracy = validation(model, test_loader, loss_function, n_test, device)
    print(f" [Validation Step]")
    print(f" - test loss: {test_loss:.4f} test accuracy: {test_accuracy*100:.2f} %")
    # 제일 성능을 보인 좋은 모델 저장하기
    if test_accuracy > best_accuracy:
        best_accuracy = test_accuracy
        torch.save(model.state_dict(), "best_model-namecls.pt")

[Step] 1/10
 [Training Step]
 - [0.00%] train loss: 1.6023
 - [38.40%] train loss: 0.6040
 - [76.80%] train loss: 0.5116
 [Validation Step]
 - test loss: 3.6281 test accuracy: 63.80 %
[Step] 2/10
 [Training Step]
 - [0.00%] train loss: 0.6085
 - [38.40%] train loss: 0.3668
 - [76.80%] train loss: 0.3733
 [Validation Step]
 - test loss: 2.2209 test accuracy: 70.60 %
[Step] 3/10
 [Training Step]
 - [0.00%] train loss: 0.2361
 - [38.40%] train loss: 0.1283
 - [76.80%] train loss: 0.2790
 [Validation Step]
 - test loss: 2.1050 test accuracy: 71.80 %
[Step] 4/10
 [Training Step]
 - [0.00%] train loss: 0.2194
 - [38.40%] train loss: 0.1288
 - [76.80%] train loss: 0.1697
 [Validation Step]
 - test loss: 1.2318 test accuracy: 74.60 %
[Step] 5/10
 [Training Step]
 - [0.00%] train loss: 0.0788
 - [38.40%] train loss: 0.2678
 - [76.80%] train loss: 0.0864
 [Validation Step]
 - test loss: 1.3280 test accuracy: 74.20 %
[Step] 6/10
 [Training Step]
 - [0.00%] train loss: 0.1341
 - [38.40%] train los

In [None]:
model = Network(input_size, hidden_size, output_size, num_layers)
model.load_state_dict(torch.load("best_model-namecls.pt", map_location="cpu"))
classes = {v: k for k, v in train_dataset.labels_stoi.items()}
vocab = train_dataset.vocab_stoi

def preprocessing(text, vocab):
    temp = []
    for x in list(text):
        if vocab.get(x) is None:
            temp.append(vocab["<unk>"])
        else:
            temp.append(vocab[x])

    idx = torch.LongTensor(temp)
    input_tensor = torch.zeros(len(idx), len(vocab)).scatter(1, idx.unsqueeze(1), 1)
    return input_tensor.unsqueeze(0)
    
def predict(model, vocab, classes):
    model.eval()
    text = input("이름을 입력: ")
    input_tensor = preprocessing(text, vocab)
    o = model(input_tensor).detach()
    pred = classes.get(o.argmax(1).item())
    probs = o.softmax(1).squeeze(0).numpy()
    probs_dict = {label: p for label, p in zip(classes.values(), probs)}
    print(f"예측: {pred}\nProbabilities: ")
    for k, v in probs_dict.items():
        print(f"{k} = {v:.4f}")

In [None]:
model.eval()
text = "장지수"
input_tensor = preprocessing(text, vocab)

In [None]:
vocab.get("현")

80

In [None]:
predict(model, vocab, classes)

이름을 입력: 최락현
예측: Korean
Probabilities: 
Korean = 0.8934
Russian = 0.0023
German = 0.0321
United States = 0.0697
Spain = 0.0025
