<a href="https://colab.research.google.com/github/profliuhao/CSIT599/blob/main/CSIT599_Module6_BERT_NER_v1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

#BERT for Named Entity Recognition (NER) - Student Exercise

===========================================================

In this exercise, you will fine-tune a BERT model for Named Entity Recognition
using the CoNLL-2003 dataset downloaded directly from a URL.


Dataset: CoNLL-2003 NER dataset (downloaded from URL)

Task: Token classification for entity recognition (PER, ORG, LOC, MISC)


In [None]:
!pip install seqeval

In [None]:
import torch
import numpy as np
import requests
import zipfile
import os
from io import BytesIO
from torch.utils.data import Dataset, DataLoader
from transformers import (
    BertTokenizerFast,
    BertForTokenClassification,
    get_linear_schedule_with_warmup
)
from torch.optim import AdamW
from seqeval.metrics import classification_report, f1_score
from tqdm import tqdm

# Set random seeds for reproducibility
torch.manual_seed(42)
np.random.seed(42)

## PART 1: Download and Parse CoNLL-2003 Dataset

In [None]:
# ==============================================================================
# PART 1: Download and Parse CoNLL-2003 Dataset
# ==============================================================================

def download_and_extract_data(url, extract_path="./conll2003_data"):
    """
    Download and extract the CoNLL-2003 dataset from a URL.

    Args:
        url: URL to download the dataset zip file
        extract_path: Path to extract the dataset

    Returns:
        extract_path: Path where data was extracted
    """
    print(f"Downloading CoNLL-2003 dataset from {url}...")

    # Create directory if it doesn't exist
    os.makedirs(extract_path, exist_ok=True)

    # Download the zip file
    response = requests.get(url)

    # Extract the zip file
    with zipfile.ZipFile(BytesIO(response.content)) as zip_file:
        zip_file.extractall(extract_path)

    print(f"Dataset extracted to {extract_path}")
    return extract_path


def parse_conll_file(file_path):
    """
    Parse a CoNLL-2003 format file.

    The file format has one token per line with columns:
    token pos_tag chunk_tag ner_tag

    Sentences are separated by empty lines.

    Args:
        file_path: Path to the CoNLL file

    Returns:
        sentences: List of sentences, where each sentence is a list of tokens
        ner_tags: List of NER tag sequences, parallel to sentences
    """
    sentences = []
    ner_tags = []

    current_tokens = []
    current_tags = []

    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()

            # Empty line indicates end of sentence
            if not line or line.startswith('-DOCSTART-'):
                if current_tokens:
                    sentences.append(current_tokens)
                    ner_tags.append(current_tags)
                    current_tokens = []
                    current_tags = []
            else:
                # Parse the line: token pos chunk ner
                parts = line.split()
                if len(parts) >= 4:
                    token = parts[0]
                    ner_tag = parts[3]

                    current_tokens.append(token)
                    current_tags.append(ner_tag)

    # Add the last sentence if file doesn't end with empty line
    if current_tokens:
        sentences.append(current_tokens)
        ner_tags.append(current_tags)

    return sentences, ner_tags


def load_conll_dataset(data_path="./conll2003_data"):
    """
    Load all splits of the CoNLL-2003 dataset.

    Args:
        data_path: Path to the extracted dataset

    Returns:
        Dictionary with 'train', 'validation', and 'test' splits
    """
    print("Loading CoNLL-2003 dataset files...")

    # The CoNLL-2003 dataset typically has these file names
    train_file = os.path.join(data_path, "train.txt")
    dev_file = os.path.join(data_path, "valid.txt")
    test_file = os.path.join(data_path, "test.txt")

    # Parse each file
    train_sentences, train_tags = parse_conll_file(train_file)
    dev_sentences, dev_tags = parse_conll_file(dev_file)
    test_sentences, test_tags = parse_conll_file(test_file)

    print(f"Loaded {len(train_sentences)} training sentences")
    print(f"Loaded {len(dev_sentences)} validation sentences")
    print(f"Loaded {len(test_sentences)} test sentences")

    return {
        'train': {'tokens': train_sentences, 'ner_tags': train_tags},
        'validation': {'tokens': dev_sentences, 'ner_tags': dev_tags},
        'test': {'tokens': test_sentences, 'ner_tags': test_tags}
    }


# Define label mappings
label_list = [
    "O", "B-PER", "I-PER", "B-ORG", "I-ORG",
    "B-LOC", "I-LOC", "B-MISC", "I-MISC"
]

