In [87]:
import sys
import os
import re
import string
import json
import urllib.request
import numpy as np

from tqdm import tqdm

import torch
torch.manual_seed(42)
np.random.seed(42)
import torch.autograd as autograd
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

from torch.utils.data import Dataset, DataLoader, TensorDataset

from torchcrf import CRF
from sklearn.metrics import f1_score

torch.manual_seed(1)
device = torch.device("cpu")

In [88]:
BATCH_SIZE = 32

In [89]:
with open('Data/labels.json') as f:
    labels = json.load(f)

In [90]:
# invert the labels
labels_inv = {v: k for k, v in labels.items()}

In [91]:
def read_file(filename):
    with open(filename, 'r') as file:
        text = file.readlines()
    return text

In [92]:
test_data = read_file('Data/sample_input')

In [94]:
embeddings = {}
emb_dim = 50
with open('glove.6B/glove.6B.50d.txt','r') as f:
  for line in f:
    values = line.split()
    word = values[0]
    vector = np.asarray(values[1:],'float32')
    embeddings[word]=vector

In [95]:
# read vocab.json
with open('vocab.json') as f:
    vocab = json.load(f)

In [98]:
def get_data(data):
    sent_idx = []
    all_idx = []
    for line in (data):
        if line != "\n":
            word = line.strip()
            word = word.lower()
            if word in vocab:
                sent_idx.append(vocab[word])
            else:
                sent_idx.append(vocab['<unk>'])
        else:
            sent_idx = np.array(sent_idx)
            all_idx.append(sent_idx)
            sent_idx = []
    return np.asarray(all_idx, dtype=object)

In [99]:
testX = get_data(test_data)

In [100]:
def custom_collate(data):
    
    batch_size = len(data)
    
    max_len = -1
    for i in range(batch_size):
        if len(data[i]) > max_len:
            max_len = len(data[i])
    
    seq_lengths = []
    for i in range(batch_size):
        seq_lengths.append(len(data[i]))
    
    padded_data = []
    mask = []
    for i in range(batch_size):
        padded_data.append(np.pad(data[i], (0, max_len-len(data[i])), 'constant', constant_values=(vocab["<pad>"])))
        mask.append(np.pad(np.ones(len(data[i])), (0, max_len-len(data[i])), 'constant', constant_values=0).astype(bool))
    
    padded_data = torch.from_numpy(np.array(padded_data))
    mask = torch.from_numpy(np.array(mask))

    return [padded_data, seq_lengths, mask]

In [101]:
testDataLoader = DataLoader(testX, batch_size=BATCH_SIZE, shuffle=False, collate_fn=custom_collate)

In [102]:
class BiLSTMCRF(nn.Module):
    def __init__(self, weights_matrix, hidden_dim, tagset_size):
        super(BiLSTMCRF, self).__init__()
        self.hidden_dim = hidden_dim
        self.embedding = nn.Embedding.from_pretrained(weights_matrix, freeze=False)
        embedding_dim = weights_matrix.shape[1]
        # self.embedding = nn.Embedding(len(vocab), embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, bidirectional=True)
        self.dropout_layer = nn.Dropout(p=0.5)
        self.hidden2tag = nn.Linear(hidden_dim*2, tagset_size)
        self.crf = CRF(tagset_size, batch_first=True)
        

    def forward(self, sentence, labels, mask):
        embeds = self.embedding(sentence)
        lstm_out, _ = self.lstm(embeds)
        lstm_out = self.dropout_layer(lstm_out)
        emissions = self.hidden2tag(lstm_out)
        return -self.crf(emissions, labels, mask=mask)

    def predict(self, sentence, mask):
        embeds = self.embedding(sentence)
        lstm_out, _ = self.lstm(embeds)
        lstm_out = self.dropout_layer(lstm_out)
        scores = self.hidden2tag(lstm_out)
        return self.crf.decode(scores, mask=mask)

In [103]:
model = torch.load('model.pt', map_location=torch.device('cpu'))

In [104]:
model.eval()

BiLSTMCRF(
  (embedding): Embedding(7400, 50)
  (lstm): LSTM(50, 256, bidirectional=True)
  (dropout_layer): Dropout(p=0.5, inplace=False)
  (hidden2tag): Linear(in_features=512, out_features=38, bias=True)
  (crf): CRF(num_tags=38)
)

In [105]:
test_preds = []
for batch in testDataLoader:
    X, seq_lens, mask = batch
    predictions = model.predict(X, mask)
    test_preds.extend(predictions)
val_preds = np.array(test_preds, dtype=object)

In [106]:
with open('test_preds.txt', 'w') as f:
    for i in range(len(test_preds)):
        for j in range(len(test_preds[i])):
            f.write(labels_inv[test_preds[i][j]] + '\n')
        f.write("\n")