# BiLSTM with CRF

In [1]:
import time
from collections import Counter

import numpy as np
import pandas as pd
import spacy

import time
import random
import string
from itertools import chain

import torch
from torch import nn
from torch.optim import Adam
from torchtext.legacy.data import Field, NestedField, BucketIterator
from torchtext.legacy.datasets import SequenceTaggingDataset
from torchtext.vocab import Vocab
import torch.optim as optim
from torchtext.legacy import data
from torchtext import datasets
from torchcrf import CRF
from torchinfo import summary


from sklearn.metrics import confusion_matrix, precision_recall_fscore_support
import seaborn as sns
import matplotlib.pyplot as plt 
from sklearn.metrics import f1_score, classification_report
import numpy as np 
sns.set()


# Data Preparation

In [2]:
# setting the seed value for reproducible set in future
seed_value = 42

random.seed(seed_value)
np.random.seed(seed_value)
torch.manual_seed(seed_value)
torch.backends.cudnn.deterministic = True

text_data = data.Field(lower = False) 

# the tags are all known so we have unk_token = None
tags = data.Field(unk_token = None)

# reading data files in the given directory
path_str = "data/"
train_data, valid_data, test_data = data.TabularDataset.splits(
        path= path_str ,
        train="train.csv",
        validation="valid.csv",
        test="test.csv", format='csv', skip_header=True,
        fields=(("text", text_data), ("tag", tags))
    )


In [3]:
# keeping the min_frequency of the words to zero so that
# words that appear less than MIN_FREQ times will be ignored from the vocabulary

MIN_FREQ = 0

# building vocab

text_data.build_vocab(train_data, 
                 min_freq = MIN_FREQ, 
                 vectors = "glove.6B.100d",
                 unk_init = torch.Tensor.normal_)


tags.build_vocab(train_data)

# defining the batch size
BATCH_SIZE = 16

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
#device = 'cpu'
train_iterator, validation_iterator, test_iterator = data.BucketIterator.splits(
    (train_data, valid_data, test_data), 
    batch_size = BATCH_SIZE,
    device = device, sort=False)

# padding index
TEXT_PAD_IDX = text_data.vocab.stoi[text_data.pad_token]  
TAG_PAD_IDX = tags.vocab.stoi[tags.pad_token]

# Defining the model architecture


Building the class for the model declaration with different layers of the model with names mentioned in the comments:

1. To prepare the CRF layer during initialization, we need to specify the number of possible tags in the text.
2. Adding CRF layer logic in the `forward()` sequence because of the major change in the implementation of the `pytorch-crf` package. Earlier, the forward spread and loss calculation was done separately but now the calculation of losses is integrated into the forward spread.
3. Initializing all the impossible transitions with a really low number (-100) in the `init_crf_transitions` function.
4. This is where the BIO logic (Begin, Inside and out) on sequence for the model is imposed.

