**Sentiment Analysis using RNN models**

In [None]:
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import torch
import torch.nn as nn
import torch.nn.functional as F
import nltk
nltk.download('stopwords')
from nltk.corpus import stopwords 
from collections import Counter
import string
import re
from tqdm import tqdm
import matplotlib.pyplot as plt
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split

**Dataset loading**

In [None]:
#구글 드라이브 마운트
from google.colab import drive
drive.mount('/content/drive')

#zip 파일 현재 폴더로 복사, 중간 부분은 파일명에 따라 변경 필요
!cp '/content/drive/MyDrive/IMDB Dataset.csv.zip' ./

In [None]:
#압축 해제
!unzip 'IMDB Dataset.csv.zip'

In [None]:
is_cuda = torch.cuda.is_available()

# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU not available, CPU used")

In [None]:
#구현 필요
base_csv = 'IMDB Dataset.csv'
df = pd.read_csv(base_csv)
df.head()

**Pre-process the dataset**

In [None]:
#구현 필요
x, y = df['review'].values, df['sentiment'].values
x_train, x_test, y_train, y_test = train_test_split(x, y, test_size=0.2)
print(f'shape of train data is {x_train.shape}')
print(f'shape of test data is {x_test.shape}')

In [None]:
def preprocess_string(s):
    # Remove all non-word characters (everything except numbers and letters)
    s = re.sub(r"[^\w\s]", '', s)
    # Replace all runs of whitespaces with no space
    s = re.sub(r"\s+", '', s)
    # replace digits with no space
    s = re.sub(r"\d", '', s)

    return s

def tockenize(x_train,y_train,x_val,y_val):
    word_list = []

    stop_words = set(stopwords.words('english')) 
    for sent in x_train:
        for word in sent.lower().split():
            word = preprocess_string(word)
            if word not in stop_words and word != '':
                word_list.append(word)
  
    corpus = Counter(word_list)
    # sorting on the basis of most common words
    corpus_ = sorted(corpus,key=corpus.get,reverse=True)[:1000]
    # creating a dict
    onehot_dict = {w:i+1 for i,w in enumerate(corpus_)}
    
    # tockenize
    final_list_train,final_list_test = [],[]
    for sent in x_train:
            final_list_train.append([onehot_dict[preprocess_string(word)] for word in sent.lower().split() 
                                     if preprocess_string(word) in onehot_dict.keys()])
    for sent in x_val:
            final_list_test.append([onehot_dict[preprocess_string(word)] for word in sent.lower().split() 
                                    if preprocess_string(word) in onehot_dict.keys()])
            
    encoded_train = [1 if label =='positive' else 0 for label in y_train]  
    encoded_test = [1 if label =='positive' else 0 for label in y_val] 
    return np.array(final_list_train), np.array(encoded_train),np.array(final_list_test), np.array(encoded_test),onehot_dict

In [None]:
#구현 필요
x_train, y_train, x_test, y_test, vocab = tockenize(x_train, y_train, x_test, y_test)
print(f'Length of vocabulary is {len(vocab)}')

In [None]:
rev_len = [len(i) for i in x_train]
pd.Series(rev_len).hist()
plt.show()
pd.Series(rev_len).describe()

In [None]:
def padding_(sentences, seq_len):
    features = np.zeros((len(sentences), seq_len),dtype=int)
    for ii, review in enumerate(sentences):
        if len(review) != 0:
            features[ii, -len(review):] = np.array(review)[:seq_len]
    return features

#구현 필요
x_train_pad = padding_(x_train, 200)
x_test_pad = padding_(x_test, 200) 

In [None]:
#구현 필요

# create Tensor datasets
train_data = TensorDataset(torch.from_numpy(x_train_pad), torch.from_numpy(y_train))
test_data = TensorDataset(torch.from_numpy(x_test_pad), torch.from_numpy(y_test))
# dataloaders
batch_size = 50

# make sure to SHUFFLE your data
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size)
test_loader = DataLoader(test_data, shuffle=False, batch_size=batch_size)

In [None]:
# obtain one batch of training data
dataiter = iter(train_loader)
sample_x, sample_y = next(dataiter)

print('Sample input size: ', sample_x.size()) # batch_size, seq_length
print('Sample input: \n', sample_x)
print('Sample input: \n', sample_y)

**GRU model code**

In [None]:
class GRU_model(nn.Module):
    def __init__(self, n_layers, hidden_dim, n_vocab, embed_dim, n_classes, device):
        super(GRU_model, self).__init__()
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.device = device

        #구현 필요
        self.embd = nn.Embedding(n_vocab, embed_dim)
        self.gru = nn.GRU(embed_dim, self.hidden_dim, num_layers=self.n_layers, batch_first=True)
        self.out = nn.Linear(self.hidden_dim, n_classes)

    def forward(self, x):
        #구현 필요
        x = self.embd(x)
        h_0 = self._init_state(batch_size=x.size(0))# 첫번째 히든 스테이트를 0벡터로 초기화
        x, _ = self.gru(x, h_0)# GRU의 리턴값은 (배치 크기, 시퀀스 길이, 은닉 상태의 크기)
        h_t = x[:, -1, :]# (배치 크기, 은닉 상태의 크기)의 텐서로 크기가 변경됨. 즉, 마지막 time-step의 은닉 상태만 가져온다.
        logit = self.out(h_t)# (배치 크기, 은닉 상태의 크기) -> (배치 크기, 출력층의 크기)
        return logit

    def _init_state(self, batch_size):
        #구현 필요
        new_state = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(self.device)
        return new_state

