## 07. [DL 입문] - 순환 신경망(Recurrent Neural Network)

cell : hidden layer에서 activation function을 통해 결과를 내보내는 역할


In [None]:
# 파이썬으로 RNN 구현

import numpy as np

timesteps = 10  # 시점의 수. NLP 에서 보통 문장의 길이가 된다.
input_size = 4  # 입력의 차원. NLP 에서 보통 단어 벡터의 차원이 된다.
hidden_size = 8  # 은닉 상태의 크기. 메모리 셀의 용량이다.

inputs = np.random.random((timesteps, input_size))  # 입력에 해당되는 2D 텐서

hidden_state_t = np.zeros((hidden_size,))  # 초기 은닉 상태는 0 vector로 초기화
# 은닉 상태의 크기 hidden_size 로 은닉 상태를 만듬.

print(hidden_state_t)

[0. 0. 0. 0. 0. 0. 0. 0.]


In [None]:
Wx = np.random.random(
    (hidden_size, input_size)
)  # (8,4) 크기의 2D 텐서 생성, 입력에 대한 가중치.
Wh = np.random.random(
    (hidden_size, hidden_size)
)  # (8,8) 크기의 2D 텐서 생성, 은닉 상태에 대한 가중치.
b = np.random.random((hidden_size,))  # (8,) 크기의 1D 텐서 생성. 이 값은 편향(bias)

print(np.shape(Wx))
print(np.shape(Wh))
print(np.shape(b))

(8, 4)
(8, 8)
(8,)


In [None]:
total_hidden_states = []

# 메모리 셀 동작
for input_t in inputs:  # 각 시점에 따라서 입력값이 입력됨.
    output_t = np.tanh(np.dot(Wx, input_t) + np.dot(Wh, hidden_state_t) + b)
    total_hidden_states.append(
        list(output_t)
    )  # 각 시점의 은닉 상태의 값을 계속해서 축적
    print(
        np.shape(total_hidden_states)
    )  # 각 시점 t별 메모리 셀의 출력의 크기는 (timestep, output_dim)
    hidden_state_t = output_t

total_hidden_states = np.stack(total_hidden_states, axis=0)
# 출력시 값을 깔끔하게 해준다.

print(
    total_hidden_states
)  # (timesteps, output_dim)의 크기. 이 경우 (10,8)의 크기를 가지는 메모리 셀의 2D 텐서를 출력.

(1, 8)
(2, 8)
(3, 8)
(4, 8)
(5, 8)
(6, 8)
(7, 8)
(8, 8)
(9, 8)
(10, 8)
[[0.92024635 0.93138331 0.90075012 0.69291843 0.95127308 0.70995549
  0.90558744 0.95885697]
 [0.99999054 0.99995118 0.99998381 0.99993691 0.99998537 0.99969607
  0.99998104 0.99995659]
 [0.9999919  0.99996501 0.99998543 0.99996571 0.99999187 0.9998793
  0.99999649 0.99994578]
 [0.99999185 0.99996324 0.99998922 0.99996165 0.99999088 0.99987623
  0.99999146 0.99995237]
 [0.99999539 0.99998016 0.99999108 0.99997223 0.99999731 0.9999518
  0.99999631 0.99997403]
 [0.9999909  0.99997012 0.99999303 0.99995095 0.99999382 0.99992077
  0.99999065 0.99996187]
 [0.99999505 0.99998777 0.99999629 0.99997813 0.99999762 0.99993667
  0.9999948  0.99998438]
 [0.99998703 0.99995423 0.99998959 0.99994436 0.99998636 0.99984816
  0.99999067 0.9999353 ]
 [0.99999604 0.99998171 0.99999065 0.99997586 0.99999767 0.99995476
  0.99999683 0.99997612]
 [0.99999345 0.99997514 0.99999258 0.99997637 0.99999222 0.99983736
  0.99999197 0.99996803]]


In [4]:
# 파이토치의 nn.RNN()
import torch
import torch.nn as nn