In [4]:
class BiLSTM(nn.Module):
    """
    Class to create the model with the desired shape and fixed architecture.
    """
    def __init__(self,
                 input_dim,
                 embedding_dim,
                 hidden_dim,
                 output_dim,
                 lstm_layers,
                 emb_dropout,
                 lstm_dropout,
                 fc_dropout,
                 word_pad_idx,
                 tag_pad_idx):
        super().__init__()

        # LAYER 1: Word Embeddings Layer
        self.embedding_dim = embedding_dim
        self.embedding = nn.Embedding(
            num_embeddings=input_dim,
            embedding_dim=embedding_dim,
            padding_idx=word_pad_idx
        )
        self.emb_dropout = nn.Dropout(emb_dropout)


        # LAYER 2: BiLSTM Layer
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_dim,
            num_layers=lstm_layers,
            bidirectional=True,
            dropout=lstm_dropout if lstm_layers > 1 else 0
        )

        # LAYER 3: Fully-connected Layer 
        self.fc_dropout = nn.Dropout(fc_dropout)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)

        # LAYER 4: CRF Layer
        self.tag_pad_idx = tag_pad_idx
        self.crf = CRF(num_tags=output_dim)

        # initializing weights with a normal distribution 
        for name, param in self.named_parameters():
            nn.init.normal_(param.data, mean=0, std=0.1)

    def forward(self, words, tags=None):
        """
        Forward pass method for the words
        :param: words: words in text data
        :param: tags: tags of the data
        """
        print(words)
        embedding_out = self.emb_dropout(self.embedding(words))
        lstm_out, _ = self.lstm(embedding_out)
        fc_out = self.fc(self.fc_dropout(lstm_out))

        if tags is not None:
            mask = tags != self.tag_pad_idx
            crf_out = self.crf.decode(fc_out, mask=mask)
            crf_loss = -self.crf(fc_out, tags=tags, mask=mask)
        else:
            crf_out = self.crf.decode(fc_out)
            crf_loss = None

        return crf_out , crf_loss



    def init_crf_transitions(self, tag_names, imp_value=-100):
        """
        Initialize the CRF transitions.
        :param: tag_names:
        :param: imp_value: importance value with default as -100
        """
        num_tags = len(tag_names)
        for i in range(num_tags):
            tag_name = tag_names[i]
            if tag_name[0] == "I" or tag_name == "<pad>":
                torch.nn.init.constant_(self.crf.start_transitions[i], imp_value)
        # impossible transitions O - I
        tag_is = {}
        for tag_position in ("B", "I", "O"):
            tag_is[tag_position] = [i for i, tag in enumerate(tag_names) if tag[0] == tag_position]
        impossible_transitions_position = {
            "O": "I"

        }
        for from_tag, to_tag_list in impossible_transitions_position.items():
            to_tags = list(to_tag_list)

            for from_tag_i in tag_is[from_tag]:
                for to_tag in to_tags:
                    for to_tag_i in tag_is[to_tag]:

                        torch.nn.init.constant_(
                            self.crf.transitions[from_tag_i, to_tag_i], imp_value
                        )
        # impossible transitions between different types

        impossible_transitions_tags = {
            "B": "I",
            "I": "I"
        }
        for from_tag, to_tag_list in impossible_transitions_tags.items():
            to_tags = list(to_tag_list)
            for from_tag_i in tag_is[from_tag]:
                for to_tag in to_tags:
                    for to_tag_i in tag_is[to_tag]:
                        if tag_names[from_tag_i].split("-")[1] != tag_names[to_tag_i].split("-")[1]:
                            torch.nn.init.constant_(
                                self.crf.transitions[from_tag_i, to_tag_i], imp_value
                            )

    def count_total_parameters(self):
        """
        Count the parameters of the model.
        """
        return sum(p.numel() for p in self.parameters() if p.requires_grad)

In [5]:
def init_weights(m):
    """
    Initialize weights for all the model parameters.
    :param: m: model object
    """
    for name, param in m.named_parameters():
        nn.init.normal_(param.data, mean = 0, std = 0.1)

In [6]:
embedding_dim=100
tag_pad_idx=TAG_PAD_IDX
model = BiLSTM(
    input_dim=len(text_data.vocab),
    embedding_dim=100,
    hidden_dim=256,
    output_dim=len(tags.vocab),
    lstm_layers=1,
    emb_dropout=0.1,
    lstm_dropout=0.1,
    fc_dropout=0.1,
    word_pad_idx=TEXT_PAD_IDX,
    tag_pad_idx=TAG_PAD_IDX
)
        
model.apply(init_weights)

BiLSTM(
  (embedding): Embedding(23626, 100, padding_idx=1)
  (emb_dropout): Dropout(p=0.1, inplace=False)
  (lstm): LSTM(100, 256, bidirectional=True)
  (fc_dropout): Dropout(p=0.1, inplace=False)
  (fc): Linear(in_features=512, out_features=10, bias=True)
  (crf): CRF(num_tags=10)
)

In [7]:
pretrained_embeddings = text_data.vocab.vectors
model.embedding.weight.data.copy_(pretrained_embeddings)
model.embedding.weight.data[tag_pad_idx] = torch.zeros(embedding_dim)


# CRF transitions initialisation
model.init_crf_transitions(
    tag_names=tags.vocab.itos
)
print(f"The Model has {model.count_total_parameters():,} parameters for training")

The Model has 3,101,034 parameters for training


In [8]:
summary(model)

Layer (type:depth-idx)                   Param #
BiLSTM                                   --
├─Embedding: 1-1                         2,362,600
├─Dropout: 1-2                           --
├─LSTM: 1-3                              733,184
├─Dropout: 1-4                           --
├─Linear: 1-5                            5,130
├─CRF: 1-6                               120
Total params: 3,101,034
Trainable params: 3,101,034
Non-trainable params: 0

