<a href="https://colab.research.google.com/github/lahiiru/idp-bootcamp/blob/main/week_5_assignment.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install transformers datasets

In [4]:
from datasets import load_dataset

dataset = load_dataset("conll2003")

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


## Dataset preparation for traning

In [12]:
from transformers import AutoTokenizer
import torch
from torch.utils.data import DataLoader, Dataset

class NERDataset(Dataset):
    def __init__(self, tokenized_dataset):
        self.tokenized_dataset = tokenized_dataset

    def __len__(self):
        return len(self.tokenized_dataset)

    def __getitem__(self, idx):
        item = self.tokenized_dataset[idx]
        return {
            'input_ids': torch.tensor(item['input_ids'], dtype=torch.long),
            'attention_mask': torch.tensor(item['attention_mask'], dtype=torch.long),
            'labels': torch.tensor(item['labels'], dtype=torch.long)
        }

def prepare_data():
    # Load the CoNLL-2003 dataset
    dataset = load_dataset("conll2003")

    # Initialize tokenizer
    tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')

    # Get label list from dataset
    label_list = dataset["train"].features["ner_tags"].feature.names

    def tokenize_and_align_labels(examples):
        tokenized_inputs = tokenizer(
            examples["tokens"],
            truncation=True,
            is_split_into_words=True,
            padding='max_length',
            max_length=128,
            return_tensors="pt"  # Return PyTorch tensors
        )

        labels = []
        for i, label in enumerate(examples["ner_tags"]):
            word_ids = tokenized_inputs.word_ids(batch_index=i)
            previous_word_idx = None
            label_ids = []
            for word_idx in word_ids:
                if word_idx is None:
                    label_ids.append(-100)
                elif word_idx != previous_word_idx:
                    label_ids.append(label[word_idx])
                else:
                    label_ids.append(-100)
                previous_word_idx = word_idx
            labels.append(label_ids)

        tokenized_inputs["labels"] = labels
        return tokenized_inputs

    # Tokenize datasets
    tokenized_datasets = dataset.map(
        tokenize_and_align_labels,
        batched=True,
        remove_columns=dataset["train"].column_names
    )

    return tokenized_datasets, len(label_list)


## Implement the model

In [13]:
from transformers import BertForTokenClassification
import torch.nn as nn

class NERModel(nn.Module):
    def __init__(self, num_labels):
        super().__init__()
        self.bert = BertForTokenClassification.from_pretrained(
            'bert-base-cased',
            num_labels=num_labels
        )

    def forward(self, input_ids, attention_mask, labels=None):
        outputs = self.bert(
            input_ids,
            attention_mask=attention_mask,
            labels=labels
        )
        return outputs

## Implement the traning loop

In [14]:
!pip install tqdm



In [15]:
## your code here
from tqdm import tqdm

def train_model(model, train_dataloader, eval_dataloader, optimizer, scheduler, device, num_epochs=3):
    model.to(device)
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        progress_bar = tqdm(train_dataloader, desc=f"Epoch {epoch + 1}/{num_epochs}", leave=True)
        for sample_index, sample in enumerate(progress_bar):
            optimizer.zero_grad()
            outputs = model(
                sample['input_ids'].to(device),
                sample['attention_mask'].to(device),
                sample['labels'].to(device)
            )
            loss = outputs.loss
            total_loss += loss.item()
            loss.backward()
            optimizer.step()
            scheduler.step()
            progress_bar.set_postfix(loss=loss.item())

        print(f"Epoch {epoch + 1}/{num_epochs}, Average Loss: {total_loss / len(train_dataloader)}")



## Implement the evaluation function

In [16]:
%pip install seqeval



In [17]:
from seqeval.metrics import classification_report

