In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.metrics import f1_score,accuracy_score
import numpy as np
import matplotlib.pyplot as plt
from gensim.models import KeyedVectors
from tqdm import tqdm
import json
import fasttext

In [None]:
with open(r'data/ATE_Test.json', 'r') as f:
    ATE_test_data = json.load(f)
with open(r'data/NER_Test.json', 'r') as f:
    NER_test_data = json.load(f)

## word2vec

In [None]:
word_vectors = KeyedVectors.load_word2vec_format('GoogleNews-vectors-negative300.bin', binary=True)
def convert_data_to_tensors_word2vec(data, word_vectors):
    texts = [data[key]['text'] for key in data]

    # Convert texts to word embeddings
    embeddings = []
    for text in texts:
        embedding = []
        for word in text.split():
            if word in word_vectors:
                embedding.append(word_vectors[word])
            else:
                embedding.append([0] * len(word_vectors['hello']))  # Use a zero vector for unknown words
        embeddings.append(embedding)

    # Pad sequences to have the same length
    max_len = 83 # maximum length of word in 1 text
    padded_embeddings = []
    for embedding in embeddings:
        padded_embedding = embedding + [[0] * len(word_vectors['hello'])] * (max_len - len(embedding))
        padded_embeddings.append(padded_embedding)

    input_ids = torch.tensor(padded_embeddings)

    return input_ids
x_test_ATE_word2vec = convert_data_to_tensors_word2vec(ATE_test_data, word_vectors)
x_test_NER_word2vec = convert_data_to_tensors_word2vec(NER_test_data, word_vectors)

  input_ids = torch.tensor(padded_embeddings)


## Glove

In [None]:
def load_glove_model(File):
    print("Loading Glove Model")
    glove_model = {}
    with open(File,'r') as f:
        for line in f:
            split_line = line.split()
            word = split_line[0]
            embedding = np.array(split_line[1:], dtype=np.float64)
            glove_model[word] = embedding
    print(f"{len(glove_model)} words loaded!")
    return glove_model

# Path to your GloVe pre-trained embeddings file
glove_file_path = 'glove.6B.300d.txt'

# Load GloVe embeddings
model_glove = load_glove_model(glove_file_path)

def convert_data_to_tensors_glove(data, model):
    texts = [data[key]['text'] for key in data]

    # Convert texts to word embeddings
    embeddings = []
    for text in texts:
        embedding = []
        for word in text.split():
            # Check if word exists in the model's vocabulary
            if word in model:
                embedding.append(model[word])
            else:
                # If word not found, use zero vector
                embedding.append([0] * model["hello"].size)
        embeddings.append(embedding)

    # Pad sequences to have the same length
    max_len = 83  # maximum length of word in a text
    padded_embeddings = []
    for embedding in embeddings:
        padded_embedding = embedding + [[0] * model["hello"].size] * (max_len - len(embedding))
        padded_embeddings.append(padded_embedding)

    # Filter out None values
    padded_embeddings = [embedding for embedding in padded_embeddings if embedding is not None]

    # Convert to tensor
    input_ids = torch.tensor(padded_embeddings, dtype=torch.float32)

    return input_ids

x_test_ATE_glove = convert_data_to_tensors_glove(ATE_test_data, model_glove)
x_test_NER_glove = convert_data_to_tensors_glove(NER_test_data, model_glove)

Loading Glove Model
400000 words loaded!


## Fasttext

In [None]:
model_fasttext = fasttext.load_model("cc.en.300.bin")
def convert_data_to_tensors_fasttext(data, model):
    texts = [data[key]['text'] for key in data]

    # Convert texts to word embeddings
    embeddings = []
    for text in texts:
        embedding = []
        for word in text.split():
                embedding.append(model.get_word_vector(word))
        embeddings.append(embedding)

    # Pad sequences to have the same length
    max_len = 83 # maximum length of word in 1 text
    padded_embeddings = []
    for embedding in embeddings:
        padded_embedding = embedding + [[0] * len(model.get_word_vector("Hello"))] * (max_len - len(embedding))
        padded_embeddings.append(padded_embedding)

    input_ids = torch.tensor(padded_embeddings)

    return input_ids

x_test_ATE_fasttext = convert_data_to_tensors_fasttext(ATE_test_data, model_fasttext)
x_test_NER_fasttext = convert_data_to_tensors_fasttext(NER_test_data, model_fasttext)



## label encoding

In [None]:
def convert_labels_to_fixed_length_ATE(labels, max_length):
    new_list=[]
    label_to_index_t2 = {'B': 0, 'I': 1, 'O': 2,'<pad>':3}
    fixed_length_labels = np.zeros((len(labels), max_length))
    for i, example_labels in enumerate(labels):
        for j, label in enumerate(example_labels[:max_length]):
            fixed_length_labels[i, j] = label_to_index_t2[label]
        for k in range(len(example_labels[:max_length]),max_length):
            fixed_length_labels[i, k] = 3
        new_list.append(len(example_labels[:max_length]))
    return fixed_length_labels,new_list

def convert_labels_to_fixed_length_NER(labels, max_length):
    new_list=[]
    label_to_index_t1 = {'I_WITNESS': 0, 'B_JUDGE': 1, 'I_CASE_NUMBER': 2, 'B_CASE_NUMBER': 3, 'I_PROVISION': 4, 'B_STATUTE': 5, 'I_DATE': 6, 'I_STATUTE': 7, 'B_WITNESS': 8, 'B_DATE': 9, 'I_RESPONDENT': 10, 'B_PRECEDENT': 11, 'B_GPE': 12, 'I_ORG': 13, 'I_PETITIONER': 14, 'B_PROVISION': 15, 'B_ORG': 16, 'I_JUDGE': 17, 'I_OTHER_PERSON': 18, 'B_COURT': 19, 'B_PETITIONER': 20, 'B_RESPONDENT': 21, 'I_PRECEDENT': 22, 'I_COURT': 23, 'I_GPE': 24, 'B_OTHER_PERSON': 25, 'O': 26, '<pad>':27}
    fixed_length_labels = np.zeros((len(labels), max_length))
    for i, example_labels in enumerate(labels):
        for j, label in enumerate(example_labels[:max_length]):
            fixed_length_labels[i, j] = label_to_index_t1[label]
        for k in range(len(example_labels[:max_length]),max_length):
            fixed_length_labels[i, k] = 27
        new_list.append(len(example_labels[:max_length]))
    return fixed_length_labels,new_list

max_length_ATE = 83
test_labels_ATE = [ATE_test_data[key]['labels'] for key in ATE_test_data]
test_lab_ATE,length_test_ATE = convert_labels_to_fixed_length_ATE(test_labels_ATE, max_length_ATE)
y_test_ATE = torch.tensor(test_lab_ATE)

max_length_NER = 70
test_labels_NER = [NER_test_data[key]['labels'] for key in NER_test_data]
test_lab_NER,length_test_NER = convert_labels_to_fixed_length_NER(test_labels_NER, max_length_NER)
y_test_NER = torch.tensor(test_lab_NER)

In [None]:
is_cuda = torch.cuda.is_available()

# If we have a GPU available, we'll set our device to GPU. We'll use this device variable later in our code.
if is_cuda:
    device = torch.device("cuda")
    print("GPU is available")
else:
    device = torch.device("cpu")
    print("GPU not available, CPU used")

GPU is available


In [None]:
x_test_ATE_word2vec_tensor = torch.tensor(x_test_ATE_word2vec, dtype=torch.float32)
x_test_ATE_glove_tensor = torch.tensor(x_test_ATE_glove, dtype=torch.float32)
x_test_ATE_fasttext_tensor = torch.tensor(x_test_ATE_fasttext, dtype=torch.float32)

x_test_NER_word2vec_tensor = torch.tensor(x_test_NER_word2vec, dtype=torch.float32)
x_test_NER_glove_tensor = torch.tensor(x_test_NER_glove, dtype=torch.float32)
x_test_NER_fasttext_tensor = torch.tensor(x_test_NER_fasttext, dtype=torch.float32)

y_test_ATE_tensor = torch.tensor(y_test_ATE, dtype=torch.long)
y_test_NER_tensor = torch.tensor(y_test_NER, dtype=torch.long)

length_test_ATE_tensor = torch.tensor(length_test_ATE)
length_test_NER_tensor = torch.tensor(length_test_NER)

  x_test_ATE_word2vec_tensor = torch.tensor(x_test_ATE_word2vec, dtype=torch.float32)
  x_test_ATE_glove_tensor = torch.tensor(x_test_ATE_glove, dtype=torch.float32)
  x_test_ATE_fasttext_tensor = torch.tensor(x_test_ATE_fasttext, dtype=torch.float32)
  x_test_NER_word2vec_tensor = torch.tensor(x_test_NER_word2vec, dtype=torch.float32)
  x_test_NER_glove_tensor = torch.tensor(x_test_NER_glove, dtype=torch.float32)
  x_test_NER_fasttext_tensor = torch.tensor(x_test_NER_fasttext, dtype=torch.float32)
  y_test_ATE_tensor = torch.tensor(y_test_ATE, dtype=torch.long)
  y_test_NER_tensor = torch.tensor(y_test_NER, dtype=torch.long)


## Model

In [None]:
# Define the RNN model
class RNNTagger(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(RNNTagger, self).__init__()
        self.rnn = nn.RNN(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), self.rnn.hidden_size).to(x.device)
        out, _ = self.rnn(x, h0)
        out = self.fc(out)
        return out

#define the LSTM model
class LSTMTagger(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(LSTMTagger, self).__init__()
        self.lstm = nn.LSTM(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), self.lstm.hidden_size).to(x.device)
        c0 = torch.zeros(1, x.size(0), self.lstm.hidden_size).to(x.device)
        out, _ = self.lstm(x, (h0,c0))
        out = self.fc(out)
        return out

