In [None]:
%%capture
!pip install --upgrade git+https://github.com/huggingface/transformers.git
!pip install flash-attn
!pip install -q scikit-learn
!pip install -q datasets
!pip install 'accelerate>=0.26.0'
!sudo apt-get -y update
!sudo apt-get -y install build-essential
!pip install evaluate seqeval
!pip install wandb

In [None]:
import wandb

wandb.login(key='00f7a841cc2925bdab7c82a2b4c186d12d042cb1')
wandb.init(project="Encoders")

In [None]:
#!/usr/bin/env python
"""
Train a Named Entity Recognition (NER) model using the "answerdotai/ModernBERT-base" model.
This script:
 - Loads and preprocesses IOB-formatted training data from `train.iob`
 - Splits the data into training and evaluation sets
 - Tokenizes and aligns labels to tokens
 - Defines a token-classification model using Transformers
 - Trains the model with specified hyperparameters while computing evaluation metrics
 - Saves the trained model and tokenizer
"""

import os
import sys
import logging
from typing import List, Tuple, Dict

import torch
from torch.utils.data import Dataset
from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
    Trainer,
    TrainingArguments,
    DataCollatorForTokenClassification,
)
import evaluate
from sklearn.model_selection import train_test_split

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def read_iob(file_path: str) -> Tuple[List[List[str]], List[List[str]]]:
    """
    Reads an IOB file and returns lists of tokenized sentences and their corresponding labels.
    Assumes each non-empty line contains a token and its label separated by whitespace,
    and that sentences are separated by blank lines.
    """
    sentences = []
    labels = []
    current_tokens = []
    current_labels = []
    try:
        with open(file_path, "r", encoding="utf-8") as f:
            for line in f:
                line = line.strip()
                if not line:
                    if current_tokens:
                        sentences.append(current_tokens)
                        labels.append(current_labels)
                        current_tokens, current_labels = [], []
                else:
                    parts = line.split()
                    if len(parts) < 2:
                        continue  # skip malformed lines
                    token, tag = parts[0], parts[-1]
                    current_tokens.append(token)
                    current_labels.append(tag)
            # Append the last sentence if file doesn't end with a blank line
            if current_tokens:
                sentences.append(current_tokens)
                labels.append(current_labels)
    except Exception as e:
        logger.error(f"Error reading file {file_path}: {e}")
        sys.exit(1)
    return sentences, labels

def get_label_mapping(labels: List[List[str]]) -> Dict[str, int]:
    """
    Creates a mapping from label strings to unique integer IDs.
    """
    unique_labels = set(label for sent in labels for label in sent)
    label_to_id = {label: idx for idx, label in enumerate(sorted(unique_labels))}
    return label_to_id

class NERDataset(Dataset):
    """
    Custom Dataset for NER tasks.
    Each example is a dict with tokenized inputs and aligned labels.
    """
    def __init__(self, sentences: List[List[str]], labels: List[List[str]], tokenizer, label_to_id, max_length: int = 128):
        self.sentences = sentences
        self.labels = labels
        self.tokenizer = tokenizer
        self.label_to_id = label_to_id
        self.max_length = max_length

    def __len__(self):
        return len(self.sentences)

    def __getitem__(self, idx):
        tokens = self.sentences[idx]
        label_tags = self.labels[idx]
        # Tokenize while preserving word boundaries
        encoding = self.tokenizer(
            tokens,
            is_split_into_words=True,
            truncation=True,
            padding="max_length",
            max_length=self.max_length,
            return_offsets_mapping=True
        )
        # Align labels with tokenized inputs
        word_ids = encoding.word_ids()
        aligned_labels = []
        previous_word_idx = None
        for word_idx in word_ids:
            if word_idx is None:
                aligned_labels.append(-100)
            elif word_idx != previous_word_idx:
                aligned_labels.append(self.label_to_id[label_tags[word_idx]])
            else:
                aligned_labels.append(-100)
            previous_word_idx = word_idx
        # Remove offset mapping as it's not needed for training
        encoding.pop("offset_mapping")
        encoding["labels"] = aligned_labels
        # Convert lists to tensors
        for key in encoding:
            encoding[key] = torch.tensor(encoding[key])
        return encoding

# File and hyperparameter settings
train_file = "train.iob"
output_dir = "./ner_model"
max_length = 256
num_train_epochs = 20
batch_size = 32
learning_rate = 2e-5

# Load IOB training data
logger.info("Loading training data...")
sentences, labels = read_iob(train_file)
test_sentences, test_labels = read_iob("test.iob")


if not sentences:
    logger.error("No data found in the training file.")
    sys.exit(1)

# Split data into training and evaluation sets (e.g., 90% train, 10% eval)
train_sentences, train_labels = sentences, labels

# Create label mappings
label_to_id = get_label_mapping(labels)
id_to_label = {v: k for k, v in label_to_id.items()}

# Load tokenizer and model from Hugging Face
model_name = "answerdotai/ModernBERT-base"
logger.info(f"Loading tokenizer and model: {model_name}")
try:
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForTokenClassification.from_pretrained(
        model_name,
        num_labels=len(label_to_id),
        id2label=id_to_label,
        label2id=label_to_id
    )
