<a href="https://colab.research.google.com/github/unknown-jun/NLP_study/blob/main/NLP_Book/B03_RNN.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# RNN 파이썬 구현

In [None]:
import numpy as np

timesteps = 10    # 시점의 수. NLP에서는 보통 문장의 길이가 된다.
input_size = 4    # 입력 차원. NLP에서는 보통 단어 벡터의 차원이 된다.
hidden_size = 8   # 은닉 상태의 크기. 메모리셀의 용량이다.

# 입력에 해당되는 2D 텐서
inputs = np.random.random((timesteps, input_size))

# 은닉상태의 크기 hidden_size로 은닉 상태를 만듬
hidden_state_t = np.zeros((hidden_size,)) # 초기 은닉 상태는 0(벡터)로 초기화

In [None]:
hidden_state_t  #8의 크기를 가지는 은닉상태. 현재는 초기 은닉 상태로 모든 차원이 0을 가짐

array([0., 0., 0., 0., 0., 0., 0., 0.])

In [None]:
Wx = np.random.random((hidden_size, input_size))    # (8,4) 크기의 2D 텐서 생성. 입력에 대한 가중치
Wh = np.random.random((hidden_size, hidden_size))   # (8,8) 크기의 2D 텐서 생성. 은닉상태에 대한 가중치
b = np.random.random((hidden_size,))                # (8,) 크기의 1D 텐서 생성. 이 값은 편향(bias)

print(np.shape(Wx))
print(np.shape(Wh))
print(np.shape(b))

(8, 4)
(8, 8)
(8,)


In [None]:
total_hidden_state = []

# 메모리 셀 동작
for input_t in inputs:
  output_t = np.tanh(np.dot(Wx, input_t) + np.dot(Wh, hidden_state_t) + b) # Wx * Xt + Wh * Ht-1 + bias
  total_hidden_state.append(output_t)  # 각 시점의 은닉상태의 값을 계속해서 축적
  hidden_state_t = output_t

total_hidden_state = np.stack(total_hidden_state, axis=0)
total_hidden_state

array([[0.99998369, 0.99975207, 0.99999221, 0.99923166, 0.99989322,
        0.99998675, 0.99995146, 0.99995514],
       [0.99994201, 0.99978881, 0.99999077, 0.99811641, 0.99994927,
        0.99998456, 0.9998886 , 0.9999048 ],
       [0.9999364 , 0.9996629 , 0.99997297, 0.99835878, 0.9998659 ,
        0.99995695, 0.99986765, 0.9999249 ],
       [0.99996672, 0.9998319 , 0.9999953 , 0.99867714, 0.9999604 ,
        0.99999167, 0.99993234, 0.99992524],
       [0.99996449, 0.99986346, 0.99999604, 0.99907703, 0.99996439,
        0.99999178, 0.99994428, 0.99995282],
       [0.99994476, 0.99984901, 0.99999401, 0.99864358, 0.99996485,
        0.99998851, 0.99993007, 0.9999289 ],
       [0.99994691, 0.99983931, 0.99999199, 0.99906522, 0.9999486 ,
        0.99998329, 0.99993855, 0.99995538],
       [0.99990642, 0.99979719, 0.99998575, 0.99833588, 0.99994351,
        0.99997363, 0.99989001, 0.99992455],
       [0.99998136, 0.99994319, 0.99999896, 0.99971784, 0.99998401,
        0.99999689, 0.999988

In [None]:
import torch
import torch.nn as nn

input_size = 5   # 입력의 크기
hidden_size = 8  # 은닉 상태의 크기

# (batch_size, time_steps, input_size)
inputs = torch.Tensor(1, 10, 5)

cell = nn.RNN(input_size, hidden_size, batch_first=True)

outputs, _status = cell(inputs)
print(outputs.shape)  # 모든 시점(timesteps)의 은닉 상태들
print(_status.shape)  # 마지막 시점(timestep)의 은닉 상태

torch.Size([1, 10, 8])
torch.Size([1, 1, 8])


In [None]:
import numpy as np

class RNN:
  def __init__(self, Wx, Wh, b):
    self.params = [Wx, Wh, b]  
    # 가중치 2개와 편향 1개를 인수로 받음
    self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)] 
    # 각 매개변수에 대응하는 형태로 기울기를 초기화한 후 grads에 저장
    self.cache = None
    # 역전파 계산 시 사용하는 중간 데이터를 담을 cache를 초기화

  def forward(self, x, h_prev):
    Wx, Wh, b = self.params
    t = np.dot(h_prev, Wh) + np.dot(x, Wx) + b
    h_next = np.tanh(t)

    self.cache = (x, h_prev, h_next)
    return h_next

  def backward(self, dh_next):
    Wx, Wh, b = self.params
    x, h_prev, h_next = self.cache

    dt = dh_next * (1 - h_next ** 2)
    db = np.sum(dt, axis=0)
    dWh = np.dot(h_prev.T, dt)
    dh_prev = np.dot(dt, Wh.T)
    dWx = np.dot(x.T, dt)
    dx = np.dot(dt, Wx.T)

    self.grads[0][...] = dWx
    self.grads[1][...] = dWh
    self.grads[2][...] = db

    return dx, dh_prev

In [None]:
class TimeRNN:
  # 초기화 메소드
  def __init__(self, Wx, Wh, b, stateful=False): 
    # 가중치, 편향, 은닉상태 인계여부(실제에서는 True)
    self.params = [Wx, Wh, b]
    self.grads = [np.zeros_like(Wx), np.zeros_like(Wh), np.zeros_like(b)]
    self.layers = None # 다수의 RNN 계층을 리스트로 저장

    # h: 마지막 RNN 계층의 은닉상태 저장
    # dh: 역전파에서 하나 앞 블록의 은닉상태 기울기 저장
    self.h, self.dh = None, None
    self.stateful = stateful # 은닉상태 유지 True
    # stateful = True: Time RNN 계층이 은닉상태를 유지한다.
    # -> 아무리 긴 데이터라도 Time RNN 계층의 순전파를 끊지 않고 전파한다.
    # stateful = False: Time RNN 계층은 은닉상태를 '영행렬'로 초기화한다. 상태가 없다.

  def set_state(self, h):
    self.h = h

  def reset_state(self, h):
    self.h = None

  # 순전파
  def forward(self, xs):
    Wx, Wh, b = self.params
    # xs.shape = (N, T, D) = (미니배치 크기, T개 분량 시계열 데이터, 입력벡터 차원 수)
    N, T, D = xs.shape 
    D, H = Wx.shape

    self.layers = []
    hs = np.empty((N, T, H), dtype='f') # 출력값을 담은 hs

    if not self.stateful or self.h is None:
      self.h = np.zeros((N, H), dtype='f')

    for t in range(T):
      layer = RNN(*self.params)
      self.h = layer.forward(xs[:, t, :], self.h)
      hs[:,t,:] = self.h
      self.layers.append(layer)

    return hs

  def backward(self, dhs):
    Wx, Wh, b = self.params
    N, T, D = dhs.shape
    D, H = Wx.shape

    dxs = np.empty((N, T, D), dtype='f')
    dh = 0
    grads = [0,0,0]

    for t in reversed(range(T)):
      layer = self.layers[t]
      dx, dh = layer.backward(dhs[:, t, :] + dh) # 합산된 기울기
      dxs[:, t, :] = dx

      for i, grad in enumerate(layer.grads):
        grads[i] += grad

    for i, grad in enumerate(grads):
      self.grad[i][...] = grad
    self.dh = dh

    return dxs

# RNN 셀 구현

In [None]:
!pip install torchtext==0.10.0



In [None]:
# 라이브러리 로드
import torch
import torchtext
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import time

In [None]:
# 데이터 전처리
start = time.time()
TEXT = torchtext.legacy.data.Field(lower=True, fix_length=200, batch_first=False)
LABEL = torchtext.legacy.data.Field(sequential=False)

In [None]:
from torchtext.legacy import datasets
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)