# Training

The outputs of the model have two lists -> the predicted values and the loss.

In [9]:
# used the learning rate, epsilon, decay rates from other implementation

def optimizer(model, lr=1e-5, eps=1e-6, weight_decay_rate=0.001, second_weight_decay_rate=0.0):
    """
    Optimize the model using the Adam optimizer with given set values of parameters.
    :param model: model object
    :param lr: learning rate value
    :param eps: epsilon value
    :param weight_decay_rate: 
    :param second_weight_decay_rate: 
    :return: Adam optimizer object
    """
    param_optimizer = list(model.named_parameters())
    no_decay = ['bias', 'gamma', 'beta']
    optimizer_parameters = [
        {'params': [p for n, p in param_optimizer if not any(nd in n for nd in no_decay)],
         'weight_decay_rate': weight_decay_rate},
        {'params': [p for n, p in param_optimizer if any(nd in n for nd in no_decay)],
         'weight_decay_rate': second_weight_decay_rate}]
    return optim.Adam(
        optimizer_parameters,
        lr=lr,
        eps=eps
    )
optimizer_obj = optimizer(model, lr=1e-5, eps=1e-6, weight_decay_rate=0.001)
scheduler_obj = optim.lr_scheduler.StepLR(optimizer_obj, step_size=10, gamma=0.5)
criterion_func = nn.CrossEntropyLoss(ignore_index = TAG_PAD_IDX)
model = model.to(device)

## Defining metrics method for comparing the results

In [10]:
def compute_f1_loss( preds, y, tag_pad_idx, full_report=False):
    """
    Compute the F1 Score for the predicted values.
    :param preds: predicted tags for the given text.
    :param y: actual tags for the given text.
    :param tag_pad_idx: tag padding index
    :param full_report: 
    :return: f1_score, flattened prediction, falttened actual values
    """
    index_o = tags.vocab.stoi["O"]
    positive_labels = [i for i in range(len(tags.vocab.itos))
                       if i not in (tag_pad_idx, index_o)]

    flatten_preds = [pred for sent_pred in preds for pred in sent_pred]

    positive_preds = [pred for pred in flatten_preds
                      if pred not in (tag_pad_idx, index_o)]

    flatten_y = [tag for sent_tag in y for tag in sent_tag]
    if full_report:

        positive_names = [tags.vocab.itos[i]
                          for i in range(len(tags.vocab.itos))
                          if i not in (tag_pad_idx, index_o)]
        print(classification_report(
            y_true=flatten_y,
            y_pred=flatten_preds,
            labels=positive_labels,
            target_names=positive_names
        ))

    return f1_score(
        y_true=flatten_y,
        y_pred=flatten_preds,
        labels=positive_labels,
        average="micro"
    ), flatten_preds, flatten_y

## Creating methods for training, and evaluation 

In [11]:
def train(model, iterator, optimizer, tag_pad_idx):
    """
    Train the model with given training set values and other parameters.
    :param model: model object with defined architecture.
    :param iterator: iterator with batch of text data and corresponding tags.
    :param optimizer: optimizer object to be used for training.
    :param tag_pad_idx: 
    :return: per epoch lost and f1 score
    """

    epoch_loss = 0
    epoch_f1 = 0
    model.train()

    for batch in iterator:

        text = batch.text
        tags = batch.tag

        optimizer.zero_grad()

        pred_tags_list, batch_loss = model(text, tags)

        # to calculate the loss and the score f1, we flatten true tags
        true_tags_list = [
            [tag for tag in sent_tag if tag != TAG_PAD_IDX]
            for sent_tag in tags.permute(1, 0).tolist()
        ]
        f1,_,_ = compute_f1_loss(pred_tags_list, true_tags_list, tag_pad_idx)

        batch_loss.backward()

        optimizer.step()
        epoch_loss += batch_loss.item()
        epoch_f1 += f1

    return epoch_loss / len(iterator), epoch_f1 / len(iterator)

