In [1]:
!wget http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
!tar xf aclImdb_v1.tar.gz

--2023-04-21 11:27:12--  http://ai.stanford.edu/~amaas/data/sentiment/aclImdb_v1.tar.gz
Resolving ai.stanford.edu (ai.stanford.edu)... 171.64.68.10
Connecting to ai.stanford.edu (ai.stanford.edu)|171.64.68.10|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 84125825 (80M) [application/x-gzip]
Saving to: ‘aclImdb_v1.tar.gz’


2023-04-21 11:27:17 (16.5 MB/s) - ‘aclImdb_v1.tar.gz’ saved [84125825/84125825]



In [5]:
import glob
import pathlib
import re
import torch

remove_marks_regex = re.compile("[,\.\(\)\[\]\*:;]|<.*?>")
shift_marks_regex = re.compile("([?!])")

In [4]:
def text2ids(text, vocab_dict):  # 긴 문자열을 토큰 ID 리스트로 변환하는 함수
  # !? 이외의 기호 삭제
  text = remove_marks_regex.sub("",text)
  # !?와 단어사이에 공백 삽입
  text = shift_marks_regex.sub(r" \1 ",text)
  tokens = text.split()
  return [ vocab_dict.get(token,0) for token in tokens]  # 용어집에 포함되지 않는 토큰은 0으로 채운다


In [6]:
#ID리스트를 int64의 텐서로 변환하는 함수 변환할때는 각 문장을 분할한 후 토근 수를 제한하고, 반대로 그 수에 
# 미치지 못하면 뒤를 0d로 채운다
def list2tensor(token_idxes, max_len=100, padding=True): 
  if len(token_idxes) > max_len:
    token_idxes = token_idxes[:max_len]
  n_tokens = len(token_idxes)
  if padding:
    token_idxes = token_idxes + [0]*(max_len - len(token_idxes))
  return torch.tensor(token_idxes, dtype = torch.int64), n_tokens

In [7]:
# Dataset 클래스 작성

In [11]:
from torch import nn, optim
from torch.utils.data import Dataset,DataLoader,TensorDataset
import tqdm

In [12]:
class IMDBDataset(Dataset):
  def __init__(self,dir_path, train = True,max_len=100, padding=True):
    self.max_len = max_len
    self.padding = padding
    path = pathlib.Path(dir_path)
    vocab_path = path.joinpath('imdb.vocab')
    # 용어집 파일을 읽어서 행 단위로 분할
    self.vocab_array = vocab_path.open().read().strip().splitlines()
    # 단어가 키, 값이 ID인 dict 
    self.vocab_dict = dict( (w, i+1) for (i,w) in enumerate(self.vocab_array) )
    if train:
      target_path = path.joinpath('train')
    else:
      target_path = path.joinpath('test')
    pos_files = sorted(glob.glob( str(target_path.joinpath('pos/*.txt')) )   )
    neg_files = sorted(glob.glob( str(target_path.joinpath('neg/*.txt')) )   )
    # pos 1, neg 0인 label을 붙여서 (file_path, label)
    self.labeled_files = list(zip([0]*len(neg_files), neg_files  )) + list(zip([1]*len(pos_files), pos_files  ))

  @property  # 함수를 변수처럼 사용해서 보다 간격하게 표현  imdb.vacab_size() -- > imdb.vacab_size
  def vocab_size(self):
    return len(self.vocab_array)
  def __len__(self):
    return len(self.labeled_files)
  def __getitem__(self, index):
    label, f =  self.labeled_files[index]
    #파일의 텍스트를 읽어서 소문자로 변환
    data = open(f).read().lower()
    # 텍스트 데이터를 ID리스트로 변환
    data = text2ids(data,self.vocab_dict)
    # ID 리스트를 Tensor로 변환
    data,n_tokens = list2tensor(data, self.max_len, self.padding)
    return data, label, n_tokens 
    

In [13]:
# 훈련용과 테스트용 DataLoader 작성
train_data = IMDBDataset('/content/aclImdb')
test_data = IMDBDataset('/content/aclImdb',train=False)
train_loader = DataLoader(train_data,batch_size=32,shuffle=True)
test_loader = DataLoader(test_data,batch_size=32,shuffle=False)

In [20]:
data = next(iter(train_loader))
data