downloading aclImdb_v1.tar.gz


aclImdb_v1.tar.gz: 100%|██████████| 84.1M/84.1M [00:02<00:00, 31.9MB/s]


**torch.text.legacy.data.Field**  
데이터 전처리를 위해 사용된다.
1. fix_length: 고정된 길이의 데이터를 얻을 수 있다.  
여기에서는 데이터의 길이를 200으로 고정했으며 200보다 짧다면 패딩 작업을 통해 200으로 맞추어 줌
2. batch_first: 신경망에 입력되는 텐서의 첫 번째 차원의 값이 배치 크기가 되도록 함.  
기본 값은 False.  
모델의 네트워크로 입력되는 데이터는 [시퀀스 길이, 배치 크기, 은닉층의 뉴런 개수]의 형태이지만 batch_first=True로 설정하면 [배치크기, 시퀀스의 길이, 은닉층의 뉴런 개수] 형태로 변경된다.
3. sequential: 데이터에 순서가 있는지 나타내며 기본값은 True. 예제의 레이블은 긍정/부정 값만 갖기 때문에 False로 설정

In [None]:
# 데이터셋 전처리 적용
import string

for example in train_data.examples:
  text = [x.lower() for x in vars(example)['text']]  # 소문자로 변경
  text = [x.replace("<br","") for x in text]         # "<br"을 공백으로 변경
  text = [''.join(c for c in s if c not in string.punctuation) for s in text] # 구두점 제거
  text = [s for s in text if s]  # 공백 제거
  vars(example)['text'] = text

In [None]:
# 훈련과 검증 데이터셋 분리
import random
train_data, valid_data = train_data.split(random_state=random.seed(0), 
                                          split_ratio=0.8)

print(f'Number of training examples:   {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples:    {len(test_data)}')

Number of training examples:   20000
Number of validation examples: 5000
Number of testing examples:    25000


In [None]:
# 단어 집합 만들기

'''
단어 집합이란 IMDB 데이터셋에 포함된 단어를 이용하여 하나의 딕셔너리와 같은 집합을 만드는 것
단어 집합을 만들 때는 단어들의 중복은 제거된 상태에서 진행함
'''
TEXT.build_vocab(train_data, max_size=10000, min_freq=10,vectors=None)
LABEL.build_vocab(train_data)

print(f'Number of TEXT vocabulary:  {len(TEXT.vocab)}')
print(f'Number of LABEL vocabulary: {len(LABEL.vocab)}')

Number of TEXT vocabulary:  10002
Number of LABEL vocabulary: 3