#define the GRU model
class GRUTagger(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(GRUTagger, self).__init__()
        self.gru = nn.GRU(input_size, hidden_size, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)

    def forward(self, x):
        h0 = torch.zeros(1, x.size(0), self.gru.hidden_size).to(x.device)
        out, _ = self.gru(x, h0)
        out = self.fc(out)
        return out

# Define hyperparameters
input_size = 300
hidden_size = 128
output_size_ATE = 4
output_size_NER = 28

In [None]:
model_t1_model1_word2vec = RNNTagger(input_size, hidden_size, output_size_NER)
model_t1_model1_word2vec.load_state_dict(torch.load("t1_model1_word2vec.pth"))
model_t2_model1_word2vec = RNNTagger(input_size, hidden_size, output_size_ATE)
model_t2_model1_word2vec.load_state_dict(torch.load("t2_model1_word2vec.pth"))
model_t1_model1_glove = RNNTagger(input_size, hidden_size, output_size_NER)
model_t1_model1_glove.load_state_dict(torch.load("t1_model1_glove.pth"))
model_t2_model1_glove = RNNTagger(input_size, hidden_size, output_size_ATE)
model_t2_model1_glove.load_state_dict(torch.load("t2_model1_glove.pth"))
model_t1_model1_fasttext = RNNTagger(input_size, hidden_size, output_size_NER)
model_t1_model1_fasttext.load_state_dict(torch.load("t1_model1_fasttext.pth"))
model_t2_model1_fasttext = RNNTagger(input_size, hidden_size, output_size_ATE)
model_t2_model1_fasttext.load_state_dict(torch.load("t2_model1_fasttext.pth"))

model_t1_model2_word2vec = LSTMTagger(input_size, hidden_size, output_size_NER)
model_t1_model2_word2vec.load_state_dict(torch.load("t1_model2_word2vec.pth"))
model_t2_model2_word2vec = LSTMTagger(input_size, hidden_size, output_size_ATE)
model_t2_model2_word2vec.load_state_dict(torch.load("t2_model2_word2vec.pth"))
model_t1_model2_glove = LSTMTagger(input_size, hidden_size, output_size_NER)
model_t1_model2_glove.load_state_dict(torch.load("t1_model2_glove.pth"))
model_t2_model2_glove = LSTMTagger(input_size, hidden_size, output_size_ATE)
model_t2_model2_glove.load_state_dict(torch.load("t2_model2_glove.pth"))
model_t1_model2_fasttext = LSTMTagger(input_size, hidden_size, output_size_NER)
model_t1_model2_fasttext.load_state_dict(torch.load("t1_model2_fasttext.pth"))
model_t2_model2_fasttext = LSTMTagger(input_size, hidden_size, output_size_ATE)
model_t2_model2_fasttext.load_state_dict(torch.load("t2_model2_fasttext.pth"))

model_t1_model3_word2vec = GRUTagger(input_size, hidden_size, output_size_NER)
model_t1_model3_word2vec.load_state_dict(torch.load("t1_model3_word2vec.pth"))
model_t2_model3_word2vec = GRUTagger(input_size, hidden_size, output_size_ATE)
model_t2_model3_word2vec.load_state_dict(torch.load("t2_model3_word2vec.pth"))
model_t1_model3_glove = GRUTagger(input_size, hidden_size, output_size_NER)
model_t1_model3_glove.load_state_dict(torch.load("t1_model3_glove.pth"))
model_t2_model3_glove = GRUTagger(input_size, hidden_size, output_size_ATE)
model_t2_model3_glove.load_state_dict(torch.load("t2_model3_glove.pth"))
model_t1_model3_fasttext = GRUTagger(input_size, hidden_size, output_size_NER)
model_t1_model3_fasttext.load_state_dict(torch.load("t1_model3_fasttext.pth"))
model_t2_model3_fasttext = GRUTagger(input_size, hidden_size, output_size_ATE)
model_t2_model3_fasttext.load_state_dict(torch.load("t2_model3_fasttext.pth"))

<All keys matched successfully>

## Evaluation

In [None]:
def eval_score(model,test_input,test_output,length_tensor,name):
    with torch.no_grad():
        model.eval()
        outputs = model(test_input)
        predictions = torch.argmax(outputs, dim=2)

        y_pred_padd = [row[:index] for row, index in zip(predictions, length_tensor)]
        y_pred_flat = torch.cat(y_pred_padd)
        y_padd_tensor =  [row[:index] for row, index in zip(test_output, length_tensor)]
        y_labels_flat = torch.cat(y_padd_tensor)
        f1 = f1_score(y_labels_flat, y_pred_flat, average='macro')
        accuracy = accuracy_score(y_labels_flat, y_pred_flat)
        print(f"Name of model:-{name}, \t f1 score: {f1},\t accuracy score: {accuracy} \n" )
#     return f1,accuracy


In [None]:
print("Dataset 1 NER")

Dataset 1 NER


In [None]:
eval_score(model_t1_model1_word2vec,x_test_NER_word2vec_tensor,y_test_NER_tensor,length_test_NER_tensor,"t1_model1_word2vec")
eval_score(model_t1_model1_glove,x_test_NER_glove_tensor,y_test_NER_tensor,length_test_NER_tensor,"t1_model1_glove")
eval_score(model_t1_model1_fasttext,x_test_NER_fasttext_tensor,y_test_NER_tensor,length_test_NER_tensor,"t1_model1_fasttext")

eval_score(model_t1_model2_word2vec,x_test_NER_word2vec_tensor,y_test_NER_tensor,length_test_NER_tensor,"t1_model2_word2vec")
eval_score(model_t1_model2_glove,x_test_NER_glove_tensor,y_test_NER_tensor,length_test_NER_tensor,"t1_model2_glove")
eval_score(model_t1_model2_fasttext,x_test_NER_fasttext_tensor,y_test_NER_tensor,length_test_NER_tensor,"t1_model2_fasttext")

eval_score(model_t1_model3_word2vec,x_test_NER_word2vec_tensor,y_test_NER_tensor,length_test_NER_tensor,"t1_model3_word2vec")
eval_score(model_t1_model3_glove,x_test_NER_glove_tensor,y_test_NER_tensor,length_test_NER_tensor,"t1_model3_glove")
eval_score(model_t1_model3_fasttext,x_test_NER_fasttext_tensor,y_test_NER_tensor,length_test_NER_tensor,"t1_model3_fasttext")

Name of model:-t1_model1_word2vec, 	 f1 score: 0.3558189079041109,	 accuracy score: 0.861878287002254 

Name of model:-t1_model1_glove, 	 f1 score: 0.23224886162702643,	 accuracy score: 0.8491960931630353 

Name of model:-t1_model1_fasttext, 	 f1 score: 0.4028700936231574,	 accuracy score: 0.8789181066867018 

Name of model:-t1_model2_word2vec, 	 f1 score: 0.3547655557172286,	 accuracy score: 0.8634410217881292 

Name of model:-t1_model2_glove, 	 f1 score: 0.2362976720430671,	 accuracy score: 0.8332381667918858 

Name of model:-t1_model2_fasttext, 	 f1 score: 0.3948665838914022,	 accuracy score: 0.8756423741547709 

Name of model:-t1_model3_word2vec, 	 f1 score: 0.32748062060098565,	 accuracy score: 0.8446882043576258 

Name of model:-t1_model3_glove, 	 f1 score: 0.21406890878378082,	 accuracy score: 0.8307738542449287 

Name of model:-t1_model3_fasttext, 	 f1 score: 0.3819970956544048,	 accuracy score: 0.8627798647633358 



In [None]:
print("Dataset 2 ATE")

Dataset 2 ATE


In [None]:
eval_score(model_t2_model1_word2vec,x_test_ATE_word2vec_tensor,y_test_ATE_tensor,length_test_ATE_tensor,"t2_model1_word2vec")
eval_score(model_t2_model1_glove,x_test_ATE_glove_tensor,y_test_ATE_tensor,length_test_ATE_tensor,"t2_model1_glove")
eval_score(model_t2_model1_fasttext,x_test_ATE_fasttext_tensor,y_test_ATE_tensor,length_test_ATE_tensor,"t2_model1_fasttext")

eval_score(model_t2_model2_word2vec,x_test_ATE_word2vec_tensor,y_test_ATE_tensor,length_test_ATE_tensor,"t2_model2_word2vec")
eval_score(model_t2_model2_glove,x_test_ATE_glove_tensor,y_test_ATE_tensor,length_test_ATE_tensor,"t2_model2_glove")
eval_score(model_t2_model2_fasttext,x_test_ATE_fasttext_tensor,y_test_ATE_tensor,length_test_ATE_tensor,"t2_model2_fasttext")

eval_score(model_t2_model3_word2vec,x_test_ATE_word2vec_tensor,y_test_ATE_tensor,length_test_ATE_tensor,"t2_model3_word2vec")
eval_score(model_t2_model3_glove,x_test_ATE_glove_tensor,y_test_ATE_tensor,length_test_ATE_tensor,"t2_model3_glove")
eval_score(model_t2_model3_fasttext,x_test_ATE_fasttext_tensor,y_test_ATE_tensor,length_test_ATE_tensor,"t2_model3_fasttext")

Name of model:-t2_model1_word2vec, 	 f1 score: 0.6999402532919099,	 accuracy score: 0.9062076967704505 

Name of model:-t2_model1_glove, 	 f1 score: 0.6017197129354663,	 accuracy score: 0.8849352156256043 

Name of model:-t2_model1_fasttext, 	 f1 score: 0.688587030489776,	 accuracy score: 0.9098820344227422 

Name of model:-t2_model2_word2vec, 	 f1 score: 0.709729054763831,	 accuracy score: 0.914910075420615 

Name of model:-t2_model2_glove, 	 f1 score: 0.6708463379432797,	 accuracy score: 0.9021465867337072 

Name of model:-t2_model2_fasttext, 	 f1 score: 0.7154978149353443,	 accuracy score: 0.9143299168439374 

Name of model:-t2_model3_word2vec, 	 f1 score: 0.7199227729549523,	 accuracy score: 0.9135563720750338 

Name of model:-t2_model3_glove, 	 f1 score: 0.6610621281734312,	 accuracy score: 0.9038870624637401 

Name of model:-t2_model3_fasttext, 	 f1 score: 0.7335586703984592,	 accuracy score: 0.9191645716495842 



#T2-model4-fasttext

In [None]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence


class BiLSTMCRF(nn.Module):
    def __init__(self, tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256):
        """ Initialize the model
        Args:
            sent_vocab (Vocab): vocabulary of words
            tag_vocab (Vocab): vocabulary of tags
            embed_size (int): embedding size
            hidden_size (int): hidden state size
        """
        super(BiLSTMCRF, self).__init__()

        self.dropout_rate = dropout_rate
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        # self.sent_vocab = sent_vocab
        self.tag_vocab = tag_vocab
        # self.embedding = nn.Embedding(len(sent_vocab), embed_size) print
        self.dropout = nn.Dropout(dropout_rate)
        self.encoder = nn.LSTM(input_size=embed_size, hidden_size=hidden_size, bidirectional=True)
        self.hidden2emit_score = nn.Linear(hidden_size * 2, len(self.tag_vocab))
        self.transition = nn.Parameter(torch.randn(len(self.tag_vocab), len(self.tag_vocab)))  # shape: (K, K)

    def forward(self, sentences,mask, tags, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            tags (tensor): corresponding tags, shape (b, len)
            sen_lengths (list): sentence lengths
        Returns:
            loss (tensor): loss on the batch, shape (b,)
        """
        # mask = (sentences != self.sent_vocab[self.sent_vocab.PAD])  # shape: (b, len)                        #$$$$$$$$$$$$$$$$$$$__________________
        sentences = sentences.transpose(0, 1)  # shape: (len, b)
        # print("forword--1",sentences.shape)
        # sentences = self.embedding(sentences)  # shape: (len, b, e)
        emit_score = self.encode(sentences, sen_lengths)  # shape: (b, len, K)
        # print("forword--2",sentences.shape)
        loss = self.cal_loss(tags, mask, emit_score)  # shape: (b,)
        return loss

    def encode(self, sentences, sent_lengths):
        """ BiLSTM Encoder
        Args:
            sentences (tensor): sentences with word embeddings, shape (len, b, e)
            sent_lengths (list): sentence lengths
        Returns:
            emit_score (tensor): emit score, shape (b, len, K)
        """
        # padded_sentences = pack_padded_sequence(sentences, sent_lengths)
        hidden_states, _ = self.encoder(sentences)
        # print(hidden_states.shape,"(((())))")
        hidden_states = hidden_states.permute(1,0,2)
        # hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
        # print(hidden_states.shape)
        emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
        emit_score = self.dropout(emit_score)  # shape: (b, len, K)
        return emit_score

    # def encode(self, sentences, sent_lengths):
    #   """ BiLSTM Encoder
    #   Args:
    #       sentences (tensor): sentences with word embeddings, shape (len, b, e)
    #       sent_lengths (list): sentence lengths
    #   Returns:
    #       emit_score (tensor): emit score, shape (b, len, K)
    #   """
    #   sorted_lengths, sorted_idx = torch.sort(sent_lengths, descending=True)
    #   sorted_sentences = sentences[:, sorted_idx, :]  # Sort the sentences based on lengths
    #   packed_sentences = pack_padded_sequence(sorted_sentences, sorted_lengths)
    #   hidden_states, _ = self.encoder(packed_sentences)
    #   hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
    #   emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
    #   emit_score = self.dropout(emit_score)  # shape: (b, len, K)
    #   return emit_score

    def cal_loss(self, tags, mask, emit_score):
        """ Calculate CRF loss
        Args:
            tags (tensor): a batch of tags, shape (b, len)
            mask (tensor): mask for the tags, shape (b, len), values in PAD position is 0
            emit_score (tensor): emit matrix, shape (b, len, K)
        Returns:
            loss (tensor): loss of the batch, shape (b,)
        """
        batch_size, sent_len = tags.shape
        # calculate score for the tags
        score = torch.gather(emit_score, dim=2, index=tags.unsqueeze(dim=2)).squeeze(dim=2)  # shape: (b, len)
        score[:, 1:] += self.transition[tags[:, :-1], tags[:, 1:]]
        total_score = (score * mask.type(torch.float)).sum(dim=1)  # shape: (b,)
        # calculate the scaling factor
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)
        fix_length = 100
        # for i in range(1, sent_len):
        for i in range(1, fix_length):
            n_unfinished = mask[:, i].sum()
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)
            emit_and_transition = emit_score[: n_unfinished, i].unsqueeze(dim=1) + self.transition  # shape: (uf, K, K)
            log_sum = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)
            max_v = log_sum.max(dim=1)[0].unsqueeze(dim=1)  # shape: (uf, 1, K)
            log_sum = log_sum - max_v  # shape: (uf, K, K)
            d_uf = max_v + torch.logsumexp(log_sum, dim=1).unsqueeze(dim=1)  # shape: (uf, 1, K)
            d = torch.cat((d_uf, d[n_unfinished:]), dim=0)
        d = d.squeeze(dim=1)  # shape: (b, K)
        max_d = d.max(dim=-1)[0]  # shape: (b,)
        d = max_d + torch.logsumexp(d - max_d.unsqueeze(dim=1), dim=1)  # shape: (b,)
        llk = total_score - d  # shape: (b,)
        loss = -llk  # shape: (b,)
        return loss


    def predict(self, sentences, mask, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            sen_lengths (list): sentence lengths
        Returns:
            tags (list[list[str]]): predicted tags for the batch
        """
        batch_size = sentences.shape[0]

        w = mask
        sentences = sentences.transpose(0, 1)

        emit_score = self.encode(sentences, sen_lengths)

        # Initialize the tags with all possible tag indices for each sentence in the batch
        tags = [[[i] for i in range(len(self.tag_vocab))]] * batch_size  # list, shape: (b, K, 1)

        # Initialize the first column of the dynamic programming matrix
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)

        # Use a fixed length (e.g., 100) instead of max(sen_lengths)
        fixed_length = 100

        # Iterate over the remaining columns of the dynamic programming matrix
        for i in range(1, fixed_length):
            # Calculate the number of unfinished sentences at the current position
            n_unfinished = mask[:, i].sum()

            # Slice the dynamic programming matrix for the unfinished sentences
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)

            # Compute emission and transition scores for the current position
            emit_and_transition = self.transition + emit_score[: n_unfinished, i].unsqueeze(dim=1)  # shape: (uf, K, K)

            # Compute the new values for the dynamic programming matrix
            new_d_uf = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)

            # Update the dynamic programming matrix and get the indices of maximum values
            d_uf, max_idx = torch.max(new_d_uf, dim=1)
            max_idx = max_idx.tolist()  # list, shape: (nf, K)

            # Update the tags for the unfinished sentences
            tags[: n_unfinished] = [[tags[b][k] + [j] for j, k in enumerate(max_idx[b])] for b in range(n_unfinished)]

            # Concatenate the new values to the dynamic programming matrix
            d = torch.cat((torch.unsqueeze(d_uf, dim=1), d[n_unfinished:]), dim=0)  # shape: (b, 1, K)

        # Remove the singleton dimension to get the final dynamic programming matrix
        d = d.squeeze(dim=1)  # shape: (b, K)

        # Get the indices of the maximum values in the final column of the matrix
        _, max_idx = torch.max(d, dim=1)  # shape: (b,)
        max_idx = max_idx.tolist()

        # Extract the predicted tags based on the maximum indices
        tags = [tags[b][k] for b, k in enumerate(max_idx)]

        # Print the predicted tags and sentence lengths for debugging
        # print(tags, sen_lengths, '((()))')

        return tags

tag_to_ix ={'O': 0, 'B': 1, 'I':2,'<START>':3,'<STOP>':4,'<PAD>':5}
# tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256)
model  = BiLSTMCRF(tag_to_ix,dropout_rate=0.5, embed_size=300, hidden_size=256)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Function to calculate accuracy
import torch

def calculate_accuracy(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0

    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]+1]
        trex = target[i][:sen_lengths[i]+1]
        acc += torch.sum(prex == trex)

    # Move the division outside the loop to calculate the average accuracy
    acc = acc.float() / (sum(sen_lengths)+10)
    # print(acc)
    return acc

def aggregater(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0
    aggr_pred = []
    aggr_targ = []
    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]]
        trex = target[i][:sen_lengths[i]]
        aggr_pred.extend(prex)
        aggr_targ.extend(trex)
    return aggr_pred,aggr_targ
import torch
from torch.utils.data import Dataset,DataLoader
import gensim.downloader as api
from torchtext.vocab import GloVe
import fasttext
import numpy as np
from torchtext.vocab import GloVe,FastText
import fasttext.util
import json
class SentimentAnalysisDataset(Dataset):
    def __init__(self, json_path, embedding_type='word2vec',load=True):
        with open(json_path, 'r') as file:
            self.data = json.load(file)

        self.embedding_type = embedding_type
        if load:
          self.embedding_model =self.load_embedding_model()
        else:
          self.embedding_model = None

    def load_embedding_model(self):
        if self.embedding_type == 'word2vec':
            # Download the pre-trained Word2Vec model
            return api.load('word2vec-google-news-300')
        elif self.embedding_type == 'glove':
            # Download the pre-trained GloVe model (6B tokens, 300d)
            return GloVe(name='6B', dim=300)
        elif self.embedding_type == 'fasttext':
            # Load the pre-trained FastText model
            return FastText(language='en')
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")

    def text_to_embeddings(self, text):
        maxlen = 100
        if self.embedding_type == 'word2vec':
            # Word2Vec embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(self.embedding_model.vector_size) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((self.embedding_model.vector_size,),-1.0))


        elif self.embedding_type == 'glove':
            # GloVe embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(sentiment_dataset.embedding_model['a'].shape[0]) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))

        elif self.embedding_type == 'fasttext':
            # FastText embeddings
            embeddings = [self.embedding_model[word] for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")
        # print()
        return np.stack(embeddings)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):


        index = str(index)
        text = self.data[index]["text"]
        labels = self.data[index]["labels"]

        # Convert text to embeddings
        text_embeddings = torch.tensor(self.text_to_embeddings(text))
        # print(text_embeddings.shape)
        # torch.stack([torch.full((1,text_embeddings.shape[1]),-1000),text_embeddings, [torch.full((1,text_embeddings.shape[1]),1000)])
        current_length = len(labels)

        labels = ['<START>'] + labels + ['<STOP>']

        sent_lengths =torch.tensor(len(labels))

        max_length = 100
        labels = labels + ['<PAD>'] * (max_length - (current_length+2))

        # Convert labels to numerical format if needed
        label_mapping = {'O': 0, 'B': 1, 'I':2,'<START>':3,'<STOP>':4,'<PAD>':5}                               # bind it to self__________________________
        numerical_labels = [label_mapping[label] for label in labels ]


        # Pad the sequence to the maximum length

        # Convert labels to PyTorch tensor
        labels_tensor = torch.tensor(numerical_labels)
        mask = torch.hstack([torch.full((text_embeddings.shape[0],),True),torch.full((100-text_embeddings.shape[0],),False)])
        # print(labels_tensor.shape,text_embeddings.shape,mask.shape)
        return text_embeddings, labels_tensor, mask,sent_lengths

model_state_dict = torch.load('t2_model4_fasttext.pt')

# Load the state dictionary into the model
model.load_state_dict(model_state_dict)

json_path = 'ATE_test.json'
embedding_type = 'fasttext'
sentiment_dataset_test = SentimentAnalysisDataset(json_path, embedding_type)
sentiment_dataset =sentiment_dataset_test
# sentiment_dataset_test.embedding_model = sentiment_dataset.embedding_model
batch_size  = 512
dataloader_test = DataLoader(sentiment_dataset_test, batch_size=batch_size, shuffle=True)
from tqdm import tqdm
from sklearn.metrics import f1_score
model.eval()
correct_predictions_val = 0
total_sentences_val = 0
predictions_r = []
traget_r = []
epoch=1
device='cuda'
with torch.no_grad():
    for sentence_in, targets, mask, sen_lengths in tqdm(dataloader_test, desc=f'Test Epoch {epoch + 1}/{300}', leave=False):
        sentence_in, targets, mask, sen_lengths = sentence_in.to(device), targets.to(device), mask.to(device), sen_lengths.to(device)

        # Prediction
        predictions_val = model.predict(sentence_in, mask, sen_lengths)
        correct_predictions_val += calculate_accuracy(predictions_val, targets, sen_lengths)
        temp_pred,temp_trag = aggregater(predictions_val, targets, sen_lengths)
        predictions_r.extend(temp_pred)
        traget_r.extend(temp_trag)

accuracy_val = correct_predictions_val / len(dataloader_test)  # Average over all sentences, not just batches
print()
print(f'Test Accuracy: {accuracy_val:.4f}')
print(f'Test F1:  {f1_score(traget_r, predictions_r, average="macro")}')


#T2-model4-Glove

In [None]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence


class BiLSTMCRF(nn.Module):
    def __init__(self, tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256):
        """ Initialize the model
        Args:
            sent_vocab (Vocab): vocabulary of words
            tag_vocab (Vocab): vocabulary of tags
            embed_size (int): embedding size
            hidden_size (int): hidden state size
        """
        super(BiLSTMCRF, self).__init__()

        self.dropout_rate = dropout_rate
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        # self.sent_vocab = sent_vocab
        self.tag_vocab = tag_vocab
        # self.embedding = nn.Embedding(len(sent_vocab), embed_size) print
        self.dropout = nn.Dropout(dropout_rate)
        self.encoder = nn.LSTM(input_size=embed_size, hidden_size=hidden_size, bidirectional=True)
        self.hidden2emit_score = nn.Linear(hidden_size * 2, len(self.tag_vocab))
        self.transition = nn.Parameter(torch.randn(len(self.tag_vocab), len(self.tag_vocab)))  # shape: (K, K)

    def forward(self, sentences,mask, tags, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            tags (tensor): corresponding tags, shape (b, len)
            sen_lengths (list): sentence lengths
        Returns:
            loss (tensor): loss on the batch, shape (b,)
        """
        # mask = (sentences != self.sent_vocab[self.sent_vocab.PAD])  # shape: (b, len)                        #$$$$$$$$$$$$$$$$$$$__________________
        sentences = sentences.transpose(0, 1)  # shape: (len, b)
        # print("forword--1",sentences.shape)
        # sentences = self.embedding(sentences)  # shape: (len, b, e)
        emit_score = self.encode(sentences, sen_lengths)  # shape: (b, len, K)
        # print("forword--2",sentences.shape)
        loss = self.cal_loss(tags, mask, emit_score)  # shape: (b,)
        return loss

    def encode(self, sentences, sent_lengths):
        """ BiLSTM Encoder
        Args:
            sentences (tensor): sentences with word embeddings, shape (len, b, e)
            sent_lengths (list): sentence lengths
        Returns:
            emit_score (tensor): emit score, shape (b, len, K)
        """
        # padded_sentences = pack_padded_sequence(sentences, sent_lengths)
        hidden_states, _ = self.encoder(sentences)
        # print(hidden_states.shape,"(((())))")
        hidden_states = hidden_states.permute(1,0,2)
        # hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
        # print(hidden_states.shape)
        emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
        emit_score = self.dropout(emit_score)  # shape: (b, len, K)
        return emit_score

    # def encode(self, sentences, sent_lengths):
    #   """ BiLSTM Encoder
    #   Args:
    #       sentences (tensor): sentences with word embeddings, shape (len, b, e)
    #       sent_lengths (list): sentence lengths
    #   Returns:
    #       emit_score (tensor): emit score, shape (b, len, K)
    #   """
    #   sorted_lengths, sorted_idx = torch.sort(sent_lengths, descending=True)
    #   sorted_sentences = sentences[:, sorted_idx, :]  # Sort the sentences based on lengths
    #   packed_sentences = pack_padded_sequence(sorted_sentences, sorted_lengths)
    #   hidden_states, _ = self.encoder(packed_sentences)
    #   hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
    #   emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
    #   emit_score = self.dropout(emit_score)  # shape: (b, len, K)
    #   return emit_score

    def cal_loss(self, tags, mask, emit_score):
        """ Calculate CRF loss
        Args:
            tags (tensor): a batch of tags, shape (b, len)
            mask (tensor): mask for the tags, shape (b, len), values in PAD position is 0
            emit_score (tensor): emit matrix, shape (b, len, K)
        Returns:
            loss (tensor): loss of the batch, shape (b,)
        """
        batch_size, sent_len = tags.shape
        # calculate score for the tags
        score = torch.gather(emit_score, dim=2, index=tags.unsqueeze(dim=2)).squeeze(dim=2)  # shape: (b, len)
        score[:, 1:] += self.transition[tags[:, :-1], tags[:, 1:]]
        total_score = (score * mask.type(torch.float)).sum(dim=1)  # shape: (b,)
        # calculate the scaling factor
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)
        fix_length = 100
        # for i in range(1, sent_len):
        for i in range(1, fix_length):
            n_unfinished = mask[:, i].sum()
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)
            emit_and_transition = emit_score[: n_unfinished, i].unsqueeze(dim=1) + self.transition  # shape: (uf, K, K)
            log_sum = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)
            max_v = log_sum.max(dim=1)[0].unsqueeze(dim=1)  # shape: (uf, 1, K)
            log_sum = log_sum - max_v  # shape: (uf, K, K)
            d_uf = max_v + torch.logsumexp(log_sum, dim=1).unsqueeze(dim=1)  # shape: (uf, 1, K)
            d = torch.cat((d_uf, d[n_unfinished:]), dim=0)
        d = d.squeeze(dim=1)  # shape: (b, K)
        max_d = d.max(dim=-1)[0]  # shape: (b,)
        d = max_d + torch.logsumexp(d - max_d.unsqueeze(dim=1), dim=1)  # shape: (b,)
        llk = total_score - d  # shape: (b,)
        loss = -llk  # shape: (b,)
        return loss


    def predict(self, sentences, mask, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            sen_lengths (list): sentence lengths
        Returns:
            tags (list[list[str]]): predicted tags for the batch
        """
        batch_size = sentences.shape[0]

        w = mask
        sentences = sentences.transpose(0, 1)

        emit_score = self.encode(sentences, sen_lengths)

        # Initialize the tags with all possible tag indices for each sentence in the batch
        tags = [[[i] for i in range(len(self.tag_vocab))]] * batch_size  # list, shape: (b, K, 1)

        # Initialize the first column of the dynamic programming matrix
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)

        # Use a fixed length (e.g., 100) instead of max(sen_lengths)
        fixed_length = 100

        # Iterate over the remaining columns of the dynamic programming matrix
        for i in range(1, fixed_length):
            # Calculate the number of unfinished sentences at the current position
            n_unfinished = mask[:, i].sum()

            # Slice the dynamic programming matrix for the unfinished sentences
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)

            # Compute emission and transition scores for the current position
            emit_and_transition = self.transition + emit_score[: n_unfinished, i].unsqueeze(dim=1)  # shape: (uf, K, K)

            # Compute the new values for the dynamic programming matrix
            new_d_uf = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)

            # Update the dynamic programming matrix and get the indices of maximum values
            d_uf, max_idx = torch.max(new_d_uf, dim=1)
            max_idx = max_idx.tolist()  # list, shape: (nf, K)

            # Update the tags for the unfinished sentences
            tags[: n_unfinished] = [[tags[b][k] + [j] for j, k in enumerate(max_idx[b])] for b in range(n_unfinished)]

            # Concatenate the new values to the dynamic programming matrix
            d = torch.cat((torch.unsqueeze(d_uf, dim=1), d[n_unfinished:]), dim=0)  # shape: (b, 1, K)

        # Remove the singleton dimension to get the final dynamic programming matrix
        d = d.squeeze(dim=1)  # shape: (b, K)

        # Get the indices of the maximum values in the final column of the matrix
        _, max_idx = torch.max(d, dim=1)  # shape: (b,)
        max_idx = max_idx.tolist()

        # Extract the predicted tags based on the maximum indices
        tags = [tags[b][k] for b, k in enumerate(max_idx)]

        # Print the predicted tags and sentence lengths for debugging
        # print(tags, sen_lengths, '((()))')

        return tags

tag_to_ix ={'O': 0, 'B': 1, 'I':2,'<START>':3,'<STOP>':4,'<PAD>':5}
# tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256)
model  = BiLSTMCRF(tag_to_ix,dropout_rate=0.5, embed_size=300, hidden_size=256)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Function to calculate accuracy
import torch

def calculate_accuracy(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0

    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]+1]
        trex = target[i][:sen_lengths[i]+1]
        acc += torch.sum(prex == trex)

    # Move the division outside the loop to calculate the average accuracy
    acc = acc.float() / (sum(sen_lengths)+10)
    # print(acc)
    return acc

def aggregater(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0
    aggr_pred = []
    aggr_targ = []
    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]]
        trex = target[i][:sen_lengths[i]]
        aggr_pred.extend(prex)
        aggr_targ.extend(trex)
    return aggr_pred,aggr_targ

import torch
from torch.utils.data import Dataset,DataLoader
import gensim.downloader as api
from torchtext.vocab import GloVe
# import fasttext
import numpy as np
# import fasttext.util
import json

class SentimentAnalysisDataset(Dataset):
    def __init__(self, json_path, embedding_type='word2vec',load=True):
        with open(json_path, 'r') as file:
            self.data = json.load(file)

        self.embedding_type = embedding_type
        if load:
          self.embedding_model =self.load_embedding_model()
        else:
          self.embedding_model = None

    def load_embedding_model(self):
        if self.embedding_type == 'word2vec':
            # Download the pre-trained Word2Vec model
            return api.load('word2vec-google-news-300')
        elif self.embedding_type == 'glove':
            # Download the pre-trained GloVe model (6B tokens, 300d)
            return GloVe(name='6B', dim=300)
        elif self.embedding_type == 'fasttext':
            # Load the pre-trained FastText model
            fasttext.util.download_model('en', if_exists='ignore')  # English
            ft = fasttext.load_model('cc.en.300.bin')
            return fasttext.load_model('cc.en.300.bin')  # Adjust the path based on your downloaded model
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")

    def text_to_embeddings(self, text):
        maxlen = 100
        if self.embedding_type == 'word2vec':
            # Word2Vec embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(self.embedding_model.vector_size) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((self.embedding_model.vector_size,),-1.0))


        elif self.embedding_type == 'glove':
            # GloVe embeddings
            embeddings = [self.embedding_model[word]  for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))

        elif self.embedding_type == 'fasttext':
            # FastText embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(sentiment_dataset.embedding_model['a'].shape[0]) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")
        # print()
        return np.stack(embeddings)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):


        index = str(index)
        text = self.data[index]["text"]
        labels = self.data[index]["labels"]

        # Convert text to embeddings
        text_embeddings = torch.tensor(self.text_to_embeddings(text))
        # print(text_embeddings.shape)
        # torch.stack([torch.full((1,text_embeddings.shape[1]),-1000),text_embeddings, [torch.full((1,text_embeddings.shape[1]),1000)])
        current_length = len(labels)

        labels = ['<START>'] + labels + ['<STOP>']

        sent_lengths =torch.tensor(len(labels))

        max_length = 100
        labels = labels + ['<PAD>'] * (max_length - (current_length+2))

        # Convert labels to numerical format if needed
        label_mapping = {'O': 0, 'B': 1, 'I':2,'<START>':3,'<STOP>':4,'<PAD>':5}                               # bind it to self__________________________
        numerical_labels = [label_mapping[label] for label in labels ]


        # Pad the sequence to the maximum length

        # Convert labels to PyTorch tensor
        labels_tensor = torch.tensor(numerical_labels)
        mask = torch.hstack([torch.full((text_embeddings.shape[0],),True),torch.full((100-text_embeddings.shape[0],),False)])
        # print(labels_tensor.shape,text_embeddings.shape,mask.shape)
        return text_embeddings, labels_tensor, mask,sent_lengths

model_state_dict = torch.load('t2_model4_glove.pt')

# Load the state dictionary into the model
model.load_state_dict(model_state_dict)

json_path = 'ATE_test.json'
embedding_type = 'glove'
sentiment_dataset_test = SentimentAnalysisDataset(json_path, embedding_type)
sentiment_dataset =sentiment_dataset_test
# sentiment_dataset_test.embedding_model = sentiment_dataset.embedding_model
batch_size  = 512
dataloader_test = DataLoader(sentiment_dataset_test, batch_size=batch_size, shuffle=True)
from tqdm import tqdm
from sklearn.metrics import f1_score
model.eval()
correct_predictions_val = 0
total_sentences_val = 0
predictions_r = []
traget_r = []
epoch=1
device='cuda'
with torch.no_grad():
    for sentence_in, targets, mask, sen_lengths in tqdm(dataloader_test, desc=f'Test Epoch {epoch + 1}/{300}', leave=False):
        sentence_in, targets, mask, sen_lengths = sentence_in.to(device), targets.to(device), mask.to(device), sen_lengths.to(device)

        # Prediction
        predictions_val = model.predict(sentence_in, mask, sen_lengths)
        correct_predictions_val += calculate_accuracy(predictions_val, targets, sen_lengths)
        temp_pred,temp_trag = aggregater(predictions_val, targets, sen_lengths)
        predictions_r.extend(temp_pred)
        traget_r.extend(temp_trag)

accuracy_val = correct_predictions_val / len(dataloader_test)  # Average over all sentences, not just batches
print()
print(f'Test Accuracy: {accuracy_val:.4f}')
print(f'Test F1:  {f1_score(traget_r, predictions_r, average="macro")}')


#T2-model4-word2vec

In [None]:
import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence


class BiLSTMCRF(nn.Module):
    def __init__(self, tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256):
        """ Initialize the model
        Args:
            sent_vocab (Vocab): vocabulary of words
            tag_vocab (Vocab): vocabulary of tags
            embed_size (int): embedding size
            hidden_size (int): hidden state size
        """
        super(BiLSTMCRF, self).__init__()

        self.dropout_rate = dropout_rate
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        # self.sent_vocab = sent_vocab
        self.tag_vocab = tag_vocab
        # self.embedding = nn.Embedding(len(sent_vocab), embed_size) print
        self.dropout = nn.Dropout(dropout_rate)
        self.encoder = nn.LSTM(input_size=embed_size, hidden_size=hidden_size, bidirectional=True)
        self.hidden2emit_score = nn.Linear(hidden_size * 2, len(self.tag_vocab))
        self.transition = nn.Parameter(torch.randn(len(self.tag_vocab), len(self.tag_vocab)))  # shape: (K, K)

    def forward(self, sentences,mask, tags, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            tags (tensor): corresponding tags, shape (b, len)
            sen_lengths (list): sentence lengths
        Returns:
            loss (tensor): loss on the batch, shape (b,)
        """
        # mask = (sentences != self.sent_vocab[self.sent_vocab.PAD])  # shape: (b, len)                        #$$$$$$$$$$$$$$$$$$$__________________
        sentences = sentences.transpose(0, 1)  # shape: (len, b)
        # print("forword--1",sentences.shape)
        # sentences = self.embedding(sentences)  # shape: (len, b, e)
        emit_score = self.encode(sentences, sen_lengths)  # shape: (b, len, K)
        # print("forword--2",sentences.shape)
        loss = self.cal_loss(tags, mask, emit_score)  # shape: (b,)
        return loss

    def encode(self, sentences, sent_lengths):
        """ BiLSTM Encoder
        Args:
            sentences (tensor): sentences with word embeddings, shape (len, b, e)
            sent_lengths (list): sentence lengths
        Returns:
            emit_score (tensor): emit score, shape (b, len, K)
        """
        # padded_sentences = pack_padded_sequence(sentences, sent_lengths)
        hidden_states, _ = self.encoder(sentences)
        # print(hidden_states.shape,"(((())))")
        hidden_states = hidden_states.permute(1,0,2)
        # hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
        # print(hidden_states.shape)
        emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
        emit_score = self.dropout(emit_score)  # shape: (b, len, K)
        return emit_score

    # def encode(self, sentences, sent_lengths):
    #   """ BiLSTM Encoder
    #   Args:
    #       sentences (tensor): sentences with word embeddings, shape (len, b, e)
    #       sent_lengths (list): sentence lengths
    #   Returns:
    #       emit_score (tensor): emit score, shape (b, len, K)
    #   """
    #   sorted_lengths, sorted_idx = torch.sort(sent_lengths, descending=True)
    #   sorted_sentences = sentences[:, sorted_idx, :]  # Sort the sentences based on lengths
    #   packed_sentences = pack_padded_sequence(sorted_sentences, sorted_lengths)
    #   hidden_states, _ = self.encoder(packed_sentences)
    #   hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
    #   emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
    #   emit_score = self.dropout(emit_score)  # shape: (b, len, K)
    #   return emit_score

    def cal_loss(self, tags, mask, emit_score):
        """ Calculate CRF loss
        Args:
            tags (tensor): a batch of tags, shape (b, len)
            mask (tensor): mask for the tags, shape (b, len), values in PAD position is 0
            emit_score (tensor): emit matrix, shape (b, len, K)
        Returns:
            loss (tensor): loss of the batch, shape (b,)
        """
        batch_size, sent_len = tags.shape
        # calculate score for the tags
        score = torch.gather(emit_score, dim=2, index=tags.unsqueeze(dim=2)).squeeze(dim=2)  # shape: (b, len)
        score[:, 1:] += self.transition[tags[:, :-1], tags[:, 1:]]
        total_score = (score * mask.type(torch.float)).sum(dim=1)  # shape: (b,)
        # calculate the scaling factor
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)
        fix_length = 100
        # for i in range(1, sent_len):
        for i in range(1, fix_length):
            n_unfinished = mask[:, i].sum()
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)
            emit_and_transition = emit_score[: n_unfinished, i].unsqueeze(dim=1) + self.transition  # shape: (uf, K, K)
            log_sum = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)
            max_v = log_sum.max(dim=1)[0].unsqueeze(dim=1)  # shape: (uf, 1, K)
            log_sum = log_sum - max_v  # shape: (uf, K, K)
            d_uf = max_v + torch.logsumexp(log_sum, dim=1).unsqueeze(dim=1)  # shape: (uf, 1, K)
            d = torch.cat((d_uf, d[n_unfinished:]), dim=0)
        d = d.squeeze(dim=1)  # shape: (b, K)
        max_d = d.max(dim=-1)[0]  # shape: (b,)
        d = max_d + torch.logsumexp(d - max_d.unsqueeze(dim=1), dim=1)  # shape: (b,)
        llk = total_score - d  # shape: (b,)
        loss = -llk  # shape: (b,)
        return loss


    def predict(self, sentences, mask, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            sen_lengths (list): sentence lengths
        Returns:
            tags (list[list[str]]): predicted tags for the batch
        """
        batch_size = sentences.shape[0]

        w = mask
        sentences = sentences.transpose(0, 1)

        emit_score = self.encode(sentences, sen_lengths)

        # Initialize the tags with all possible tag indices for each sentence in the batch
        tags = [[[i] for i in range(len(self.tag_vocab))]] * batch_size  # list, shape: (b, K, 1)

        # Initialize the first column of the dynamic programming matrix
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)

        # Use a fixed length (e.g., 100) instead of max(sen_lengths)
        fixed_length = 100

        # Iterate over the remaining columns of the dynamic programming matrix
        for i in range(1, fixed_length):
            # Calculate the number of unfinished sentences at the current position
            n_unfinished = mask[:, i].sum()

            # Slice the dynamic programming matrix for the unfinished sentences
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)

            # Compute emission and transition scores for the current position
            emit_and_transition = self.transition + emit_score[: n_unfinished, i].unsqueeze(dim=1)  # shape: (uf, K, K)

            # Compute the new values for the dynamic programming matrix
            new_d_uf = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)

            # Update the dynamic programming matrix and get the indices of maximum values
            d_uf, max_idx = torch.max(new_d_uf, dim=1)
            max_idx = max_idx.tolist()  # list, shape: (nf, K)

            # Update the tags for the unfinished sentences
            tags[: n_unfinished] = [[tags[b][k] + [j] for j, k in enumerate(max_idx[b])] for b in range(n_unfinished)]

            # Concatenate the new values to the dynamic programming matrix
            d = torch.cat((torch.unsqueeze(d_uf, dim=1), d[n_unfinished:]), dim=0)  # shape: (b, 1, K)

        # Remove the singleton dimension to get the final dynamic programming matrix
        d = d.squeeze(dim=1)  # shape: (b, K)

        # Get the indices of the maximum values in the final column of the matrix
        _, max_idx = torch.max(d, dim=1)  # shape: (b,)
        max_idx = max_idx.tolist()

        # Extract the predicted tags based on the maximum indices
        tags = [tags[b][k] for b, k in enumerate(max_idx)]

        # Print the predicted tags and sentence lengths for debugging
        # print(tags, sen_lengths, '((()))')

        return tags

tag_to_ix ={'O': 0, 'B': 1, 'I':2,'<START>':3,'<STOP>':4,'<PAD>':5}
# tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256)
model  = BiLSTMCRF(tag_to_ix,dropout_rate=0.5, embed_size=300, hidden_size=256)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

# Function to calculate accuracy
import torch

def calculate_accuracy(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0

    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]+1]
        trex = target[i][:sen_lengths[i]+1]
        acc += torch.sum(prex == trex)

    # Move the division outside the loop to calculate the average accuracy
    acc = acc.float() / (sum(sen_lengths)+10)
    # print(acc)
    return acc

def aggregater(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0
    aggr_pred = []
    aggr_targ = []
    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]]
        trex = target[i][:sen_lengths[i]]
        aggr_pred.extend(prex)
        aggr_targ.extend(trex)
    return aggr_pred,aggr_targ

import torch
from torch.utils.data import Dataset,DataLoader
import gensim.downloader as api
from torchtext.vocab import GloVe
# import fasttext
import numpy as np
# import fasttext.util
import json
class SentimentAnalysisDataset(Dataset):
    def __init__(self, json_path, embedding_type='word2vec',load=True):
        with open(json_path, 'r') as file:
            self.data = json.load(file)

        self.embedding_type = embedding_type
        if load:
          self.embedding_model =self.load_embedding_model()
        else:
          self.embedding_model = None

    def load_embedding_model(self):
        if self.embedding_type == 'word2vec':
            # Download the pre-trained Word2Vec model
            return api.load('word2vec-google-news-300')
        elif self.embedding_type == 'glove':
            # Download the pre-trained GloVe model (6B tokens, 300d)
            return GloVe(name='6B', dim=300)
        elif self.embedding_type == 'fasttext':
            # Load the pre-trained FastText model
            fasttext.util.download_model('en', if_exists='ignore')  # English
            ft = fasttext.load_model('cc.en.300.bin')
            return fasttext.load_model('cc.en.300.bin')  # Adjust the path based on your downloaded model
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")

    def text_to_embeddings(self, text):
        maxlen = 100
        if self.embedding_type == 'word2vec':
            # Word2Vec embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(self.embedding_model.vector_size) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((self.embedding_model.vector_size,),-1.0))


        elif self.embedding_type == 'glove':
            # GloVe embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(sentiment_dataset.embedding_model['a'].shape[0]) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))

        elif self.embedding_type == 'fasttext':
            # FastText embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(sentiment_dataset.embedding_model['a'].shape[0]) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")
        # print()
        return np.stack(embeddings)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):


        index = str(index)
        text = self.data[index]["text"]
        labels = self.data[index]["labels"]

        # Convert text to embeddings
        text_embeddings = torch.tensor(self.text_to_embeddings(text))
        # print(text_embeddings.shape)
        # torch.stack([torch.full((1,text_embeddings.shape[1]),-1000),text_embeddings, [torch.full((1,text_embeddings.shape[1]),1000)])
        current_length = len(labels)

        labels = ['<START>'] + labels + ['<STOP>']

        sent_lengths =torch.tensor(len(labels))

        max_length = 100
        labels = labels + ['<PAD>'] * (max_length - (current_length+2))

        # Convert labels to numerical format if needed
        label_mapping = {'O': 0, 'B': 1, 'I':2,'<START>':3,'<STOP>':4,'<PAD>':5}                               # bind it to self__________________________
        numerical_labels = [label_mapping[label] for label in labels ]


        # Pad the sequence to the maximum length

        # Convert labels to PyTorch tensor
        labels_tensor = torch.tensor(numerical_labels)
        mask = torch.hstack([torch.full((text_embeddings.shape[0],),True),torch.full((100-text_embeddings.shape[0],),False)])
        # print(labels_tensor.shape,text_embeddings.shape,mask.shape)
        return text_embeddings, labels_tensor, mask,sent_lengths

model_state_dict = torch.load('t2_model4_word2vec.pt')

# Load the state dictionary into the model
model.load_state_dict(model_state_dict)

json_path = 'ATE_test.json'
embedding_type = 'word2vec'
sentiment_dataset_test = SentimentAnalysisDataset(json_path, embedding_type)
sentiment_dataset =sentiment_dataset_test
# sentiment_dataset_test.embedding_model = sentiment_dataset.embedding_model
batch_size  = 512
dataloader_test = DataLoader(sentiment_dataset_test, batch_size=batch_size, shuffle=True)
from tqdm import tqdm
from sklearn.metrics import f1_score
model.eval()
correct_predictions_val = 0
total_sentences_val = 0
predictions_r = []
traget_r = []
epoch=1
device='cuda'
with torch.no_grad():
    for sentence_in, targets, mask, sen_lengths in tqdm(dataloader_test, desc=f'Test Epoch {epoch + 1}/{300}', leave=False):
        sentence_in, targets, mask, sen_lengths = sentence_in.to(device), targets.to(device), mask.to(device), sen_lengths.to(device)

        # Prediction
        predictions_val = model.predict(sentence_in, mask, sen_lengths)
        correct_predictions_val += calculate_accuracy(predictions_val, targets, sen_lengths)
        temp_pred,temp_trag = aggregater(predictions_val, targets, sen_lengths)
        predictions_r.extend(temp_pred)
        traget_r.extend(temp_trag)

accuracy_val = correct_predictions_val / len(dataloader_test)  # Average over all sentences, not just batches
print()
print(f'Test Accuracy: {accuracy_val:.4f}')
print(f'Test F1:  {f1_score(traget_r, predictions_r, average="macro")}')


#t1_model4_fasttext

In [None]:
import torch
from torch.utils.data import Dataset,DataLoader
import gensim.downloader as api
from torchtext.vocab import GloVe,FastText
import fasttext
import numpy as np
import fasttext.util
import json
import torch
from torch.utils.data import Dataset,DataLoader
import gensim.downloader as api
from torchtext.vocab import GloVe,FastText
import fasttext
import numpy as np
import fasttext.util
import json

class SentimentAnalysisDataset(Dataset):
    def __init__(self, json_path, embedding_type='word2vec',load=True):
        with open(json_path, 'r') as file:
            self.data = json.load(file)

        self.embedding_type = embedding_type
        if load:
          self.embedding_model =self.load_embedding_model()
        else:
          self.embedding_model = None

    def load_embedding_model(self):
        if self.embedding_type == 'word2vec':
            # Download the pre-trained Word2Vec model
            return api.load('word2vec-google-news-300')
        elif self.embedding_type == 'glove':
            # Download the pre-trained GloVe model (6B tokens, 300d)
            return GloVe(name='6B', dim=300)
        elif self.embedding_type == 'fasttext':
            # Load the pre-trained FastText model

            return FastText(language='en')
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")

    def text_to_embeddings(self, text):
        maxlen = 100
        if self.embedding_type == 'word2vec':
            # Word2Vec embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(self.embedding_model.vector_size) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((self.embedding_model.vector_size,),-1.0))


        elif self.embedding_type == 'glove':
            # GloVe embeddings

            embeddings = [self.embedding_model[word] for word in text.split() ]
            # print(np.stack(embeddings).shape)

            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))

        elif self.embedding_type == 'fasttext':
            # FastText embeddings
            embeddings = [self.embedding_model[word] for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")
        # print()
        return np.stack(embeddings)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):


        index = str(index)
        text = self.data[index]["text"]
        labels = self.data[index]["labels"]

        text,labels = preprocess_text(text,labels)
        # Convert text to embeddings
        text_embeddings = torch.tensor(self.text_to_embeddings(text))

        # print(text_embeddings.shape)
        # torch.stack([torch.full((1,text_embeddings.shape[1]),-1000),text_embeddings, [torch.full((1,text_embeddings.shape[1]),1000)])
        current_length = len(labels)
#         print(labels)
        labels = ['<START>'] + labels + ['<STOP>']
#         print(labels)
#         mask = torch.hstack([torch.full((len(labels),),True),torch.full((max(0,100-len(labels)),),False)])
        sent_lengths =torch.tensor(len(labels))
        max_length = 100
        labels = labels + ['<PAD>'] * (max_length - (current_length+2))

        # Convert labels to numerical format if needed
        label_mapping = {'O': 0, 'B_COURT': 1, 'I_COURT': 2, 'B_PETITIONER': 3, 'I_PETITIONER': 4, 'B_RESPONDENT': 5, 'I_RESPONDENT': 6, 'B_JUDGE': 7, 'I_JUDGE': 8, 'B_LAWYER': 9, 'I_LAWYER': 10, 'B_DATE': 11, 'I_DATE': 12, 'B_ORG': 13, 'I_ORG': 14, 'B_GPE': 15, 'I_GPE': 16, 'B_STATUTE': 17, 'I_STATUTE': 18, 'B_PROVISION': 19, 'I_PROVISION': 20, 'B_PRECEDENT': 21, 'I_PRECEDENT': 22, 'B_CASE_NUMBER': 23, 'I_CASE_NUMBER': 24, 'B_WITNESS': 25, 'I_WITNESS': 26, 'B_OTHER_PERSON': 27, 'I_OTHER_PERSON': 28, '<START>': 29, '<STOP>': 30, '<PAD>': 31}
#         label_mapping = {'O': 0, 'B_COURT': 1, 'I_COURT': 2, 'B_PETITIONER': 3, 'I_PETITIONER': 4, 'B_RESPONDENT': 5, 'I_RESPONDENT': 6, 'B_JUDGE': 7, 'I_JUDGE': 8, 'B_LAWYER': 9, 'I_LAWYER': 10,  '<START>': 11, '<STOP>': 12, '<PAD>': 13}
        numerical_labels = [label_mapping[label] for label in labels ]
#         print(numerical_labels)

        # Pad the sequence to the maximum length

        # Convert labels to PyTorch tensor
        labels_tensor = torch.tensor(numerical_labels)
        mask = torch.hstack([torch.full((text_embeddings.shape[0],),True),torch.full((100-text_embeddings.shape[0],),False)])
        # print(labels_tensor.shape,text_embeddings.shape,mask.shape)
        return text_embeddings, labels_tensor, mask,sent_lengths

import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence


class BiLSTMCRF(nn.Module):
    def __init__(self, tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256):
        """ Initialize the model
        Args:
            sent_vocab (Vocab): vocabulary of words
            tag_vocab (Vocab): vocabulary of tags
            embed_size (int): embedding size
            hidden_size (int): hidden state size
        """
        super(BiLSTMCRF, self).__init__()

        self.dropout_rate = dropout_rate
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        # self.sent_vocab = sent_vocab
        self.tag_vocab = tag_vocab
        # self.embedding = nn.Embedding(len(sent_vocab), embed_size) print
        self.dropout = nn.Dropout(dropout_rate)
        self.encoder = nn.LSTM(input_size=embed_size, hidden_size=hidden_size, bidirectional=True)
        self.hidden2emit_score = nn.Linear(hidden_size * 2, len(self.tag_vocab))
        self.transition = nn.Parameter(torch.randn(len(self.tag_vocab), len(self.tag_vocab)))  # shape: (K, K)

    def forward(self, sentences,mask, tags, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            tags (tensor): corresponding tags, shape (b, len)
            sen_lengths (list): sentence lengths
        Returns:
            loss (tensor): loss on the batch, shape (b,)
        """
        # mask = (sentences != self.sent_vocab[self.sent_vocab.PAD])  # shape: (b, len)                        #$$$$$$$$$$$$$$$$$$$__________________
        sentences = sentences.transpose(0, 1)  # shape: (len, b)
        # print("forword--1",sentences.shape)
        # sentences = self.embedding(sentences)  # shape: (len, b, e)
        emit_score = self.encode(sentences, sen_lengths)  # shape: (b, len, K)
        # print("forword--2",sentences.shape)
        loss = self.cal_loss(tags, mask, emit_score)  # shape: (b,)
        return loss

    def encode(self, sentences, sent_lengths):
        """ BiLSTM Encoder
        Args:
            sentences (tensor): sentences with word embeddings, shape (len, b, e)
            sent_lengths (list): sentence lengths
        Returns:
            emit_score (tensor): emit score, shape (b, len, K)
        """
        # padded_sentences = pack_padded_sequence(sentences, sent_lengths)
        hidden_states, _ = self.encoder(sentences)
        # print(hidden_states.shape,"(((())))")
        hidden_states = hidden_states.permute(1,0,2)
        # hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
        # print(hidden_states.shape)
        emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
        emit_score = self.dropout(emit_score)  # shape: (b, len, K)
        return emit_score

    # def encode(self, sentences, sent_lengths):
    #   """ BiLSTM Encoder
    #   Args:
    #       sentences (tensor): sentences with word embeddings, shape (len, b, e)
    #       sent_lengths (list): sentence lengths
    #   Returns:
    #       emit_score (tensor): emit score, shape (b, len, K)
    #   """
    #   sorted_lengths, sorted_idx = torch.sort(sent_lengths, descending=True)
    #   sorted_sentences = sentences[:, sorted_idx, :]  # Sort the sentences based on lengths
    #   packed_sentences = pack_padded_sequence(sorted_sentences, sorted_lengths)
    #   hidden_states, _ = self.encoder(packed_sentences)
    #   hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
    #   emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
    #   emit_score = self.dropout(emit_score)  # shape: (b, len, K)
    #   return emit_score

    def cal_loss(self, tags, mask, emit_score):
        """ Calculate CRF loss
        Args:
            tags (tensor): a batch of tags, shape (b, len)
            mask (tensor): mask for the tags, shape (b, len), values in PAD position is 0
            emit_score (tensor): emit matrix, shape (b, len, K)
        Returns:
            loss (tensor): loss of the batch, shape (b,)
        """
        batch_size, sent_len = tags.shape
        # calculate score for the tags
        score = torch.gather(emit_score, dim=2, index=tags.unsqueeze(dim=2)).squeeze(dim=2)  # shape: (b, len)
        score[:, 1:] += self.transition[tags[:, :-1], tags[:, 1:]]
        total_score = (score * mask.type(torch.float)).sum(dim=1)  # shape: (b,)
        # calculate the scaling factor
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)
        fix_length = 100
        for i in range(1, fix_length):
            n_unfinished = mask[:, i].sum()
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)
            emit_and_transition = emit_score[: n_unfinished, i].unsqueeze(dim=1) + self.transition  # shape: (uf, K, K)
            log_sum = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)
            max_v = log_sum.max(dim=1)[0].unsqueeze(dim=1)  # shape: (uf, 1, K)
            log_sum = log_sum - max_v  # shape: (uf, K, K)
            d_uf = max_v + torch.logsumexp(log_sum, dim=1).unsqueeze(dim=1)  # shape: (uf, 1, K)
            d = torch.cat((d_uf, d[n_unfinished:]), dim=0)
        d = d.squeeze(dim=1)  # shape: (b, K)
        max_d = d.max(dim=-1)[0]  # shape: (b,)
        d = max_d + torch.logsumexp(d - max_d.unsqueeze(dim=1), dim=1)  # shape: (b,)
        llk = total_score - d  # shape: (b,)
        loss = -llk  # shape: (b,)
        return loss


    def predict(self, sentences, mask, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            sen_lengths (list): sentence lengths
        Returns:
            tags (list[list[str]]): predicted tags for the batch
        """
        batch_size = sentences.shape[0]

        w = mask
        sentences = sentences.transpose(0, 1)

        emit_score = self.encode(sentences, sen_lengths)

        # Initialize the tags with all possible tag indices for each sentence in the batch
        tags = [[[i] for i in range(len(self.tag_vocab))]] * batch_size  # list, shape: (b, K, 1)

        # Initialize the first column of the dynamic programming matrix
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)

        # Use a fixed length (e.g., 100) instead of max(sen_lengths)
        fixed_length = 100

        # Iterate over the remaining columns of the dynamic programming matrix
        for i in range(1, fixed_length):
            # Calculate the number of unfinished sentences at the current position
            n_unfinished = mask[:, i].sum()

            # Slice the dynamic programming matrix for the unfinished sentences
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)

            # Compute emission and transition scores for the current position
            emit_and_transition = self.transition + emit_score[: n_unfinished, i].unsqueeze(dim=1)  # shape: (uf, K, K)

            # Compute the new values for the dynamic programming matrix
            new_d_uf = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)

            # Update the dynamic programming matrix and get the indices of maximum values
            d_uf, max_idx = torch.max(new_d_uf, dim=1)
            max_idx = max_idx.tolist()  # list, shape: (nf, K)

            # Update the tags for the unfinished sentences
            tags[: n_unfinished] = [[tags[b][k] + [j] for j, k in enumerate(max_idx[b])] for b in range(n_unfinished)]

            # Concatenate the new values to the dynamic programming matrix
            d = torch.cat((torch.unsqueeze(d_uf, dim=1), d[n_unfinished:]), dim=0)  # shape: (b, 1, K)

        # Remove the singleton dimension to get the final dynamic programming matrix
        d = d.squeeze(dim=1)  # shape: (b, K)

        # Get the indices of the maximum values in the final column of the matrix
        _, max_idx = torch.max(d, dim=1)  # shape: (b,)
        max_idx = max_idx.tolist()

        # Extract the predicted tags based on the maximum indices
        tags = [tags[b][k] for b, k in enumerate(max_idx)]

        # Print the predicted tags and sentence lengths for debugging
        # print(tags, sen_lengths, '((()))')

        return tags


# Function to calculate accuracy
import torch

def calculate_accuracy(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0

    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]+1]
        trex = target[i][:sen_lengths[i]+1]
        acc += torch.sum(prex == trex)

    # Move the division outside the loop to calculate the average accuracy
    acc = acc.float() / (sum(sen_lengths)+10)
    # print(acc)
    return acc

def aggregater(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0
    aggr_pred = []
    aggr_targ = []
    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]]
        trex = target[i][:sen_lengths[i]]
        aggr_pred.extend(prex)
        aggr_targ.extend(trex)
    return aggr_pred,aggr_targ
import json
import string
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
nltk.download('stopwords')

def preprocess_text(text,label):
    # Remove punctuation
    text_no_punct = ''
    for char in text:
        if char not in string.punctuation:
            text_no_punct += char

    # Check if the text length is zero after removing punctuation
    if len(text_no_punct.strip()) == 0:
        return text

    # Lowercase the text
    text_lower = text_no_punct.lower()

    # Remove stopwords
    stop_words = set(stopwords.words('english'))
    tokens = text_lower.split()

    text_no_stopwords = ''
    labels =[]
    for word in range(len(tokens)):
        if not(tokens[word].lower() in stop_words and label[word]== 'O'):
            text_no_stopwords += tokens[word] + ' '
            labels.append(label[word])

    return text_no_stopwords.strip(),labels

tag_to_ix = {'O': 0, 'B_COURT': 1, 'I_COURT': 2, 'B_PETITIONER': 3, 'I_PETITIONER': 4, 'B_RESPONDENT': 5, 'I_RESPONDENT': 6, 'B_JUDGE': 7, 'I_JUDGE': 8, 'B_LAWYER': 9, 'I_LAWYER': 10, 'B_DATE': 11, 'I_DATE': 12, 'B_ORG': 13, 'I_ORG': 14, 'B_GPE': 15, 'I_GPE': 16, 'B_STATUTE': 17, 'I_STATUTE': 18, 'B_PROVISION': 19, 'I_PROVISION': 20, 'B_PRECEDENT': 21, 'I_PRECEDENT': 22, 'B_CASE_NUMBER': 23, 'I_CASE_NUMBER': 24, 'B_WITNESS': 25, 'I_WITNESS': 26, 'B_OTHER_PERSON': 27, 'I_OTHER_PERSON': 28, '<START>': 29, '<STOP>': 30, '<PAD>': 31}
# tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256)
model  = BiLSTMCRF(tag_to_ix,dropout_rate=0.5, embed_size=300, hidden_size=256)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model_state_dict = torch.load('t1_model4_fasttext.pt')

# Load the state dictionary into the model
model.load_state_dict(model_state_dict)

json_path = 'NER_test.json'
embedding_type = 'fasttext'
sentiment_dataset_test = SentimentAnalysisDataset(json_path, embedding_type)
sentiment_dataset =sentiment_dataset_test
# sentiment_dataset_test.embedding_model = sentiment_dataset.embedding_model
batch_size  = 512
dataloader_test = DataLoader(sentiment_dataset_test, batch_size=batch_size, shuffle=True)
from tqdm import tqdm
from sklearn.metrics import f1_score
model.eval()
correct_predictions_val = 0
total_sentences_val = 0
predictions_r = []
traget_r = []
epoch=1
device='cuda'
with torch.no_grad():
    for sentence_in, targets, mask, sen_lengths in tqdm(dataloader_test, desc=f'Test Epoch {epoch + 1}/{300}', leave=False):
        sentence_in, targets, mask, sen_lengths = sentence_in.to(device), targets.to(device), mask.to(device), sen_lengths.to(device)

        # Prediction
        predictions_val = model.predict(sentence_in, mask, sen_lengths)
        correct_predictions_val += calculate_accuracy(predictions_val, targets, sen_lengths)
        temp_pred,temp_trag = aggregater(predictions_val, targets, sen_lengths)
        predictions_r.extend(temp_pred)
        traget_r.extend(temp_trag)

accuracy_val = correct_predictions_val / len(dataloader_test)  # Average over all sentences, not just batches
print()
print(f'Test Accuracy: {accuracy_val:.4f}')
print(f'Test F1:  {f1_score(traget_r, predictions_r, average="macro")}')
print(f1_score(traget_r, predictions_r, average=None))

#t1_model4_Glove

In [None]:
import torch
from torch.utils.data import Dataset,DataLoader
import gensim.downloader as api
from torchtext.vocab import GloVe,FastText
import fasttext
import numpy as np
import fasttext.util
import json
import torch
from torch.utils.data import Dataset,DataLoader
import gensim.downloader as api
from torchtext.vocab import GloVe,FastText
import fasttext
import numpy as np
import fasttext.util
import json

import torch
from torch.utils.data import Dataset,DataLoader
# import gensim.downloader as api
from torchtext.vocab import GloVe
# import fasttext
import numpy as np
#import fasttext.util
import json
class SentimentAnalysisDataset(Dataset):
    def __init__(self, json_path, embedding_type='word2vec',load=True):
        with open(json_path, 'r') as file:
            self.data = json.load(file)

        self.embedding_type = embedding_type
        if load:
          self.embedding_model =self.load_embedding_model()
        else:
          self.embedding_model = None

    def load_embedding_model(self):
        if self.embedding_type == 'word2vec':
            # Download the pre-trained Word2Vec model
            return api.load('word2vec-google-news-300')
        elif self.embedding_type == 'glove':
            # Download the pre-trained GloVe model (6B tokens, 300d)
            return GloVe(name='6B', dim=300)
        elif self.embedding_type == 'fasttext':
            # Load the pre-trained FastText model
            fasttext.util.download_model('en', if_exists='ignore')  # English
            ft = fasttext.load_model('cc.en.300.bin')
            return fasttext.load_model('cc.en.300.bin')  # Adjust the path based on your downloaded model
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")

    def text_to_embeddings(self, text):
        maxlen = 100
        if self.embedding_type == 'word2vec':
            # Word2Vec embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(self.embedding_model.vector_size) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((self.embedding_model.vector_size,),-1.0))


        elif self.embedding_type == 'glove':
            # GloVe embeddings

            embeddings = [self.embedding_model[word] for word in text.split() ]
            # print(np.stack(embeddings).shape)

            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))

        elif self.embedding_type == 'fasttext':
            # FastText embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(sentiment_dataset.embedding_model['a'].shape[0]) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")
        # print()
        return np.stack(embeddings)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):


        index = str(index)
        text = self.data[index]["text"]
        labels = self.data[index]["labels"]

        text,labels = preprocess_text(text,labels)
        # Convert text to embeddings
        text_embeddings = torch.tensor(self.text_to_embeddings(text))

        # print(text_embeddings.shape)
        # torch.stack([torch.full((1,text_embeddings.shape[1]),-1000),text_embeddings, [torch.full((1,text_embeddings.shape[1]),1000)])
        current_length = len(labels)
#         print(labels)
        labels = ['<START>'] + labels + ['<STOP>']
#         print(labels)
#         mask = torch.hstack([torch.full((len(labels),),True),torch.full((max(0,100-len(labels)),),False)])
        sent_lengths =torch.tensor(len(labels))
        max_length = 100
        labels = labels + ['<PAD>'] * (max_length - (current_length+2))

        # Convert labels to numerical format if needed
        label_mapping = {'O': 0, 'B_COURT': 1, 'I_COURT': 2, 'B_PETITIONER': 3, 'I_PETITIONER': 4, 'B_RESPONDENT': 5, 'I_RESPONDENT': 6, 'B_JUDGE': 7, 'I_JUDGE': 8, 'B_LAWYER': 9, 'I_LAWYER': 10, 'B_DATE': 11, 'I_DATE': 12, 'B_ORG': 13, 'I_ORG': 14, 'B_GPE': 15, 'I_GPE': 16, 'B_STATUTE': 17, 'I_STATUTE': 18, 'B_PROVISION': 19, 'I_PROVISION': 20, 'B_PRECEDENT': 21, 'I_PRECEDENT': 22, 'B_CASE_NUMBER': 23, 'I_CASE_NUMBER': 24, 'B_WITNESS': 25, 'I_WITNESS': 26, 'B_OTHER_PERSON': 27, 'I_OTHER_PERSON': 28, '<START>': 29, '<STOP>': 30, '<PAD>': 31}
#         label_mapping = {'O': 0, 'B_COURT': 1, 'I_COURT': 2, 'B_PETITIONER': 3, 'I_PETITIONER': 4, 'B_RESPONDENT': 5, 'I_RESPONDENT': 6, 'B_JUDGE': 7, 'I_JUDGE': 8, 'B_LAWYER': 9, 'I_LAWYER': 10,  '<START>': 11, '<STOP>': 12, '<PAD>': 13}
        numerical_labels = [label_mapping[label] for label in labels ]
#         print(numerical_labels)

        # Pad the sequence to the maximum length

        # Convert labels to PyTorch tensor
        labels_tensor = torch.tensor(numerical_labels)
        mask = torch.hstack([torch.full((text_embeddings.shape[0],),True),torch.full((100-text_embeddings.shape[0],),False)])
        # print(labels_tensor.shape,text_embeddings.shape,mask.shape)
        return text_embeddings, labels_tensor, mask,sent_lengths

import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence


class BiLSTMCRF(nn.Module):
    def __init__(self, tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256):
        """ Initialize the model
        Args:
            sent_vocab (Vocab): vocabulary of words
            tag_vocab (Vocab): vocabulary of tags
            embed_size (int): embedding size
            hidden_size (int): hidden state size
        """
        super(BiLSTMCRF, self).__init__()

        self.dropout_rate = dropout_rate
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        # self.sent_vocab = sent_vocab
        self.tag_vocab = tag_vocab
        # self.embedding = nn.Embedding(len(sent_vocab), embed_size) print
        self.dropout = nn.Dropout(dropout_rate)
        self.encoder = nn.LSTM(input_size=embed_size, hidden_size=hidden_size, bidirectional=True)
        self.hidden2emit_score = nn.Linear(hidden_size * 2, len(self.tag_vocab))
        self.transition = nn.Parameter(torch.randn(len(self.tag_vocab), len(self.tag_vocab)))  # shape: (K, K)

    def forward(self, sentences,mask, tags, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            tags (tensor): corresponding tags, shape (b, len)
            sen_lengths (list): sentence lengths
        Returns:
            loss (tensor): loss on the batch, shape (b,)
        """
        # mask = (sentences != self.sent_vocab[self.sent_vocab.PAD])  # shape: (b, len)                        #$$$$$$$$$$$$$$$$$$$__________________
        sentences = sentences.transpose(0, 1)  # shape: (len, b)
        # print("forword--1",sentences.shape)
        # sentences = self.embedding(sentences)  # shape: (len, b, e)
        emit_score = self.encode(sentences, sen_lengths)  # shape: (b, len, K)
        # print("forword--2",sentences.shape)
        loss = self.cal_loss(tags, mask, emit_score)  # shape: (b,)
        return loss

    def encode(self, sentences, sent_lengths):
        """ BiLSTM Encoder
        Args:
            sentences (tensor): sentences with word embeddings, shape (len, b, e)
            sent_lengths (list): sentence lengths
        Returns:
            emit_score (tensor): emit score, shape (b, len, K)
        """
        # padded_sentences = pack_padded_sequence(sentences, sent_lengths)
        hidden_states, _ = self.encoder(sentences)
        # print(hidden_states.shape,"(((())))")
        hidden_states = hidden_states.permute(1,0,2)
        # hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
        # print(hidden_states.shape)
        emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
        emit_score = self.dropout(emit_score)  # shape: (b, len, K)
        return emit_score

    # def encode(self, sentences, sent_lengths):
    #   """ BiLSTM Encoder
    #   Args:
    #       sentences (tensor): sentences with word embeddings, shape (len, b, e)
    #       sent_lengths (list): sentence lengths
    #   Returns:
    #       emit_score (tensor): emit score, shape (b, len, K)
    #   """
    #   sorted_lengths, sorted_idx = torch.sort(sent_lengths, descending=True)
    #   sorted_sentences = sentences[:, sorted_idx, :]  # Sort the sentences based on lengths
    #   packed_sentences = pack_padded_sequence(sorted_sentences, sorted_lengths)
    #   hidden_states, _ = self.encoder(packed_sentences)
    #   hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
    #   emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
    #   emit_score = self.dropout(emit_score)  # shape: (b, len, K)
    #   return emit_score

    def cal_loss(self, tags, mask, emit_score):
        """ Calculate CRF loss
        Args:
            tags (tensor): a batch of tags, shape (b, len)
            mask (tensor): mask for the tags, shape (b, len), values in PAD position is 0
            emit_score (tensor): emit matrix, shape (b, len, K)
        Returns:
            loss (tensor): loss of the batch, shape (b,)
        """
        batch_size, sent_len = tags.shape
        # calculate score for the tags
        score = torch.gather(emit_score, dim=2, index=tags.unsqueeze(dim=2)).squeeze(dim=2)  # shape: (b, len)
        score[:, 1:] += self.transition[tags[:, :-1], tags[:, 1:]]
        total_score = (score * mask.type(torch.float)).sum(dim=1)  # shape: (b,)
        # calculate the scaling factor
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)
        fix_length = 100
        for i in range(1, fix_length):
            n_unfinished = mask[:, i].sum()
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)
            emit_and_transition = emit_score[: n_unfinished, i].unsqueeze(dim=1) + self.transition  # shape: (uf, K, K)
            log_sum = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)
            max_v = log_sum.max(dim=1)[0].unsqueeze(dim=1)  # shape: (uf, 1, K)
            log_sum = log_sum - max_v  # shape: (uf, K, K)
            d_uf = max_v + torch.logsumexp(log_sum, dim=1).unsqueeze(dim=1)  # shape: (uf, 1, K)
            d = torch.cat((d_uf, d[n_unfinished:]), dim=0)
        d = d.squeeze(dim=1)  # shape: (b, K)
        max_d = d.max(dim=-1)[0]  # shape: (b,)
        d = max_d + torch.logsumexp(d - max_d.unsqueeze(dim=1), dim=1)  # shape: (b,)
        llk = total_score - d  # shape: (b,)
        loss = -llk  # shape: (b,)
        return loss


    def predict(self, sentences, mask, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            sen_lengths (list): sentence lengths
        Returns:
            tags (list[list[str]]): predicted tags for the batch
        """
        batch_size = sentences.shape[0]

        w = mask
        sentences = sentences.transpose(0, 1)

        emit_score = self.encode(sentences, sen_lengths)

        # Initialize the tags with all possible tag indices for each sentence in the batch
        tags = [[[i] for i in range(len(self.tag_vocab))]] * batch_size  # list, shape: (b, K, 1)

        # Initialize the first column of the dynamic programming matrix
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)

        # Use a fixed length (e.g., 100) instead of max(sen_lengths)
        fixed_length = 100

        # Iterate over the remaining columns of the dynamic programming matrix
        for i in range(1, fixed_length):
            # Calculate the number of unfinished sentences at the current position
            n_unfinished = mask[:, i].sum()

            # Slice the dynamic programming matrix for the unfinished sentences
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)

            # Compute emission and transition scores for the current position
            emit_and_transition = self.transition + emit_score[: n_unfinished, i].unsqueeze(dim=1)  # shape: (uf, K, K)

            # Compute the new values for the dynamic programming matrix
            new_d_uf = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)

            # Update the dynamic programming matrix and get the indices of maximum values
            d_uf, max_idx = torch.max(new_d_uf, dim=1)
            max_idx = max_idx.tolist()  # list, shape: (nf, K)

            # Update the tags for the unfinished sentences
            tags[: n_unfinished] = [[tags[b][k] + [j] for j, k in enumerate(max_idx[b])] for b in range(n_unfinished)]

            # Concatenate the new values to the dynamic programming matrix
            d = torch.cat((torch.unsqueeze(d_uf, dim=1), d[n_unfinished:]), dim=0)  # shape: (b, 1, K)

        # Remove the singleton dimension to get the final dynamic programming matrix
        d = d.squeeze(dim=1)  # shape: (b, K)

        # Get the indices of the maximum values in the final column of the matrix
        _, max_idx = torch.max(d, dim=1)  # shape: (b,)
        max_idx = max_idx.tolist()

        # Extract the predicted tags based on the maximum indices
        tags = [tags[b][k] for b, k in enumerate(max_idx)]

        # Print the predicted tags and sentence lengths for debugging
        # print(tags, sen_lengths, '((()))')

        return tags


# Function to calculate accuracy
import torch

def calculate_accuracy(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0

    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]+1]
        trex = target[i][:sen_lengths[i]+1]
        acc += torch.sum(prex == trex)

    # Move the division outside the loop to calculate the average accuracy
    acc = acc.float() / (sum(sen_lengths)+10)
    # print(acc)
    return acc

def aggregater(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0
    aggr_pred = []
    aggr_targ = []
    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]]
        trex = target[i][:sen_lengths[i]]
        aggr_pred.extend(prex)
        aggr_targ.extend(trex)
    return aggr_pred,aggr_targ
import json
import string
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
nltk.download('stopwords')

def preprocess_text(text,label):
    # Remove punctuation
    text_no_punct = ''
    for char in text:
        if char not in string.punctuation:
            text_no_punct += char

    # Check if the text length is zero after removing punctuation
    if len(text_no_punct.strip()) == 0:
        return text

    # Lowercase the text
    text_lower = text_no_punct.lower()

    # Remove stopwords
    stop_words = set(stopwords.words('english'))
    tokens = text_lower.split()

    text_no_stopwords = ''
    labels =[]
    for word in range(len(tokens)):
        if not(tokens[word].lower() in stop_words and label[word]== 'O'):
            text_no_stopwords += tokens[word] + ' '
            labels.append(label[word])

    return text_no_stopwords.strip(),labels

tag_to_ix = {'O': 0, 'B_COURT': 1, 'I_COURT': 2, 'B_PETITIONER': 3, 'I_PETITIONER': 4, 'B_RESPONDENT': 5, 'I_RESPONDENT': 6, 'B_JUDGE': 7, 'I_JUDGE': 8, 'B_LAWYER': 9, 'I_LAWYER': 10, 'B_DATE': 11, 'I_DATE': 12, 'B_ORG': 13, 'I_ORG': 14, 'B_GPE': 15, 'I_GPE': 16, 'B_STATUTE': 17, 'I_STATUTE': 18, 'B_PROVISION': 19, 'I_PROVISION': 20, 'B_PRECEDENT': 21, 'I_PRECEDENT': 22, 'B_CASE_NUMBER': 23, 'I_CASE_NUMBER': 24, 'B_WITNESS': 25, 'I_WITNESS': 26, 'B_OTHER_PERSON': 27, 'I_OTHER_PERSON': 28, '<START>': 29, '<STOP>': 30, '<PAD>': 31}
# tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256)
model  = BiLSTMCRF(tag_to_ix,dropout_rate=0.5, embed_size=300, hidden_size=256)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model_state_dict = torch.load('t1_model4_glove.pt')

# Load the state dictionary into the model
model.load_state_dict(model_state_dict)

json_path = 'NER_test.json'
embedding_type = 'glove'
sentiment_dataset_test = SentimentAnalysisDataset(json_path, embedding_type)
sentiment_dataset =sentiment_dataset_test
# sentiment_dataset_test.embedding_model = sentiment_dataset.embedding_model
batch_size  = 512
dataloader_test = DataLoader(sentiment_dataset_test, batch_size=batch_size, shuffle=True)
from tqdm import tqdm
from sklearn.metrics import f1_score
model.eval()
correct_predictions_val = 0
total_sentences_val = 0
predictions_r = []
traget_r = []
epoch=1
device='cuda'
with torch.no_grad():
    for sentence_in, targets, mask, sen_lengths in tqdm(dataloader_test, desc=f'Test Epoch {epoch + 1}/{300}', leave=False):
        sentence_in, targets, mask, sen_lengths = sentence_in.to(device), targets.to(device), mask.to(device), sen_lengths.to(device)

        # Prediction
        predictions_val = model.predict(sentence_in, mask, sen_lengths)
        correct_predictions_val += calculate_accuracy(predictions_val, targets, sen_lengths)
        temp_pred,temp_trag = aggregater(predictions_val, targets, sen_lengths)
        predictions_r.extend(temp_pred)
        traget_r.extend(temp_trag)

accuracy_val = correct_predictions_val / len(dataloader_test)  # Average over all sentences, not just batches
print()
print(f'Test Accuracy: {accuracy_val:.4f}')
print(f'Test F1:  {f1_score(traget_r, predictions_r, average="macro")}')



#t1_model4_word2vec

In [None]:
import torch
from torch.utils.data import Dataset,DataLoader
import gensim.downloader as api
from torchtext.vocab import GloVe,FastText
import fasttext
import numpy as np
import fasttext.util
import json
import torch
from torch.utils.data import Dataset,DataLoader
import gensim.downloader as api
from torchtext.vocab import GloVe,FastText
import fasttext
import numpy as np
import fasttext.util
import json

import torch
from torch.utils.data import Dataset,DataLoader
# import gensim.downloader as api
from torchtext.vocab import GloVe
# import fasttext
import numpy as np
#import fasttext.util
import json
class SentimentAnalysisDataset(Dataset):
    def __init__(self, json_path, embedding_type='word2vec',load=True):
        with open(json_path, 'r') as file:
            self.data = json.load(file)

        self.embedding_type = embedding_type
        if load:
          self.embedding_model =self.load_embedding_model()
        else:
          self.embedding_model = None

    def load_embedding_model(self):
        if self.embedding_type == 'word2vec':
            # Download the pre-trained Word2Vec model
            return api.load('word2vec-google-news-300')
        elif self.embedding_type == 'glove':
            # Download the pre-trained GloVe model (6B tokens, 300d)
            return GloVe(name='6B', dim=300)
        elif self.embedding_type == 'fasttext':
            # Load the pre-trained FastText model
            fasttext.util.download_model('en', if_exists='ignore')  # English
            ft = fasttext.load_model('cc.en.300.bin')
            return fasttext.load_model('cc.en.300.bin')  # Adjust the path based on your downloaded model
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")

    def text_to_embeddings(self, text):
        maxlen = 100
        if self.embedding_type == 'word2vec':
            # Word2Vec embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(self.embedding_model.vector_size) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((self.embedding_model.vector_size,),-1.0))


        elif self.embedding_type == 'glove':
            # GloVe embeddings

            embeddings = [self.embedding_model[word] for word in text.split() ]
            # print(np.stack(embeddings).shape)

            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))

        elif self.embedding_type == 'fasttext':
            # FastText embeddings
            embeddings = [self.embedding_model[word] if word in self.embedding_model else torch.zeros(sentiment_dataset.embedding_model['a'].shape[0]) for word in text.split() ]
            # print(np.stack(embeddings).shape)
            embeddings = [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1000.0)] + embeddings + [torch.full((sentiment_dataset.embedding_model['a'].shape[0],),+1000.0)]

            # print('##',np.stack([torch.full((self.embedding_model.vector_size,),-1000.0)] + embeddings + [torch.full((self.embedding_model.vector_size,),+1000.0)]  ).shape)

            for i in range(100-len(embeddings)):
              embeddings.append(torch.full((sentiment_dataset.embedding_model['a'].shape[0],),-1.0))
        else:
            raise ValueError(f"Unsupported embedding type: {self.embedding_type}")
        # print()
        return np.stack(embeddings)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):


        index = str(index)
        text = self.data[index]["text"]
        labels = self.data[index]["labels"]

        text,labels = preprocess_text(text,labels)
        # Convert text to embeddings
        text_embeddings = torch.tensor(self.text_to_embeddings(text))

        # print(text_embeddings.shape)
        # torch.stack([torch.full((1,text_embeddings.shape[1]),-1000),text_embeddings, [torch.full((1,text_embeddings.shape[1]),1000)])
        current_length = len(labels)
#         print(labels)
        labels = ['<START>'] + labels + ['<STOP>']
#         print(labels)
#         mask = torch.hstack([torch.full((len(labels),),True),torch.full((max(0,100-len(labels)),),False)])
        sent_lengths =torch.tensor(len(labels))
        max_length = 100
        labels = labels + ['<PAD>'] * (max_length - (current_length+2))

        # Convert labels to numerical format if needed
        label_mapping = {'O': 0, 'B_COURT': 1, 'I_COURT': 2, 'B_PETITIONER': 3, 'I_PETITIONER': 4, 'B_RESPONDENT': 5, 'I_RESPONDENT': 6, 'B_JUDGE': 7, 'I_JUDGE': 8, 'B_LAWYER': 9, 'I_LAWYER': 10, 'B_DATE': 11, 'I_DATE': 12, 'B_ORG': 13, 'I_ORG': 14, 'B_GPE': 15, 'I_GPE': 16, 'B_STATUTE': 17, 'I_STATUTE': 18, 'B_PROVISION': 19, 'I_PROVISION': 20, 'B_PRECEDENT': 21, 'I_PRECEDENT': 22, 'B_CASE_NUMBER': 23, 'I_CASE_NUMBER': 24, 'B_WITNESS': 25, 'I_WITNESS': 26, 'B_OTHER_PERSON': 27, 'I_OTHER_PERSON': 28, '<START>': 29, '<STOP>': 30, '<PAD>': 31}
#         label_mapping = {'O': 0, 'B_COURT': 1, 'I_COURT': 2, 'B_PETITIONER': 3, 'I_PETITIONER': 4, 'B_RESPONDENT': 5, 'I_RESPONDENT': 6, 'B_JUDGE': 7, 'I_JUDGE': 8, 'B_LAWYER': 9, 'I_LAWYER': 10,  '<START>': 11, '<STOP>': 12, '<PAD>': 13}
        numerical_labels = [label_mapping[label] for label in labels ]
#         print(numerical_labels)

        # Pad the sequence to the maximum length

        # Convert labels to PyTorch tensor
        labels_tensor = torch.tensor(numerical_labels)
        mask = torch.hstack([torch.full((text_embeddings.shape[0],),True),torch.full((100-text_embeddings.shape[0],),False)])
        # print(labels_tensor.shape,text_embeddings.shape,mask.shape)
        return text_embeddings, labels_tensor, mask,sent_lengths

import torch
import torch.nn as nn
from torch.nn.utils.rnn import pack_padded_sequence, pad_packed_sequence


class BiLSTMCRF(nn.Module):
    def __init__(self, tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256):
        """ Initialize the model
        Args:
            sent_vocab (Vocab): vocabulary of words
            tag_vocab (Vocab): vocabulary of tags
            embed_size (int): embedding size
            hidden_size (int): hidden state size
        """
        super(BiLSTMCRF, self).__init__()

        self.dropout_rate = dropout_rate
        self.embed_size = embed_size
        self.hidden_size = hidden_size
        # self.sent_vocab = sent_vocab
        self.tag_vocab = tag_vocab
        # self.embedding = nn.Embedding(len(sent_vocab), embed_size) print
        self.dropout = nn.Dropout(dropout_rate)
        self.encoder = nn.LSTM(input_size=embed_size, hidden_size=hidden_size, bidirectional=True)
        self.hidden2emit_score = nn.Linear(hidden_size * 2, len(self.tag_vocab))
        self.transition = nn.Parameter(torch.randn(len(self.tag_vocab), len(self.tag_vocab)))  # shape: (K, K)

    def forward(self, sentences,mask, tags, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            tags (tensor): corresponding tags, shape (b, len)
            sen_lengths (list): sentence lengths
        Returns:
            loss (tensor): loss on the batch, shape (b,)
        """
        # mask = (sentences != self.sent_vocab[self.sent_vocab.PAD])  # shape: (b, len)                        #$$$$$$$$$$$$$$$$$$$__________________
        sentences = sentences.transpose(0, 1)  # shape: (len, b)
        # print("forword--1",sentences.shape)
        # sentences = self.embedding(sentences)  # shape: (len, b, e)
        emit_score = self.encode(sentences, sen_lengths)  # shape: (b, len, K)
        # print("forword--2",sentences.shape)
        loss = self.cal_loss(tags, mask, emit_score)  # shape: (b,)
        return loss

    def encode(self, sentences, sent_lengths):
        """ BiLSTM Encoder
        Args:
            sentences (tensor): sentences with word embeddings, shape (len, b, e)
            sent_lengths (list): sentence lengths
        Returns:
            emit_score (tensor): emit score, shape (b, len, K)
        """
        # padded_sentences = pack_padded_sequence(sentences, sent_lengths)
        hidden_states, _ = self.encoder(sentences)
        # print(hidden_states.shape,"(((())))")
        hidden_states = hidden_states.permute(1,0,2)
        # hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
        # print(hidden_states.shape)
        emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
        emit_score = self.dropout(emit_score)  # shape: (b, len, K)
        return emit_score

    # def encode(self, sentences, sent_lengths):
    #   """ BiLSTM Encoder
    #   Args:
    #       sentences (tensor): sentences with word embeddings, shape (len, b, e)
    #       sent_lengths (list): sentence lengths
    #   Returns:
    #       emit_score (tensor): emit score, shape (b, len, K)
    #   """
    #   sorted_lengths, sorted_idx = torch.sort(sent_lengths, descending=True)
    #   sorted_sentences = sentences[:, sorted_idx, :]  # Sort the sentences based on lengths
    #   packed_sentences = pack_padded_sequence(sorted_sentences, sorted_lengths)
    #   hidden_states, _ = self.encoder(packed_sentences)
    #   hidden_states, _ = pad_packed_sequence(hidden_states, batch_first=True)  # shape: (b, len, 2h)
    #   emit_score = self.hidden2emit_score(hidden_states)  # shape: (b, len, K)
    #   emit_score = self.dropout(emit_score)  # shape: (b, len, K)
    #   return emit_score

    def cal_loss(self, tags, mask, emit_score):
        """ Calculate CRF loss
        Args:
            tags (tensor): a batch of tags, shape (b, len)
            mask (tensor): mask for the tags, shape (b, len), values in PAD position is 0
            emit_score (tensor): emit matrix, shape (b, len, K)
        Returns:
            loss (tensor): loss of the batch, shape (b,)
        """
        batch_size, sent_len = tags.shape
        # calculate score for the tags
        score = torch.gather(emit_score, dim=2, index=tags.unsqueeze(dim=2)).squeeze(dim=2)  # shape: (b, len)
        score[:, 1:] += self.transition[tags[:, :-1], tags[:, 1:]]
        total_score = (score * mask.type(torch.float)).sum(dim=1)  # shape: (b,)
        # calculate the scaling factor
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)
        fix_length = 100
        for i in range(1, fix_length):
            n_unfinished = mask[:, i].sum()
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)
            emit_and_transition = emit_score[: n_unfinished, i].unsqueeze(dim=1) + self.transition  # shape: (uf, K, K)
            log_sum = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)
            max_v = log_sum.max(dim=1)[0].unsqueeze(dim=1)  # shape: (uf, 1, K)
            log_sum = log_sum - max_v  # shape: (uf, K, K)
            d_uf = max_v + torch.logsumexp(log_sum, dim=1).unsqueeze(dim=1)  # shape: (uf, 1, K)
            d = torch.cat((d_uf, d[n_unfinished:]), dim=0)
        d = d.squeeze(dim=1)  # shape: (b, K)
        max_d = d.max(dim=-1)[0]  # shape: (b,)
        d = max_d + torch.logsumexp(d - max_d.unsqueeze(dim=1), dim=1)  # shape: (b,)
        llk = total_score - d  # shape: (b,)
        loss = -llk  # shape: (b,)
        return loss


    def predict(self, sentences, mask, sen_lengths):
        """
        Args:
            sentences (tensor): sentences, shape (b, len). Lengths are in decreasing order, len is the length
                                of the longest sentence
            sen_lengths (list): sentence lengths
        Returns:
            tags (list[list[str]]): predicted tags for the batch
        """
        batch_size = sentences.shape[0]

        w = mask
        sentences = sentences.transpose(0, 1)

        emit_score = self.encode(sentences, sen_lengths)

        # Initialize the tags with all possible tag indices for each sentence in the batch
        tags = [[[i] for i in range(len(self.tag_vocab))]] * batch_size  # list, shape: (b, K, 1)

        # Initialize the first column of the dynamic programming matrix
        d = torch.unsqueeze(emit_score[:, 0], dim=1)  # shape: (b, 1, K)

        # Use a fixed length (e.g., 100) instead of max(sen_lengths)
        fixed_length = 100

        # Iterate over the remaining columns of the dynamic programming matrix
        for i in range(1, fixed_length):
            # Calculate the number of unfinished sentences at the current position
            n_unfinished = mask[:, i].sum()

            # Slice the dynamic programming matrix for the unfinished sentences
            d_uf = d[: n_unfinished]  # shape: (uf, 1, K)

            # Compute emission and transition scores for the current position
            emit_and_transition = self.transition + emit_score[: n_unfinished, i].unsqueeze(dim=1)  # shape: (uf, K, K)

            # Compute the new values for the dynamic programming matrix
            new_d_uf = d_uf.transpose(1, 2) + emit_and_transition  # shape: (uf, K, K)

            # Update the dynamic programming matrix and get the indices of maximum values
            d_uf, max_idx = torch.max(new_d_uf, dim=1)
            max_idx = max_idx.tolist()  # list, shape: (nf, K)

            # Update the tags for the unfinished sentences
            tags[: n_unfinished] = [[tags[b][k] + [j] for j, k in enumerate(max_idx[b])] for b in range(n_unfinished)]

            # Concatenate the new values to the dynamic programming matrix
            d = torch.cat((torch.unsqueeze(d_uf, dim=1), d[n_unfinished:]), dim=0)  # shape: (b, 1, K)

        # Remove the singleton dimension to get the final dynamic programming matrix
        d = d.squeeze(dim=1)  # shape: (b, K)

        # Get the indices of the maximum values in the final column of the matrix
        _, max_idx = torch.max(d, dim=1)  # shape: (b,)
        max_idx = max_idx.tolist()

        # Extract the predicted tags based on the maximum indices
        tags = [tags[b][k] for b, k in enumerate(max_idx)]

        # Print the predicted tags and sentence lengths for debugging
        # print(tags, sen_lengths, '((()))')

        return tags


