<a href="https://colab.research.google.com/github/omidkhalafbeigi/CBRCMN_WordBased/blob/main/BiLSTM_Word.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
import os
drive.mount('/content/drive')
os.environ['CUDA_LAUNCH_BLOCKING'] = "1"
!pip install torchinfo

Mounted at /content/drive
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting torchinfo
  Downloading torchinfo-1.7.0-py3-none-any.whl (22 kB)
Installing collected packages: torchinfo
Successfully installed torchinfo-1.7.0


In [None]:
import torch
from torch import nn
from torch import optim
import numpy as np
from gc import collect
import pickle
import pandas as pd
from sklearn.metrics import f1_score, accuracy_score
from random import shuffle
from torchinfo import summary
from IPython.display import clear_output

In [None]:
if torch.cuda.is_available(): device = 'cuda'
else: device = 'cpu'

In [None]:
words = pickle.load(open('/content/drive/MyDrive/elmo_word+char_based/words_dict.pkl', mode='rb'))
words_count = len(words.keys()) + 1 # For padding in word based

In [None]:
class Model(nn.Module):
  def __init__(self, embed_dim, rnn_hidden_size):
    super(Model, self).__init__()
    self.embed = nn.Embedding(num_embeddings=words_count, embedding_dim=embed_dim, device=device)
    self.bi_lstm = nn.LSTM(input_size=embed_dim, hidden_size=rnn_hidden_size, num_layers=1, bias=True,
                           batch_first=True, dropout=0, bidirectional=True, device=device)
    self.linear_after_rnn = nn.Linear(in_features=rnn_hidden_size * 2, out_features=300, bias=True, device=device)
    self.output_layer = nn.Linear(in_features=300, out_features=3, bias=True, device=device)
  def forward(self, X):
    output = self.embed(X).to(device)
    output = self.bi_lstm(output)[0]
    output = torch.max(output, dim=1).values
    output = self.linear_after_rnn(output)
    output = self.output_layer(output)
    return output

In [None]:
embed_dim = 100
rnn_hidden_size = 256
slices = ['/content/drive/MyDrive/elmo_word+char_based/slice1.npz', '/content/drive/MyDrive/elmo_word+char_based/slice2.npz',
          '/content/drive/MyDrive/elmo_word+char_based/slice3.npz', '/content/drive/MyDrive/elmo_word+char_based/slice4.npz',
          '/content/drive/MyDrive/elmo_word+char_based/slice5.npz', '/content/drive/MyDrive/elmo_word+char_based/slice6.npz',
          '/content/drive/MyDrive/elmo_word+char_based/slice7.npz', '/content/drive/MyDrive/elmo_word+char_based/slice8.npz',
          '/content/drive/MyDrive/elmo_word+char_based/slice9.npz']

In [None]:
model = Model(embed_dim, rnn_hidden_size)
optimizer = optim.Adam(params=model.parameters(), lr=1e-4)
print(summary(model))
model.train()

batch_size = 32
epochs = 10
ce = nn.CrossEntropyLoss()

batch_size_test = 1024
metrics_epoch = list()
test_set = np.load('drive/MyDrive/elmo_word+char_based/slice_test.npz')
labels_test = np.array(test_set['labels'], dtype=np.int32)
test_set = torch.Tensor(test_set['word_based']).type(torch.int32).to('cpu')
for label_idx in range(len(labels_test)):
  if labels_test[label_idx] == -1: labels_test[label_idx] = 0
  elif labels_test[label_idx] == 0: labels_test[label_idx] = 1
  elif labels_test[label_idx] == 1: labels_test[label_idx] = 2