**build_vocab**  
1. max_size: 단어 집합으 크기로 단어 집합에 포함되는 어휘 수를 의미함
2. min_freq: 훈련 데이터셋에서 특정 단어의 최소 등장 횟수를 의미함.  
즉. min_freq= 10으로 설정했기 때문에 훈련 데이터셋에서 특정 단어가 최소 열번 이상 등장한 것만 단어 집합에 추가하겠다는 의미
3. vectors: 임베딩 벡터를 지정할 수 있다.  
임베딩 벡터는 워드 임베딩의 결과로 나온 벡터. 사전 학습된 임베딩으로는 워드투벡터(Word 2 Vector), 글로브(Glove) 등이 있으며, 파이토치에서도 nn.embedding()을 통해 단어를 랜덤한 숫자 값으로 변환한 후 가중치를 학습하는 방법을 제공함

In [None]:
# 테스트 데이터셋의 단어 집합 확인
print(LABEL.vocab.stoi)

defaultdict(<bound method Vocab._default_unk_index of <torchtext.legacy.vocab.Vocab object at 0x7f92d0fe7410>>, {'<unk>': 0, 'pos': 1, 'neg': 2})


In [None]:
# 데이터셋 메모리로 가져오기
BATCH_SIZE = 64
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

embeding_dim = 100  # 각 단어를 100차원으로 조정(임베딩 계층을 통과한 후 각 벡터의 크기)
hidden_size = 300

train_iterator, valid_iterator, test_iterator = torchtext.legacy.data.BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size= BATCH_SIZE,
    device= device
)

**BucketIterator**  
데이터로더(dataloader)와 쓰임새가 같다.  
즉, 배치 크기 단위로 값을 차례로 꺼내어 메모리로 가져오고 싶을 때 사용함. 특히 Field에서 fix_length를 사용하지 않았다면 BucketIterator에서 데이터의 길이를 조정할 수 있다. BucketIterator는 비슷한 길이의 데이터를 한 배치에 할당하여 패딩(padding)을 최소화시켜 줌

In [None]:
# 워드 임베딩 및 RNN 셀 정의
class RNNCell_Encoder(nn.Module):
  def __init__(self, input_dim, hidden_size):
    super(RNNCell_Encoder, self).__init__()
    self.rnn = nn.RNNCell(input_dim, hidden_size)

  def forward(self, inputs):     # inputs는 입력 시퀀스로 (시퀀스 길이, 배치, 임베딩(seq, batch, embedding))의 형태를 가짐
    bz = inputs.shape[1]         # 배치 차원을 가져옴
    ht = torch.zeros((bz, hidden_size)).to(device)  # 배치와 은닉층 뉴런의 크기를 0으로 초기화
    for word in inputs:
      ht = self.rnn(word, ht)
    return ht

class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.em = nn.Embedding(len(TEXT.vocab.stoi), embeding_dim)
    self.rnn = RNNCell_Encoder(embeding_dim, hidden_size)
    self.fc1 = nn.Linear(hidden_size, 256)
    self.fc2 = nn.Linear(256,3)

  def forward(self,x):
    x = self.em(x)
    x = self.rnn(x)
    x = F.relu(self.fc1(x))
    x = self.fc2(x)
    return x

In [None]:
# 옵티마이저와 손실 함수 정의
model = Net()
model.to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

**torch.nn.CrossEntropyLoss()**  
다중 분류에 사용됨.  
torch.nn.CrossEntropyLoss는 nn.LogSoftmax와 nn.NLLLoss 연산의 조합으로 구성.  
nn.LogSoftmax는 모델 네트워크의 마지막 계층에서 얻은 결괏값들을 확률로 해석하기 위해 소프트맥스 함수의 결과에 로그(log)를 취한 것이고, nn.NLLLoss는 다중분류에 사용된다. 신경망에서 로그 확률 값을 얻으려면 마지막에 LogSoftmax를 추가해야 함

In [None]:
# 모델 학습을 위한 함수 정의
# 1. 데이터 로더에서 데이터를 가져와서 
# 2. 모델에 적용한 후 
# 3. 손실 함수를 적용하여 오차를 구하고
# 4. 옵티마이저를 이용하여 파라미터(가중치, 바이어스 등)를 업데이트

def training(epoch, model, trainloader, validloader):
  correct = 0
  total = 0
  running_loss = 0

  model.train()
  for b in trainloader:
    x, y = b.text, b.label             # trainloader에서 text와 label을 꺼내 옴
    x, y = x.to(device), y.to(device)  # 꺼내 온 데이터를 gpu로 옮김

    y_pred = model(x)
    loss = loss_fn(y_pred, y)          # CrossEntropyLoss 손실 함수를 이용하여 오차 계산

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()
    
    with torch.no_grad():
      y_pred = torch.argmax(y_pred, dim=1)
      correct += (y_pred == y).sum().item()
      total += y.size(0)
      running_loss += loss.item()

  epoch_loss = running_loss / len(trainloader.dataset)   # 누적된 오차를 전체 데이터셋으로 나누어서 에포크 단계마다 오차를 구함
  epoch_acc = correct / total

  valid_correct = 0
  valid_total = 0
  valid_running_loss = 0

  model.eval()
  with torch.no_grad():
    for b in validloader:
      x, y = b.text, b.label
      x, y = x.to(device), y.to(device)

      y_pred = model(x)
      loss = loss_fn(y_pred, y)
      
      y_pred = torch.argmax(y_pred, dim=1)
      valid_correct += (y_pred == y).sum().item()
      valid_total += y.size(0)
      valid_running_loss += loss.item()

  epoch_valid_loss = valid_running_loss / len(validloader.dataset)
  epoch_valid_acc = valid_correct / valid_total

  # 훈련이 진행될 때 에포크마다 정확도와 오차(loss)를 출력
  print('epoch: ', epoch,
        'loss: ', round(epoch_loss, 3),
        'accuracy: ', round(epoch_acc, 3),
        'valid_loss: ', round(epoch_valid_loss, 3),
        'valid_accuracy: ', round(epoch_valid_acc, 3))
  
  return epoch_loss, epoch_acc, epoch_valid_loss, epoch_valid_acc