# TODO: Create a dictionary mapping label names to IDs
# Hint: Use enumerate to create {label: id} mapping
label2id = _____________________

# TODO: Create a dictionary mapping label IDs to names
# Hint: Reverse the label2id mapping
id2label = _____________________

num_labels = len(label_list)

## PART 2: Custom PyTorch Dataset Class

In [None]:
# ==============================================================================
# PART 2: Custom PyTorch Dataset Class
# ==============================================================================

class CoNLLDataset(Dataset):
    """
    Custom PyTorch Dataset for CoNLL-2003 data.
    """

    def __init__(self, sentences, ner_tags, tokenizer, max_length=128, label2id=None):
        """
        Initialize the dataset.

        Args:
            sentences: List of token lists
            ner_tags: List of NER tag lists (as strings)
            tokenizer: BERT tokenizer
            max_length: Maximum sequence length
            label2id: Dictionary mapping label strings to IDs
        """
        self.sentences = sentences
        self.ner_tags = ner_tags
        self.tokenizer = tokenizer
        self.max_length = max_length
        self.label2id = label2id

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        """
        Get a single example.

        Returns:
            Dictionary with input_ids, attention_mask, and labels
        """
        tokens = self.sentences[idx]
        tags = self.ner_tags[idx]

        # Convert string tags to IDs
        tag_ids = [self.label2id[tag] for tag in tags]

        # Tokenize and align labels
        encoded = self.tokenizer(
            tokens,
            is_split_into_words=True,
            truncation=True,
            padding='max_length',
            max_length=self.max_length,
            return_tensors='pt'
        )

        # Align labels with tokenized input
        labels = self.align_labels(encoded, tag_ids)

        return {
            'input_ids': encoded['input_ids'].squeeze(0),
            'attention_mask': encoded['attention_mask'].squeeze(0),
            'labels': torch.tensor(labels, dtype=torch.long)
        }

    def align_labels(self, encoded, tag_ids, label_all_tokens=True):
        """
        Align labels with tokenized input.

        Args:
            encoded: Output from tokenizer
            tag_ids: List of label IDs for original tokens
            label_all_tokens: Whether to label all subword tokens

        Returns:
            List of aligned label IDs
        """
        word_ids = encoded.word_ids(batch_index=0)

        previous_word_idx = None
        label_ids = []

        for word_idx in word_ids:
            # TODO: Handle special tokens (None) - assign -100 to ignore in loss
            # Hint: Special tokens have word_idx = None
            if word_idx is None:
                label_ids.append(_____)

            # TODO: For the first token of a word, use the actual label
            # Hint: Check if word_idx != previous_word_idx
            elif word_idx != previous_word_idx:
                label_ids.append(_____)

            # TODO: For subsequent tokens of the same word
            # If label_all_tokens is True, use the label; otherwise use -100
            else:
                label_ids.append(tag_ids[word_idx] if _____ else -100)

            previous_word_idx = word_idx

        return label_ids



## PART 3: Prepare Data Loaders

In [None]:
# ==============================================================================
# PART 3: Prepare Data Loaders
# ==============================================================================

def prepare_dataloaders(dataset_dict, tokenizer, batch_size=16):
    """
    Prepare PyTorch DataLoaders for training and evaluation.

    Args:
        dataset_dict: Dictionary with train/validation/test splits
        tokenizer: BERT tokenizer
        batch_size: Batch size for training

    Returns:
        train_dataloader, eval_dataloader, test_dataloader
    """
    print("Preparing data loaders...")

    # Create dataset objects
    train_dataset = CoNLLDataset(
        dataset_dict['train']['tokens'],
        dataset_dict['train']['ner_tags'],
        tokenizer,
        label2id=label2id
    )

    eval_dataset = CoNLLDataset(
        dataset_dict['validation']['tokens'],
        dataset_dict['validation']['ner_tags'],
        tokenizer,
        label2id=label2id
    )

    test_dataset = CoNLLDataset(
        dataset_dict['test']['tokens'],
        dataset_dict['test']['ner_tags'],
        tokenizer,
        label2id=label2id
    )

    # TODO: Create DataLoader for training set
    # Hint: Use DataLoader with shuffle=True for training
    train_dataloader = DataLoader(
        train_dataset,
        shuffle=_____,
        batch_size=batch_size
    )

    # TODO: Create DataLoader for validation set
    # Hint: Use shuffle=False for evaluation
    eval_dataloader = DataLoader(
        eval_dataset,
        shuffle=_____,
        batch_size=batch_size
    )

    # Create DataLoader for test set
    test_dataloader = DataLoader(
        test_dataset,
        shuffle=False,
        batch_size=batch_size
    )

    return train_dataloader, eval_dataloader, test_dataloader