# Function to calculate accuracy
import torch

def calculate_accuracy(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0

    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]+1]
        trex = target[i][:sen_lengths[i]+1]
        acc += torch.sum(prex == trex)

    # Move the division outside the loop to calculate the average accuracy
    acc = acc.float() / (sum(sen_lengths)+10)
    # print(acc)
    return acc

def aggregater(predictions, targets, sen_lengths):
    ranges = targets.shape[0]
    target = targets.cpu()
    predictions = torch.tensor(predictions).cpu()
    acc = 0
    aggr_pred = []
    aggr_targ = []
    for i in range(ranges):
        prex = predictions[i][:sen_lengths[i]]
        trex = target[i][:sen_lengths[i]]
        aggr_pred.extend(prex)
        aggr_targ.extend(trex)
    return aggr_pred,aggr_targ
import json
import string
import nltk
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
nltk.download('stopwords')

def preprocess_text(text,label):
    # Remove punctuation
    text_no_punct = ''
    for char in text:
        if char not in string.punctuation:
            text_no_punct += char

    # Check if the text length is zero after removing punctuation
    if len(text_no_punct.strip()) == 0:
        return text

    # Lowercase the text
    text_lower = text_no_punct.lower()

    # Remove stopwords
    stop_words = set(stopwords.words('english'))
    tokens = text_lower.split()

    text_no_stopwords = ''
    labels =[]
    for word in range(len(tokens)):
        if not(tokens[word].lower() in stop_words and label[word]== 'O'):
            text_no_stopwords += tokens[word] + ' '
            labels.append(label[word])

    return text_no_stopwords.strip(),labels

