In [None]:
!pip install torchsummary
from torchsummary import summary

In [None]:
!sudo apt install default-jre

In [None]:
!sudo apt-get --yes --force-yes install default-jdk 

In [None]:
import os
import pandas as pd
import numpy as np
import pandas as pd
import time
import torch
import torch.nn as nn
from torch.nn.functional import one_hot
import torch.optim as optim 
from torch.optim import lr_scheduler
from torch import Tensor
from torch.utils.data import Dataset, DataLoader
from collections import Counter 
from gensim.utils import simple_preprocess 
from tqdm import tqdm

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
device

In [None]:
TRAIN = True #from scratch
PRETRAINED = False
INFER = True
TEST = True
assert TRAIN != PRETRAINED, 'TRAIN and PRETRAINED should be different'

In [None]:
batch_size = 1 
learning_rate = 1e-04
display_step = 2500
epochs = 25
num_classes = 7
max_len = 75
pretrained_segmentation = True

In [None]:
pretrained_path = '/kaggle/input/pretrained-rnn/RNN.pth'

In [None]:
!pip install py_vncorenlp
import py_vncorenlp
import os
if not os.path.exists('/kaggle/working/vncorenlp'):
    os.makedirs('/kaggle/working/vncorenlp')
    
py_vncorenlp.download_model(save_dir='/kaggle/working/vncorenlp'); 
rdrsegmenter = py_vncorenlp.VnCoreNLP(annotators=["wseg"], save_dir='/kaggle/working/vncorenlp');

In [None]:
def get_data(path):
    data = pd.read_excel(path, sheet_name=None)['Sheet1']
    data.columns = ['index', 'Emotion', 'Sentence'] 
    data.drop(columns=['index'], inplace=True)
    return data

train_data = get_data('/kaggle/input/uit-vsmec/UIT-VSMEC/train_nor_811.xlsx')
valid_data = get_data('/kaggle/input/uit-vsmec/UIT-VSMEC/valid_nor_811.xlsx')
test_data = get_data('/kaggle/input/uit-vsmec/UIT-VSMEC/test_nor_811.xlsx') 

In [None]:
word2vec_path = '/kaggle/input/word2vec-300dim/word2vec_vi_words_300dims.txt'
file = open(word2vec_path, "r")
nlines, ebb_dim = file.readline().split() 
nlines, ebb_dim = int(nlines), int(ebb_dim)
word2vec_idx = dict()
word2vec_weights = list()
word2vec_weights.append([0 for j in range(ebb_dim)])
print('getting embeddings:')
for i in tqdm(range(1, nlines + 1)):
    line = file.readline() 
    while line != "":  
        line = line.split()
        word = "_".join(line[0:len(line)-ebb_dim]).lower()
        if word not in word2vec_idx:
            word2vec_weights.append([float(j) for j in line[len(line)-ebb_dim:len(line)]]) 
            word2vec_idx[word] = i  
            break  
        line = file.readline()    

In [None]:
dict_map = {
    "òa": "oà",
    "Òa": "Oà",
    "ÒA": "OÀ",
    "óa": "oá",
    "Óa": "Oá",
    "ÓA": "OÁ",
    "ỏa": "oả",
    "Ỏa": "Oả",
    "ỎA": "OẢ",
    "õa": "oã",
    "Õa": "Oã",
    "ÕA": "OÃ",
    "ọa": "oạ",
    "Ọa": "Oạ",
    "ỌA": "OẠ",
    "òe": "oè",
    "Òe": "Oè",
    "ÒE": "OÈ",
    "óe": "oé",
    "Óe": "Oé",
    "ÓE": "OÉ",
    "ỏe": "oẻ",
    "Ỏe": "Oẻ",
    "ỎE": "OẺ",
    "õe": "oẽ",
    "Õe": "Oẽ",
    "ÕE": "OẼ",
    "ọe": "oẹ",
    "Ọe": "Oẹ",
    "ỌE": "OẸ",
    "ùy": "uỳ",
    "Ùy": "Uỳ",
    "ÙY": "UỲ",
    "úy": "uý",
    "Úy": "Uý",
    "ÚY": "UÝ",
    "ủy": "uỷ",
    "Ủy": "Uỷ",
    "ỦY": "UỶ",
    "ũy": "uỹ",
    "Ũy": "Uỹ",
    "ŨY": "UỸ",
    "ụy": "uỵ",
    "Ụy": "Uỵ",
    "ỤY": "UỴ",
    }

def tone_normalize(text, dict_map):
    for i, j in dict_map.items():
        text = text.replace(i, j)
    return text