except Exception as e:
    logger.error(f"Error loading model/tokenizer: {e}")
    sys.exit(1)

# Create training and evaluation datasets
train_dataset = NERDataset(train_sentences, train_labels, tokenizer, label_to_id, max_length=max_length)
test_dataset = NERDataset(test_sentences, test_labels, tokenizer, label_to_id, max_length=max_length)

# Define data collator for dynamic padding
data_collator = DataCollatorForTokenClassification(tokenizer)

def compute_metrics(eval_preds):
    pred_logits, labels = eval_preds
    pred_ids = pred_logits.argmax(axis=-1)

    predictions = []
    true_labels = []
    for pred, label in zip(pred_ids, labels):
        pred_seq, true_seq = [], []
        for p, l in zip(pred, label):
            if l != -100:  # Ignore subword/padding positions
                pred_seq.append(id_to_label[p])
                true_seq.append(id_to_label[l])
        predictions.append(pred_seq)
        true_labels.append(true_seq)

    metric = evaluate.load("seqeval")
    # Explicitly request 'strict' mode (exact boundary matches only) 
    # and IOB2 tagging scheme
    results = metric.compute(
        predictions=predictions, 
        references=true_labels,
        mode="strict",         # ensures exact boundary matching
        scheme="IOB2"          # or "IOB1", "IOE2", etc., if that's your labeling scheme
    )

    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

training_args = TrainingArguments(
    output_dir=output_dir,
    evaluation_strategy="epoch",       # Evaluate at the end of each epoch
    save_strategy="epoch",            # Save a checkpoint at the end of each epoch
    load_best_model_at_end=True,      # Load the best model at the end of training
    metric_for_best_model="f1",       # Choose the metric to select the best model
    greater_is_better=True,           # True if a higher metric is better
    num_train_epochs=num_train_epochs,
    per_device_train_batch_size=batch_size,
    per_device_eval_batch_size=batch_size,
    logging_dir="./logs",
    logging_steps=50,
    # optionally keep only the 1 best checkpoint to save space
    save_total_limit=1,
    report_to="wandb",
)

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=train_dataset,
    eval_dataset=test_dataset,
    tokenizer=tokenizer,
    compute_metrics=compute_metrics,
)

logger.info("Starting training...")
trainer.train()

logger.info("Evaluating model on evaluation dataset...")
eval_results = trainer.evaluate()
logger.info(f"Evaluation results: {eval_results}")

logger.info(f"Saving model to {output_dir} ...")
model.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)

logger.info("Training complete.")

In [None]:
import json

def merge_bio_entities(entities):
    merged = []
    current_label = None
    current_tokens = []

    for ent in entities:
        # Each 'ent' has {"label": "B-DATE", "text": "..."} etc.
        if "-" in ent["label"]:
            prefix, ent_type = ent["label"].split("-", 1)
        else:
            # If there's no dash, treat label as "O" or single-label
            prefix, ent_type = "O", ent["label"]

        if prefix == "B":
            # Close any previously open entity
            if current_label is not None:
                merged.append({"label": current_label, "text": " ".join(current_tokens)})
            current_label = ent_type
            current_tokens = [ent["text"]]

        elif prefix == "I" and ent_type == current_label:
            # Continue the current entity
            current_tokens.append(ent["text"])

        else:
            # If prefix is "O" or label doesn't match the current entity
            # close the current entity if open
            if current_label is not None:
                merged.append({"label": current_label, "text": " ".join(current_tokens)})
                current_label = None
                current_tokens = []
            # If this is a new B-XXX label, start a new entity
            if prefix == "B":
                current_label = ent_type
                current_tokens = [ent["text"]]

    # Close any leftover entity
    if current_label is not None:
        merged.append({"label": current_label, "text": " ".join(current_tokens)})

    return merged

# -- Main prediction code --

test_sentences, test_labels = read_iob("test.iob")
test_dataset = NERDataset(test_sentences, test_labels, tokenizer, label_to_id, max_length=max_length)

prediction_output = trainer.predict(test_dataset)
pred_logits = prediction_output.predictions
true_label_ids = prediction_output.label_ids
pred_ids = pred_logits.argmax(axis=-1)

final_res = []

for i, text in enumerate(test_sentences):
    # Filter out positions where the label is -100 (subword/padding)
    words_index = (true_label_ids[i] != -100)
    t_ids = true_label_ids[i][words_index]
    p_ids = pred_ids[i][words_index]
    
    raw_entities = []
    for word, t_id, p_id in zip(text, t_ids, p_ids):
        predicted_label = id_to_label[p_id]
        if predicted_label != "O":
            # Collect each non-O prediction
            raw_entities.append({"label": predicted_label, "text": word})
    
    # Merge consecutive B-XXX and I-XXX tokens
    merged_entities = merge_bio_entities(raw_entities)
    # Convert to a JSON string
    final_res.append(json.dumps(merged_entities, ensure_ascii=False))

In [None]:
import pandas as pd

pd.DataFrame(final_res, columns=["pred"]).to_csv("pred.csv", index=False)