[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/toby-htx/ONNX-Sharing-Session/blob/main/BiLSTM_in_PyTorch_W2V.ipynb)

In [None]:
import gensim.downloader as api

w2v = api.load("word2vec-google-news-300") 

In [None]:
!git clone https://github.com/toby-htx/Onnx-Sharing-Session.git

In [None]:
import pandas as pd
from sklearn import preprocessing
import re

def process_text(document):
     
    # Remove extra white space from text
    document = re.sub(r'\s+', ' ', document, flags=re.I)
         
    # Remove all the special characters from text
    document = re.sub(r'\W', ' ', str(document))
 
    return document

In [None]:
df = pd.read_csv('./Onnx-Sharing-Session/Data/Isear(Fear&Joy).csv')
df = df[['Emotion','Statement']]
df['preprocessedStatement'] = df.Statement.apply(process_text)

le = preprocessing.LabelEncoder()
df['Emotion'] = le.fit_transform(df['Emotion'])

X = df['preprocessedStatement'].tolist()
Y = df.pop('Emotion').tolist()

train_iter = (zip(Y,X))

In [None]:
import torch
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

tokenizer = get_tokenizer('basic_english')

def yield_tokens(data_iter):
    for _, text in data_iter:
        yield tokenizer(text)

vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials=["<unk>"])
vocab.set_default_index(vocab["<unk>"])

In [None]:
import numpy as np

emb_dim=300
vocab_size = vocab.__len__()
weights_matrix = np.zeros((vocab_size, emb_dim))
words_found = 0

for i, word in enumerate(vocab.get_itos()):
    try: 
        weights_matrix[i] = w2v[word]
        words_found += 1
    except KeyError:
        pass
        # weights_matrix[i] = np.zeros((1, emb_dim))
        # weights_matrix[i] = np.random.rand(1, emb_dim)

In [None]:
from sklearn.model_selection import train_test_split

x_train, x_test, y_train, y_test= train_test_split(X,Y,test_size=0.05,stratify = Y) 

x_val, y_val = x_train[:100], y_train[:100] 
x_train, y_train = x_train[100:], y_train[100:]

train_data = list(zip(y_train,x_train))
valid_data = list(zip(y_val,x_val))
test_data = list(zip(y_test,x_test))

In [None]:
text_pipeline = lambda x: vocab(tokenizer(x))
label_pipeline = lambda x: int(x)

In [None]:
from torch.nn.utils.rnn import pad_sequence

def collate_batch(batch):

    label_list, text_list, text_len  = [], [], []
   
    for (_label,_text) in batch:
        label_list.append(_label)
        processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
        text_list.append(processed_text)
        text_len.append(len(processed_text))
   
    label_list = torch.tensor(label_list, dtype=torch.int64)

    text_len = torch.tensor(text_len, dtype=torch.int64)
   
    text_list_padded = pad_sequence(text_list, batch_first=True, padding_value=0)
   
    return label_list, text_list_padded, text_len

In [None]:
class LSTM_W2V(torch.nn.Module) :
    def __init__(self, vocab_size, embedding_dim, hidden_dim, weights) :
        super().__init__()

        self.hidden_dim = hidden_dim
        self.embeddings = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.embeddings.weight.data.copy_(torch.from_numpy(weights))
        self.embeddings.weight.requires_grad = False ## freeze embedding
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.maxpool = nn.MaxPool1d(1)
        self.avgpool = nn.AvgPool1d(1)
        self.linear = nn.Linear(hidden_dim*2, hidden_dim*2)
        self.linear2 = nn.Linear(hidden_dim*2, 2)

    def forward(self, x, text_len):
        
        h0 = torch.zeros(2, x.size(0), self.hidden_dim)
        c0 = torch.zeros(2, x.size(0), self.hidden_dim)
  
        x = self.embeddings(x)
        packed_embedded = pack_padded_sequence(input=x, lengths=text_len, batch_first=True, enforce_sorted=False)
        lstm_out, (ht, ct) = self.lstm(packed_embedded, (h0,c0))
        lstm_out, output_lengths = pad_packed_sequence(lstm_out, batch_first=True)

        out_max_pool=self.maxpool(lstm_out)
        out_avg_pool=self.avgpool(lstm_out)

        out = torch.cat((out_max_pool, out_avg_pool), 1)
        out = out[:, -1, :]

        out = F.relu(self.linear(out))
        preds = self.linear2(out)
            
        return preds

In [None]:
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence
import torch.nn.functional as F

#embedding_dim=300
hidden_dim=32

model = LSTM_W2V(vocab_size, emb_dim, hidden_dim, weights_matrix)

In [None]:
from torch.utils.data import DataLoader

BATCH_SIZE = 64

train_dl = DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
val_dl = DataLoader(valid_data, batch_size=BATCH_SIZE,collate_fn=collate_batch)
test_dl = DataLoader(test_data, batch_size=1,collate_fn=collate_batch)

In [None]:
def train_model(model, epochs=10, lr=0.001):
    parameters = filter(lambda p: p.requires_grad, model.parameters())
    optimizer = torch.optim.Adam(parameters, lr=lr)
        
    for i in range(epochs):
        model.train()
        sum_loss = 0.0
        total = 0
        for y, x, len in train_dl:
            y = y.long()
            x = x.long()
            y_pred = model(x, len)
            optimizer.zero_grad()
            loss = F.cross_entropy(y_pred, y)
            loss.backward()
            optimizer.step()
            sum_loss += loss.item()*y.shape[0]
            total += y.shape[0]
        val_loss, val_acc = validation_metrics(model, val_dl)
        if i % 5 == 1:
            print("train loss %.3f, val loss %.3f, val accuracy %.3f" % (sum_loss/total, val_loss, val_acc))
        
def validation_metrics(model, valid_dl):
    model.eval()
    correct = 0
    total = 0
    sum_loss = 0.0
    for y, x, len in valid_dl:
        y = y.long()
        x = x.long()
        y_hat = model(x, len)
        loss = F.cross_entropy(y_hat, y)
        pred = torch.max(y_hat, 1)[1]
        correct += (pred == y).float().sum()
        total += y.shape[0]
        sum_loss += loss.item()*y.shape[0]
    return sum_loss/total, correct/total

def predict_test_cases(model, test_dl):
    model.eval()
    pred_list = []
    with torch.no_grad():
        for _, x, len in test_dl:
            x = x.long()
            y_hat = model(x, len)
            pred = torch.max(y_hat, 1)[1]
            pred_list.append(pred)
    return pred_list

In [None]:
train_model(model, epochs=30)

train loss 0.693, val loss 0.694, val accuracy 0.480
train loss 0.687, val loss 0.696, val accuracy 0.480
train loss 0.684, val loss 0.696, val accuracy 0.480
train loss 0.687, val loss 0.694, val accuracy 0.480
train loss 0.683, val loss 0.699, val accuracy 0.480
train loss 0.683, val loss 0.698, val accuracy 0.480


In [None]:
pred_list = predict_test_cases(model, test_dl)

In [None]:
from sklearn.metrics import classification_report

print(classification_report(y_test, pred_list))

              precision    recall  f1-score   support

           0       0.77      0.80      0.78        54
           1       0.79      0.76      0.78        55

    accuracy                           0.78       109
   macro avg       0.78      0.78      0.78       109
weighted avg       0.78      0.78      0.78       109



In [None]:
torch.save(model.state_dict(), 'saved_weights.pt')