def preprocess_text(text):
    text = " ".join(simple_preprocess(text)).strip()
    
    output = rdrsegmenter.word_segment(text)
    text = " ".join(output)     
    
    text = tone_normalize(text, dict_map)  
    return text

def clean_text(text): 
    text = preprocess_text(text) 
    tokens = text.split()  
    tokens = [w for w in tokens if w in word2vec_idx]
    if (max_len != -1):
        tokens = tokens[0:max_len]     
    tokens = ' '.join(tokens).strip()
    return tokens

In [None]:
def emotions_map(text):
        if text=='Enjoyment':
            return 0
        elif text=='Disgust':
            return 1
        elif text=='Sadness':
            return 2
        elif text=='Anger':
            return 3
        elif text=='Surprise':
            return 4
        elif text=='Fear':
            return 5
        else:
            return 6

In [None]:
train_data['Sentence'] = train_data['Sentence'].map(clean_text)
valid_data['Sentence'] = valid_data['Sentence'].map(clean_text)

train_data['Emotion'] = train_data['Emotion'].map(emotions_map)
valid_data['Emotion'] = valid_data['Emotion'].map(emotions_map) 

In [None]:
train_data

In [None]:
counts = [1/(train_data['Emotion'].value_counts()[i]) for i in range(num_classes)]
weights = [i/(sum(counts)) for i in counts]
weights

In [None]:
class DataClass(Dataset):
    def __init__(self, input_list, output_list, max_len = max_len, train=False): 
        super(DataClass, self).__init__()
        self.input_list = input_list
        self.output_list = output_list
        self.train = train
        self.max_len = max_len
 
        encoded_text = [torch.Tensor([word2vec_idx[word] for word in sentence.strip().split() if word in word2vec_idx] ).to(device) for sentence in self.input_list]  
        self.input_list = encoded_text 
        self.output_list = [torch.tensor(int(y)) for y in self.output_list]
        self.output_list = [one_hot(y, num_classes = num_classes) for y in self.output_list]
        
    def __getitem__(self, index):   
        data = self.input_list[index]
        label = self.output_list[index]
       
        return data, label
    
    def __len__(self):
        return len(self.input_list)

In [None]:
train_input_list = [x for x in train_data["Sentence"]]
train_output_list = [x for x in train_data["Emotion"]]
train = [x for x in list(zip(train_input_list, train_output_list)) if len(x[0]) >0] 
train_input_list, train_output_list  = [list(t) for t in zip(*train)]

valid_input_list = [x for x in valid_data["Sentence"]]
valid_output_list = [x for x in valid_data["Emotion"]]
valid = [x for x in list(zip(valid_input_list, valid_output_list)) if len(x[0]) >0] 
valid_input_list, valid_output_list  = [list(t) for t in zip(*valid)]

In [None]:
train_set = DataClass(train_input_list, train_output_list)
valid_set = DataClass(valid_input_list, valid_output_list)

train_dataloader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
valid_dataloader = DataLoader(valid_set, batch_size=batch_size, shuffle=True)

In [None]:
def save_model(model, optimizer, path):
    checkpoint = {
        "model": model.state_dict(),
        "optimizer": optimizer.state_dict(),
    }
    torch.save(checkpoint, path)
    print('model saved')

def load_model(model, optimizer, path):
    checkpoint = torch.load(path)
    model.load_state_dict(checkpoint["model"])
    optimizer.load_state_dict(checkpoint['optimizer'])
    return model, optimizer
    print("model loaded")