In [None]:
# 모델 학습
epochs = 5
train_loss = []
train_acc = []
valid_loss = []
valid_acc = []

for epoch in range(epochs):
  epoch_loss, epoch_acc, epoch_valid_loss, epoch_valid_acc = training(epoch, model, train_iterator, valid_iterator)
  train_loss.append(epoch_loss)
  train_acc.append(epoch_loss)
  valid_loss.append(epoch_valid_loss)
  valid_acc.append(epoch_valid_acc)

end = time.time()
print(end-start)

epoch:  0 loss:  0.011 accuracy:  0.501 valid_loss:  0.011 valid_accuracy:  0.503
epoch:  1 loss:  0.011 accuracy:  0.507 valid_loss:  0.011 valid_accuracy:  0.495
epoch:  2 loss:  0.011 accuracy:  0.516 valid_loss:  0.011 valid_accuracy:  0.496
epoch:  3 loss:  0.011 accuracy:  0.517 valid_loss:  0.011 valid_accuracy:  0.496
epoch:  4 loss:  0.011 accuracy:  0.531 valid_loss:  0.011 valid_accuracy:  0.512
131.796147108078


In [None]:
# 모델 예측 함수 정의
def evaluate(epoch, model, testloader):
  test_correct = 0
  test_total = 0
  test_running_loss = 0

  model.eval()
  with torch.no_grad():
    for b in testloader:
      x, y = b.text, b.label
      x, y = x.to(device), y.to(device)
      y_pred = model(x)
      loss = loss_fn(y_pred, y)
      y_pred = torch.argmax(y_pred, dim=1)
      test_correct += (y_pred == y).sum().item()
      test_total += y.size(0)
      test_running_loss += loss.item()

  epoch_test_loss = test_running_loss / len(testloader.dataset)
  epoch_test_acc = test_correct / test_total

  print('epoch: ', epoch,
        'test_loss: ', round(epoch_test_loss,3),
        'test_accuracy: ', round(epoch_test_acc,3))
  
  return epoch_test_loss, epoch_test_acc

In [None]:
# 모델 예측 결과 확인
start = time.time()

epochs = 5
test_loss = []
test_acc = []

for epoch in range(epochs):
  epoch_test_loss, epoch_test_acc = evaluate(epoch, model, test_iterator)
  test_loss.append(epoch_loss)
  test_acc.append(epoch_loss)


end = time.time()
print(end-start)

epoch:  0 test_loss:  0.011 test_accuracy:  0.503
epoch:  1 test_loss:  0.011 test_accuracy:  0.503
epoch:  2 test_loss:  0.011 test_accuracy:  0.503
epoch:  3 test_loss:  0.011 test_accuracy:  0.503
epoch:  4 test_loss:  0.011 test_accuracy:  0.503
31.64377474784851


# RNN 계층 구현

In [None]:
# 라이브러리 호출
import torch
import torchtext
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import time

In [None]:
# 데이터셋 내려받기 및 전처리
TEXT = torchtext.legacy.data.Field(sequential=True, batch_first=True, lower=True)
LABEL = torchtext.legacy.data.Field(sequential=False, batch_first=True)

from torchtext.legacy import datasets
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)
train_data, valid_data = train_data.split(split_ratio=0.8)

TEXT.build_vocab(train_data, max_size=10000, min_freq=10, vectors=None)
LABEL.build_vocab(train_data)

BATCH_SIZE = 100
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
# 데이터셋 분리
train_iterator, valid_iterator, test_iterator = torchtext.legacy.data.BucketIterator.splits((train_data, valid_data, test_data),
                                                                                            batch_size= BATCH_SIZE,
                                                                                            device= device)

In [None]:
# 변수 값 지정
vocab_size = len(TEXT.vocab)
n_classes = 2

