In [None]:
!pip install evaluate datasets transformers seqeval

import evaluate
from datasets import load_dataset
from transformers import AutoTokenizer
from transformers import AutoModelForTokenClassification, DataCollatorForTokenClassification, Trainer, TrainingArguments

raw_datasets = load_dataset("conll2003")
label_list = raw_datasets["train"].features["ner_tags"].feature.names
checkpoint = "microsoft/deberta-base"
num_labels = len(label_list)

tokenizer = AutoTokenizer.from_pretrained(
    "microsoft/deberta-base",
    use_fast=True,
    add_prefix_space=True
)

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"],
        truncation=True,
        is_split_into_words=True,
        max_length=128,
        padding="max_length"
    )
    aligned_labels = []
    for i, labels in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        label_ids = []
        for word_id in word_ids:
            # -100 is used to ignore subword tokens in the loss
            label_ids.append(labels[word_id] if word_id is not None else -100)
        aligned_labels.append(label_ids)
    tokenized_inputs["labels"] = aligned_labels
    return tokenized_inputs

tokenized_datasets = raw_datasets.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=raw_datasets["train"].column_names
)

# 4. Load model
model = AutoModelForTokenClassification.from_pretrained(checkpoint, num_labels=num_labels)
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

# 5. Define metrics
metric = evaluate.load("seqeval")

def compute_metrics(p):
    logits, labels = p
    predictions = logits.argmax(axis=-1)

    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

# 6. Training setup
training_args = TrainingArguments(
    output_dir="./my_deberta_ner",
    do_train=True,
    do_eval=True,
    # Instead of evaluation_strategy="epoch", pick a step-based approach:
    eval_steps=500,
    save_steps=500,
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    learning_rate=2e-5,
    logging_steps=50
)


# 7. Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

# 8. Train
trainer.train()

# 9. Evaluate
metrics = trainer.evaluate(tokenized_datasets["test"])
print("Test Metrics:", metrics)
print("F1 Score:", metrics["eval_f1"])

In [None]:
!pip install evaluate datasets transformers seqeval pytorch-crf

import evaluate
from datasets import load_dataset
from transformers import AutoTokenizer
from transformers import AutoModelForTokenClassification, DataCollatorForTokenClassification, Trainer, TrainingArguments
import torch
import torch.nn as nn
from torchcrf import CRF
from transformers import DebertaPreTrainedModel, DebertaModel

class DebertaCRFForTokenClassification(DebertaPreTrainedModel):
    """
    Custom DeBERTa model with a CRF layer on top for token classification.
    """
    def __init__(self, config):
        super().__init__(config)
        self.num_labels = config.num_labels

        self.deberta = DebertaModel(config)
        self.dropout = nn.Dropout(config.hidden_dropout_prob)
        self.classifier = nn.Linear(config.hidden_size, config.num_labels)
        self.crf = CRF(self.num_labels, batch_first=True)

        self.post_init()

    def forward(
        self,
        input_ids=None,
        attention_mask=None,
        token_type_ids=None,
        labels=None,
        **kwargs,
    ):
        # 1. Extract embeddings from DeBERTa
        deberta_kwargs = {
            "input_ids": input_ids,
            "attention_mask": attention_mask,
            "token_type_ids": token_type_ids,
        }

        outputs = self.deberta(**{k: v for k, v in deberta_kwargs.items() if v is not None})


        # 2. Classifier + dropout
        sequence_output = self.dropout(outputs[0])  # (batch_size, seq_len, hidden_dim)
        logits = self.classifier(sequence_output)   # (batch_size, seq_len, num_labels)

        loss = None
        if labels is not None:
            # build a mask of valid token positions (labels != -100)
            mask = labels != -100

            # clamp -100 labels to zero so CRF doesn't see them as invalid indexes
            labels_clamped = labels.clone()
            labels_clamped[labels_clamped == -100] = 0

            log_likelihood = self.crf(logits, tags=labels_clamped, mask=mask)
            loss = -1 * log_likelihood

        output = (logits,)
        return ((loss,) + output) if loss is not None else output

# -------------------------
# 1. Load the custom model code
# -------------------------