In [None]:
class Rnn(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(Rnn, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(torch.Tensor(word2vec_weights).to(device))
        self.hidden_size = hidden_size
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        x = x.long() 
        x = self.embedding(x)  
        out, _ = self.rnn(x) 
        out = self.fc(out[:, -1, :])
        return out

In [None]:
from collections import OrderedDict

model = Rnn(ebb_dim, 50, 7) 

if PRETRAINED == True:
    checkpoint = torch.load(pretrained_path) 

    new_state_dict = OrderedDict()
    for k, v in checkpoint['model'].items(): 
        name = k
        new_state_dict[name] = v 
        
    model.load_state_dict(new_state_dict)  
    
model.to(device)
summary(model, torch.Size([max_len]))

In [None]:
def train(train_dataloader, 
          valid_dataloader,
          epoch, display_step):
    
    start_time = time.time()
    train_loss_epoch = 0
    valid_loss_epoch = 0
    train_correct = 0
    valid_correct = 0
    last_loss = 999999999  

    model.train()
    for i, (data,targets) in enumerate(train_dataloader):
        
        data, targets = data.to(device), targets.to(device) 
         
        optimizer.zero_grad()
        
        outputs = model(data) 
        
        loss = loss_function(outputs, torch.max(targets, dim = 1)[1])
        loss.backward()
        
        optimizer.step()
        
        train_correct += (torch.max(outputs, dim = 1)[1] == torch.max(targets, dim = 1)[1]).sum().float()

        train_loss_epoch += loss.item()  
        
        if (i+1) % display_step == 0:
            print('Train Epoch: {} [{}/{} ({}%)] '.format(
                epoch + 1, (i+1) * len(data), len(train_dataloader.dataset), 100 * (i+1) * len(data) / len(train_dataloader.dataset)  
            ))     
                            
    train_loss_epoch/= len(train_set) 
    train_accuracy = train_correct / len(train_set) 
 
    model.eval()
    with torch.no_grad():
        for data, target in valid_dataloader:
            data, target = data.to(device), target.to(device)
            valid_output = model(data)
            valid_loss = nn.CrossEntropyLoss()(valid_output, target.float())    
            valid_loss_epoch += valid_loss.item()
            valid_correct += (torch.max(valid_output, dim = 1)[1] == torch.max(target, dim = 1)[1]).sum().float() 
              
    valid_loss_epoch/= len(valid_set)
    valid_accuracy = valid_correct / len(valid_set) 
        

    print(f"Done epoch #{epoch+1}, time for this epoch: {time.time()-start_time}s") 
    
    train_accuracy = train_accuracy.item()
    valid_accuracy = valid_accuracy.item()
    
    return train_loss_epoch , valid_loss_epoch, train_accuracy, valid_accuracy    

In [None]:
loss_function = nn.CrossEntropyLoss(torch.Tensor(weights).to(device)) 
optimizer = optim.AdamW(params=model.parameters(), lr=learning_rate )

if PRETRAINED == True:
    optimizer.load_state_dict(checkpoint['optimizer'])

In [None]:
if TRAIN == True:
    for epoch in range(epochs):
        train_loss_epoch = 0
        test_loss_epoch = 0 

        (train_loss_epoch, valid_loss_epoch, train_accuracy, valid_accuracy ) = train(train_dataloader, valid_dataloader, epoch, display_step)
        print('Epoch: ' + str(epoch + 1) + '\tTrain_loss: '+str(train_loss_epoch)+ '\tTrain_acc: ' + str(train_accuracy) + '\tVal_loss: ' + str(valid_loss_epoch) + '\tVal_acc: ' + str(valid_accuracy) )

In [None]:
test_data['Sentence'] = test_data['Sentence'].map(clean_text) 
test_data['Emotion'] = test_data['Emotion'].map(emotions_map)

test_output_list = [x for x in test_data["Emotion"]]
test_input_list = [x for x in test_data["Sentence"]]
test = [x for x in list(zip(test_input_list, test_output_list)) if len(x[0]) >0] 
test_input_list, test_output_list  = [list(t) for t in zip(*test)]

test_set = DataClass(test_input_list, test_output_list) 
test_dataloader = DataLoader(test_set, batch_size=batch_size, shuffle=False)

In [None]:
if TEST == True:
    model.eval() 
    predicts = list()
    outputs = list()
    with torch.no_grad():
        for data, target in test_dataloader:
            data, target = data.to(device), target.to(device)
            test_output = model(data) 
            predicts.append(torch.max(test_output, dim = 1)[1].item())
            outputs.append(torch.max(target, dim = 1)[1].item())  
 
    print()
    from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score, classification_report, confusion_matrix
    print(classification_report(predicts, outputs, digits = 4)) 

In [None]:
from string import punctuation 
def reverse_emotion_map(emotion): 
        if emotion == 0:
            return 'Enjoyment'
        elif emotion == 1:
            return 'Disgust'
        elif emotion == 2:
            return 'Sadness'
        elif emotion == 3:
            return 'Anger'
        elif emotion == 4:
            return 'Surprise'
        elif emotion == 5: 
            return "Fear"
        else:
            return "Other"
        
def infer(string):
    model.eval() 
    for punct in punctuation: 
        string = string.replace(punct, " " + punct + " ") 
    old_str = " "
    while old_str != string:
        old_str = string
        string.replace("  ", " ")
    string = string.lower() 
    string = clean_text(string) 
    encoded = torch.Tensor([word2vec_idx[word] for word in string.strip().split() if word in word2vec_idx]).to(device)
    encoded = torch.unsqueeze(encoded,0)
    emotion = model(encoded)  
    emotion = torch.max(emotion, dim = 1)[1].item()           
    emotion = reverse_emotion_map(emotion)
    return emotion           

In [None]:
infer_str = "Hê" 
print(infer(infer_str))