In [None]:
# RNN 계층 네트워크
class BasicRNN(nn.Module):
  def __init__(self, n_layers, hidden_dim, n_vocab, embed_dim, n_classes, dropout_p=0.2):
    super(BasicRNN, self).__init__()
    self.n_layers = n_layers          # RNN 계층에 대한 개수
    self.embed = nn.Embedding(n_vocab, embed_dim) # 워드 임베딩 적용
    self.hidden_dim = hidden_dim
    self.dropout = nn.Dropout(dropout_p)  # 드랍아웃 적용
    self.rnn = nn.RNN(embed_dim, self.hidden_dim, num_layers = self.n_layers, batch_first=True)
    self.out = nn.Linear(self.hidden_dim, n_classes)

  def forward(self, x):
    x = self.embed(x) # 문자를 숫자/벡터로 변환
    h_0 = self._init_state(batch_size=x.size(0))  # 최초 은닉 상태의 값을 0으로 초기화
    x,_ = self.rnn(x, h_0)  # RNN 계층을 의미하며, 파라미터로 입력과 이전 은닉 상태의 값을 받음
    h_t = x[:,-1,:]  # 모든 네트워크를 거쳐서 가장 마지막에 나온 단어의 임베딩 값(마지막 은닉 상태의 값)
    self.dropout(h_t)
    logit = torch.sigmoid(self.out(h_t))
    return logit

  def _init_state(self, batch_size=1):
    weight = next(self.parameters()).data   # 모델의 파라미터 값을 가져와서 weight 변수에 저장
    return weight.new(self.n_layers, batch_size, self.hidden_dim).zero_() 
    # 크기가 (계층의 개수, 배치 크기, 은닉층의 뉴런/유닛 개수)인 은닉 상태(텐서)를 생성하여 0으로 초기화한 후 반환

In [None]:
# 손실 함수와 옵티마이저 설정
model = BasicRNN(n_layers=2, hidden_dim=256, n_vocab=vocab_size, embed_dim=128, n_classes=n_classes, dropout_p=0.5)
model.to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

In [None]:
# 모델 학습 함수
def train(model, optimizer, train_iter):
  model.train()
  for b, batch in enumerate(train_iter):
    x, y = batch.text.to(device), batch.label.to(device)
    y.data.sub_(1) # 레이블 값을 0과 1로 변환
    optimizer.zero_grad()

    logit = model(x)
    loss = F.cross_entropy(logit, y)
    loss.backward()
    optimizer.step()

    if b % 50 == 0:
      print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(e,
                                                                      b * len(x),
                                                                      len(train_iter.dataset),
                                                                      100. * b / len(train_iter),
                                                                      loss.item()))

In [None]:
# 모델 평가 함수
def evaluate(model, val_iter):
  model.eval()
  corrects, total, total_loss = 0,0,0

  for batch in val_iter:
    x, y = batch.text.to(device), batch.label.to(device)
    y.data.sub_(1)
    logit = model(x)
    loss = F.cross_entropy(logit, y, reduction='sum')
    total += y.size(0)
    total_loss += loss.item()
    corrects += (logit.max(1)[1].view(y.size()).data == y.data).sum()

  avg_loss = total_loss / len(val_iter.dataset)
  avg_accuracy = corrects / total
  return avg_loss, avg_accuracy

**`(logit.max(1)[1].view(y.size()).data == y.data).sum()`**  