# 2. Load dataset
raw_datasets = load_dataset("conll2003")
label_list = raw_datasets["train"].features["ner_tags"].feature.names
num_labels = len(label_list)

checkpoint = "microsoft/deberta-base"
tokenizer = AutoTokenizer.from_pretrained(
    checkpoint,
    use_fast=True,
    add_prefix_space=True
)

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"],
        truncation=True,
        is_split_into_words=True,
        max_length=128,
        padding="max_length"
    )
    aligned_labels = []
    for i, labels in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        label_ids = []
        for word_id in word_ids:
            if word_id is None:
                label_ids.append(-100)
            else:
                label_ids.append(labels[word_id])

        # Ensure first token is not masked for CRF
        if label_ids[0] == -100:
            for j, l in enumerate(label_ids):
                if l != -100:
                    label_ids[0] = l  # move first real label up
                    break

        aligned_labels.append(label_ids)
    tokenized_inputs["labels"] = aligned_labels  # this line should only appear once, after the loop
    return tokenized_inputs

tokenized_datasets = raw_datasets.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=raw_datasets["train"].column_names
)

# 3. Create the CRF model
model = DebertaCRFForTokenClassification.from_pretrained(
    checkpoint,
    num_labels=num_labels
)

data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

# 4. Load the seqeval metric
metric = evaluate.load("seqeval")

def compute_metrics(eval_preds):
    predictions, labels = eval_preds

    # ---------------------
    # CRF decode step
    # ---------------------
    # 1. Convert raw logits to tensor
    device = model.device
    logits_tensor = torch.tensor(logits, dtype=torch.float32).to(device)
    labels_tensor = torch.tensor(labels, dtype=torch.long).to(device)
    mask = labels_tensor != -100  # This will also be on the right device now


    with torch.no_grad():
        if logits_tensor.ndim == 3:  # (batch_size, seq_len, num_labels)
            predictions_list = model.crf.decode(logits_tensor, mask=mask)
        else:  # already decoded predictions
            predictions_list = logits_tensor.tolist()

    true_predictions = []
    true_labels = []

    for preds, golds, m in zip(predictions_list, labels_tensor, mask):
        preds_idx = 0
        tmp_pred = []
        tmp_gold = []
        for gold_label, mask_val in zip(golds, m):
            if mask_val.item() == 1:
                # decode the predicted label ID
                pred_id = preds[preds_idx]
                preds_idx += 1
                tmp_pred.append(label_list[pred_id])
                tmp_gold.append(label_list[gold_label.item()])
        true_predictions.append(tmp_pred)
        true_labels.append(tmp_gold)

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

# 5. Training Arguments (with AdamW, LR=2e-5, weight_decay=0.01, etc.)
training_args = TrainingArguments(
    output_dir="./my_deberta_ner",
    do_train=True,
    do_eval=True,
    eval_steps=500,
    save_steps=500,
    num_train_epochs=3,   # or 5 if you're doing data augmentation
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    learning_rate=2e-5,
    weight_decay=0.01,    # CRF config typically pairs well with some weight decay
    logging_steps=50
)

# 6. Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_datasets["train"],
    eval_dataset=tokenized_datasets["validation"],
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

# 7. Train
trainer.train()

In [None]:
# 8. Evaluate on Test Set
metrics = trainer.evaluate(tokenized_datasets["test"])
print("Test Metrics:", metrics)
print("F1 Score:", metrics["eval_f1"])

In [None]:
!pip install nltk evaluate datasets transformers seqeval pytorch-crf

import random
import nltk
from nltk.corpus import wordnet
nltk.download("wordnet")
nltk.download("omw-1.4")

import evaluate
from datasets import load_dataset, concatenate_datasets
from transformers import AutoTokenizer
from transformers import AutoModelForTokenClassification, DataCollatorForTokenClassification, Trainer, TrainingArguments

raw_datasets = load_dataset("conll2003")
label_list = raw_datasets["train"].features["ner_tags"].feature.names
checkpoint = "microsoft/deberta-base"
num_labels = len(label_list)

tokenizer = AutoTokenizer.from_pretrained(
    "microsoft/deberta-base",
    use_fast=True,
    add_prefix_space=True
)

