In [None]:
# To ignore deprecated warnings
import warnings
warnings.simplefilter("ignore")
warnings.warn("deprecated", DeprecationWarning)
import os
import random
import numpy as np
from sklearn.metrics import accuracy_score
import logging
logging.basicConfig(format='%(asctime)s : %(levelname)s : %(message)s', level=logging.INFO)

from torch.utils import data # pytorch data class 
import torch.nn as nn # pytorch neural network 불러오기 
import torch.nn.utils.rnn as rnn_utils # rnn utils
import torch

import pickle

# Sentiment Classfication

In [None]:
def build_dict(seqs):
    num_skip_sent = 0
    word_count = 4
    vocab = ["<pad>","<s>","</s>","<unk>"]
    word2id = {"<pad>": 0, "<s>": 1, "</s>": 2, "<unk>": 3}
    id2word = {0: "<pad>", 1: "<s>", 2: "</s>", 3: "<unk>"}
    print("Building vocab and dict..")
    for line in seqs:
        words = line.strip().split(' ') # tokenized by space 
        for word in words:
            if word not in vocab:
                word_count += 1 # increment word_count
                vocab.append(word) # append new unique word
                index = word_count - 1 # word index (consider index 0)
                word2id[word] = index # word to index
                id2word[index] = word # index to word
    
    print("Number of unique words: %d" % len(vocab))
    print("Finised building vocab and dict!")

    return vocab, word2id, id2word

In [None]:
def batch(iterable, n=1):
    args = [iter(iterable)] * n
    return zip_longest(*args)


def pad_tensor(vec, pad, value=0, dim=0):
    """
    pad token으로 채우는 용도 
    args:
        vec - tensor to pad
        pad - the size to pad to
        dim - dimension to pad
    return:
        a new tensor padded to 'pad' in dimension 'dim'
    """
    pad_size = pad - vec.shape[0]

    if len(vec.shape) == 2:
        zeros = torch.ones((pad_size, vec.shape[-1])) * value
    elif len(vec.shape) == 1:
        zeros = torch.ones((pad_size,)) * value
    else:
        raise NotImplementedError
    return torch.cat([torch.Tensor(vec), zeros], dim=dim)

In [None]:
def collate_fn(batch, values=(0, 0), dim=0):
    """
    데이터 로더에 들어가기전에 batch화 할 때 거치는 함수 
    args:
        batch - list of (tensor, label)
    reutrn:
        xs - a tensor of all examples in 'batch' after padding
        ys - a LongTensor of all labels in batch
        ws - a tensor of sequence lengths
    """

    sequence_lengths = torch.Tensor([int(x[0].shape[dim]) for x in batch]) # 각 batch 마다 길이를 얻어내고 
    sequence_lengths, xids = sequence_lengths.sort(descending=True) # 감소하는 순서로 정렬
    # find longest sequence (가장 긴 sequence의 길이를 구함 )
    max_len = max(map(lambda x: x[0].shape[dim], batch))
    # pad according to max_len (max length 만큼 padd를 추가 )
    batch = [(pad_tensor(x, pad=max_len, dim=dim), label) for (x, label) in batch]

    # stack all
    xs = torch.stack([x[0] for x in batch], dim=0)
    xs = xs[xids].contiguous() # decreasing order로 다시 나열
    
    labels = [x[1] for x in batch]
    labels = [labels[i] for i in xids] # decreasing order로 다시 나열
    
    return xs.long(), sequence_lengths.int(), torch.Tensor(labels)

In [None]:
class Sentiment_Dataset(data.Dataset):

    def __init__(self, path , word2id):
        self.seqs = open(path).readlines()
        self.word2id = word2id

    def __getitem__(self, index):
        """Returns one data pair (source and sentiment)."""
        seqs = self.seqs[index]
        seqs, label = self.process(seqs, self.word2id)
        return seqs, label

    def __len__(self):
        return len(self.seqs)
                    
    def process(self, seq, word2id):
        label = 0 # default label "pos"
        sequence = []
        sequence.append(word2id["<s>"])
        words = seq.strip().split(' ')
        for i in range(0, len(words)-1):
            current_word = words[i]
            if "negative" == words[len(words)-1]: #if label is "neg", then 1
                label = 1
            if current_word in word2id:
                sequence.append(word2id[current_word])
            else:
                sequence.append(3) # replace by <unk> token
        sequence.append(word2id["</s>"])
        sequence = torch.Tensor(sequence)
        return sequence, label