1. `max(1)[1]`: .max(dim=0)[0]은 최댓값(max)을 나타내고 .max(dim=0)[1]은 최댓값을 갖는 데이터의 인덱스를 나타냄
2. `view(y.size())`: logit.max(1)[1]의 결과를 y.size()로 크기 변경
3. `data == y.data`: 모델의 예측 결과(logit.max(1)[1].view(y.size().data)가 레이블(실제 값, y.data)과 같은지 확인
4. `sum()`: 모델의 예측 결과와 레이블(실제 값)이 같으면 그 합을 corrects 변수에 누적하여 저장함


In [None]:
# 모델 학습 및 평가
BATCH_SiZE = 100
LR = 0.001
EPOCHS = 5

for e in range(1, EPOCHS + 1):
  train(model, optimizer, train_iterator)
  val_loss, val_accuracy = evaluate(model, valid_iterator)
  print('[EPOCH: %d], validation Loss: %5.2f | Validation Accuracy: %5.2f' % (e, val_loss, val_accuracy))

[EPOCH: 1], validation Loss:  0.69 | Validation Accuracy:  0.50
[EPOCH: 2], validation Loss:  0.69 | Validation Accuracy:  0.50
[EPOCH: 3], validation Loss:  0.69 | Validation Accuracy:  0.50
[EPOCH: 4], validation Loss:  0.70 | Validation Accuracy:  0.50
[EPOCH: 5], validation Loss:  0.70 | Validation Accuracy:  0.51


In [None]:
test_loss, test_acc = evaluate(model, test_iterator)
print('Test Loss: %5.2f | Test Accuracy: %5.2f' % (test_loss, test_acc))

Test Loss:  0.71 | Test Accuracy:  0.49


# 복습

In [None]:
import torch
import torchtext
import numpy as np
import torch.nn as nn
import torch.nn.functional as F
import time

In [None]:
!pip install torchtext==0.10.0

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchtext==0.10.0
  Downloading torchtext-0.10.0-cp37-cp37m-manylinux1_x86_64.whl (7.6 MB)
[K     |████████████████████████████████| 7.6 MB 5.9 MB/s 
Collecting torch==1.9.0
  Downloading torch-1.9.0-cp37-cp37m-manylinux1_x86_64.whl (831.4 MB)
[K     |████████████████████████████████| 831.4 MB 2.7 kB/s 
Installing collected packages: torch, torchtext
  Attempting uninstall: torch
    Found existing installation: torch 1.11.0+cu113
    Uninstalling torch-1.11.0+cu113:
      Successfully uninstalled torch-1.11.0+cu113
  Attempting uninstall: torchtext
    Found existing installation: torchtext 0.12.0
    Uninstalling torchtext-0.12.0:
      Successfully uninstalled torchtext-0.12.0
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
torchvision 0.12.0+

In [None]:
TEXT = torchtext.legacy.data.Field(lower=True, fix_length=200, batch_first=False)
LABEL = torchtext.legacy.data.Field(sequential=False)

In [None]:
from torchtext.legacy import datasets
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)

downloading aclImdb_v1.tar.gz


aclImdb_v1.tar.gz: 100%|██████████| 84.1M/84.1M [00:08<00:00, 9.38MB/s]


In [None]:
vars(train_data.examples[0])

{'label': 'pos',
 'text': ['the',
  'italian',
  'is',
  'an',
  'astonishingly',
  'accomplished',
  'film',
  'for',
  'its',
  'time.',
  'stunningly',
  'shot,',
  'with',
  'lighting',
  'effects',
  'that',
  'are',
  'truly',
  'sublime,',
  'this',
  'is',
  'an',
  'early',
  'gem',
  'that',
  'clearly',
  'reveals',
  'reginald',
  'barker',
  'to',
  'be',
  'a',
  'pioneer',
  'director',
  'of',
  'equal',
  'standing',
  'to',
  'd.w.',
  'griffith',
  'and',
  'maurice',
  'tourneur.',
  'how',
  'much',
  'control',
  'thomas',
  'ince',
  'exerted',
  'over',
  'the',
  'production',
  'is',
  'hard',
  'to',
  'know,',
  'but',
  'this',
  'film',
  'still',
  'has',
  'extraordinary',
  'power.',
  'the',
  'simple',
  'story',
  'of',
  'an',
  'italian',
  'immigrant',
  'struggling',
  'to',
  'keep',
  'his',
  'family',
  'alive',
  'in',
  'new',
  'york,',
  'is',
  'very',
  'moving.',
  'the',
  'themes',
  'of',
  'social',
  'injustice,',
  'revenge',
  '

In [None]:
import string

for example in train_data.examples:
  text = [x.lower() for x in vars(example)['text']]
  text = [x.replace('<br', '') for x in text]
  text = [''.join(c for c in s if c not in string.punctuation) for s in text]
  text = [s for s in text if s]
  vars(example)['text'] = text

In [None]:
import random
train_data, valid_data = train_data.split(random_state=random.seed(0), split_ratio=0.8)

In [None]:
print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

In [None]:
TEXT.build_vocab(train_data, max_size=10000, min_freq=10, vectors=None)
LABEL.build_vocab(train_data)

print(f'Unique token in TEXT vocabulary: {len(TEXT.vocab)}')
print(f'Unique token in LABEL vocabulary: {len(LABEL.vocab)}')

In [None]:
BATCH_SIZE = 64
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
embedding_dim = 100
hidden_size = 300

train_iteration, valid_iteraton, test_iteration = torchtext.legacy.data.BucketIterator.splits(
                                                                                                (train_data, valid_data, test_data),
                                                                                               batch_size = BATCH_SIZE,
                                                                                               device=device )

In [None]:
class RNNCell_Encoder(nn.Module):
  def __init__(self, input_dim, hidden_size):
    super(RNNCell_Encoder, self).__init__()
    self.rnn = nn.RNNCell(input_dim, hidden_size)

  def forward(self, inputs):
    bz = inputs.shape[1]
    ht = torch.zeros((bz, hidden_size)).to(device)

    for word in inputs:
      ht = self.rnn(word, ht)
    return ht

class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.em = nn.Embedding(len(TEXT.vocab.stoi), embedding_dim)
    self.rnn = RNNCell_Encoder(embedding_dim, hidden_size)
    self.fc1 = nn.Linear(hidden_size, 256)
    self.fc2 = nn.Linear(256, 3)

  def forward(self, x):
    x = self.em(x)
    x = self.rnn(x)
    x = F.relu(self.fc1(x))
    x = self.fc2(x)
    return x

In [None]:
model = Net()
model.to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.0001)

In [None]:
def training(epoch, model, trainloader, validloader):
  correct = 0
  total = 0
  running_loss = 0

  model.train()
  for b in trainloader:
    x, y = b.text, b.label
    x, y = x.to(device), y.to(device)

    y_pred = model(x)
    loss = loss_fn(y_pred, y)
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    with torch.no_grad():
      y_pred = torch.argmax(y_pred, dim=1)
      correct += (y_pred == y).sum().item()
      total += y.size(0)
      running_loss += loss.item()

  epoch_loss = running_loss / len(trainloader.dataset)
  epoch_acc = correct / total

  valid_correct = 0
  valid_total = 0
  valid_running_loss = 0

  model.eval()
  with torch.no_grad():
    for b in validloader:
      x, y = b.text, b.label
      x, y = x.to(device), y.to(device)
      y_pred = model(x)  # 각각의 확률값이 나옴
      loss = loss_fn(y_pred, y)
      y_pred = torch.argmax(y_pred, dim=1)      # 가장 확률이 큰 인덱스를 추출
      valid_correct += (y_pred==y).sum().item() # 실제 레이블과 인덱스를 비교하여 맞는 것들을 합계
      valid_total += y.size(0)
      valid_running_loss += loss.item()

  epoch_valid_loss = valid_running_loss / len(validloader.dataset)
  epoch_valid_acc = valid_correct / valid_total

  print('epoch: ', epoch,
        'loss: ', round(epoch_loss, 3),
        'accuracy: ', round(epoch_acc, 3),
        'valid_loss: ', round(epoch_valid_loss, 3),
        'valid_accuracy: ', round(epoch_valid_acc,3))
  
  return epoch_loss, epoch_acc, epoch_valid_loss, epoch_valid_acc

In [None]:
start = time.time()

epoch = 5
train_loss = []
train_acc = []
valid_loss = []
valid_acc = []

for epoch in range(epochs):
  epoch_loss, epoch_acc, epoch_valid_loss, epoch_valid_acc = training(epoch, model, train_iteration, valid_iteraton)
  train_loss.append(epoch_loss)
  train_acc.append(epoch_acc)
  valid_loss.append(epoch_valid_loss)
  valid_acc.append(epoch_valid_acc)

end = time.time()
print(end-start)

epoch:  0 loss:  0.011 accuracy:  0.49 valid_loss:  0.011 valid_accuracy:  0.506
epoch:  1 loss:  0.011 accuracy:  0.505 valid_loss:  0.011 valid_accuracy:  0.499
epoch:  2 loss:  0.011 accuracy:  0.511 valid_loss:  0.011 valid_accuracy:  0.497
epoch:  3 loss:  0.011 accuracy:  0.521 valid_loss:  0.011 valid_accuracy:  0.496
epoch:  4 loss:  0.011 accuracy:  0.523 valid_loss:  0.011 valid_accuracy:  0.511
92.5865728855133


In [None]:
def evaluate(epoch, model, testloader):
  test_correct = 0
  test_total = 0
  test_running_loss = 0

  model.eval()
  with torch.no_grad():
    for b in testloader:
      x, y = b.text, b.label
      x, y = x.to(device), y.to(device)
      y_pred = model(x)
      loss = loss_fn(y_pred, y)
      y_pred = torch.argmax(y_pred, dim=1)

      test_correct += (y_pred == y).sum().item()
      test_total += y.size(0)
      test_running_loss += loss.item()

  epoch_test_loss = test_running_loss / len(testloader.dataset)
  epoch_test_acc = test_correct / test_total

  print('epoch: ', epoch,
        'test_loss: ', round(epoch_test_loss, 3),
        'test_accuracy: ', round(epoch_test_acc,3) )
  
  return epoch_test_loss, epoch_test_acc

In [None]:
start = time.time()

epochs = 5
test_loss = []
test_acc = []

for epoch in range(epochs):
  epoch_test_loss, epoch_test_acc = evaluate(epoch, model, test_iteration)
  test_loss.append(epoch_test_loss)
  test_acc.append(epoch_test_acc)

end = time.time()
print(end-start)

epoch:  0 test_loss:  0.011 test_accuracy:  0.5
epoch:  1 test_loss:  0.011 test_accuracy:  0.5
epoch:  2 test_loss:  0.011 test_accuracy:  0.5
epoch:  3 test_loss:  0.011 test_accuracy:  0.5
epoch:  4 test_loss:  0.011 test_accuracy:  0.5
32.00366711616516


In [None]:
TEXT = torchtext.legacy.data.Field(sequential=True, batch_first=True, lower=True)
LABEL = torchtext.legacy.data.Field(sequential=False, batch_first=True)

train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)
train_data, valid_data = train_data.split(split_ratio=0.8)

TEXT.build_vocab(train_data, max_size=10000, min_freq=10, vectors=None)
LABEL.build_vocab(train_data)

BATCH_SIZE = 100
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

In [None]:
train_iterator, valid_iterator, test_iterator = torchtext.legacy.data.BucketIterator.splits((train_data, valid_data, test_data),
                                                                                            batch_size=BATCH_SIZE, 
                                                                                            device = device)

In [None]:
vocab_size = len(TEXT.vocab)
n_classes = 2

In [None]:
class BasicRNN(nn.Module):
  def __init__(self, n_layers, hidden_dim, 
               n_vocab, embed_dim, n_classes, 
               dropout_p=0.2):
    super(BasicRNN, self).__init__()
    self.n_layers = n_layers
    self.embed = nn.Embedding(n_vocab, embed_dim)
    self.hidden_dim = hidden_dim
    self.dropout = nn.Dropout(dropout_p)
    self.rnn = nn.RNN(embed_dim, self.hidden_dim, 
                      num_layers=self.n_layers, batch_first=True)
    self.out = nn.Linear(self.hidden_dim, n_classes)

  def forward(self, x):
    x = self.embed(x)
    h_0 = self._init_state(batch_size=x.size(0))
    x, _ = self.rnn(x, h_0)
    h_t = x[:,-1,:]
    self.dropout(h_t)
    logit = torch.sigmoid(self.out(h_t))
    return logit

  def _init_state(self, batch_size=1):
    weight = next(self.parameters()).data
    return weight.new(self.n_layers, batch_size, self.hidden_dim).zero_()

In [None]:
model = BasicRNN(n_layers=1, hidden_dim=256, 
                 n_vocab=vocab_size, embed_dim=128, 
                 n_classes=n_classes, dropout_p=0.5)
model.to(device)

loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),lr=0.0001)