def evaluate_model(model, test_dataloader, id2label, device):
    model.eval()
    # Your code here
    true_labels = []
    predicted_labels = []

    with torch.no_grad():
        for batch in test_dataloader:
            input_ids = batch['input_ids'].to(device)
            attention_mask = batch['attention_mask'].to(device)
            labels = batch['labels'].to(device)

            outputs = model(input_ids, attention_mask)
            logits = outputs.logits

            # Convert logits to predictions
            predictions = torch.argmax(logits, dim=2)
            predictions = predictions.cpu().numpy()
            labels = labels.cpu().numpy()

            # Convert predictions and labels to label names
            for prediction, label in zip(predictions, labels):
                true_label_names = [id2label[l] for l in label if l != -100]
                predicted_label_names = [id2label[p] for p, l in zip(prediction, label) if l != -100]
                true_labels.append(true_label_names)
                predicted_labels.append(predicted_label_names)

    # Print classification report
    print(classification_report(true_labels, predicted_labels))

## Implement the main() function

In [18]:
from torch.optim import AdamW
from transformers import get_linear_schedule_with_warmup

def main():
    # Device configuration
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")

    # Load and prepare dataset
    print("Loading and preparing dataset...")
    tokenized_datasets, num_labels = prepare_data()

    # Convert to custom Dataset objects
    train_dataset = NERDataset(tokenized_datasets["train"])
    eval_dataset = NERDataset(tokenized_datasets["validation"])
    test_dataset = NERDataset(tokenized_datasets["test"])

    # Create data loaders
    train_dataloader = DataLoader(
        train_dataset,
        batch_size=256,  # Increased batch size
        shuffle=True
    )
    eval_dataloader = DataLoader(
        eval_dataset,
        batch_size=256
    )
    test_dataloader = DataLoader(
        test_dataset,
        batch_size=256
    )

    # Model initialization
    model = NERModel(num_labels=num_labels)
    model.to(device)

    # Define optimizer with better parameters
    optimizer = AdamW(
        model.parameters(),
        lr=5e-5,  # Increased learning rate
        weight_decay=0.01,
        eps=1e-8
    )

    # Learning rate scheduler
    num_epochs = 3
    num_training_steps = num_epochs * len(train_dataloader)
    num_warmup_steps = num_training_steps // 10
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=num_warmup_steps,
        num_training_steps=num_training_steps
    )

    # Training
    print("Starting training...")
    train_model(model, train_dataloader, eval_dataloader, optimizer, scheduler, device, num_epochs=num_epochs)
    # save trained model
    torch.save(model, "bert_ner_model.pth")

    # Evaluation
    print("\nEvaluating on validation set...")
    id2label = {
        0: "O",
        1: "B-PER",
        2: "I-PER",
        3: "B-ORG",
        4: "I-ORG",
        5: "B-LOC",
        6: "I-LOC",
        7: "B-MISC",
        8: "I-MISC"
    }

    evaluate_model(model, eval_dataloader, id2label, device)

    return model

In [19]:
model = main()

Using device: cuda
Loading and preparing dataset...


Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


Starting training...


Epoch 1/3: 100%|██████████| 55/55 [01:02<00:00,  1.14s/it, loss=0.122]


Epoch 1/3, Average Loss: 0.705366049842401


Epoch 2/3: 100%|██████████| 55/55 [01:01<00:00,  1.12s/it, loss=0.0494]


Epoch 2/3, Average Loss: 0.08070390102538195


Epoch 3/3: 100%|██████████| 55/55 [01:01<00:00,  1.12s/it, loss=0.0441]


Epoch 3/3, Average Loss: 0.047781018303199245

Evaluating on validation set...
              precision    recall  f1-score   support

         LOC       0.94      0.94      0.94      1837
        MISC       0.80      0.82      0.81       922
         ORG       0.88      0.91      0.89      1341
         PER       0.98      0.97      0.97      1836

   micro avg       0.91      0.92      0.92      5936
   macro avg       0.90      0.91      0.90      5936
weighted avg       0.91      0.92      0.92      5936



## Implement inference function


In [41]:
from transformers import AutoTokenizer
import torch
import numpy as np

from transformers import AutoTokenizer
import torch
import numpy as np