for epoch in range(epochs):
  shuffle(slices)
  epoch_loss = list()
  for slice_idx in range(len(slices)):
    dataset = np.load(slices[slice_idx])
    labels = torch.Tensor(dataset['labels'].astype(np.int64)).type(torch.int64).to('cpu')
    dataset_word_based = torch.Tensor(dataset['word_based']).type(torch.int32).to('cpu')
    for label_idx in range(len(labels)):
      if labels[label_idx] == -1: labels[label_idx] = 0
      elif labels[label_idx] == 0: labels[label_idx] = 1
      elif labels[label_idx] == 1: labels[label_idx] = 2
    dataset_size = dataset_word_based.shape[0]
    idx = torch.randperm(dataset_word_based.shape[0])
    dataset_word_based = dataset_word_based[idx] # Randomization
    labels = labels[idx] # Randomization
    for batch in range(batch_size, dataset_word_based.shape[0] + batch_size, batch_size):
      optimizer.zero_grad()
      X_word_based = dataset_word_based[batch - batch_size:batch].to(device)
      y = labels[batch - batch_size:batch].to(device)
      output = model(X_word_based).squeeze()
      loss = ce(output, y)
      loss.backward()
      optimizer.step()
      loss_value = loss.item()
      epoch_loss.append(loss_value)
      for metrics in metrics_epoch:
        print(f'Epoch: {metrics[0]} - Accuracy: {metrics[1]} - F1: {metrics[2]} - Training Loss: {metrics[3]}')
      print(f'Epoch: {epoch + 1} - Slice index: {slice_idx + 1} - Batch: {batch} - Loss: {loss_value} - Dataset size: {dataset_size}')    
      clear_output(wait=True)
    del X_word_based, y, labels, output
    collect()
  
  model.eval()
  y_pred = list()
  for batch in range(batch_size_test, test_set.shape[0] + batch_size_test, batch_size_test):
    X = test_set[batch - batch_size_test:batch].to(device)
    output = torch.argmax(model(X).squeeze(), dim=1).detach().cpu().numpy()
    for y in output:
      y_pred.append(y)
  y_pred = np.array(y_pred)
  accuracy = accuracy_score(labels_test, y_pred)
  f1 = f1_score(labels_test, y_pred, average='weighted')
  with open('/content/drive/MyDrive/elmo_word+char_based/DL_KerasEmb_BiLSTM_WordBased_Log.txt', mode='a') as writer:
    writer.write(f'Epoch: {epoch+1} - Test Accuracy: {accuracy} - Test F1: {f1} - Train Loss (mean): {np.mean(epoch_loss)}\n')
  metrics_epoch.append([epoch+1, accuracy, f1, np.mean(epoch_loss)])
  model.train()
  del X, output, y_pred, accuracy, f1
  collect()

  print('---------------------')
  print(f'Epoch: {epoch + 1} - Loss: {np.mean(epoch_loss)}')
  print('---------------------')

In [None]:
model.eval()
batch_size = 1024
test_set = np.load('drive/MyDrive/elmo_word+char_based/slice_test.npz')
y_pred = list()

labels = torch.Tensor(test_set['labels'].astype(np.int64)).type(torch.int64).to('cpu')
test_word_based = torch.Tensor(test_set['word_based']).type(torch.int32).to('cpu')
for label_idx in range(len(labels)):
  if labels[label_idx] == -1: labels[label_idx] = 0
  elif labels[label_idx] == 0: labels[label_idx] = 1
  elif labels[label_idx] == 1: labels[label_idx] = 2
for batch in range(batch_size, test_word_based.shape[0] + batch_size, batch_size):
  X_word_based = test_word_based[batch - batch_size:batch].to(device)
  output = torch.argmax(model(X_word_based).squeeze(), dim=1).detach().cpu().numpy()
  for y in output:
    y_pred.append(y)
  print(batch)
y_pred = np.array(y_pred)

f1 = f1_score(labels, y_pred, average='weighted')
accuracy = accuracy_score(labels, y_pred)
print(f'F1 Score: {f1}')
print(f'Accuracy: {accuracy}')

1024
2048
3072
4096
5120
6144
7168
8192
9216
10240
F1 Score: 0.7498991637053165
Accuracy: 0.7497
