In [56]:
import json
import torch
import json
import random
import numpy as np
import torch.nn as nn
import torch.optim as optim
import gensim.downloader as api
import matplotlib.pyplot as plt
import torch.nn.functional as F
from torch.nn.utils.rnn import pad_sequence
from sklearn.metrics import accuracy_score, f1_score
from torch.utils.data import Dataset, DataLoader, TensorDataset

In [2]:
# Load pre-trained word embeddings
word_vectors = api.load("word2vec-google-news-300")
glove_vectors = api.load("glove-wiki-gigaword-300")
fasttext_vectors = api.load("fasttext-wiki-news-subwords-300")

In [57]:
# Define the BiLSTM-CRF model
class BiLSTMCRFTagger(nn.Module):
    def __init__(self, embedding_dim, hidden_dim, vocab_size, tagset_size, word_embeddings):
        super(BiLSTMCRFTagger, self).__init__()
        self.embedding = nn.Embedding.from_pretrained(word_embeddings, freeze=True)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, batch_first=True, bidirectional=True)
        self.hidden2tag = nn.Linear(hidden_dim * 2, tagset_size)
        self.crf = CRF(tagset_size)
    
    def forward(self, sentence):
        embeds = self.embedding(sentence)
        lstm_out, _ = self.lstm(embeds)
        tag_space = self.hidden2tag(lstm_out)
        return tag_space

# Define the dataset
class ReviewDataset(Dataset):
    def __init__(self, data_path, word_vectors, max_seq_length):
        self.data_path = data_path
        self.word_vectors = word_vectors
        self.max_seq_length = max_seq_length
        self.data = self.load_data()
        self.word_to_idx, self.label_to_idx = self.prepare_vocab()
        self.X, self.y = self.prepare_data()

    def load_data(self):
        with open(self.data_path, 'r') as f:
            data = json.load(f)
        if isinstance(data, list):
            data = {str(idx): sample for idx, sample in enumerate(data)}
        return data

    def prepare_vocab(self):
        word_to_idx = {word: idx + 1 for idx, word in enumerate(self.word_vectors.index_to_key)}
        label_to_idx = {"O": 1, "B_RESPONDENT": 2, "I_RESPONDENT": 3, "B_DATE": 4, "I_DATE":5, "B_GPE": 6, "I_GPE": 7, "B_PROVISION": 8, 
                        "I_PROVISION": 9, "B_STATUTE": 10, "I_STATUTE": 11, "B_ORG": 12, "B_CASE_NUMBER": 13, "I_CASE_NUMBER": 14, 
                        "B_OTHER_PERSON": 15, "I_OTHER_PERSON": 16, "B_WITNESS": 17, "I_WITNESS": 18, "I_ORG": 19, "B_JUDGE": 20, "I_JUDGE": 21, 
                        "B_PETITIONER": 22, "I_PETITIONER": 23, "B_COURT": 24, "I_COURT": 25, "B_PRECEDENT": 27, "I_PRECEDENT": 0}
        return word_to_idx, label_to_idx

    def prepare_data(self):
        X, y = [], []
        max_text_length = 0
        max_label_length = 0

        for sample_id, sample_data in self.data.items():
            text = sample_data['text'].split()
            labels = sample_data['labels']

            text_indices = [self.word_to_idx.get(word, 0) for word in text]
            label_indices = [self.label_to_idx[label] for label in labels]

            X.append(torch.tensor(text_indices))
            y.append(torch.tensor(label_indices))

            max_text_length = max(max_text_length, len(text_indices))
            max_label_length = max(max_label_length, len(label_indices))

        # Pad both X and y to the maximum length
        X_padded = pad_sequence(X, batch_first=True, padding_value=0)
        y_padded = pad_sequence(y, batch_first=True, padding_value=0)

        # Truncate to max_seq_length if necessary
        X_padded = X_padded[:, :self.max_seq_length]
        y_padded = y_padded[:, :self.max_seq_length]

        # Pad or truncate X and y to the same length
        if X_padded.size(1) > y_padded.size(1):
            y_padded = torch.nn.functional.pad(y_padded, (0, X_padded.size(1) - y_padded.size(1)), value=0)
        elif y_padded.size(1) > X_padded.size(1):
            X_padded = torch.nn.functional.pad(X_padded, (0, y_padded.size(1) - X_padded.size(1)), value=0)

        print("X_padded", X_padded.shape)
        print("Y_padded", y_padded.shape)
        return X_padded, y_padded

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.X[idx], self.y[idx]