In [None]:
def sentiment_trainer(model, optimizer, loaders, epoch,n_epochs):
    """
    args:
        model     - sentiment model
        optimizer - adam
        loaders   - valid, train loaders
    return:
        model, optimizer
    """
    losses = []
    total_accuracy = []
    val_losses = []
    val_total_accuracy = []
    n_iter = 0
    for split in loaders.keys():
        if split == "train":
            for batch in loaders[split]:
                model.train() # train mode
                input, input_lengths, target_label = batch
                predict_label = model(input.to(device),input_lengths.to(device))
                loss_fn = nn.BCELoss().to(device)
                loss = loss_fn(predict_label,target_label.to(device))
                losses.append(loss.item())
                # Calculate accuracy
                x_acc = predict_label.round().cpu().detach().numpy()
                y_acc = target_label.cpu().detach().numpy()
                accuracy = accuracy_score(x_acc,y_acc)
                total_accuracy.append(accuracy)
                # Reset gradients
                optimizer.zero_grad()
                # Compute gradients
                loss.backward()
                optimizer.step()
                n_iter+=1 # count number of trained sentences
                if n_iter % 100 == 0: # print loss only if it's training stage
                    print ('\n [{}] current_iter_loss= {:05.3f} acc= {:05.3f}'.format(n_iter,loss,accuracy))
                
        elif split == "valid":
            model.eval() # eval mode
            for batch in loaders[split]:
                input, input_lengths, target_label = batch
                predict_label = model(input.to(device),input_lengths.to(device))
                loss_fn = nn.BCELoss().to(device)
                loss = loss_fn(predict_label,target_label.to(device))
                val_losses.append(loss.item())
                # Calculate accuracy
                x_acc = predict_label.round().cpu().detach().numpy()
                y_acc = target_label.cpu().detach().numpy()
                accuracy = accuracy_score(x_acc,y_acc)
                val_total_accuracy.append(accuracy)
  
    print ('\n Epoch({}/{}) avg train_loss= {:05.3f} train_acc= {:05.3f} val_loss= {:05.3f} val_acc= {:05.3f}'
           .format(
               epoch+1,n_epochs,np.mean(losses), np.mean(total_accuracy), 
                   np.mean(val_losses), np.mean(val_total_accuracy)
              )
          )
    
    return model, optimizer

In [None]:
def idx2word(idx, i2w, pad_idx):
    """
    index로 이루어진 문장을 받아,
    word 문장으로 전환
    """

    sent_str = [str()]*len(idx)

    for i, sent in enumerate(idx):

        for word_id in sent:

            if word_id == pad_idx:
                break
            sent_str[i] += i2w[int(word_id)] + " "

        sent_str[i] = sent_str[i].strip()


    return sent_str[0]

In [None]:
def evaluate(model, loader, epoch,n_epochs, id2word):
    """
    각 epoch 마다 model에 대한 test및, prediction결과 추출
    """
    losses = []
    total_accuracy = []
    model.eval()
    for batch in loader:
        input, input_lengths, target_label = batch
        predict_label = model(input.to(device),input_lengths.to(device))
        loss_fn = nn.BCELoss().to(device)
        loss = loss_fn(predict_label,target_label.to(device))
        losses.append(loss.item())
        # Calculate accuracy
        x_acc = predict_label.round().cpu().detach().numpy()
        y_acc = target_label.cpu().detach().numpy()
        accuracy = accuracy_score(x_acc,y_acc)
        total_accuracy.append(accuracy)

    print ('\n Epoch({}/{}) avg test_loss= {:05.3f} test_acc= {:05.3f}'
        .format(epoch+1,n_epochs,np.mean(losses), np.mean(total_accuracy))
    )
    
    # test inference
    print (idx2word(input, id2word, 0))
    predict_label = model(input.to(device),input_lengths.to(device))
    ans = {0: 'positive', 1:'negative'}
    print('\n Pred: {}, Ans: {}'
          .format(ans[predict_label[0].round().item()], ans[target_label[0].item()])
         )


