Bidirectional RNN for Named Entity Recognition (NER)

This notebook demonstrates how to implement a Bidirectional Recurrent Neural Network (RNN)
using PyTorch for Named Entity Recognition (NER) on a simplified dataset.

It covers:
- Creating a simple synthetic NER dataset.
- Using word embeddings (demonstrated with a basic embedding layer).
- Implementing a Bidirectional LSTM in PyTorch.
- Handling variable-length sequences with padding and masking.
- Evaluating on precision, recall, and F1-score (for a single entity type).

## Libraries

In [3]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.nn.utils.rnn import pad_sequence
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report

## 1. Data Preparation (Simplified Synthetic Data)

In [4]:
# Create a small synthetic dataset for demonstration
sentences = [
    "John lives in New York .",
    "Mary went to Paris with Peter .",
    "The Eiffel Tower is in France .",
    "Google was founded in California .",
]
labels = [
    ["PER", "O", "O", "LOC", "O"],
    ["PER", "O", "O", "LOC", "O", "PER", "O"],
    ["O", "LOC", "O", "O", "O", "LOC", "O"],
    ["ORG", "O", "O", "O", "LOC", "O"],
]

# Create vocabulary and tag mappings
words = sorted(list(set([word.lower() for sent in sentences for word in sent.split()])))
tags = sorted(list(set([tag for label_list in labels for tag in label_list])))

word_to_index = {word: i + 1 for i, word in enumerate(words)} # 0 for padding
word_to_index["<pad>"] = 0
index_to_word = {i: word for word, i in word_to_index.items()}

tag_to_index = {tag: i for i, tag in enumerate(tags)}
index_to_tag = {i: tag for tag, i in tag_to_index.items()}

vocab_size = len(word_to_index)
tagset_size = len(tag_to_index)

print(f"Vocabulary: {word_to_index}")
print(f"Tags: {tag_to_index}")

# Convert sentences and labels to numerical indices
indexed_sentences = [[word_to_index[word.lower()] for word in sent.split()] for sent in sentences]
indexed_labels = [[tag_to_index[tag] for tag in label_list] for label_list in labels]

# Split data into training and testing sets
train_sentences, test_sentences, train_labels, test_labels = train_test_split(
    indexed_sentences, indexed_labels, test_size=0.2, random_state=42
)

Vocabulary: {'.': 1, 'california': 2, 'eiffel': 3, 'founded': 4, 'france': 5, 'google': 6, 'in': 7, 'is': 8, 'john': 9, 'lives': 10, 'mary': 11, 'new': 12, 'paris': 13, 'peter': 14, 'the': 15, 'to': 16, 'tower': 17, 'was': 18, 'went': 19, 'with': 20, 'york': 21, '<pad>': 0}
Tags: {'LOC': 0, 'O': 1, 'ORG': 2, 'PER': 3}


## 2. Handling Variable-Length Sequences with Padding

In [5]:
def pad_sequences(sequences, padding_value=0):
    return pad_sequence([torch.tensor(seq) for seq in sequences], batch_first=True, padding_value=padding_value)

train_padded_sentences = pad_sequences(train_sentences)
train_padded_labels = pad_sequences(train_labels, padding_value=-1) # Use -1 for padding in labels to ignore during loss

test_padded_sentences = pad_sequences(test_sentences)
test_padded_labels = pad_sequences(test_labels, padding_value=-1)

print("Padded Training Sentences:")
print(train_padded_sentences)
print("Padded Training Labels:")
print(train_padded_labels)

Padded Training Sentences:
tensor([[ 6, 18,  4,  7,  2,  1,  0],
        [ 9, 10,  7, 12, 21,  1,  0],
        [15,  3, 17,  8,  7,  5,  1]])
Padded Training Labels:
tensor([[ 2,  1,  1,  1,  0,  1, -1],
        [ 3,  1,  1,  0,  1, -1, -1],
        [ 1,  0,  1,  1,  1,  0,  1]])


## 3. Bidirectional LSTM Model

In [6]:
class BiLSTM_NER(nn.Module):
    def __init__(self, vocab_size, embedding_dim, hidden_dim, num_layers, tagset_size):
        super(BiLSTM_NER, self).__init__()
        self.embedding = nn.Embedding(vocab_size, embedding_dim, padding_idx=0)
        self.lstm = nn.LSTM(embedding_dim, hidden_dim, num_layers, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim * 2, tagset_size) # *2 for bidirectional

    def forward(self, x, lengths):
        embedded = self.embedding(x)
        packed_embedded = nn.utils.rnn.pack_padded_sequence(embedded, lengths, batch_first=True, enforce_sorted=False)
        packed_output, _ = self.lstm(packed_embedded)
        output, _ = nn.utils.rnn.pad_packed_sequence(packed_output, batch_first=True)
        logits = self.fc(output)
        return logits