In [None]:
input_size = 5  # 입력의 크기 -> 매 시점마다 들어가는 입력의 크기
hidden_size = 8  # 은닉 상태의 크기

# 입력 텐서
# (batch_size, time_steps, input_size)
inputs = torch.Tensor(1, 10, 5)

cell = nn.RNN(
    input_size, hidden_size, batch_first=True
)  # 입력 텐서의 첫번째 차원이 배치 크기

outputs, _status = cell(
    inputs
)  # outputs : 모든 시점의 은닉 상태, _status : 마지막 시점 은닉 상태

print(outputs.shape)  # 모든 time-step 의 hidden_state
print(_status.shape)  # 최종 time-step의 hidden_state

torch.Size([1, 10, 8])
torch.Size([1, 1, 8])


In [None]:
# Deep RNN

# (batch_size, time_steps, input_size)
inputs = torch.Tensor(1, 10, 5)

cell = nn.RNN(input_size=5, hidden_size=8, num_layers=2, batch_first=True)
outputs, _status = cell(inputs)

print(outputs.shape)
print(_status.shape)  # (층의 갯수, 배치 크기, 은닉 상태의 크기)

torch.Size([1, 10, 8])
torch.Size([2, 1, 8])


In [None]:
# Bidirectional RNN

# (batch_size, time_steps, input_size)
inputs = torch.Tensor(1, 10, 5)

cell = nn.RNN(
    input_size=5, hidden_size=8, num_layers=2, batch_first=True, bidirectional=True
)
outputs, _status = cell(inputs)

print(outputs.shape)  # 배치 크기, 시퀀스 길이, 은닉 상태의 크기 x 2
print(_status.shape)  # (층의 개수 x2, 배치 크기, 은닉 상태의 크기)

torch.Size([1, 10, 16])
torch.Size([4, 1, 8])


## LSTM과 GRU
- 바닐라 RNN의 한계 : time-step이 길어질수록 앞의 정보가 뒤로 충분히 전달되지 못함
- 장기 의존성 문제(the problem of Long-Term Dependencies)

### LSTM
- Long SHort-Term Memory
1. 입력 게이트
    - 현재의 정보를 기억하기 위한 게이트
2. 삭제 게이트
    - 기억을 삭제하기 위한 게이트
3. 셀 상태(장기 상태)
    - 입력 게이트와 삭제 게이트의 조합
4. 출력 게이트와 은닉 상태(단기 상태)
    
### GRU
1. 업데이트 게이트
2. 리셋 게이트

LSTM, GRU 성능은 비슷하다고 알려짐
경험적으로 데이터 양이 ㅈ거을 때는 매개변수의 양이 적은 GRU가 조금 낫고
데이터 양이 많으면 LSTM이 더 낫다고도 함.

In [None]:
# 파이토치의 nn.LSTM()
# nn.LSTM(input_dim, hidden_size, batch_first = True)

# 파이토치의 nn.GRU()
# nn.GRU(input_dim, hidden_size, batch_first = True)

In [12]:
# 문자 단위 RNN (Char RNN)

import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

In [None]:
# 1.훈련 데이터 전처리하기
# apple -> pple!

input_str = "apple"
label_str = "pple!"
char_vocab = sorted(list(set(input_str + label_str)))
vocab_size = len(char_vocab)

print(f"문자 집합의 크기 : {vocab_size}")  # !,a,e,l,p

문자 집합의 크기 : 5


In [None]:
input_size = vocab_size  # 입력의 크기는 문자 집합의 크기, 원핫벡터사용
hidden_size = 5
output_size = 5
learning_rate = 0.1

In [17]:
char_to_index = dict((c,i) for i,c in enumerate(char_vocab)) # 문자에 고유한 정수 인덱스 부여
print(char_to_index)

{'!': 0, 'a': 1, 'e': 2, 'l': 3, 'p': 4}