def prepare_inference(model_path=None):
    """Initialize tokenizer and load model for inference"""
    tokenizer = AutoTokenizer.from_pretrained('bert-base-cased')

    # Load trained model if path provided, otherwise use existing model
    if model_path:
        model = torch.load(model_path)

    id2label = {
        0: "O",
        1: "B-PER",
        2: "I-PER",
        3: "B-ORG",
        4: "I-ORG",
        5: "B-LOC",
        6: "I-LOC",
        7: "B-MISC",
        8: "I-MISC"
    }

    return tokenizer, id2label

def inference(text, model, tokenizer, id2label, device='cuda'):
    """
    Perform NER inference on input text

    Args:
        text (str): Input text to analyze
        model: Trained NER model
        tokenizer: BERT tokenizer
        id2label (dict): Mapping from label ids to label names
        device (str): Device to run inference on ('cuda' or 'cpu')

    Returns:
        list: List of tuples containing (word, entity_label)
    """
    ## fill in your code

    # Ensure model is in evaluation mode
    model.eval()

    # Tokenize the text
    tokenized_inputs = tokenizer(
        text,
        truncation=True,
        is_split_into_words=False,
        padding='max_length',
        max_length=128,
        return_tensors="pt"  # Return PyTorch tensors
    )

    # Perform inference
    outputs = model(
        tokenized_inputs['input_ids'].to(device),
        tokenized_inputs['attention_mask'].to(device)
    )


    # Convert predictions to labels
    predicted_labels = torch.argmax(outputs.logits, dim=2).squeeze().tolist()
    predicted_labels = [id2label[label] for label in predicted_labels]


    # Align and merge subword tokens with labels
    tokens = tokenizer.convert_ids_to_tokens(tokenized_inputs['input_ids'][0])
    labeled_words = []
    current_word = ""
    current_label = None

    for token, label in zip(tokens, predicted_labels):
        if token == "[PAD]":
            break
        elif token.startswith("##"):
            current_word += token[2:]
        else:
            # If accumulated word is there, add it with its label
            if current_word:
                labeled_words.append((current_word, current_label))
            # Start a new word
            current_word = token
            current_label = label

    # Append the last accumulated word
    if current_word:
        labeled_words.append((current_word, current_label))

    return labeled_words

def print_entities(labeled_words):
    """Pretty print the labeled entities"""
    current_entity = None
    entity_text = []

    for word, label in labeled_words:
        if label == "O":
            if current_entity:
                print(f"{current_entity}: {' '.join(entity_text)}")
                current_entity = None
                entity_text = []
        elif label.startswith("B-"):
            if current_entity:
                print(f"{current_entity}: {' '.join(entity_text)}")
            current_entity = label[2:]  # Remove "B-" prefix
            entity_text = [word]
        elif label.startswith("I-"):
            if current_entity == label[2:]:  # If it's the same entity type
                entity_text.append(word)
            else:
                if current_entity:
                    print(f"{current_entity}: {' '.join(entity_text)}")
                current_entity = label[2:]
                entity_text = [word]

    if current_entity:  # Print last entity if exists
        print(f"{current_entity}: {' '.join(entity_text)}")

In [42]:
# First initialize
tokenizer, id2label = prepare_inference()

# Example texts to analyze
texts = [
    "John Smith works at Microsoft in Seattle and visited New York last summer.",
    "The European Union signed a trade deal with Japan in Brussels.",
    "Tesla CEO Elon Musk announced new features coming to their vehicles."
]

# Process each text
for text in texts:
    print("\nText:", text)
    print("Entities found:")
    results = inference(text, model, tokenizer, id2label)
    print_entities(results)


Text: John Smith works at Microsoft in Seattle and visited New York last summer.
Entities found:
PER: John Smith
ORG: Microsoft
LOC: Seattle
LOC: New York

Text: The European Union signed a trade deal with Japan in Brussels.
Entities found:
ORG: European Union
LOC: Japan
LOC: Brussels

Text: Tesla CEO Elon Musk announced new features coming to their vehicles.
Entities found:
ORG: Tesla
PER: Elon Musk