# Hyperparameters
embedding_dim = 100
hidden_dim = 128
num_layers = 1
learning_rate = 0.01
num_epochs = 10
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = BiLSTM_NER(vocab_size, embedding_dim, hidden_dim, num_layers, tagset_size).to(device)
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
loss_function = nn.CrossEntropyLoss(ignore_index=-1) # Ignore padding in labels

## 4. Training the Model

In [7]:
def train(model, iterator, optimizer, loss_function, device):
    model.train()
    epoch_loss = 0

    for batch_idx, (padded_sentences, padded_labels) in enumerate(iterator):
        padded_sentences = padded_sentences.to(device)
        padded_labels = padded_labels.to(device)

        # Get lengths of original sequences (before padding)
        lengths = torch.tensor([len(seq) for seq in iterator.dataset.sentences[batch_idx * iterator.batch_size : (batch_idx + 1) * iterator.batch_size]])
        lengths = lengths.to(device)

        optimizer.zero_grad()
        predictions = model(padded_sentences, lengths)

        # Reshape predictions and labels for loss calculation
        predictions = predictions.view(-1, predictions.shape[-1])
        labels = padded_labels.view(-1)

        loss = loss_function(predictions, labels)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()

    return epoch_loss / len(iterator)

# Create data loaders (simple iteration for this example)
train_data = list(zip(train_padded_sentences, train_padded_labels))
test_data = list(zip(test_padded_sentences, test_padded_labels))

class SimpleDataset:
    def __init__(self, sentences, labels):
        self.sentences = sentences
        self.labels = labels

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        return self.sentences[idx], self.labels[idx]

train_dataset = SimpleDataset(train_padded_sentences, train_padded_labels)
test_dataset = SimpleDataset(test_padded_sentences, test_padded_labels)

batch_size = 2 # Adjust as needed
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size)

for epoch in range(num_epochs):
    train_loss = train(model, train_loader, optimizer, loss_function, device)
    print(f"Epoch: {epoch+1}, Train Loss: {train_loss:.4f}")

Epoch: 1, Train Loss: 1.2778
Epoch: 2, Train Loss: 0.4356
Epoch: 3, Train Loss: 0.1139
Epoch: 4, Train Loss: 0.0220
Epoch: 5, Train Loss: 0.0042
Epoch: 6, Train Loss: 0.0011
Epoch: 7, Train Loss: 0.0004
Epoch: 8, Train Loss: 0.0002
Epoch: 9, Train Loss: 0.0001
Epoch: 10, Train Loss: 0.0001


## 5. Evaluation

In [8]:
def evaluate(model, iterator, device, index_to_tag):
    model.eval()
    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for batch_idx, (padded_sentences, padded_labels) in enumerate(iterator):
            padded_sentences = padded_sentences.to(device)
            padded_labels = padded_labels.to(device)

            lengths = torch.tensor([len(seq) for seq in iterator.dataset.sentences[batch_idx * iterator.batch_size : (batch_idx + 1) * iterator.batch_size]])
            lengths = lengths.to(device)

            predictions = model(padded_sentences, lengths)
            _, predicted_indices = torch.max(predictions, dim=2)

            # Convert indices to tags, ignoring padding (-1)
            for i in range(padded_labels.shape[0]):
                true_tags = [index_to_tag[idx.item()] for idx in padded_labels[i] if idx.item() != -1]
                pred_tags = [index_to_tag[idx.item()] for idx in predicted_indices[i][:len(true_tags)]] # Truncate predictions to true length

                all_labels.extend(true_tags)
                all_predictions.extend(pred_tags)

    # Evaluate using classification report (for a single entity type, e.g., 'PER')
    print("\nEvaluation Report:")
    print(classification_report(all_labels, all_predictions))

evaluate(model, test_loader, device, index_to_tag)


Evaluation Report:
              precision    recall  f1-score   support

         LOC       0.50      1.00      0.67         1
           O       0.80      1.00      0.89         4
         PER       0.00      0.00      0.00         2

    accuracy                           0.71         7
   macro avg       0.43      0.67      0.52         7
weighted avg       0.53      0.71      0.60         7



  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
  _warn_prf(average, modifier, f"{metric.capitalize()} is", len(result))