def evaluate(model, iterator, tag_pad_idx,full_report):
    """
    Evaluate the model.
    :param model: model_object used for trainig.
    :param iterator: iterator with batch of training text and tags
    :param tag_pad_idx: 
    :param full_report: 
    :return: per epoch lost, f1 score, and actual labels
    """

    epoch_loss = 0
    epoch_f1 = 0

    model.eval()
    preds = []
    labels = []
    with torch.no_grad():

        for batch in iterator:

            text = batch.text
            tags = batch.tag

            pred_tags_list, batch_loss = model(text, tags)
            true_tags_list = [
                [tag for tag in sent_tag if tag != TAG_PAD_IDX]
                for sent_tag in tags.permute(1, 0).tolist()
            ]

            f1, pred, lab = compute_f1_loss(pred_tags_list, true_tags_list, tag_pad_idx, full_report)
            preds.append(pred)
            labels.append(lab)
            epoch_loss += batch_loss.item()
            epoch_f1 += f1

    return epoch_loss / len(iterator), epoch_f1 / len(iterator),preds, labels

In [None]:
def calculate_epoch_time(start_time, end_time):
    """
    Compute the time taken for each epoch
    :param start_time: start time of the epoch
    :param end_time: end time of the epoch
    :return: time in minutes and seconds taken for the epoch
    """
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs


n_epochs = 50

train_loss_list = []
train_f1_list = []
val_loss_list = []
val_f1_list = []

best_valid_loss = float('inf')

for epoch in range(n_epochs):

    start_time = time.time()
   
    train_loss, train_f1 = train(model, train_iterator, optimizer_obj, TAG_PAD_IDX)
    train_loss_list.append(train_loss)
    train_f1_list.append(train_f1) 
    
    valid_loss, valid_f1,_,_ = evaluate(model, validation_iterator, TAG_PAD_IDX, full_report= False)
    val_loss_list.append(valid_loss)
    val_f1_list.append(valid_f1)
    
    scheduler_obj.step()
    end_time = time.time()

    epoch_mins, epoch_secs = calculate_epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'bilstm_crf.pt')
    

    if epoch%1 == 0: 
        print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
        print(f'\tTraining Loss: {train_loss:.3f} | Training F1 score: {train_f1*100:.2f}%')
        print(f'\t Validation Loss: {valid_loss:.3f} |  Validation F1 score: {valid_f1*100:.2f}%')

## Plotting training and Validation Loss

In [None]:
x = np.linspace(0, N_EPOCHS,N_EPOCHS)
plt.figure(figsize=(8,8))
plt.plot(x,train_loss_list)
plt.plot(x,val_loss_list)
plt.title("Loss")
plt.legend(["Train loss", "Valid loss"])

## Plotting Training and Validation F1-Score

In [None]:
x = np.linspace(0, N_EPOCHS,N_EPOCHS)
plt.figure(figsize=(8,8))
plt.plot(x,train_f1_list)
plt.plot(x,val_f1_list)
plt.title("F1 score")
plt.legend(["Train F1", "Valid F1"])

### Loading the saved model for prediction on test set

In [None]:
model.load_state_dict(torch.load('bilstm_crf.pt'))

test_loss, test_f1, preds, labels = evaluate(model, test_iterator, TAG_PAD_IDX, full_report=False)
print(f'Test Loss: {test_loss:.3f} |  Test F1 score: {test_f1*100:.2f}%')

In [None]:
predict =  [item for sublist in preds for item in sublist]
true =  [item for sublist in labels for item in sublist]
confusion_mat = confusion_matrix(true, predict)

### Creating Confusion Matrix

In [None]:
confusion_values_dataframe =pd.DataFrame(confusion)

confusion_values_dataframe.columns=[i for i in tags.vocab.itos]
s = pd.Series([i for i in tags.vocab.itos])
confusion_values_dataframe = confusion_values_dataframe.set_index([s])

confusion_values_dataframe['LOC'] = confusion_values_dataframe['B-LOC'] + confusion_values_dataframe['I-LOC']
confusion_values_dataframe['PER'] = confusion_values_dataframe['B-PER'] + confusion_values_dataframe['I-PER']
confusion_values_dataframe['ORG'] = confusion_values_dataframe['B-ORG'] + confusion_values_dataframe['I-ORG']
confusion_values_dataframe['MISC'] = confusion_values_dataframe['B-MISC'] + confusion_values_dataframe['I-MISC']


confusion_values_dataframe = confusion_values_dataframe.drop(columns=[ i for i in tags.vocab.itos if i != 'O'])