In [18]:
# 예측 결과를 다시 문자 시퀀스로 보기 위해 정수로부터 문자를 얻을 수 있는 index_to_char
index_to_char = {}
for key, value in char_to_index.items():
    index_to_char[value] = key
print(index_to_char)

{0: '!', 1: 'a', 2: 'e', 3: 'l', 4: 'p'}


In [19]:
# 입력 데이터와 레이블 데이터의 각 문자들을 정수로 맵핑
x_data = [char_to_index[c] for c in input_str]
y_data = [char_to_index[c] for c in label_str]
print(x_data)
print(y_data)

[1, 4, 4, 3, 2]
[4, 4, 3, 2, 0]


In [20]:
# 파이토치 nn.RNN()은 기본적으로 3차원 텐서 받음
# 배치 차원 추가
# 텐서 연산인 unsqueeze(0)을 통해 해결할 수도 있었음.
x_data = [x_data]
y_data =[y_data]
print(x_data)
print(y_data)

[[1, 4, 4, 3, 2]]
[[4, 4, 3, 2, 0]]


In [21]:
# 입력 시퀀스의 각 문자들을 원-핫 벡터로 바꿔줌
x_one_hot = [np.eye(vocab_size)[x] for x in x_data]
print(x_one_hot)

[array([[0., 1., 0., 0., 0.],
       [0., 0., 0., 0., 1.],
       [0., 0., 0., 0., 1.],
       [0., 0., 0., 1., 0.],
       [0., 0., 1., 0., 0.]])]


In [23]:
# 입력 데이터와 레이블 데이터를 텐서로 바꿔줌
X = torch.FloatTensor(x_one_hot)
y = torch.LongTensor(y_data)

# 각 텐서 크기 확인
print(f'훈련 데이터의 크기 : {X.shape}')
print(f'레이블의 크기 : {y.shape}')

훈련 데이터의 크기 : torch.Size([1, 5, 5])
레이블의 크기 : torch.Size([1, 5])


In [28]:
# 모델 구현
class Net(torch.nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(Net, self).__init__()
        self.rnn = torch.nn.RNN(input_size, hidden_size, batch_first = True) # RNN 셀 구현
        self.fc = torch.nn.Linear(hidden_size, output_size, bias=True) # 출력층 구현
    
    def forward(self, x): # 구현한 RNN 셀과 출력층을 연결
        x, _status = self.rnn(x)
        x = self.fc(x)
        return x
    
net = Net(input_size, hidden_size, output_size)

outputs = net(X)

print(outputs.shape) # 3차원 텐서
print(outputs.view(-1, input_size).shape) # 2차원 텐서로 변환
print(y.shape)
print(y.view(-1).shape)

torch.Size([1, 5, 5])
torch.Size([5, 5])
torch.Size([1, 5])
torch.Size([5])


In [30]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), learning_rate)

for i in range(100):
    optimizer.zero_grad()
    outputs = net(X)
    loss = criterion(outputs.view(-1, input_size), y.view(-1)) # view 하는 이유는 batch 차원 제거 위해
    loss.backward() # 기울기 계산
    optimizer.step()

    # 모델이 실제로 어떻게 예측했는지를 확인하기 위한 코드
    result = outputs.data.numpy().argmax(axis=2) # 최종 예측값인 각 time-step 별 5차원 벡터에 대해서 가장 높은 값의 인덱스를 선택
    result_str = ''.join([index_to_char[c] for c in np.squeeze(result)])
    print(i, "loss: ", loss.item(), "prediction: ", result, "true y: ", y_data, "prediction str: ", result_str)