[tensor([[  154,   462,   752,  ..., 62413, 16257, 15393],
         [    9,   101,    11,  ...,     0,     0,     0],
         [    9,   214,    10,  ...,     5,  2778,    29],
         ...,
         [   72,    20,   239,  ...,     0,     0,     0],
         [   10,    18,    12,  ...,     0,     0,     0],
         [   10,     6,     3,  ...,     0,     0,     0]]),
 tensor([0, 0, 0, 0, 1, 1, 0, 0, 0, 1, 0, 0, 1, 1, 0, 0, 1, 1, 1, 1, 0, 0, 0, 1,
         0, 0, 1, 0, 0, 0, 0, 0]),
 tensor([100,  76, 100, 100, 100, 100, 100, 100,  49, 100, 100, 100, 100, 100,
         100,  77,  91, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100, 100,
         100,  32,  77,  57])]

In [21]:
# 신경망..  x가 입력되었을때 0또는 1일 출력되는 2진 분류 문제
# x를 Embedding 으로 벡터 시계열로 변환하고, RNN에 넣어서 마지막 출력을 1차원 선형 계층과 연결

In [22]:
class SequenceTaggingNet(nn.Module):
  def __init__(self, num_embeddings, embedding_dim=50, hidden_size=50,num_layers=1, dropout=0.2):
    super(SequenceTaggingNet,self).__init__()
    # num_embeddings, embedding_dim 행렬의 크기 
    # padding_idx 패딩 토큰 0을 설정하면 입력데이터의 길이를 맞추기 위해 사용
    self.emb = nn.Embedding(num_embeddings, embedding_dim,padding_idx=0)
    # batch_first=True  입력텐서의 첫번째 차원이 배치크기인지 시퀀스 크기인지 지정
    self.lstm = nn.LSTM(embedding_dim,hidden_size,num_layers,batch_first=True,dropout=dropout)
    self.linear = nn.Linear(hidden_size, 1)
  def forward(self, x, h0=None, l=None):
    # ID를 Embedding으로 다차원 벡터로 변환
    # X는 (batch_size, step_size)
    #-> (batch_size, step_size, embedding_dim)
    x = self.emb(x)
    # 초기상태 h0와 함께 RNN에 x를 전달
    # x (batch_size, step_size, embedding_dim)
    #-> (batch_size, step_size, hidden_dim)
    x, h = self.lstm(x, h0)
    #마지막 단계만 추출
    # x (batch_size, step_size, hidden_dim)
    # -> (bath_size, 1)
    if l is not None:
      # 입력의 원래 길이가 있으면 사용
      x = x[list(range(len(x))), l-1, :]
    else:
      #없으면 단순히 마지막 것을 사용
      x = x[:, -1, :]
    # 추출한 마지막 단계에 선형 계층을 넣는다
    x = self.linear(x)
    # 불필요한 차원을 삭제
    # (batch_size, 1) - >(batch_size, )
    x = x.squeeze()
    return x


In [23]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# 훈련 및 평가
def eval_net(net, data_loader, device='gpu'):
  net.eval()
  y_list = []
  y_pred_list = []
  for x, y, l in data_loader:
    x = x.to(device)
    y = y.to(device)
    l = l.to(device)
    with torch.no_grad():
      y_pred = net(x, l=l)
      y_pred = (y_pred > 0).long()
      y_list.append(y)
      y_pred_list.append(y_pred)
  y_list = torch.cat(y_list)
  y_pred_list = torch.cat(y_pred_list)
  acc = (y_list == y_pred_list).float().sum() / len(y_list)
  return acc.item()

In [None]:
net = SequenceTaggingNet(train_data.vocab_size+1, num_layers=2)
net.to(device)
opt = optim.Adam(net.parameters())
loss_f = nn.BCEWithLogitsLoss()
for epoch in range(10):
  losses=[]
  net.train()
  for x,y,l in tqdm.tqdm(train_loader):
    x = x.to(device)
    y = y.to(device)
    l = l.to(device)
    y_pred = net(x, l=l)
    loss = loss_f(y_pred, y.float())
    opt.zero_grad()
    loss.backward()
    opt.step()
    losses.append(loss.item())
  train_acc = eval_net(net, train_loader, device)
  val_acc = eval_net(net, test_loader, device)
  print(f"epoch:{epoch}, loss:{np.mean(losses)} train_acc:{train_acc} val_acc:{val_acc}")