confusion_values_dataframe.loc['LOC'] = confusion_values_dataframe.loc['B-LOC'] + confusion_values_dataframe.loc['I-LOC']
confusion_values_dataframe.loc['PER'] = confusion_values_dataframe.loc['B-PER'] + confusion_values_dataframe.loc['I-PER']
confusion_values_dataframe.loc['ORG'] = confusion_values_dataframe.loc['B-ORG'] + confusion_values_dataframe.loc['I-ORG']
confusion_values_dataframe.loc['MISC'] = confusion_values_dataframe.loc['B-MISC'] + confusion_values_dataframe.loc['I-MISC']

confusion_values_dataframe = confusion_values_dataframe.drop([i for i in tags.vocab.itos if i != 'O'  ])

confusion_values_dataframe

### Calculating Precision, Recall, F1Score from Confusion Matrix of the test set data

In [None]:
c_m = confusion_values_dataframe.to_numpy()

TP = np.diag(c_m)
FP = np.sum(c_m, axis=0) - TP
FN = np.sum(c_m, axis=1) - TP

num_classes = 4
TN = []
for i in range(num_classes):
    temp = np.delete(cm, i, 0)    
    temp = np.delete(temp, i, 1) 
    TN.append(sum(sum(temp)))
    
precision = TP/(TP+FP)
recall = TP/(TP+FN)
f1 = (2*precision*recall)/(precision+recall)

prf_df =pd.DataFrame()
prf_df['Precision'] = precision
prf_df['Recall'] = recall
prf_df['F1-score'] = f1

s = pd.Series([i for i in confusion_df.index])
prf_df = prf_df.set_index([s])

prf_df

## Understanding the trained models behavior

In [None]:
def tag_sentence(model, device, sentence, text_field, tag_field):
    """
    Used the model to predict the tags for given sentence
    :param model: trained model object
    :param device: device type either cpu or gpu
    :param sentence: sentence to detect the tags in
    :param text_field: 
    :param tag_field: 
    :return: tokens of the sentence, predicted tags and the unknown.
    """
    model.eval()

    if isinstance(sentence, str):
        nlp = spacy.load('en')
        sent_tokens = [token.text for token in nlp(sentence)]
    else:
        sent_tokens = [token for token in sentence]

    if text_field.lower:
        sent_tokens = [t.lower() for t in sent_tokens]

    max_word_len = max([len(token) for token in sent_tokens])

    numericalized_tokens = [text_field.vocab.stoi[t] for t in sent_tokens]
    unk_idx = text_field.vocab.stoi[text_field.unk_token]
    unknowns = [t for t, n in zip(sent_tokens, numericalized_tokens) if n == unk_idx]

    token_tensor_obj = torch.as_tensor(numericalized_tokens)
    token_tensor_obj = token_tensor_obj.unsqueeze(-1).to(device)

    predictions, _ = model(token_tensor_obj)
    predicted_tags = [tag_field.vocab.itos[t] for t in predictions[0]]

    return sent_tokens, predicted_tags, unknowns

In [None]:
example_index = 10

sentence = vars(valid_data.examples[example_index])['text']
actual_tags = vars(valid_data.examples[example_index])['tag']

print(sentence)
print(actual_tags)

In [None]:
tokens, pred_tags, unks = tag_sentence(model, 
                                       device, 
                                       sentence, 
                                       text_data, 
                                       tags
                                      )
print(pred_tags)
print(actual_tags)

In [None]:
# used method from Kaggle for better representation in the output cell

print("Predicted Tag\t\t\t\tActual Tag\t\t\t\tCorrect?\t\t\t\tToken\n")
for token, pred_tag, actual_tag in zip(tokens, pred_tags, actual_tags):
    correct = '✔' if pred_tag == actual_tag else '✘'
    space = 5 if pred_tag == 'O'else 4
    space1 = 5 if actual_tag == 'O'else 4   
    print(pred_tag,"\t"*space, actual_tag, "\t"*space1, correct,"\t"*5, token)

In [None]:
sentence = 'The Armed Forces of Ukraine has not only minimised Russian gains, but also enabled substantive advances through counter attacks in eastern and southern areas of Ukraine.'
tokens, tags, unks = tag_sentence(model, 
                                  device, 
                                  sentence, 
                                  TEXT, 
                                  TAG
                                )

print(unks)
print("Pred. Tag\tToken\n")


for token, tag in zip(tokens, tags):
    space = 2 if tag == 'O'else 1
    print(tag, "\t"*space, token)