In [None]:
def train(model, optimizer, train_iter):
  model.train()

  for b, batch in enumerate(train_iter):
    x, y = batch.text.to(device), batch.label.to(device)
    y.data.sub_(1)
    optimizer.zero_grad()

    logit = model(x)
    loss = F.cross_entropy(logit, y)
    loss.backward()
    optimizer.step()

    if b % 50 == 0:
      print("Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}".format(e,
                                                                      b * len(x),
                                                                      len(train_iter.dataset),
                                                                      100. * b / len(train_iter),
                                                                      loss.item()))

In [None]:
def evaluate(model, val_iter):
  model.eval()
  corrects, total, total_loss = 0,0,0

  for batch in val_iter:
    x, y = batch.text.to(device), batch.label.to(device)
    y.data.sub_(1)
    logit = model(x)
    loss = F.cross_entropy(logit, y, reduction='sum')
    total += y.size(0)
    total_loss += loss.item()
    corrects += (logit.max(1)[1].view(y.size()).data == y.data).sum()

  avg_loss = total_loss / len(val_iter.dataset)
  avg_accuracy = corrects / total
  
  return avg_loss, avg_accuracy

In [None]:
BATCH_SIZE = 100
LR = 0.001
EPOCHS = 5

for e in range(1, EPOCHS + 1):
  train(model, optimizer, train_iterator)
  val_loss, val_accuracy = evaluate(model, valid_iterator)
  print('[EPOCH: %d], Validation Loss: %5.2f | Validation Accuracy: %5.2f' 
        % (e, val_loss, val_accuracy))

