# Natural Language Processing
![](https://i.imgur.com/qkg2E2D.png)

## Assignment 002 - NER Tagger

> Notebook by:
> - NLP Course Stuff
## Revision History

| Version | Date       | User        | Content / Changes                                                   |
|---------|------------|-------------|---------------------------------------------------------------------|
| 0.1.000 | 29/05/2025 | course staff| First version                                                       |


## Overview
In this assignment, you will build a complete training and testing pipeline for a neural sequential tagger for named entities using LSTM.

## Dataset
You will work with the ReCoNLL 2003 dataset, a corrected version of the [CoNLL 2003 dataset](https://www.clips.uantwerpen.be/conll2003/ner/):

**Click on those links so you have access to the data!**
- [Train data](https://drive.google.com/file/d/1CqEGoLPVKau3gvVrdG6ORyfOEr1FSZGf/view?usp=sharing)

- [Dev data](https://drive.google.com/file/d/1rdUida-j3OXcwftITBlgOh8nURhAYUDw/view?usp=sharing)

- [Test data](https://drive.google.com/file/d/137Ht40OfflcsE6BIYshHbT5b2iIJVaDx/view?usp=sharing)

As you will see, the annotated texts are labeled according to the `IOB` annotation scheme (more on this below), for 3 entity types: Person, Organization, Location.

## Your Implementation

Please create a local copy of this template Colab's Notebook:

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/drive/1KGkObwUn5QQm_v0nB0nAUlB4YrwThuzl#scrollTo=Z-fCqGh9ybgm)

The assignment's instructions are there; follow the notebook.

## Submission
- **Notebook Link**: Add the URL to your assignment's notebook in the `notebook_link.txt` file, following the format provided in the example.
- **Access**: Ensure the link has edit permissions enabled to allow modifications if needed.
- **Deadline**: <font color='green'>12/06/2025</font>.
- **Platform**: Continue using GitHub for submissions. Push your project to the team repository and monitor the test results under the actions section.

Good Luck 🤗


<!-- ## NER schemes:  

> `IO`: is the simplest scheme that can be applied to this task. In this scheme, each token from the dataset is assigned one of two tags: an inside tag (`I`) and an outside tag (`O`). The `I` tag is for named entities, whereas the `O` tag is for normal words. This scheme has a limitation, as it cannot correctly encode consecutive entities of the same type.

> `IOB`: This scheme is also referred to in the literature as BIO and has been adopted by the Conference on Computational Natural Language Learning (CoNLL) [1]. It assigns a tag to each word in the text, determining whether it is the beginning (`B`) of a known named entity, inside (`I`) it, or outside (`O`) of any known named entities.

> `IOE`: This scheme works nearly identically to `IOB`, but it indicates the end of the entity (`E` tag) instead of its beginning.

> `IOBES`: An alternative to the IOB scheme is `IOBES`, which increases the amount of information related to the boundaries of named entities. In addition to tagging words at the beginning (`B`), inside (`I`), end (`E`), and outside (`O`) of a named entity. It also labels single-token entities with the tag `S`.

> `BI`: This scheme tags entities in a similar method to `IOB`. Additionally, it labels the beginning of non-entity words with the tag B-O and the rest as I-O.

> `IE`: This scheme works exactly like `IOE` with the distinction that it labels the end of non-entity words with the tag `E-O` and the rest as `I-O`.

> `BIES`: This scheme encodes the entities similar to `IOBES`. In addition, it also encodes the non-entity words using the same method. It uses `B-O` to tag the beginning of non-entity words, `I-O` to tag the inside of non-entity words, and `S-O` for single non-entity tokens that exist between two entities. -->


## NER Schemes

### IO
- **Description**: The simplest scheme for named entity recognition (NER).
- **Tags**:
  - `I`: Inside a named entity.
  - `O`: Outside any named entity.
- **Limitation**: Cannot correctly encode consecutive entities of the same type.

### IOB (BIO)
- **Description**: Adopted by the Conference on Computational Natural Language Learning (CoNLL).
- **Tags**:
  - `B`: Beginning of a named entity.
  - `I`: Inside a named entity.
  - `O`: Outside any named entity.
- **Advantage**: Can encode the boundaries of consecutive entities.

### IOE
- **Description**: Similar to IOB, but indicates the end of an entity.
- **Tags**:
  - `I`: Inside a named entity.
  - `O`: Outside any named entity.
  - `E`: End of a named entity.
- **Advantage**: Focuses on the end boundary of entities.

### IOBES
- **Description**: An extension of IOB with additional boundary information.
- **Tags**:
  - `B`: Beginning of a named entity.
  - `I`: Inside a named entity.
  - `O`: Outside any named entity.
  - `E`: End of a named entity.
  - `S`: Single-token named entity.
- **Advantage**: Provides more detailed boundary information for named entities.

### BI
- **Description**: Tags entities similarly to IOB and labels the beginning of non-entity words.
- **Tags**:
  - `B`: Beginning of a named entity.
  - `I`: Inside a named entity.
  - `B-O`: Beginning of a non-entity word.
  - `I-O`: Inside a non-entity word.
- **Advantage**: Distinguishes the beginning of non-entity sequences.

### IE
- **Description**: Similar to IOE but for non-entity words.
- **Tags**:
  - `I`: Inside a named entity.
  - `O`: Outside any named entity.
  - `E`: End of a named entity.
  - `E-O`: End of a non-entity word.
  - `I-O`: Inside a non-entity word.
- **Advantage**: Highlights the end of non-entity sequences.

### BIES
- **Description**: Encodes both entities and non-entity words using the IOBES method.
- **Tags**:
  - `B`: Beginning of a named entity.
  - `I`: Inside a named entity.
  - `O`: Outside any named entity.
  - `E`: End of a named entity.
  - `S`: Single-token named entity.
  - `B-O`: Beginning of a non-entity word.
  - `I-O`: Inside a non-entity word.
  - `S-O`: Single non-entity token.
- **Advantage**: Comprehensive encoding for both entities and non-entities.




In [1]:
!mkdir data
# Fetch data
# train_link = 'https://drive.google.com/file/d/1CqEGoLPVKau3gvVrdG6ORyfOEr1FSZGf/view?usp=sharing'
# dev_link   = 'https://drive.google.com/file/d/1rdUida-j3OXcwftITBlgOh8nURhAYUDw/view?usp=sharing'
# test_link  = 'https://drive.google.com/file/d/137Ht40OfflcsE6BIYshHbT5b2iIJVaDx/view?usp=sharing'

!wget -q --no-check-certificate 'https://docs.google.com/uc?export=download&id=1CqEGoLPVKau3gvVrdG6ORyfOEr1FSZGf' -O data/train.txt
!wget -q --no-check-certificate 'https://docs.google.com/uc?export=download&id=1rdUida-j3OXcwftITBlgOh8nURhAYUDw' -O data/dev.txt
!wget -q --no-check-certificate 'https://docs.google.com/uc?export=download&id=137Ht40OfflcsE6BIYshHbT5b2iIJVaDx' -O data/test.txt


In [2]:
# Any additional needed libraries
# !pip install --q

In [3]:
# Standard Library Imports
import os
import copy
import random
import warnings
from collections import defaultdict
from typing import Optional

# ML
import numpy as np
import scipy as sp
import pandas as pd

# Visual
import matplotlib
import seaborn as sns
from tqdm import tqdm
from tabulate import tabulate
import matplotlib.pyplot as plt
from IPython.display import display

# DL
import torch as th
import torch.nn as nn
from torch.optim import Adam
from torch.utils.data import DataLoader, Dataset

# Metrics
from sklearn import metrics
from sklearn.metrics import accuracy_score , roc_auc_score, classification_report, confusion_matrix, precision_recall_fscore_support


In [4]:
SEED = 42
# Set the random seed for Python
random.seed(SEED)

# Set the random seed for numpy
np.random.seed(SEED)

# Set the random seed for pytorch
th.manual_seed(SEED)

# If using CUDA (for GPU operations)
th.cuda.manual_seed(SEED)

# Set up the device
# TO DO ----------------------------------------------------------------------
DEVICE = None
# TO DO ----------------------------------------------------------------------
# assert DEVICE == "cuda"

DataType = list[tuple[list[str], list[str]]]

# Part 1 - Dataset Preparation

## Step 1: Read Data
Write a function for reading the data from a single file (of the ones that are provided above).   
- The function recieves a filepath
- The funtion encodes every sentence individually using a pair of lists, one list contains the words and one list contains the tags.
- Each list pair will be added to a general list (data), which will be returned back from the function.

Example output:
```
[
  (['At','Trent','Bridge',':'],['O','B-LOC','I-LOC ','O']),
  ([...],[...]),
  ...
]
```

In [5]:
def read_data(filepath:str) -> DataType:
  """
  Read data from a single file.
  The function recieves a filepath
  The funtion encodes every sentence using a pair of lists, one list contains the words and one list contains the tags.
  :param filepath: path to the file
  :return: data as a list of tuples
  """
  data = []
  # TO DO ----------------------------------------------------------------------
  sentence_words = []
  sentence_tags = []

  with open(filepath, 'r') as f:
    for line in f: # read line by line from the file
      line = line.strip()

      if line:  # check the line isn't empty
        parts = line.split() # split the line to word and a tag
        if len(parts) == 2: # verify format
          word, tag = parts
          sentence_words.append(word)
          sentence_tags.append(tag)
        else:
          print(f"Problem with format at line: {line} - line was skipped")

      else: # if empty line it should be the end of a sentence
        if len(sentence_words) == len(sentence_tags): # verify that we have the same amount of tags and words
              data.append((sentence_words, sentence_tags)) # add new tagged sentence in the desired format
        else:
            print(f"""Error: Mismatched word and tag counts in sentence.
              Skipping sentence: {sentence_words}""")
        sentence_words = []
        sentence_tags = []

   # handle the last sentence if it doesn't end with an empty line
  if sentence_words:
      if len(sentence_words) == len(sentence_tags):
          data.append((sentence_words, sentence_tags))
      else:
          print(f"""Error: Mismatched word and tag counts in last sentence.
            Skipping sentence: {sentence_words}""")

  # TO DO ----------------------------------------------------------------------
  return data

In [6]:
train = read_data("data/train.txt")
dev = read_data("data/dev.txt")
test = read_data("data/test.txt")

In [7]:
############################ Our Tests ###########################

# Verify the structure of the output from read_data
train_data = read_data('data/train.txt')

# Check if the output is a list of tuples
is_output_list_of_tuples = isinstance(train_data, list) and all(isinstance(item, tuple) for item in train_data)

# Check if each tuple contains two lists
each_tuple_has_two_lists = all(len(item) == 2 and isinstance(item[0], list) and isinstance(item[1], list) for item in train_data)

# Check if all items in the first list of each tuple are strings
all_words_are_strings = all(all(isinstance(word, str) for word in sentence[0]) for sentence in train_data)

# Check if all items in the second list of each tuple are strings
all_tags_are_strings = all(all(isinstance(tag, str) for tag in sentence[1]) for sentence in train_data)

# Check if the lengths of the word list and tag list are the same for each sentence
word_tag_list_lengths_match = all(len(sentence[0]) == len(sentence[1]) for sentence in train_data)

# check
set([tag for pair in train for tag in pair[1]])

# Print verification results with informative names and test manners
print(f"Test: Output is a list of tuples: {is_output_list_of_tuples}")
print(f"Test: Each tuple contains two lists: {each_tuple_has_two_lists}")
print(f"Test: All words in sentence lists are strings: {all_words_are_strings}")
print(f"Test: All tags in tag lists are strings: {all_tags_are_strings}")
print(f"Test: Word and tag lists have matching lengths for each sentence: {word_tag_list_lengths_match}")

Test: Output is a list of tuples: True
Test: Each tuple contains two lists: True
Test: All words in sentence lists are strings: True
Test: All tags in tag lists are strings: True
Test: Word and tag lists have matching lengths for each sentence: True


{'B-LOC', 'B-ORG', 'B-PER', 'I-LOC', 'I-ORG', 'I-PER', 'O'}

<!-- ## Step 2: Create vocab

The following `Vocab` class can be served as a dictionary that maps words and tags into Ids.   
The `UNK_TOKEN` should be used for words that are not part of the training data.

Note: you may change the Vocab class. -->

## Step 2: Convert IOB Tags to IOBES Scheme

### Your Task
1. Implement the `iob_to_iobes` function to convert IOB tags to IOBES.
2. Apply this conversion to the datasets.

The IOBES scheme adds:
- `E-X`: End of entity type X (the last token of a multi-token entity)
- `S-X`: Single token entity of type X (both beginning and end)

Example conversion:
- `B-PER I-PER O` → `B-PER E-PER O`
- `B-LOC O` → `S-LOC O`



In [9]:
def iob_to_iobes(tags: list[str]) -> list[str]:
  """
  Convert IOB tags to IOBES format.

  The IOB format tags tokens as:
  - B-X: Beginning of entity type X
  - I-X: Inside entity type X
  - O: Outside any entity

  The IOBES format adds:
  - E-X: End of entity type X
  - S-X: Single token entity of type X

  Args:
      tags (list): List of IOB tags

  Returns:
      list: List of tags in IOBES format
  """
  # TO DO ----------------------------------------------------------------------
  iobes_list = []

  # add 'O' to handle the last tag's
  tags_with_sentinel = tags + ['O']

  for current_tag, next_tag in zip(tags_with_sentinel[:-1], tags_with_sentinel[1:]):
    # for O do nothing
    if current_tag == 'O':
      iobes_list.append('O')

    # for Begining of entity make it S if it is single
    elif current_tag.startswith('B-'):
      entity_type = current_tag[2:]
      # check if the next tag is an 'I-'
      if next_tag.startswith('I-'):
          next_entity_type = next_tag[2:]
          # check if it's an 'I-' of the *same* entity type
          if next_entity_type == entity_type:
            iobes_list.append(current_tag) # it's the beginning of a multi-token entity
          else:
              # warning for B-X followed by I-Y (X != Y)
              print(f"""Warning: Inconsistent tag sequence found: {current_tag}
                    followed by {next_tag}. Converting {current_tag} to S-{entity_type}.""")
              iobes_list.append(f'S-{entity_type}') # treat as single-token entity
      else:
        iobes_list.append(f'S-{entity_type}') # it's a single-token entity (followed by O, B- different type, or end of sequence)

    # for Intermidiate of entity make it E if the next token is of differnt type
    elif current_tag.startswith('I-'):
      entity_type = current_tag[2:]
      # check if the next tag continues the same entity
      if next_tag.startswith('I-') and next_tag[2:] == entity_type:
          iobes_list.append(current_tag) # It's inside a multi-token entity
      else:
          iobes_list.append(f'E-{entity_type}') # It's the end of a multi-token entity (followed by O, B- different type, or end of sequence)

    # handle edge cases - where I tag appears without a preceding B or I of the same type
    else:
        # if we encounter an unexpected tag format (like an isolated I-), treat it as O and warn
        print(f"Warning: Encountered an unexpected or isolated tag format: {current_tag}. Treating as 'O'.")
        iobes_list.append('O')

  # TO DO ----------------------------------------------------------------------
  return iobes_list



def convert_dataset_to_iobes(dataset: DataType) -> DataType:
  """
  Convert a dataset from IOB to IOBES format.

  Args:
      dataset (DataType): Dataset in IOB format

  Returns:
      DataType: Dataset in IOBES format
  """
  # TO DO ----------------------------------------------------------------------
  iobes_dataset = []
  for pair in dataset:
    words, tags = pair
    iobes_tags = iob_to_iobes(tags)
    iobes_dataset.append((words, iobes_tags))
  # TO DO ----------------------------------------------------------------------
  return iobes_dataset

In [10]:
!cat /proc/meminfo | head -n 5
!top -b -n1 | head -20

MemTotal:       13289416 kB
MemFree:         6910948 kB
MemAvailable:   11810472 kB
Buffers:          392264 kB
Cached:          4680324 kB
top - 11:11:29 up 2 min,  0 users,  load average: 3.01, 1.17, 0.43
Tasks:  19 total,   1 running,  17 sleeping,   0 stopped,   1 zombie
%Cpu(s):  3.3 us,  3.3 sy,  0.0 ni, 60.0 id, 33.3 wa,  0.0 hi,  0.0 si,  0.0 st
MiB Mem :  12977.9 total,   6743.6 free,   1139.4 used,   5094.9 buff/cache
MiB Swap:      0.0 total,      0.0 free,      0.0 used.  11532.9 avail Mem 

    PID USER      PR  NI    VIRT    RES    SHR S  %CPU  %MEM     TIME+ COMMAND
     95 root      20   0  384832 132244  27296 S   6.7   1.0   0:06.13 jupyter+
      1 root      20   0    1076      8      0 S   0.0   0.0   0:00.02 docker-+
      7 root      20   0 1227336  59160  41324 S   0.0   0.4   0:00.88 node
      9 root      20   0    7376   3440   3140 S   0.0   0.0   0:00.06 oom_mon+
     11 root      20   0    7376   1868   1584 S   0.0   0.0   0:00.00 run.sh
     12 root      

In [11]:
# Convert datasets to IOBES
train_iobes = convert_dataset_to_iobes(train)
dev_iobes = convert_dataset_to_iobes(dev)
test_iobes = convert_dataset_to_iobes(test)

In [12]:
# Display example of conversion
print("Example of IOB to IOBES conversion:")
example_idx = 0  # Pick the first sentence as an example
print("Original (IOB):", train[example_idx][1][:10])  # Show first 10 tags
print("Converted (IOBES):", train_iobes[example_idx][1][:10])

Example of IOB to IOBES conversion:
Original (IOB): ['B-ORG', 'O', 'B-ORG', 'O']
Converted (IOBES): ['S-ORG', 'O', 'S-ORG', 'O']


<!-- ## Step 3: Create vocab

The following `Vocab` class can be served as a dictionary that maps words and tags into Ids.   
The `UNK_TOKEN` should be used for words that are not part of the training data.

Note: you may change the Vocab class. -->

## Step 3: Create Vocab

The `Vocab` class will serve as a dictionary that maps words and tags into IDs. Use the `UNK_TOKEN` for words that are not part of the training data.

**Special Tokens:**
- `PAD_TOKEN = 0`
- `UNK_TOKEN = 1`

### Your Task
1. **Define Special Tokens**: Define special tokens such as `PAD_TOKEN` and `UNK_TOKEN` and assign them unique IDs.
2. **Initialize Dictionaries**: Populate the word and tag dictionaries based on the training set.

Note: you may change the Vocab class.

In [None]:
vocab = Vocab(train)
print(vocab.tag2id)
print(f"Vocab size: {vocab.n_words}")

## Step 4: Prepare Data
Write a function `prepare_data` that takes one of the [train, dev, test] and the `Vocab` instance, for converting each pair of (words, tags) to a pair of indexes. Additionally, the function should pad the sequences to the maximum length sequence **of the given split**.

Note: Vocabulary is based only on the train set.

### Your Task
1. Convert each pair of (words, tags) to a pair of indexes using the Vocab instance.
2. Pad the sequences to the maximum length of the sequences in the given split.

In [None]:
def prepare_data(data: DataType, vocab: Vocab):
  data_sequences = []
  # TO DO ----------------------------------------------------------------------

  # TO DO ----------------------------------------------------------------------
  return data_sequences

In [None]:
train_sequences = prepare_data(train, vocab)
dev_sequences = prepare_data(dev, vocab)
test_sequences = prepare_data(test, vocab)

### Your Task
Print the number of OOV in dev and test sets:

In [None]:
def count_oov(sequences) -> int:
  """
  Count the number of OOV words.
  :param sequences: list of sequences
  :return: number of OOV words
  """
  oov = -1
  # TO DO ----------------------------------------------------------------------

  # TO DO ----------------------------------------------------------------------
  return oov

## Step 5: Extract Casing Features

The casing of words (uppercase, lowercase, etc.) can provide useful information for NER. Let's add this feature to improve our model.

### Your Task
1. Implement a function to extract casing features for each word.
2. Modify the data preparation to include these features.

Casing features to extract:
- Is all lowercase
- Is all uppercase
- Is titlecase (first letter capital)
- Has non-initial capital letters
- Contains digits
"""

In [None]:
def get_casing_features(word: str) -> list[int]:
  """
  Extract casing features for a word.

  Features:
  - Is all lowercase (1/0)
  - Is all uppercase (1/0)
  - Is titlecase (first letter capital only) (1/0)
  - Has non-initial capital letters (1/0)
  - Contains digits (1/0)

  Args:
      word (str): Input word

  Returns:
      list[int]: Binary features [0/1, 0/1, 0/1, 0/1, 0/1]
  """
  # TO DO ----------------------------------------------------------------------

  # TO DO ----------------------------------------------------------------------
  return [0, 0, 0, 0, 0]  # Placeholder

def prepare_data_with_casing(data: DataType, vocab: Vocab):
  """
  Prepare data with additional casing features.

  Args:
      data (DataType): Input data
      vocab (Vocab): Vocabulary instance

  Returns:
      list: List of sequences with word IDs, casing features, and tag IDs
  """
  data_sequences = []
  # TO DO ----------------------------------------------------------------------

  # TO DO ----------------------------------------------------------------------
  return data_sequences

In [None]:
# Prepare data with casing features
train_sequences_casing = prepare_data_with_casing(train_iobes, vocab)
dev_sequences_casing = prepare_data_with_casing(dev_iobes, vocab)
test_sequences_casing = prepare_data_with_casing(test_iobes, vocab)

## Step 6: Dataloaders
Create dataloaders for each split in the dataset. They should return the samples as Tensors.

**Hint** - you can create a Dataset to support this part.

For the training set, use shuffling, and for the dev and test, not.

In [None]:
def prepare_data_loader(sequences, batch_size: int, train: bool = True):
  """
  Create a dataloader from a list of sequences.
  :param sequences: list of sequences
  :param batch_size: batch size
  :param train: whether to shuffle the dataloader or not
  :return: dataloader
  """
  dataloader = None
  # TO DO ----------------------------------------------------------------------

  # TO DO ----------------------------------------------------------------------
  return dataloader

In [None]:
def prepare_data_loader(sequences, batch_size: int, train: bool = True):
  """
  Create a dataloader from a list of sequences.
  :param sequences: list of sequences
  :param batch_size: batch size
  :param train: whether to shuffle the dataloader or not
  :return: dataloader
  """
  dataloader = None
  # TO DO ----------------------------------------------------------------------
  dataset = NERDataset(sequences)
  dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=train)
  # TO DO ----------------------------------------------------------------------
  return dataloader

In [None]:
BATCH_SIZE = 16
dl_train = prepare_data_loader(train_sequences, batch_size=BATCH_SIZE)
dl_dev = prepare_data_loader(dev_sequences, batch_size=BATCH_SIZE, train=False)
dl_test = prepare_data_loader(test_sequences, batch_size=BATCH_SIZE, train=False)
len(dl_train), len(dl_dev), len(dl_test)

<br><br><br><br><br><br>

# Part 2 - NER Model Training

## Step 1: Implement Model

Write NERNet, a PyTorch Module for labeling words with NER tags.

> `input_size`: the size of the vocabulary  
`embedding_size`: the size of the embeddings  
`hidden_size`: the LSTM hidden size  
`output_size`: the number tags we are predicting for  
`n_layers`: the number of layers we want to use in LSTM  
`directions`: could 1 or 2, indicating unidirectional or bidirectional LSTM, respectively  

<br>  

The input for your forward function should be a single sentence tensor.

*Note: the embeddings in this section are learned embedding. That means that you don't need to use pretrained embedding like the one used in the last excersie. You will use them in part 5.*

*Note: You may change the NERNet class.*

In [None]:
class NERNet(nn.Module):
  def __init__(self, input_size: int, embedding_size: int, hidden_size: int, output_size: int, n_layers: int, directions: int):
    """
    Initialize a NERNet instance.
    :param input_size: the size of the vocabulary
    :param embedding_size: the size of the embeddings
    :param hidden_size: the LSTM hidden size
    :param output_size: the number tags we are predicting for
    :param n_layers: the number of layers we want to use in LSTM
    :param directions: could be 1 or 2, indicating unidirectional or bidirectional LSTM, respectively
    """
    super(NERNet, self).__init__()
    # TO DO ----------------------------------------------------------------------

    # TO DO ----------------------------------------------------------------------

  def forward(self, input_sentence):
    # TO DO ----------------------------------------------------------------------
    # input_sentence: (batch_size, seq_len)

    # TO DO ----------------------------------------------------------------------
    return output

In [None]:
model = NERNet(vocab.n_words, embedding_size=300, hidden_size=500, output_size=vocab.n_tags, n_layers=3, directions=2)
model.to(DEVICE)

## Step 2: Training Loop

Write a training loop, which takes a model (instance of NERNet), number of epochs to train on, and the train&dev datasets.  

The function will return the `loss` and `accuracy` durring training.  
(If you're using a different/additional metrics, return them too)

The loss is always CrossEntropyLoss and the optimizer is always Adam.
Make sure to use `tqdm` while iterating on `n_epochs`.


In [None]:
def train_loop(model: NERNet, n_epochs: int, dataloader_train, dataloader_dev):
  """
  Train a model.
  :param model: model instance
  :param n_epochs: number of epochs to train on
  :param dataloader_train: train dataloader
  :param dataloader_dev: dev dataloader
  :return: loss and accuracy during training
  """
  # Optimizer (ADAM is a fancy version of SGD)
  optimizer = Adam(model.parameters(), lr=0.0001)

  # Record
  metrics = {'loss': {'train': [], 'dev': []}, 'accuracy': {'train': [], 'dev': []}}

  # Move model to device
  model.to(DEVICE)

  ## TO DO ----------------------------------------------------------------------

  # TO DO ----------------------------------------------------------------------
  return metrics

In [None]:
metrics = train_loop(model, n_epochs=5, dataloader_train=dl_train, dataloader_dev=dl_dev)
metrics

<br><br><br><br><br><br>

# Part 3 - Evaluation


## Step 1: Evaluation Function

Write an evaluation loop for a trained model using the dev and test datasets. This function will print the `Recall`, `Precision`, and `F1` scores and plot a `Confusion Matrix`.

Perform this evaluation twice:
1. For all labels (7 labels in total).
2. For all labels except "O" (6 labels in total).

## Metrics and Display

### Metrics
- **Recall**: True Positive Rate (TPR), also known as Recall.
- **Precision**: The opposite of False Positive Rate (FPR), also known as Precision.
- **F1 Score**: The harmonic mean of Precision and Recall.

*Note*: For all these metrics, use **weighted** averaging:
Calculate metrics for each label, and find their average weighted by support. Refer to the [sklearn documentation](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.precision_recall_fscore_support.html#sklearn.metrics.precision_recall_fscore_support) for more details.

### Display
1. Print the `Recall`, `Precision`, and `F1` scores in a tabulated format.
2. Display a `Confusion Matrix` plot:
   - Rows represent the predicted labels.
   - Columns represent the true labels.
   - Include a title for the plot, axis names, and the names of the tags on the X-axis.

In [None]:
def evaluate(model: NERNet, title: str, dataloader: DataLoader, vocab: Vocab):
  """
  Evaluate a trained model on the given dataset.
  :param model: model instance
  :param title: title for the plot
  :param dataloader: dataloader
  :param vocab: Vocab instance
  :return: Dictionary of evaluation results
  """
  results = {}
  # TO DO ----------------------------------------------------------------------

  # TO DO ----------------------------------------------------------------------
  return results

## Step 2: Train & Evaluate on Dev Set

Train and evaluate (on the dev set) a few models, all with `embedding_size=300` and `N_EPOCHS=5` (for fairness and computational reasons), and with the following hyper parameters (you may use that as captions for the models as well):

- Model 1: (hidden_size: 500, n_layers: 1, directions: 1)
- Model 2: (hidden_size: 500, n_layers: 2, directions: 1)
- Model 3: (hidden_size: 500, n_layers: 3, directions: 1)
- Model 4: (hidden_size: 500, n_layers: 1, directions: 2)
- Model 5: (hidden_size: 500, n_layers: 2, directions: 2)
- Model 6: (hidden_size: 500, n_layers: 3, directions: 2)
- Model 7: (hidden_size: 800, n_layers: 1, directions: 2)
- Model 8: (hidden_size: 800, n_layers: 2, directions: 2)
- Model 9: (hidden_size: 800, n_layers: 3, directions: 2)




In [None]:
N_EPOCHS = 5
EMB_DIM = 300

Here is an example (random numbers) of the display of the results):

In [None]:
# Example:
results_acc = np.random.rand(9, 10)
columns = ['N_MODEL','HIDDEN_SIZE','N_LAYERS','DIRECTIONS','RECALL','PERCISION','F1','RECALL_WO_O','PERCISION_WO_O','F1_WO_O']
df = pd.DataFrame(results_acc, columns=columns)
df.N_MODEL = [f'model_{n}' for n in range(1,10)]
print(tabulate(df, headers='keys', tablefmt='psql',floatfmt=".4f"))

In [None]:
# Define models with their hyperparameters
models = {
  'Model1': {'embedding_size': EMB_DIM, 'hidden_size': 500, 'n_layers': 1, 'directions': 1},
  'Model2': {'embedding_size': EMB_DIM, 'hidden_size': 500, 'n_layers': 2, 'directions': 1},
  'Model3': {'embedding_size': EMB_DIM, 'hidden_size': 500, 'n_layers': 3, 'directions': 1},
  'Model4': {'embedding_size': EMB_DIM, 'hidden_size': 500, 'n_layers': 1, 'directions': 2},
  'Model5': {'embedding_size': EMB_DIM, 'hidden_size': 500, 'n_layers': 2, 'directions': 2},
  'Model6': {'embedding_size': EMB_DIM, 'hidden_size': 500, 'n_layers': 3, 'directions': 2},
  'Model7': {'embedding_size': EMB_DIM, 'hidden_size': 800, 'n_layers': 1, 'directions': 2},
  'Model8': {'embedding_size': EMB_DIM, 'hidden_size': 800, 'n_layers': 2, 'directions': 2},
  'Model9': {'embedding_size': EMB_DIM, 'hidden_size': 800, 'n_layers': 3, 'directions': 2},
}

In [None]:
# TO DO ----------------------------------------------------------------------

# TO DO ----------------------------------------------------------------------

# Print results in tabulated format
print(tabulate(results_dev, headers='keys', tablefmt='psql', floatfmt=".4f"))

## Step 3: Evaluate on Test Set
Evaluate your models on the test set and save the results as a CSV. Add this file to your repo for submission.

In [None]:
results = pd.DataFrame(columns=columns)
file_name = "NER_results.csv"
# TO DO ----------------------------------------------------------------------

# TO DO ----------------------------------------------------------------------
print(tabulate(results, headers='keys', tablefmt='psql',floatfmt=".4f"))


## Step 4 - Best Model
Decide which model performs the best, write its configuration, train it for 5 more epochs and evaluate it on the test set.

In [None]:
best_model_cfg = {'embedding_size':EMB_DIM, 'hidden_size': -1, 'n_layers': -1, 'directions': -1}
# TO DO ----------------------------------------------------------------------

# TO DO ----------------------------------------------------------------------

<br><br><br><br><br>

# Part 4 - Pretrained Embeddings



To prepare for this task, please read [this discussion](https://discuss.pytorch.org/t/can-we-use-pre-trained-word-embeddings-for-weight-initialization-in-nn-embedding/1222).

**TIP**: Ensure that the vectors are aligned with the IDs in your vocabulary. In other words, make sure that the word with ID 0 corresponds to the first vector in the GloVe matrix used to initialize `nn.Embedding`.



## Step 1: Get Data



Download the GloVe embeddings from [this link](https://nlp.stanford.edu/projects/glove/). Use the 300-dimensional vectors from `glove.6B.zip`.



In [None]:
# TO DO ----------------------------------------------------------------------

# TO DO ----------------------------------------------------------------------

## Step 2: Inject Embeddings

Then intialize the `nn.Embedding` module in your `NERNet` with these embeddings, so that you can start your training with pre-trained vectors.

In [None]:
def get_emb_matrix(filepath: str, vocab: Vocab) -> np.ndarray:
  emb_matrix = np.zeros((len(vocab.word2id), 300))
  # TO DO ----------------------------------------------------------------------

  # TO DO ----------------------------------------------------------------------
  return emb_matrix

In [None]:
def initialize_from_pretrained_emb(model: NERNet, emb_matrix: np.ndarray):
  """
  Inject the pretrained embeddings into the model.
  :param model: model instance
  :param emb_matrix: pretrained embeddings
  """
  # TO DO ----------------------------------------------------------------------

  # TO DO ----------------------------------------------------------------------

In [None]:
# Read embeddings and inject them to a model
emb_file = 'glove.6B.300d.txt'
emb_matrix = get_emb_matrix(emb_file, vocab)
ner_glove = NERNet(input_size=VOCAB_SIZE, embedding_size=EMB_DIM, hidden_size=500, output_size=NUM_TAGS, n_layers=1, directions=1)
initialize_from_pretrained_emb(ner_glove, emb_matrix)

## Step 3: Evaluate on Test Set

Same as the evaluation process before, please display:

1. Print a `RECALL-PERCISION-F1` scores in a tabulate format.
2. Display a `confusion matrix` plot: where the predicted labels are the rows, and the true labels are the columns.

Make sure to use the title for the plot, axis names, and the names of the tags on the X-axis.

Make sure to download and upload this CSV as well.

In [None]:
results = pd.DataFrame(columns=columns)
file_name = "NER_results_glove.csv"
# TO DO ----------------------------------------------------------------------

# TO DO ----------------------------------------------------------------------
print(tabulate(results, headers='keys', tablefmt='psql',floatfmt=".4f"))

## Step 4 - Best Model
Decide which model performs the best, write its configuration, train it for 5 more epochs and evaluate it on the test set.

In [None]:
best_model_glove_cfg = {'embedding_size':EMB_DIM, 'hidden_size': -1, 'n_layers': -1, 'directions': -1}
# TO DO ----------------------------------------------------------------------

# TO DO ----------------------------------------------------------------------

# Part 5 - Error Analysis

In this part, you'll analyze the errors made by your best model to understand its strengths and weaknesses.

## Step 1: Extract Predictions

First, let's extract predictions from your best model on the test set:

In [None]:
def get_predictions(model, dataloader, vocab, PAD_TOKEN, DEVICE):
    """
    Get predictions from the model on a dataloader.

    Returns:
        - true_tags_list: List of lists of true tag strings
        - pred_tags_list: List of lists of predicted tag strings
        - words_list: List of lists of words
    """
    import torch

    model.eval()
    true_tags_list = []
    pred_tags_list = []
    words_list = []

    with torch.no_grad():
        # Handle different dataloader output formats
        for batch in dataloader:
            # Unpack based on actual dataloader output
            if len(batch) == 3:  # (input_ids, casing_features, labels)
                input_ids, casing_features, labels = batch
                # Move tensors to device
                input_ids = input_ids.to(DEVICE)
                casing_features = casing_features.to(DEVICE)
                labels = labels.to(DEVICE)

                # Get model predictions
                outputs = model(input_ids, casing_features)
            else:  # (input_ids, labels)
                input_ids, labels = batch
                # Move tensors to device
                input_ids = input_ids.to(DEVICE)
                labels = labels.to(DEVICE)

                # Get model predictions
                outputs = model(input_ids)

            _, predicted = torch.max(outputs, 2)

            # Process each sequence in the batch
            for i in range(input_ids.size(0)):
                # Get sequence length (ignoring padding)
                seq_len = (input_ids[i] != PAD_TOKEN).sum().item()

                # Convert ids to tag strings and words
                true_tags = [vocab.id2tag[tag.item()] for tag in labels[i][:seq_len]]
                pred_tags = [vocab.id2tag[tag.item()] for tag in predicted[i][:seq_len]]
                words = [vocab.id2word[word.item()] for word in input_ids[i][:seq_len]]

                true_tags_list.append(true_tags)
                pred_tags_list.append(pred_tags)
                words_list.append(words)

    return true_tags_list, pred_tags_list, words_list

## Step 2: Implement Simple Error Analysis

Now, implement a function to analyze the errors in predictions:

In [None]:
def simple_analyze_errors(true_tags, pred_tags, words):
    """
    Analyze errors in NER predictions.

    Args:
        true_tags: List of true tag sequences
        pred_tags: List of predicted tag sequences
        words: List of word sequences

    Returns:
        dict: Error statistics and examples
    """
    # TODO: Implement error analysis
    # 1. Initialize error categories
    # 2. Process each sequence to identify errors
    # 3. Categorize errors and collect examples
    # 4. Return statistics and examples

    # Placeholder
    return {
        'total_entities': 0,
        'correct_entities': 0,
        'accuracy': 0.0,
        'error_counts': {},
        'error_examples': {}
    }

## Step 3: Helper Functions

Implement these helper functions to extract entities and check for overlapping spans:

In [None]:
def get_entities_simple(tags):
    """
    Extract entities from a sequence of tags.
    Returns list of (start_idx, end_idx, entity_type) tuples.
    """
    # TODO: Implement entity extraction
    return []

def has_overlap(start1, end1, start2, end2):
    """Check if two spans overlap"""
    # TODO: Implement overlap checking
    return False

## Step 4: Visualization and Analysis

Create a function to display the error analysis results:

In [None]:
def print_error_analysis(analysis):
    """Print a summary of the error analysis results"""
    # TODO: Implement printing function to show:
    # 1. Basic statistics (total entities, correct entities, accuracy)
    # 2. Error counts by category
    # 3. Examples of each error type
    # 4. Suggestions for improvement based on findings
    pass

## Step 5: Improvement Suggestions

Based on your error analysis, suggest at least three specific improvements to your model. Consider:

1. What types of errors are most common?
2. Are there patterns in the errors (e.g., specific entity types, contexts)?
3. What techniques might address these specific error types?

Write your suggestions in 3-5 sentences for each improvement.

In [None]:
# Example usage
if __name__ == "__main__":
    # Sample data for testing
    true_tags = [
        ['O', 'B-PER', 'I-PER', 'O', 'B-LOC', 'I-LOC', 'O'],
        ['B-ORG', 'I-ORG', 'O', 'B-PER', 'O']
    ]

    pred_tags = [
        ['O', 'B-PER', 'O', 'O', 'B-ORG', 'I-ORG', 'O'],
        ['B-ORG', 'I-ORG', 'I-ORG', 'B-PER', 'O']
    ]

    words = [
        ['The', 'John', 'Smith', 'visited', 'New', 'York', 'yesterday'],
        ['Google', 'Inc', 'hired', 'Alice', 'recently']
    ]

    # Run the error analysis
    analysis = simple_analyze_errors(true_tags_list, pred_tags_list, words_list)
    print_error_analysis(analysis)

    # TODO: Write your improvement suggestions here

Remember to focus on substantive analysis rather than code complexity. Your goal is to identify meaningful patterns and provide practical suggestions for improvement.

# Testing
Copy the content of the **tests.py** file from the repo and paste below. This will create the results.json file and download it to your machine.

In [None]:
####################
# PLACE TESTS HERE #
train_ds = read_data("data/train.txt")
dev_ds = read_data("data/dev.txt")
test_ds = read_data("data/test.txt")
def test_read_data():
    result = {
        'lengths': (len(train_ds), len(dev_ds), len(test_ds)),
    }
    return result

vocab = Vocab(train_ds)
def test_vocab():
    sent = vocab.index_words(["I", "am", "Spongebob"])
    return {
        'length': vocab.n_words,
        'tag2id_length': len(vocab.tag2id),
        "Spongebob": sent[2]
    }

train_sequences = prepare_data(train_ds, vocab)
dev_sequences = prepare_data(dev_ds, vocab)
test_sequences = prepare_data(test_ds, vocab)

def test_count_oov():
    return {
        'dev_oov': count_oov(dev_sequences),
        'test_oov': count_oov(test_sequences)
    }

BATCH_SIZE = 16
dl_train = prepare_data_loader(train_sequences, batch_size=BATCH_SIZE)
dl_dev = prepare_data_loader(dev_sequences, batch_size=BATCH_SIZE, train=False)
dl_test = prepare_data_loader(test_sequences, batch_size=BATCH_SIZE, train=False)

def test_prepare_data_loader():
    return {
        'lengths': (len(dl_train), len(dl_dev), len(dl_test))
    }


def test_NERNet():
    # Extract best model configuration
    hidden_size = best_model_cfg['hidden_size']
    n_layers = best_model_cfg['n_layers']
    directions = best_model_cfg['directions']


    # Create model
    best_model = NERNet(vocab.n_words, embedding_size=300, hidden_size=hidden_size, output_size=vocab.n_tags, n_layers=n_layers, directions=directions)
    best_model.to(DEVICE)

    # Train model and evaluate
    _ = train_loop(best_model, n_epochs=10, dataloader_train=dl_train, dataloader_dev=dl_dev)
    results = evaluate(best_model, title="", dataloader=dl_test, vocab=vocab)

    return {
        'f1': results['F1'],
        'f1_wo_o': results['F1_WO_O'],
    }

def test_glove():
    # Get embeddings
    emb_file = 'glove.6B.300d.txt'
    emb_matrix = get_emb_matrix(emb_file, vocab)

    # Extract best model configuration
    hidden_size = best_model_glove_cfg['hidden_size']
    n_layers = best_model_glove_cfg['n_layers']
    directions = best_model_glove_cfg['directions']

    # Create model
    best_model = NERNet(vocab.n_words, embedding_size=300, hidden_size=hidden_size, output_size=vocab.n_tags, n_layers=n_layers, directions=directions)
    best_model.to(DEVICE)
    initialize_from_pretrained_emb(ner_glove, emb_matrix)

    # Train model and evaluate
    _ = train_loop(best_model, n_epochs=10, dataloader_train=dl_train, dataloader_dev=dl_dev)
    results = evaluate(best_model, title="", dataloader=dl_test, vocab=vocab)

    return {
        'f1': results['F1'],
        'f1_wo_o': results['F1_WO_O'],
    }

TESTS = [
    test_read_data,
    test_vocab,
    test_count_oov,
    test_prepare_data_loader,
    test_NERNet,
    test_glove
]

# Run tests and save results
res = {}
for test in TESTS:
    try:
        cur_res = test()
        res.update({test.__name__: cur_res})
    except Exception as e:
        res.update({test.__name__: repr(e)})

with open('results.json', 'w') as f:
    json.dump(res, f, indent=2)

# Download the results.json file
files.download('results.json')

####################