tag_to_ix = {'O': 0, 'B_COURT': 1, 'I_COURT': 2, 'B_PETITIONER': 3, 'I_PETITIONER': 4, 'B_RESPONDENT': 5, 'I_RESPONDENT': 6, 'B_JUDGE': 7, 'I_JUDGE': 8, 'B_LAWYER': 9, 'I_LAWYER': 10, 'B_DATE': 11, 'I_DATE': 12, 'B_ORG': 13, 'I_ORG': 14, 'B_GPE': 15, 'I_GPE': 16, 'B_STATUTE': 17, 'I_STATUTE': 18, 'B_PROVISION': 19, 'I_PROVISION': 20, 'B_PRECEDENT': 21, 'I_PRECEDENT': 22, 'B_CASE_NUMBER': 23, 'I_CASE_NUMBER': 24, 'B_WITNESS': 25, 'I_WITNESS': 26, 'B_OTHER_PERSON': 27, 'I_OTHER_PERSON': 28, '<START>': 29, '<STOP>': 30, '<PAD>': 31}
# tag_vocab, dropout_rate=0.5, embed_size=300, hidden_size=256)
model  = BiLSTMCRF(tag_to_ix,dropout_rate=0.5, embed_size=300, hidden_size=256)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model_state_dict = torch.load('t1_model4_word2vec.pt')