In [None]:
# Building vocab or loading existing vocab
path = "./data/sentiment_id2word.pkl"
if os.path.isfile(path):
    with open("./data/sentiment_id2word.pkl", "rb") as f:
        id2word = pickle.load(f)
    with open("./data/sentiment_word2id.pkl", "rb") as f:
        word2id = pickle.load(f)
    with open("./data/sentiment_vocab.pkl", "rb") as f:
        vocab = pickle.load(f)

else: # file does not exist
    vocab, word2id, id2word = build_dict(seqs)
    pickle.dump(vocab, open("./data/sentiment_vocab.pkl", "wb" ))
    pickle.dump(word2id, open("./data/sentiment_word2id.pkl", "wb" ))
    pickle.dump(id2word, open("./data/sentiment_id2word.pkl", "wb" ))

In [None]:
# constructing train dataset
train_dataset = Sentiment_Dataset("./data/train.tok", word2id)
train_data_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                          batch_size=config["batch_size"],
                                          shuffle=True,
                                          collate_fn=collate_fn)
# constructing valid dataset
valid_dataset = Sentiment_Dataset("./data/valid.tok", word2id)
valid_data_loader = torch.utils.data.DataLoader(dataset=valid_dataset,
                                          batch_size=config["batch_size"],
                                          shuffle=True,
                                          collate_fn=collate_fn)
# constructing test dataset
test_dataset = Sentiment_Dataset("./data/test.tok", word2id)
test_data_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=config["batch_size"],
                                          shuffle=True,
                                          collate_fn=collate_fn)

loaders = {"train": train_data_loader, "valid": valid_data_loader}

In [None]:
class Sentiment_Classification(nn.Module):
    """
    IMDb 영화 리뷰 감성분류 모델
    """
    def __init__(self, config, vocab_size):
        """
        args:
            config     - hyperparameters
            vocab_size - vocab_size
        return:
            None
        """
        super(Sentiment_Classification, self).__init__()
        self.vocab_size = vocab_size
        self.embedding = nn.Embedding(self.vocab_size, config["embedding_size"])
        self.rnn = nn.GRU(input_size=config["embedding_size"], hidden_size=config["hidden_size"], 
                          num_layers=config["num_layers"],dropout=config["dropout"],bidirectional = config['bidirectional'] , batch_first=True)
        self.bidirectional = config['bidirectional']
        self.num_layers = config['num_layers']
        if self.bidirectional:
            self.hidden_size = 2*config['hidden_size']
        else:
            self.hidden_size = config['hidden_size']
        self.outputs = nn.Sequential(
                            nn.Linear(self.hidden_size, self.hidden_size),
                            nn.LeakyReLU(0.2),
                            nn.Linear(self.hidden_size, self.hidden_size),
                            nn.LeakyReLU(0.2),
                            nn.Linear(self.hidden_size, 1),
                            nn.Sigmoid()
                        )
    def forward(self,input, lengths):
        #기존 코드를 수정하지 않고 코드를 구현해 주세요!

In [None]:
config = {
  "epochs": ???,
  "hidden_size": ???,
  "bidirectional": ???,
  "num_layers": ???,
  "dropout": ???,
  "batch_size": ???,
  "embedding_size": ???,
  "learning_rate": ???,
}

In [None]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Init sentiment model
model = Sentiment_Classification(config, vocab_size=len(vocab))
model = model.to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=config.get("learning_rate", .001))

In [None]:
# Train model
for epoch in range(config["epochs"]):
    model,optimizer  = sentiment_trainer(model,optimizer, loaders, epoch,config["epochs"])
    evaluate(model, test_data_loader, epoch,config["epochs"], id2word)

### 모델제출 - 학습된 모델 저장 및 테스트

In [None]:
# 모델 저장 - (./trained_model + ./config.pkl) 제출
pickle.dump(config, open("./config.pkl", "wb" ))
torch.save(model.state_dict(), "./trained_model")
# 저장된 모델 확인하기
if os.path.isfile("./config.pkl"):
    with open("./config.pkl","rb") as f:
        config = pickle.load(f)
model = Sentiment_Classification(config, vocab_size=len(vocab))
model = model.to(device)
model.load_state_dict(torch.load("./trained_model"))
model.eval()
evaluate(model, test_data_loader, epoch,config["epochs"], id2word)