0 loss:  1.4957239627838135 prediction:  [[4 4 2 0 0]] true y:  [[4, 4, 3, 2, 0]] prediction str:  ppe!!
1 loss:  1.2863942384719849 prediction:  [[4 3 3 2 2]] true y:  [[4, 4, 3, 2, 0]] prediction str:  pllee
2 loss:  1.0699931383132935 prediction:  [[4 4 3 2 2]] true y:  [[4, 4, 3, 2, 0]] prediction str:  pplee
3 loss:  0.856244683265686 prediction:  [[4 4 4 2 0]] true y:  [[4, 4, 3, 2, 0]] prediction str:  pppe!
4 loss:  0.6447590589523315 prediction:  [[4 4 3 2 0]] true y:  [[4, 4, 3, 2, 0]] prediction str:  pple!
5 loss:  0.4812183976173401 prediction:  [[4 4 3 2 0]] true y:  [[4, 4, 3, 2, 0]] prediction str:  pple!
6 loss:  0.37004247307777405 prediction:  [[4 4 3 2 0]] true y:  [[4, 4, 3, 2, 0]] prediction str:  pple!
7 loss:  0.2730952799320221 prediction:  [[4 4 3 2 0]] true y:  [[4, 4, 3, 2, 0]] prediction str:  pple!
8 loss:  0.1985623836517334 prediction:  [[4 4 3 2 0]] true y:  [[4, 4, 3, 2, 0]] prediction str:  pple!
9 loss:  0.14216120541095734 prediction:  [[4 4 3 2 0]]

In [35]:
# 더 많은 데이터로 학습한 문자 단위 RNN

# 훈련 데이터 전처리

sentence = ("if you want to build a ship, don't drum up people together to "
            "collect wood and don't assign them tasks and work, but rather "
            "teach them to long for the endless immensity of the sea.")

char_set = list(set(sentence)) # 중복 제거한 문자 집합 생성
char_dic = {c : i for i, c in enumerate(char_set)} # 각 문자에 정수 인코딩
print(char_dic)

dic_size = len(char_dic)
print(f'문자 집합의 크기 : {dic_size}')

{',': 0, 'l': 1, 'o': 2, 'g': 3, 'h': 4, 'n': 5, 'u': 6, 'm': 7, 's': 8, 'w': 9, 'a': 10, 'i': 11, 'c': 12, 'p': 13, 'e': 14, 'y': 15, "'": 16, 'd': 17, '.': 18, 'f': 19, 't': 20, ' ': 21, 'k': 22, 'r': 23, 'b': 24}
문자 집합의 크기 : 25


In [36]:
# 하이퍼 파라미터 설정
hidden_size = dic_size
sequence_length = 10 # 임의 숫자 가정
learning_rate = 0.1

In [38]:
# 임의의 sequence_length값 단위로 샘플들을 잘라서 데이터 만드는 모습

# 데이터 구성
x_data = []
y_data = []

for i in range(0, len(sentence) - sequence_length):
    x_str = sentence[i:i + sequence_length]
    y_str = sentence[i+1: i + sequence_length + 1]
    print(i, x_str, '->', y_str)

    x_data.append([char_dic[c] for c in x_str]) # x str to index
    y_data.append([char_dic[c] for c in y_str]) # y str to index

0 if you wan -> f you want
1 f you want ->  you want 
2  you want  -> you want t
3 you want t -> ou want to
4 ou want to -> u want to 
5 u want to  ->  want to b
6  want to b -> want to bu
7 want to bu -> ant to bui
8 ant to bui -> nt to buil
9 nt to buil -> t to build
10 t to build ->  to build 
11  to build  -> to build a
12 to build a -> o build a 
13 o build a  ->  build a s
14  build a s -> build a sh
15 build a sh -> uild a shi
16 uild a shi -> ild a ship
17 ild a ship -> ld a ship,
18 ld a ship, -> d a ship, 
19 d a ship,  ->  a ship, d
20  a ship, d -> a ship, do
21 a ship, do ->  ship, don
22  ship, don -> ship, don'
23 ship, don' -> hip, don't
24 hip, don't -> ip, don't 
25 ip, don't  -> p, don't d
26 p, don't d -> , don't dr
27 , don't dr ->  don't dru
28  don't dru -> don't drum
29 don't drum -> on't drum 
30 on't drum  -> n't drum u
31 n't drum u -> 't drum up
32 't drum up -> t drum up 
33 t drum up  ->  drum up p
34  drum up p -> drum up pe
35 drum up pe -> rum up peo
36