## PART 4: Training Function

In [None]:
# ==============================================================================
# PART 4: Training Function
# ==============================================================================

def train_epoch(model, dataloader, optimizer, scheduler, device):
    """
    Train the model for one epoch.

    Args:
        model: BERT model for token classification
        dataloader: Training data loader
        optimizer: Optimizer
        scheduler: Learning rate scheduler
        device: CPU or CUDA device

    Returns:
        Average training loss for the epoch
    """
    model.train()
    total_loss = 0

    progress_bar = tqdm(dataloader, desc="Training")

    for batch in progress_bar:
        # TODO: Move batch tensors to device (GPU/CPU)
        # Hint: Use .to(device) for each tensor
        input_ids = batch["input_ids"].to(_____)
        attention_mask = batch["attention_mask"].to(_____)
        labels = batch["labels"].to(_____)

        # TODO: Zero the gradients
        # Hint: Use optimizer.zero_grad()
        _____

        # Forward pass
        outputs = model(
            input_ids=input_ids,
            attention_mask=attention_mask,
            labels=labels
        )

        loss = outputs.loss
        total_loss += loss.item()

        # TODO: Backward pass - compute gradients
        # Hint: Call .backward() on the loss
        _____

        # TODO: Update model parameters
        # Hint: Use optimizer.step()
        _____

        # TODO: Update learning rate
        # Hint: Use scheduler.step()
        _____

        progress_bar.set_postfix({"loss": loss.item()})

    return total_loss / len(dataloader)



## PART 5: Evaluation Function

In [None]:
# ==============================================================================
# PART 5: Evaluation Function
# ==============================================================================

def evaluate(model, dataloader, device):
    """
    Evaluate the model on the given dataloader.

    Uses seqeval library which properly handles entity-level evaluation
    (rather than token-level), following the CoNLL evaluation scheme.

    Args:
        model: BERT model for token classification
        dataloader: Evaluation data loader
        device: CPU or CUDA device

    Returns:
        metrics: Dictionary with precision, recall, f1, and loss
    """
    model.eval()
    total_loss = 0

    all_predictions = []
    all_labels = []

    with torch.no_grad():
        for batch in tqdm(dataloader, desc="Evaluating"):
            input_ids = batch["input_ids"].to(device)
            attention_mask = batch["attention_mask"].to(device)
            labels = batch["labels"].to(device)

            # TODO: Get model outputs (forward pass)
            # Hint: Pass input_ids, attention_mask, and labels to model
            outputs = model(
                input_ids=_____,
                attention_mask=_____,
                labels=_____
            )

            loss = outputs.loss
            total_loss += loss.item()

            # Get predictions (logits -> class IDs)
            # TODO: Get the predicted class for each token
            # Hint: Use torch.argmax on outputs.logits along the last dimension
            predictions = torch.argmax(_____, dim=-1)

            # Convert to lists and remove ignored index (-100)
            predictions = predictions.cpu().numpy()
            labels = labels.cpu().numpy()

            # Remove padding and special tokens for evaluation
            for pred_seq, label_seq in zip(predictions, labels):
                pred_labels = []
                true_labels = []

                for pred, label in zip(pred_seq, label_seq):
                    if label != -100:  # Ignore special tokens
                        pred_labels.append(id2label[pred])
                        true_labels.append(id2label[label])

                all_predictions.append(pred_labels)
                all_labels.append(true_labels)

    # Calculate metrics using seqeval
    avg_loss = total_loss / len(dataloader)
    f1 = f1_score(all_labels, all_predictions)

    print("\n" + classification_report(all_labels, all_predictions))

    return {
        "loss": avg_loss,
        "f1": f1
    }

## PART 6: Main Training Loop

In [None]:
# ==============================================================================
# PART 6: Main Training Loop
# ==============================================================================