[EPOCH: 1], Validation Loss:  0.69 | Validation Accuracy:  0.50
[EPOCH: 2], Validation Loss:  0.69 | Validation Accuracy:  0.50
[EPOCH: 3], Validation Loss:  0.69 | Validation Accuracy:  0.50
[EPOCH: 4], Validation Loss:  0.69 | Validation Accuracy:  0.49
[EPOCH: 5], Validation Loss:  0.69 | Validation Accuracy:  0.49


In [None]:
test_loss, test_acc = evaluate(model, test_iterator)
print("Test Loss: %5.2f | Test Accuracy: %5.2f" % (test_loss, test_acc))

Test Loss:  0.69 | Test Accuracy:  0.51


In [None]:
start = time.time()
TEXT = torchtext.legacy.data.Field(lower= True, fix_length=200, batch_first=False)
LABEL = torchtext.legacy.data.Field(sequential=False)

In [None]:
from torchtext.legacy import datasets
train_data, test_data = datasets.IMDB.splits(TEXT, LABEL)

downloading aclImdb_v1.tar.gz


aclImdb_v1.tar.gz: 100%|██████████| 84.1M/84.1M [00:09<00:00, 8.75MB/s]


In [None]:
import string

for example in train_data.examples:
  text = [x.lower() for x in vars(example)['text']]
  text = [x.replace("<br", "") for x in text]
  text = [''.join(c for c in s if c not in string.punctuation) for s in text]
  text = [s for s in text if s]
  vars(example)['text'] = text

In [None]:
import random
train_data, valid_data = train_data.split(random_state=random.seed(0), 
                                          split_ratio=0.8)

In [None]:
print(f'Number of training examples: {len(train_data)}')
print(f'Number of validation examples: {len(valid_data)}')
print(f'Number of testing examples: {len(test_data)}')

Number of training examples: 20000
Number of validation examples: 5000
Number of testing examples: 25000


In [None]:
TEXT.build_vocab(train_data, max_size=10000, min_freq=10, vectors= None)
LABEL.build_vocab(train_data)

print(f'unique tokens in TEXT vocabulary: {len(TEXT.vocab)}')
print(f'unique tokens in LABEL vocabulary: {len(LABEL.vocab)}')

unique tokens in TEXT vocabulary: 10002
unique tokens in LABEL vocabulary: 3


In [None]:
BATCH_SIZE = 64
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

embedding_dim = 100
hidden_size = 300  # 은닉층의 유닛 개수(D_h)

train_iterator, valid_iterator, test_iterator = torchtext.legacy.data.BucketIterator.splits(
    (train_data, valid_data, test_data),
    batch_size = BATCH_SIZE,
    device = device )

In [None]:
class RNNCell_Encoder(nn.Module):
  def __init__(self, input_dim, hidden_size):
    super(RNNCell_Encoder, self).__init__()
    self.rnn = nn.RNNCell(input_dim, hidden_size)
  
  def forward(self, inputs):
    bz = inputs.shape[1]
    ht = torch.zeros((bz, hidden_size)).to(device)

    for word in inputs:
      ht = self.rnn(word, ht)
      
    return ht

class Net(nn.Module):
  def __init__(self):
    super(Net, self).__init__()
    self.em = nn.Embedding(len(TEXT.vocab.stoi), embeding_dim)
    self.rnn = RNNCell_Encoder(embedding_dim, hidden_size)
    self.fc1 = nn.Linear(hidden_size, 256)
    self.fc2 = nn.Linear(256, 3)

  def forward(self, x):
    x = self.em(x)
    x = self.rnn(x)
    x = F.relu(self.fc1(x))
    x = self.fc2(x)
    return x