# Define the CRF layer
class CRF(nn.Module):
    def __init__(self, num_tags):
        super(CRF, self).__init__()
        self.num_tags = num_tags
        self.transitions = nn.Parameter(torch.randn(num_tags, num_tags))

    def forward(self, feats):
        return feats

In [58]:
# Define hyperparameters
max_seq_length = 100
hidden_dim_word2vec = 125
hidden_dim_glove = 256
hidden_dim_fasttext = 125
tagset_size = 28

# Create data loaders
test_dataset_word2vec = ReviewDataset('NER_test.json', word_vectors, max_seq_length)
test_loader_word2vec = DataLoader(test_dataset_word2vec, batch_size=32, drop_last=True)
test_dataset_glove = ReviewDataset('NER_test.json', glove_vectors, max_seq_length)
test_loader_glove = DataLoader(test_dataset_glove, batch_size=32, drop_last=True)
test_dataset_fasttext = ReviewDataset('NER_test.json', fasttext_vectors, max_seq_length)
test_loader_fasttext = DataLoader(test_dataset_fasttext, batch_size=32, drop_last=True)

X_padded torch.Size([949, 100])
Y_padded torch.Size([949, 100])
X_padded torch.Size([949, 100])
Y_padded torch.Size([949, 100])
X_padded torch.Size([949, 100])
Y_padded torch.Size([949, 100])


In [59]:
word2vec_state_dict = torch.load('t1_model4_word2vec.pt')

In [60]:
glove_state_dict = torch.load('t1_model4_GloVe.pt')

In [61]:
fasttext_state_dict = torch.load('t1_model4_fasttext.pt')

In [62]:
vocab_size_word2vec = len(word_vectors.index_to_key)
vocab_size_glove = len(glove_vectors.index_to_key)
vocab_size_fasttext = len(fasttext_vectors.index_to_key)

In [63]:
word2vec_model = BiLSTMCRFTagger(word_vectors.vector_size, hidden_dim_word2vec, vocab_size_word2vec, tagset_size, torch.FloatTensor(word_vectors.vectors))

In [64]:
glove_model = BiLSTMCRFTagger(glove_vectors.vector_size, hidden_dim_glove, vocab_size_glove, tagset_size, torch.FloatTensor(glove_vectors.vectors))

In [65]:
fasttext_model = BiLSTMCRFTagger(fasttext_vectors.vector_size, hidden_dim_fasttext, vocab_size_fasttext, tagset_size, torch.FloatTensor(fasttext_vectors.vectors))

In [66]:
word2vec_model.load_state_dict(word2vec_state_dict)

<All keys matched successfully>

In [67]:
glove_model.load_state_dict(glove_state_dict)

<All keys matched successfully>

In [68]:
fasttext_model.load_state_dict(fasttext_state_dict)

<All keys matched successfully>

In [85]:
def evaluate_model(model, test_loader):
    test_loss = 0
    test_all_preds = []
    test_all_labels = []
    criterion = nn.CrossEntropyLoss()
    with torch.no_grad():
        model.eval()
        for text, labels in test_loader:
            outputs = model(text)
            loss = criterion(outputs.view(-1, tagset_size), labels.view(-1))
            test_loss += loss.item()
            _, predicted = torch.max(outputs, 2)
            test_all_preds.extend(predicted.view(-1).cpu().numpy().tolist())
            test_all_labels.extend(labels.view(-1).cpu().numpy().tolist())

    # Calculate the accuracy and F1 score
    test_f1 = f1_score(test_all_labels, test_all_preds, average='macro')
    accuracy = accuracy_score(test_all_labels, test_all_preds)

    return accuracy, test_f1


In [86]:
word2vec_accuracy, word2vec_f1 = evaluate_model(word2vec_model, test_loader_word2vec)
glove_accuracy, glove_f1 = evaluate_model(glove_model, test_loader_glove)
fasttext_accuracy, fasttext_f1 = evaluate_model(fasttext_model, test_loader_fasttext)

# Print the results
print("T2 Model 4 Word2Vec Test Accuracy:", word2vec_accuracy, " Test F1 Score:", word2vec_f1)
print("T2 Model 4 GloVe Test Accuracy:", glove_accuracy, " Test F1 Score:", glove_f1)
print("T2 Model 4 Fasttext Test Accuracy:", fasttext_accuracy, " Test F1 Score:", fasttext_f1)

T2 Model 4 Word2Vec Test Accuracy: 0.9566702586206897  Test F1 Score: 0.4625821460029647
T2 Model 4 GloVe Test Accuracy: 0.9494827586206896  Test F1 Score: 0.3455169452643622
T2 Model 4 Fasttext Test Accuracy: 0.9597198275862069  Test F1 Score: 0.4277152762183393