def main():
    """
    Main function to orchestrate the training process.
    """
    # Set device
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(f"Using device: {device}")

    # Download and load dataset
    data_url = "https://data.deepai.org/conll2003.zip"
    data_path = download_and_extract_data(data_url)
    dataset = load_conll_dataset(data_path)

    # TODO: Initialize BERT tokenizer
    # Hint: Use BertTokenizerFast.from_pretrained("bert-base-cased")
    # Note: Use cased version because entity names are case-sensitive
    tokenizer = BertTokenizerFast.from_pretrained(_____)

    # Prepare data loaders
    train_dataloader, eval_dataloader, test_dataloader = prepare_dataloaders(
        dataset, tokenizer, batch_size=16
    )

    # TODO: Initialize BERT model for token classification
    # Hint: Use BertForTokenClassification.from_pretrained with num_labels parameter
    model = BertForTokenClassification.from_pretrained(
        "bert-base-cased",
        num_labels=_____,
        id2label=id2label,
        label2id=label2id
    )
    model.to(device)

    # Training hyperparameters
    num_epochs = 3
    learning_rate = 5e-5

    # TODO: Initialize optimizer
    # Hint: Use AdamW with model.parameters() and learning_rate
    optimizer = AdamW(_____, lr=_____)

    # Calculate total training steps for scheduler
    total_steps = len(train_dataloader) * num_epochs

    # TODO: Initialize learning rate scheduler
    # Hint: Use get_linear_schedule_with_warmup
    scheduler = get_linear_schedule_with_warmup(
        optimizer,
        num_warmup_steps=_____,  # Use 10% of total steps for warmup
        num_training_steps=_____
    )

    # Training loop
    print("\n" + "="*50)
    print("Starting Training")
    print("="*50)

    best_f1 = 0

    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch + 1}/{num_epochs}")
        print("-" * 50)

        # Train
        train_loss = train_epoch(model, train_dataloader, optimizer, scheduler, device)
        print(f"Training Loss: {train_loss:.4f}")

        # Evaluate
        print("\nValidation Results:")
        eval_metrics = evaluate(model, eval_dataloader, device)
        print(f"Validation Loss: {eval_metrics['loss']:.4f}")
        print(f"Validation F1: {eval_metrics['f1']:.4f}")

        # Save best model
        if eval_metrics['f1'] > best_f1:
            best_f1 = eval_metrics['f1']
            torch.save(model.state_dict(), "best_bert_ner_model.pt")
            print(f"Saved new best model with F1: {best_f1:.4f}")

    # Final evaluation on test set
    print("\n" + "="*50)
    print("Final Evaluation on Test Set")
    print("="*50)

    # Load best model
    model.load_state_dict(torch.load("best_bert_ner_model.pt"))
    test_metrics = evaluate(model, test_dataloader, device)
    print(f"\nTest Loss: {test_metrics['loss']:.4f}")
    print(f"Test F1: {test_metrics['f1']:.4f}")

## PART 7: Inference Example

In [None]:
# ==============================================================================
# PART 7: Inference Example
# ==============================================================================

def predict_entities(text, model, tokenizer, device):
    """
    Predict named entities in a given text.

    Args:
        text: Input text string
        model: Trained BERT NER model
        tokenizer: BERT tokenizer
        device: CPU or CUDA device

    Returns:
        List of (word, entity_tag) tuples
    """
    model.eval()

    # Tokenize input
    tokens = tokenizer(text, return_tensors="pt", truncation=True, max_length=128)
    input_ids = tokens["input_ids"].to(device)
    attention_mask = tokens["attention_mask"].to(device)

    # Get predictions
    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)
        predictions = torch.argmax(outputs.logits, dim=-1)

    # Convert token IDs back to words
    predicted_labels = predictions[0].cpu().numpy()
    tokens_decoded = tokenizer.convert_ids_to_tokens(input_ids[0])

    # Filter out special tokens and pair with predictions
    results = []
    for token, label_id in zip(tokens_decoded, predicted_labels):
        if token not in ["[CLS]", "[SEP]", "[PAD]"]:
            results.append((token, id2label[label_id]))

    return results




## PART 8: Training

In [None]:
if __name__ == "__main__":
    main()

## PART 9: Inference

In [None]:
# Example inference
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tokenizer = BertTokenizerFast.from_pretrained("bert-base-cased")
model = BertForTokenClassification.from_pretrained(
    "bert-base-cased",
    num_labels=num_labels,
    id2label=id2label,
    label2id=label2id
)
model.load_state_dict(torch.load("best_bert_ner_model.pt"))
model.to(device)


In [None]:
test_text = "Apple Inc. is located in Cupertino, California. Tim Cook is the CEO."
entities = predict_entities(test_text, model, tokenizer, device)
print("\nDetected Entities:")
for token, label in entities:
    if label != "O":
        print(f"{token}: {label}")