# --------------------- Augmentation Functions ---------------------
def get_synonyms(word):
    synonyms = set()
    for syn in wordnet.synsets(word):
        for lemma in syn.lemmas():
            name = lemma.name().replace('_', ' ')
            if name.lower() != word.lower():
                synonyms.add(name)
    return list(synonyms)

def synonym_replacement(example, max_replacements=2):
    tokens = example["tokens"][:]
    labels = example["ner_tags"][:]
    non_entity_indices = [i for i, label in enumerate(labels) if label == 0]
    random.shuffle(non_entity_indices)
    replaced = 0
    for idx in non_entity_indices:
        synonyms = get_synonyms(tokens[idx])
        if synonyms:
            tokens[idx] = random.choice(synonyms)
            replaced += 1
        if replaced >= max_replacements:
            break
    return {"tokens": tokens, "ner_tags": labels}

def random_masking(example, max_masks=2):
    tokens = example["tokens"][:]
    labels = example["ner_tags"][:]
    non_entity_indices = [i for i, label in enumerate(labels) if label == 0]
    random.shuffle(non_entity_indices)
    masked = 0
    for idx in non_entity_indices[:max_masks]:
        tokens[idx] = "[MASK]"
        masked += 1
        if masked >= max_masks:
            break
    return {"tokens": tokens, "ner_tags": labels}

# --------------------- Apply Augmentation ---------------------
train_data = raw_datasets["train"].shuffle(seed=42)
syn_replace_count = int(0.15 * len(train_data))
mask_count = int(0.10 * len(train_data))

syn_replace_set = train_data.select(range(syn_replace_count))
mask_set = train_data.select(range(syn_replace_count, syn_replace_count + mask_count))
rest_set = train_data.select(range(syn_replace_count + mask_count, len(train_data)))

aug_syn = syn_replace_set.map(synonym_replacement)
aug_mask = mask_set.map(random_masking)

augmented_train = concatenate_datasets([aug_syn, aug_mask, rest_set]).shuffle(seed=42)

# --------------------- Tokenizer & Alignment ---------------------
def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"],
        truncation=True,
        is_split_into_words=True,
        max_length=128,
        padding="max_length"
    )
    aligned_labels = []
    for i, labels in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        label_ids = []
        for word_id in word_ids:
            if word_id is None:
                label_ids.append(-100)
            else:
                label_ids.append(labels[word_id])
        if label_ids[0] == -100:
            for j, l in enumerate(label_ids):
                if l != -100:
                    label_ids[0] = l
                    break
        aligned_labels.append(label_ids)
    tokenized_inputs["labels"] = aligned_labels
    return tokenized_inputs

tokenized_train = augmented_train.map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=augmented_train.column_names
)

tokenized_val = raw_datasets["validation"].map(
    tokenize_and_align_labels,
    batched=True,
    remove_columns=raw_datasets["validation"].column_names
)

# 5. Define metrics
metric = evaluate.load("seqeval")

def compute_metrics(p):
    logits, labels = p
    predictions = logits.argmax(axis=-1)

    true_predictions = [
        [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = metric.compute(predictions=true_predictions, references=true_labels)
    return {
        "precision": results["overall_precision"],
        "recall": results["overall_recall"],
        "f1": results["overall_f1"],
        "accuracy": results["overall_accuracy"],
    }

model = AutoModelForTokenClassification.from_pretrained(checkpoint, num_labels=num_labels)
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

# 6. Training setup
training_args = TrainingArguments(
    output_dir="./my_deberta_ner",
    do_train=True,
    do_eval=True,
    # Instead of evaluation_strategy="epoch", pick a step-based approach:
    eval_steps=500,
    save_steps=500,
    num_train_epochs=3,
    per_device_train_batch_size=8,
    per_device_eval_batch_size=8,
    learning_rate=2e-5,
    logging_steps=50
)


# 7. Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train,
    eval_dataset=tokenized_val,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

# 8. Train
trainer.train()

In [None]:
# 9. Evaluate
metrics = trainer.evaluate(tokenized_datasets["test"])
print("Test Metrics:", metrics)
print("F1 Score:", metrics["eval_f1"])