In [None]:
!rm -rf /content/*

In [None]:
from transformers import (
    AutoTokenizer,
    AutoModelForTokenClassification,
    TrainingArguments,
    Trainer,
    DataCollatorForTokenClassification
)
from datasets import Dataset, load_metric
import numpy as np
import pandas as pd
import time

# Function to read CONLL file
def read_conll_file(file_path):
    with open(file_path, 'r', encoding='utf-8') as f:
        sentences = []
        current_sentence = {"tokens": [], "ner_tags": []}

        for line in f:
            line = line.strip()

            if not line:  # Sentence boundary
                if current_sentence["tokens"]:
                    sentences.append(current_sentence)
                    current_sentence = {"tokens": [], "ner_tags": []}
                continue

            parts = line.split('\t')
            if len(parts) == 2:
                token, tag = parts
                current_sentence["tokens"].append(token)
                current_sentence["ner_tags"].append(tag)

    return sentences



In [None]:
from google.colab import files
uploaded = files.upload()

Saving amharic_ner_conll_labeled_output.conll to amharic_ner_conll_labeled_output.conll


In [None]:
input_file = "amharic_ner_conll_labeled_output.conll"

In [None]:
from datasets import Dataset, DatasetDict
import pandas as pd
from sklearn.model_selection import train_test_split

def load_and_split_conll(file_path, val_size=0.2, test_size=0.1):
    # Read the single CONLL file
    data = read_conll_file(file_path)

    # Convert to pandas DataFrame for easy splitting
    df = pd.DataFrame(data)

    # Split into train, validation, and test
    train_df, temp_df = train_test_split(df, test_size=val_size + test_size, random_state=42)
    val_df, test_df = train_test_split(temp_df, test_size=test_size/(val_size + test_size), random_state=42)

    # Create DatasetDict
    return DatasetDict({
        "train": Dataset.from_pandas(train_df),
        "validation": Dataset.from_pandas(val_df),
        "test": Dataset.from_pandas(test_df)
    })



In [None]:
MODELS_TO_COMPARE = [
    "xlm-roberta-base",
    "distilbert-base-multilingual-cased",
    "bert-base-multilingual-cased"
]

def compare_models(dataset):
    results = []

    # Get label list
    global label_list
    all_tags = set()
    for split in dataset.values():
        for tags in split["ner_tags"]:
            all_tags.update(tags)
    label_list = sorted(all_tags)

    # Create label mappings
    label2id = {label: i for i, label in enumerate(label_list)}
    id2label = {i: label for i, label in enumerate(label_list)}

    # Define MAX_LENGTH, LEARNING_RATE, BATCH_SIZE, NUM_EPOCHS, and device
    # These variables were used in the original code but not defined.
    # You should set appropriate values for these.
    MAX_LENGTH = 128
    LEARNING_RATE = 2e-5
    BATCH_SIZE = 16
    NUM_EPOCHS = 3

    import torch
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


    # Define the compute_metrics function
    # This function was used in the original code but not defined.
    # install the 'seqeval' library to use the metric.
    !pip install -q seqeval
    from datasets import load_metric
    metric = load_metric("seqeval")

    def compute_metrics(p, label_list):
        predictions, labels = p
        predictions = np.argmax(predictions, axis=2)

        # Remove ignored index (special tokens)
        true_predictions = [
            [label_list[p] for (p, l) in zip(prediction, label) if l != -100]
            for prediction, label in zip(predictions, labels)
        ]
        true_labels = [
            [label_list[l] for (p, l) in zip(prediction, label) if l != -100]
            for prediction, label in zip(predictions, labels)
        ]

        results = metric.compute(predictions=true_predictions, references=true_labels)
        return {
            "precision": results["overall_precision"],
            "recall": results["overall_recall"],
            "f1": results["overall_f1"],
            "accuracy": results["overall_accuracy"],
        }


    for model_name in MODELS_TO_COMPARE: # Iterate only through model names
        try:
            print(f"\n{'='*50}")
            print(f"Training {model_name}")
            print(f"{'='*50}")

            # Load tokenizer - FORCE fast version
            tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=True)

            # Tokenize function with fast tokenizer
            def tokenize_and_align_labels(examples):
                tokenized_inputs = tokenizer(
                    examples["tokens"],
                    truncation=True,
                    is_split_into_words=True,
                    max_length=MAX_LENGTH,
                    padding="max_length"
                )

                labels = []
                for i, label_seq in enumerate(examples["ner_tags"]):
                    word_ids = tokenized_inputs.word_ids(batch_index=i)
                    previous_word_idx = None
                    label_ids = []
                    for word_idx in word_ids:
                        if word_idx is None:
                            label_ids.append(-100)
                        elif word_idx != previous_word_idx:
                            # Use .get with a default of -100 to handle potential missing tags
                            label_ids.append(label2id.get(label_seq[word_idx], -100))
                        else:
                            label_ids.append(-100)
                        previous_word_idx = word_idx
                    labels.append(label_ids)

                tokenized_inputs["labels"] = labels
                return tokenized_inputs

            # Tokenize dataset
            tokenized_datasets = dataset.map(
                tokenize_and_align_labels,
                batched=True,
                remove_columns=dataset["train"].column_names
            )

            # Load model - Use AutoModelForTokenClassification
            model = AutoModelForTokenClassification.from_pretrained(
                model_name,
                num_labels=len(label_list),
                id2label=id2label,
                label2id=label2id
            ).to(device)

            # Training arguments
            training_args = TrainingArguments(
                output_dir=f"./results_{model_name.replace('/', '-')}",
                eval_strategy="epoch",
                learning_rate=LEARNING_RATE,
                per_device_train_batch_size=BATCH_SIZE,
                per_device_eval_batch_size=BATCH_SIZE,
                num_train_epochs=NUM_EPOCHS,
                weight_decay=0.01,
                save_total_limit=2,
                save_strategy="epoch",
                load_best_model_at_end=True,
                metric_for_best_model="f1",
                logging_dir=f"./logs_{model_name.replace('/', '-')}",
                logging_steps=10,
                report_to="none"
            )

            # Trainer
            trainer = Trainer(
                model=model,
                args=training_args,
                train_dataset=tokenized_datasets["train"],
                eval_dataset=tokenized_datasets["validation"],
                tokenizer=tokenizer,
                data_collator=DataCollatorForTokenClassification(tokenizer),
                compute_metrics=lambda p: compute_metrics(p, label_list)
            )

            # Train
            start_time = time.time()
            trainer.train()
            training_time = time.time() - start_time

            # Evaluate
            eval_results = trainer.evaluate()

            results.append({
                "model_name": model_name,
                "f1_score": eval_results["eval_f1"],
                "precision": eval_results["eval_precision"],
                "recall": eval_results["eval_recall"],
                "accuracy": eval_results["eval_accuracy"],
                "training_time": training_time,
                "model_size": sum(p.numel() for p in model.parameters())
            })

        except Exception as e:
            print(f"Error training {model_name}: {str(e)}")
            import traceback
            traceback.print_exc() # Print the full traceback for debugging
            continue

    if not results:
        raise ValueError("All models failed to train. Check error messages above.")

    # Find the best model based on F1 score
    best_model_index = np.argmax([r["f1_score"] for r in results])
    best_model = results[best_model_index]
    best_model_path = f"./best_model_{best_model['model_name'].replace('/', '-')}"

    # Save the best model
    trainer.model.save_pretrained(best_model_path)
    tokenizer.save_pretrained(best_model_path)

    return pd.DataFrame(results), best_model

In [None]:
dataset = load_and_split_conll("amharic_ner_conll_labeled_output.conll", val_size=0.2, test_size=0.1)
dataset

DatasetDict({
    train: Dataset({
        features: ['tokens', 'ner_tags', '__index_level_0__'],
        num_rows: 6
    })
    validation: Dataset({
        features: ['tokens', 'ner_tags', '__index_level_0__'],
        num_rows: 2
    })
    test: Dataset({
        features: ['tokens', 'ner_tags', '__index_level_0__'],
        num_rows: 2
    })
})

In [None]:

comparison_results, best_model = compare_models(dataset)
print(f"Best model: {best_model['model_name']}")
print(comparison_results)


  metric = load_metric("seqeval")



Training xlm-roberta-base


The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Map:   0%|          | 0/6 [00:00<?, ? examples/s]

Map:   0%|          | 0/2 [00:00<?, ? examples/s]

Map:   0%|          | 0/2 [00:00<?, ? examples/s]

Some weights of XLMRobertaForTokenClassification were not initialized from the model checkpoint at xlm-roberta-base and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = Trainer(


Epoch,Training Loss,Validation Loss,Precision,Recall,F1,Accuracy
1,No log,1.853103,0.0625,0.029412,0.04,0.536
2,No log,1.790051,0.2,0.058824,0.090909,0.608
3,No log,1.755643,0.2,0.058824,0.090909,0.6


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


  _warn_prf(average, modifier, msg_start, len(result))



Training distilbert-base-multilingual-cased


Map:   0%|          | 0/6 [00:00<?, ? examples/s]

Map:   0%|          | 0/2 [00:00<?, ? examples/s]

Map:   0%|          | 0/2 [00:00<?, ? examples/s]

Some weights of DistilBertForTokenClassification were not initialized from the model checkpoint at distilbert-base-multilingual-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = Trainer(


Epoch,Training Loss,Validation Loss,Precision,Recall,F1,Accuracy
1,No log,1.872117,0.066667,0.025641,0.037037,0.46988
2,No log,1.685045,0.090909,0.025641,0.04,0.692771
3,No log,1.599197,0.111111,0.025641,0.041667,0.704819


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


  _warn_prf(average, modifier, msg_start, len(result))



Training bert-base-multilingual-cased


Map:   0%|          | 0/6 [00:00<?, ? examples/s]

Map:   0%|          | 0/2 [00:00<?, ? examples/s]

Map:   0%|          | 0/2 [00:00<?, ? examples/s]

Some weights of BertForTokenClassification were not initialized from the model checkpoint at bert-base-multilingual-cased and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
  trainer = Trainer(


Epoch,Training Loss,Validation Loss,Precision,Recall,F1,Accuracy
1,No log,1.344515,0.0,0.0,0.0,0.716867
2,No log,1.106431,0.0,0.0,0.0,0.716867
3,No log,1.054486,0.0,0.0,0.0,0.716867


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


  _warn_prf(average, modifier, msg_start, len(result))
  _warn_prf(average, modifier, msg_start, len(result))


Best model: xlm-roberta-base
                           model_name  f1_score  precision    recall  \
0                    xlm-roberta-base  0.090909   0.200000  0.058824   
1  distilbert-base-multilingual-cased  0.041667   0.111111  0.025641   
2        bert-base-multilingual-cased  0.000000   0.000000  0.000000   

   accuracy  training_time  model_size  
0  0.608000     527.227098   277459977  
1  0.704819     180.017216   134741001  
2  0.716867     263.752807   177269769  


1. Imports and Initial Setup

In [52]:
import torch
from transformers import AutoModelForTokenClassification, AutoTokenizer
import numpy as np
from lime.lime_text import LimeTextExplainer
import shap
import matplotlib.pyplot as plt

2. Model Loading and Setup

In [53]:

# Load your saved best model
model_path = "./best_model_xlm-roberta-base"
model = AutoModelForTokenClassification.from_pretrained(model_path)
tokenizer = AutoTokenizer.from_pretrained(model_path)

# Ensure model is in evaluation mode
model.eval()
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

label_list = list(model.config.id2label.values())


SHAP Implementation Section

In [64]:

# def shap_explanation(text, model, tokenizer):
#     """Explain model predictions using SHAP with space tokenization"""
#     print("\nGenerating SHAP explanation...")


def shap_explanation(text, model, tokenizer):
    """Simplified SHAP explanation for space-tokenized text"""
    print("\nGenerating SHAP-style explanation...")

    words = space_tokenizer(text)
    num_words = len(words)

    print("Feature importance by position:")
    print("{:15} {:10} {}".format("Word", "Position", "Importance"))
    print("-" * 40)

    # Create a simple positional importance analysis
    for i, word in enumerate(words):
        # Create modified versions of the text
        original_pred = predict_ner(text)[1][i]

        # Create text with this word masked
        masked_words = words.copy()
        masked_words[i] = "[MASK]"
        masked_text = " ".join(masked_words)
        masked_pred = predict_ner(masked_text)[1][i]

        # Calculate importance as prediction change
        importance = abs(original_pred - masked_pred)

        print("{:15} {:10} {:.2f}".format(
            word,
            i,
            importance
        ))

    print("\nKey:")
    print("Importance = How much prediction changes when word is masked")


2. LIME Implementation Section

In [63]:

def lime_explanation(text, model, tokenizer, target_label_idx):
    """Simplified LIME explanation for space-tokenized text"""
    print(f"\nGenerating LIME-style explanation for {label_list[target_label_idx]}...")

    words = space_tokenizer(text)
    num_words = len(words)

    # Create neighborhood of examples by removing one word at a time
    neighborhood = []
    for i in range(num_words):
        modified_words = words.copy()
        modified_words[i] = "[MASK]"
        neighborhood.append(" ".join(modified_words))

    # Get predictions for all examples
    predictions = []
    for example in [text] + neighborhood:
        _, preds = predict_ner(example)
        predictions.append(preds)

    # Calculate importance for each position
    importance_scores = []
    original_preds = predictions[0]
    for i in range(num_words):
        # Focus on the target label's prediction at this position
        original_score = (original_preds[i] == target_label_idx)
        modified_score = (predictions[i+1][i] == target_label_idx)
        importance = abs(original_score - modified_score)
        importance_scores.append((words[i], i, importance))

    # Sort by importance
    importance_scores.sort(key=lambda x: x[2], reverse=True)

    print("\nTop influential words:")
    print("{:15} {:10} {}".format("Word", "Position", "Influence"))
    print("-" * 40)
    for word, pos, imp in importance_scores[:min(5, num_words)]:
        print("{:15} {:10} {:.2f}".format(word, pos, imp))



3. Custom Tokenizer Setup

In [56]:
# Custom space-based tokenizer
def space_tokenizer(text):
    return text.split()

# Wrapper to make compatible with transformers and SHAP
class SpaceTokenizerWrapper:
    def tokenize(self, text):
        return space_tokenizer(text)

    def convert_tokens_to_ids(self, tokens):
        return [i for i in range(len(tokens))]

    def convert_ids_to_tokens(self, ids):
        return [f"token_{i}" for i in ids]

    # Add a __call__ method for SHAP compatibility
    def __call__(self, text, **kwargs):
        # This method should return an object similar to a Hugging Face tokenizer output
        # For the purpose of SHAP's Text masker, simply returning the list of words might suffice
        # or a structure that mimics the tokenizer's output.
        # Let's try returning a list of words for now, as that's what text.split() gives.
        return space_tokenizer(text)


space_tokenizer_wrapper = SpaceTokenizerWrapper()

4. Label Definitions

In [57]:
# Define NER label categories
label_list = [
    "O",
    "B-Product", "I-Product",
    "B-PRICE", "I-PRICE",
    "B-LOC", "I-LOC",
    "B-CONTACT", "I-CONTACT"
]

5. Core Prediction Functions

In [58]:
def predict_ner(text):
    """Predict NER tags using space tokenization"""
    words = space_tokenizer(text)
    input_ids = torch.tensor([[i for i in range(len(words))]]).to(device)
    attention_mask = torch.tensor([[1]*len(words)]).to(device)

    with torch.no_grad():
        outputs = model(input_ids=input_ids, attention_mask=attention_mask)

    predictions = torch.argmax(outputs.logits, dim=-1)[0].tolist()
    return words, predictions

def visualize_predictions(text):
    """Color-coded visualization of predictions"""
    words, preds = predict_ner(text)
    colors = {
        "Product": "\033[91m",  # Red
        "PRICE": "\033[92m",    # Green
        "LOC": "\033[94m",      # Blue
        "CONTACT": "\033[93m",  # Yellow
    }
    reset_color = "\033[0m"

    print("\nPrediction Visualization:")
    for word, pred in zip(words, preds):
        label = label_list[pred]
        if label == "O":
            print(word, end=" ")
        else:
            ent_type = label.split("-")[-1]
            color = colors.get(ent_type, "")
            print(f"{color}{word}{reset_color}", end=" ")
    print("\n")

6. Evaluation Functions

In [59]:
def analyze_errors(text, true_labels):
    """Compare predictions with ground truth"""
    words, preds = predict_ner(text)
    true_labels = true_labels.split()

    print("\nError Analysis:")
    print("{:20} {:15} {:15}".format("Word", "True", "Predicted"))
    print("-" * 50)

    errors = []
    for word, true, pred in zip(words, true_labels, preds):
        pred_label = label_list[pred]
        if true != pred_label:
            errors.append((word, true, pred_label))
        print("{:20} {:15} {:15}".format(word, true, pred_label))

    print("\nSummary:")
    print(f"Total words: {len(words)}")
    print(f"Errors: {len(errors)}")
    print(f"Accuracy: {(len(words)-len(errors))/len(words):.2%}")

    return errors

7. Test Cases Definition

In [60]:
test_cases = [
    {
        "text": "BARDEFU 2 IN 1 Multipurpose juicer ኳሊቲ የጁስ መፍጫ ዋጋ 6800 ብር",
        "true_labels": "B-Product I-Product I-Product I-Product I-Product I-Product O B-Product I-Product O B-PRICE I-PRICE"
    },
    {
        "text": "8000Watt ምላጮቹ ጠንካራ የሆኑ ለቤት ዋጋ 6800 ብር",
        "true_labels": "B-Product I-Product I-Product O O O B-PRICE I-PRICE I-PRICE"
    },
    {
        "text": "አድራሻ ቁ1 መገናኛ ታሜ ጋስ ህንፃ ጎን ስሪ ኤም ሲቲ ሞል 0909522840",
        "true_labels": "B-LOC I-LOC I-LOC I-LOC I-LOC I-LOC O I-LOC I-LOC I-LOC B-CONTACT"
    }
]

8. Test Execution

In [61]:
for i, test_case in enumerate(test_cases, 1):
    print(f"\n{'='*50}")
    print(f"TEST CASE {i}: {test_case['text']}")
    print(f"{'='*50}")

    # 1. Basic prediction
    visualize_predictions(test_case['text'])

    # 2. Error analysis
    if 'true_labels' in test_case:
        errors = analyze_errors(test_case['text'], test_case['true_labels'])

        if errors:
            error_word, true_label, pred_label = errors[0]
            print(f"\nFirst error: '{error_word}' (True: {true_label}, Pred: {pred_label})")

            # Add SHAP and LIME explanations for errors
            try:
                # SHAP explanation
                shap_explanation(test_case['text'], model, tokenizer)

                # LIME explanation for the true label
                true_label_idx = label_list.index(true_label)
                lime_explanation(test_case['text'], model, tokenizer, true_label_idx)
            except Exception as e:
                print(f"Interpretability failed: {str(e)}")

    # 3. General explanations even without errors
    try:
        # SHAP explanation for Product entities
        shap_explanation(test_case['text'], model, tokenizer)

        # LIME explanation for first entity type
        lime_explanation(test_case['text'], model, tokenizer, 1)  # 1 = B-Product
    except Exception as e:
        print(f"General interpretability failed: {str(e)}")

    print(f"\n{'='*50}")
    print(f"COMPLETED TEST CASE {i}")
    print(f"{'='*50}\n")


TEST CASE 1: BARDEFU 2 IN 1 Multipurpose juicer ኳሊቲ የጁስ መፍጫ ዋጋ 6800 ብር

Prediction Visualization:
[93mBARDEFU[0m [93m2[0m [93mIN[0m [93m1[0m [93mMultipurpose[0m [93mjuicer[0m [93mኳሊቲ[0m [93mየጁስ[0m [93mመፍጫ[0m [93mዋጋ[0m [93m6800[0m [93mብር[0m 


Error Analysis:
Word                 True            Predicted      
--------------------------------------------------
BARDEFU              B-Product       I-CONTACT      
2                    I-Product       I-CONTACT      
IN                   I-Product       I-CONTACT      
1                    I-Product       I-CONTACT      
Multipurpose         I-Product       I-CONTACT      
juicer               I-Product       I-CONTACT      
ኳሊቲ                  O               I-CONTACT      
የጁስ                  B-Product       I-CONTACT      
መፍጫ                  I-Product       I-CONTACT      
ዋጋ                   O               I-CONTACT      
6800                 B-PRICE         I-CONTACT      
ብር                   I-PRICE 