In [39]:
print(x_data[0])
print(y_data[0])

[11, 19, 21, 15, 2, 6, 21, 9, 10, 5]
[19, 21, 15, 2, 6, 21, 9, 10, 5, 20]


In [42]:
x_one_hot = [np.eye(dic_size)[x] for x in x_data] # x 데이터는 원-핫 인코딩
X = torch.FloatTensor(x_one_hot)
y = torch.LongTensor(y_data)

print(f"훈련 데이터의 크기 : {X.shape}")
print(f"레이블의 크기 : {y.shape}")
print(X[0])
print(y[0])

훈련 데이터의 크기 : torch.Size([170, 10, 25])
레이블의 크기 : torch.Size([170, 10])
tensor([[0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 1., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 1., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0.,
         0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 0., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0., 0., 1., 0., 0., 0.],
        [0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0.,
         0., 0.

In [47]:
# 모델 구현
class Net(torch.nn.Module):
    def __init__(self, input_dim, hidden_size, layers): # 현재 hidden_size는 dic_size와 같음.
        super(Net, self).__init__()
        self.rnn = torch.nn.RNN(input_dim, hidden_size, num_layers=layers, batch_first = True)
        self.fc = torch.nn.Linear(hidden_size, hidden_size, bias=True)
    
    def forward(self, x):
        x, _status = self.rnn(x)
        x = self.fc(x)
        return x 

In [48]:
net = Net(dic_size, hidden_size, 2) # 이번에는 층 2개

criterion = torch.nn.CrossEntropyLoss()
optimizer = optim.Adam(net.parameters(), learning_rate)

outputs = net(X)
print(outputs.shape) # 3차원 텐서
print(outputs.view(-1, dic_size).shape) # 2차원 텐서로 변환
print(y.shape)
print(y.view(-1).shape)


torch.Size([170, 10, 25])
torch.Size([1700, 25])
torch.Size([170, 10])
torch.Size([1700])


In [49]:
for i in range(100):
    optimizer.zero_grad()
    outputs = net(X) # (170, 10, 25) 크기를 가진 텐서를 매 에포크 마다 모델의 입력으로 사용
    loss = criterion(outputs.view(-1, dic_size), y.view(-1))
    loss.backward()
    optimizer.step()

    # results의 텐서 크기는 (170, 10)
    results = outputs.argmax(dim=2)
    predict_str = ""
    for j, result in enumerate(results):
        if j == 0 : # 처음에는 예측 결과를 전부 가져오지만
            predict_str += ''.join([char_set[t] for t in result])
        else : # 그 다음에는 마지막 글자만 반복 추가
            predict_str += char_set[result[-1]]
    print(predict_str)


lmmllmmllllllllllllllllldlmllllmlllllolllmllllmmmlllmllllomlmllmolllllllllllllllmmlllllmlmllllllllllmlmlllllllmllllllloldwomlllldlllllllmllmlmlwwlllllllllomllmlllllllllllmmlllllml
       tp  eh  up   euepepp       e pe    ehp  e  u    e  eu  u e         up  h     eupppe  he  eup e ep  hh     up   ppe  eu   eehe  eu  u       ehe        ue      pp     ue  up 
ttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttttt
       re                                                                                                                                                                          
g..eo'ee'.h.'.......'....'.s........'.'le...g'....e.'l....e...eg.e..h.....l......e'.h..s'.......e..le'.s..'...s.l.'....l....'..e...e.'ee'.eg.e............l..sh.se'..e..f'.eh....e.
.s eoeeoeeioeiiiiiwioeooiieieiioeeieeieioieoeiaioeaoeaoeiooioeoeeoioeooioeeoeioiiieoioeoeioiaoiieiei