In [None]:
n_layers = 1
vocab_size = len(vocab) + 1  # extra 1 for <pad>
hidden_dim = 128
embed_dim = 100
n_classes = 2

#구현 필요
model = GRU_model(n_layers, hidden_dim, vocab_size, embed_dim, n_classes, device).to(device)

**Train and evaluation**

In [None]:
def train(model, criterion, optimizer, data_loader):
    model.train()
    train_loss = 0
    for i, (x, y) in enumerate(data_loader):
        x, y = x.to(device), y.to(device)
        
        optimizer.zero_grad()
        logit = model(x)
        loss = criterion(logit, y)
        loss.backward()
        optimizer.step()

        train_loss += loss.item() * x.size(0)
      
    return train_loss / len(data_loader.dataset)

def evaluate(model, data_loader):
    model.eval()
    corrects, total_loss = 0, 0
    for i, (x, y) in enumerate(data_loader):
        x, y = x.to(device), y.to(device)

        logit = model(x)
        corrects += (logit.max(1)[1].view(y.size()).data == y.data).sum()
    size = len(data_loader.dataset)
    
    avg_accuracy = 100.0 * corrects / size
    return avg_accuracy

In [None]:
num_epochs = 10
lr = 0.001

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)

for e in range(1, num_epochs+1):
    #구현 필요
    train_loss = train(model, criterion, optimizer, train_loader)
    test_accuracy = evaluate(model, test_loader)
    print("[Epoch: %2d] train loss : %5.2f | test accuracy : %5.2f" % (e, train_loss, test_accuracy))

**LSTM model code**

In [None]:
class LSTM_model(nn.Module):
    def __init__(self, n_layers, hidden_dim, n_vocab, embed_dim, n_classes, device):
        super(LSTM_model, self).__init__()
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.device = device

        #구현 필요
        self.embed = nn.Embedding(n_vocab, embed_dim)
        self.lstm = nn.LSTM(embed_dim, self.hidden_dim, num_layers=self.n_layers, batch_first=True)
        self.out = nn.Linear(self.hidden_dim, n_classes)
    def forward(self, x):
        #구현 필요
        x = self.embed(x)
        h_0 = self._init_state(batch_size=x.size(0))# 첫번째 히든 스테이트를 0벡터로 초기화
        x, _ = self.lstm(x, h_0)# LSTM의 리턴값 또한 (배치 크기, 시퀀스 길이, 은닉 상태의 크기)
        h_t = x[:, -1, :]# (배치 크기, 은닉 상태의 크기)의 텐서로 크기가 변경됨. 즉, 마지막 time-step의 은닉 상태만 가져온다.
        logit = self.out(h_t)# (배치 크기, 은닉 상태의 크기) -> (배치 크기, 출력층의 크기)
        return logit

    def _init_state(self, batch_size):
        #구현 필요
        new_cell_state = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(self.device)
        new_hidden_state = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(self.device)
        return (new_hidden_state, new_cell_state)

**LSTM training and evaluation**

In [None]:
#구현 필요
model = LSTM_model(n_layers, hidden_dim, vocab_size, embed_dim, n_classes, device).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
for e in range(1, num_epochs+1):
    #구현 필요
    train_loss = train(model, criterion, optimizer, train_loader)
    test_accuracy = evaluate(model, test_loader)
    print("[Epoch: %2d] train loss : %5.2f | test accuracy : %5.2f" % (e, train_loss, test_accuracy))

**Vanilla RNN model code**

In [None]:
class RNN_model(nn.Module):
    def __init__(self, n_layers, hidden_dim, n_vocab, embed_dim, n_classes, device):
        super(RNN_model, self).__init__()
        self.n_layers = n_layers
        self.hidden_dim = hidden_dim
        self.device = device

        #구현 필요
        self.embed = nn.Embedding(n_vocab, embed_dim)
        self.rnn = nn.RNN(embed_dim, self.hidden_dim, num_layers=self.n_layers, batch_first=True)
        self.out = nn.Linear(self.hidden_dim, n_classes)
    def forward(self, x):
        #구현 필요
        x = self.embed(x)
        h_0 = self._init_state(batch_size=x.size(0))# 첫번째 히든 스테이트를 0벡터로 초기화
        x, _ = self.rnn(x, h_0)# RNN의 리턴값은 (배치 크기, 시퀀스 길이, 은닉 상태의 크기)
        h_t = x[:, -1, :]# (배치 크기, 은닉 상태의 크기)의 텐서로 크기가 변경됨. 즉, 마지막 time-step의 은닉 상태만 가져온다.
        logit = self.out(h_t)# (배치 크기, 은닉 상태의 크기) -> (배치 크기, 출력층의 크기)
        return logit

    def _init_state(self, batch_size):
        #구현 필요        
        new_state = torch.zeros(self.n_layers, batch_size, self.hidden_dim).to(self.device)
        return new_state

In [None]:
#구현 필요
model = RNN_model(n_layers, hidden_dim, vocab_size, embed_dim, n_classes, device).to(device)

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
for e in range(1, num_epochs+1):
    #구현 필요
    train_loss = train(model, criterion, optimizer, train_loader)
    test_accuracy = evaluate(model, test_loader)

    print("[Epoch: %2d] train loss : %5.2f | test accuracy : %5.2f" % (e, train_loss, test_accuracy))