### Load Dataset

In [None]:
import pandas as pd

df_all = pd.read_csv('data/three_class.csv')
df_positive = df_all.copy()
df_positive["emotion"] = df_positive["emotion"].map({1:1, 0:0, -1:0})
df_neutral = df_all.copy()
df_neutral["emotion"] = df_neutral["emotion"].map({1:0, 0:1, -1:0})
df_negative = df_all.copy()
df_negative["emotion"] = df_negative["emotion"].map({1:0, 0:0, -1:1})

### Train Word Vector

In [None]:
wv_input = df_all['context'].map(lambda s: s.split(" "))   # [for w in s.split(" ") if w not in stopwords]                        

In [None]:
from gensim import models

# Word2Vec
word2vec = models.Word2Vec(wv_input, 
                           vector_size=64,
                           min_count=1,
                           epochs=1000)

### Using Three BiLSTM

In [None]:
import torch
from torch import nn
from torch.nn.utils.rnn import pad_sequence, pack_padded_sequence,pad_packed_sequence
from torch.utils.data import Dataset, DataLoader
import numpy as np

#device = "cuda:0" if torch.cuda.is_available() else "cpu"
device = "cpu"

In [None]:
# Super Character
learning_rate = 5e-4
input_size = 300
num_epoches = 8
batch_size = 10
embed_size = 64
hidden_size = 64
num_layers = 2

In [None]:
class MyDataset(Dataset):
    def __init__(self, df):
        self.data = []
        self.label = df["emotion"].tolist()
        for s in df["context"].tolist():
            vectors = []
            for w in s.split(" "):
                if w in word2vec.wv.key_to_index:
                    vectors.append(word2vec.wv[w])   # replace word to vec
            vectors = torch.Tensor(vectors)
            self.data.append(vectors)
    
    def __getitem__(self, index):
        data = self.data[index]
        label = self.label[index]
        return data, label

    def __len__(self):
        return len(self.label)


def collate_fn(data):
    """
    :param data: data, label
    :return: vec-data、data length、label
    """
    data.sort(key=lambda x: len(x[0]), reverse=True) # reverse sort
    data_length = [len(sq[0]) for sq in data]
    x = [i[0] for i in data]
    y = [i[1] for i in data]
    data = pad_sequence(x, batch_first=True, padding_value=0)   # pad sequence
    return data, torch.tensor(y, dtype=torch.float32), data_length

In [None]:
# Network Structure
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True, bidirectional=True)
        self.fc = nn.Linear(hidden_size * 2, 1) 
        self.sigmoid = nn.Sigmoid()

    def forward(self, x, lengths):
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)
        
        packed_input = torch.nn.utils.rnn.pack_padded_sequence(input=x, lengths=lengths, batch_first=True)
        packed_out, (h_n, h_c) = self.lstm(packed_input, (h0, c0))

        lstm_out = torch.cat([h_n[-2], h_n[-1]], 1)
        out = self.fc(lstm_out)
        out = self.sigmoid(out)
        return out

In [None]:
from sklearn import metrics

# Test
def test():
    y_pos, y_neu, y_neg, y_true = [], [], [], []

    with torch.no_grad():
        for x, labels, lengths in test_loader:
            x = x.to(device)
            outputs_pos = lstm_positive(x, lengths) 
            outputs_pos = outputs_pos.view(-1)
            outputs_neu = lstm_neutral(x, lengths) 
            outputs_neu = outputs_neu.view(-1)
            outputs_neg = lstm_negative(x, lengths) 
            outputs_neg = outputs_neg.view(-1)
            y_pos.append(outputs_pos)
            y_neu.append(outputs_neu)
            y_neg.append(outputs_neg)
            y_true.append(labels)
    
    y_pos = torch.cat(y_pos)
    y_neu = torch.cat(y_neu)
    y_neg = torch.cat(y_neg)
    y_pred = torch.cat((y_pos.reshape(-1, 1), y_neu.reshape(-1, 1), y_neg.reshape(-1, 1)), 1)
    y_true = torch.cat(y_true)
    y_prob = torch.argmax(y_pred, dim=1)
    y_pred = []
    for i in y_prob:
        if i == 0:
            y_pred.append(1)
        elif i == 1:
            y_pred.append(0)
        else:
            y_pred.append(-1)
    
    print("Accuracy score:", metrics.accuracy_score(y_true, y_pred))
    print("f-1 score:", metrics.f1_score(y_true, y_pred, average='macro'))

In [None]:
# Training
n = df_all.shape[0]
for k in range(5):
    df_test = df_all[round(n/5*k):round(n/5*(k+1))]
    df_train_positive = pd.concat([df_positive[0:round(n/5*k)], df_positive[round(n/5*(k+1)):n+1]])
    df_train_neutral = pd.concat([df_neutral[0:round(n/5*k)], df_neutral[round(n/5*(k+1)):n+1]])
    df_train_negative = pd.concat([df_negative[0:round(n/5*k)], df_negative[round(n/5*(k+1)):n+1]])

    # training data
    train_data_positive = MyDataset(df_train_positive)
    train_loader_positive = DataLoader(train_data_positive, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
    train_data_neutral = MyDataset(df_train_neutral)
    train_loader_neutral = DataLoader(train_data_neutral, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)
    train_data_negative = MyDataset(df_train_negative)
    train_loader_negative = DataLoader(train_data_negative, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)

    # test data
    test_data = MyDataset(df_test)
    test_loader = DataLoader(test_data, batch_size=batch_size, collate_fn=collate_fn, shuffle=True)

    lstm_positive = LSTM(embed_size, hidden_size, num_layers)
    lstm_neutral = LSTM(embed_size, hidden_size, num_layers)
    lstm_negative = LSTM(embed_size, hidden_size, num_layers)

    # Loss and Optimizer
    criterion = nn.BCELoss()
    optimizer_positive = torch.optim.Adam(lstm_positive.parameters(), lr=learning_rate)
    optimizer_neutral = torch.optim.Adam(lstm_neutral.parameters(), lr=learning_rate)
    optimizer_negative = torch.optim.Adam(lstm_negative.parameters(), lr=learning_rate)

    for epoch in range(num_epoches):
        total_loss = 0
        for i, (x, labels, lengths) in enumerate(train_loader_positive):
            x = x.to(device)
            labels = labels.to(device)
            outputs = lstm_positive(x, lengths) 
            logits = outputs.view(-1)  
            loss = criterion(logits, labels)   
            total_loss += loss
            optimizer_positive.zero_grad()              
            loss.backward(retain_graph=True)    
            optimizer_positive.step()                    
            if (i+1) % 10 == 0:
                total_loss = 0
    
        total_loss = 0
        for i, (x, labels, lengths) in enumerate(train_loader_neutral):
            x = x.to(device)
            labels = labels.to(device)
            outputs = lstm_neutral(x, lengths) 
            logits = outputs.view(-1)  
            loss = criterion(logits, labels)   
            total_loss += loss
            optimizer_neutral.zero_grad()              
            loss.backward(retain_graph=True)    
            optimizer_neutral.step()                    
            if (i+1) % 10 == 0:
                total_loss = 0
    
        total_loss = 0
        for i, (x, labels, lengths) in enumerate(train_loader_negative):
            x = x.to(device)
            labels = labels.to(device)
            outputs = lstm_negative(x, lengths) 
            logits = outputs.view(-1)  
            loss = criterion(logits, labels)   
            total_loss += loss
            optimizer_negative.zero_grad()              
            loss.backward(retain_graph=True)    
            optimizer_negative.step()                    
            if (i+1) % 10 == 0:
                total_loss = 0
    
        test()