# Load the state dictionary into the model
model.load_state_dict(model_state_dict)

json_path = 'NER_test.json'
embedding_type = 'word2vec'
sentiment_dataset_test = SentimentAnalysisDataset(json_path, embedding_type)
sentiment_dataset =sentiment_dataset_test
# sentiment_dataset_test.embedding_model = sentiment_dataset.embedding_model
batch_size  = 512
dataloader_test = DataLoader(sentiment_dataset_test, batch_size=batch_size, shuffle=True)
from tqdm import tqdm
from sklearn.metrics import f1_score
model.eval()
correct_predictions_val = 0
total_sentences_val = 0
predictions_r = []
traget_r = []
epoch=1
device='cuda'
with torch.no_grad():
    for sentence_in, targets, mask, sen_lengths in tqdm(dataloader_test, desc=f'Test Epoch {epoch + 1}/{300}', leave=False):
        sentence_in, targets, mask, sen_lengths = sentence_in.to(device), targets.to(device), mask.to(device), sen_lengths.to(device)

        # Prediction
        predictions_val = model.predict(sentence_in, mask, sen_lengths)
        correct_predictions_val += calculate_accuracy(predictions_val, targets, sen_lengths)
        temp_pred,temp_trag = aggregater(predictions_val, targets, sen_lengths)
        predictions_r.extend(temp_pred)
        traget_r.extend(temp_trag)

accuracy_val = correct_predictions_val / len(dataloader_test)  # Average over all sentences, not just batches
print()
print(f'Test Accuracy: {accuracy_val:.4f}')
print(f'Test F1:  {f1_score(traget_r, predictions_r, average="macro")}')
