In [6]:
from datasets import load_dataset, DatasetDict,Dataset

# Assuming your CoNLL files are named train.txt, validation.txt, test.txt
# and are in a directory called 'data'.

# If your CoNLL files are directly in a specified path
try:
    # Use 'conll2003' builder if your format is strictly CoNLL-2003-like
    # The 'features' argument might need adjustment based on your exact columns
    # For a custom CoNLL file with just 'tokens' and 'ner_tags' as the last column:
    # You might need to write a small custom loading script if it's not strictly CoNLL-2003
    # which has 4 columns (token, pos, chunk, ner_tag).

    # Common scenario: your custom data has two columns: token and NER tag.
    # You can adapt the loading or write a simple parser.

    # Option A: If your data closely matches CoNLL2003 (token, POS, chunk, NER_tag)
    # This is for public datasets following strict CoNLL2003 format
    # raw_datasets = load_dataset("conll2003")

    # Option B: If your custom CoNLL has only two columns (word, NER_tag)
    # You'll likely need a custom loading function.
    # Let's write a simple one.

    def read_conll_file_all(file_path):
        """Reads a CoNLL formatted file and returns a list of dictionaries."""
        with open(file_path, "r", encoding="utf-8") as f:
            lines = f.readlines()

        data = []
        tokens = []
        ner_tags = []

        for line in lines:
            line = line.strip()
            if not line: # Empty line signals end of a sentence
                if tokens:
                    data.append({"tokens": tokens, "ner_tags": ner_tags})
                tokens = []
                ner_tags = []
            elif line.startswith("-DOCSTART-"):
                continue
            else:
                parts = line.split()
                if len(parts) >= 2:
                    tokens.append(parts[0])
                    ner_tags.append(parts[-1])
                else:
                    print(f"Warning: Malformed line encountered: {line}")
        if tokens: # Add the last sentence if file doesn't end with blank line
            data.append({"tokens": tokens, "ner_tags": ner_tags})
        return data

    # Load your custom CoNLL files
    #train_data = read_conll_file("../data/train.txt")
    #test_data = read_conll_file("../data/test.txt")
    #Load all
    # all_data = read_conll_file_all('../labeled_data.txt')
    # print(all_data)
    # full_dataset = Dataset.from_list(all_data)
    # print(full_dataset)
    # train_test_split = full_dataset.train_test_split(test_size=0.2, seed=42, train_size=0.8)

    # #test_val_split = train_test_split['test'].train_test_split(test_size=0.5, seed=42)

    # raw_datasets = DatasetDict({
    #     "train": train_test_split['train'],
    #     "validation": train_test_split['train'],
    #     "test": train_test_split['test'],
    # })
    # print(f"Total sentences: {len(full_dataset)}")
    # print(f"Train sentences: {len(raw_datasets['train'])}")
    # print(f"Validation sentences: {len(raw_datasets['validation'])}")
    # print(f"Test sentences: {len(raw_datasets['test'])}")
except Exception as e:
    print(f"Could not load CoNLL data directly using load_dataset or custom parser: {e}")
    print("Please ensure your data files are correctly placed and formatted.")
    # Fallback or error handling
    raw_datasets = None # Or raise an error
    # Let's assume your combined CoNLL data is in 'combined_ner_data.txt'
# Or, you can manually combine your manually created CoNLL files into one first.
combined_data = read_conll_file_all("../data/improved_labeled.txt")
print(combined_data[0])
# Convert to Hugging Face Dataset
full_dataset = Dataset.from_list(combined_data)
# Convert 

# Check the number of samples BEFORE splitting
num_samples = len(full_dataset)
print(f"Total sentences read from file: {num_samples}")

if num_samples < 2:
    raise ValueError(
        "Your combined_ner_data.txt contains less than 2 sentences. "
        "You need at least 2 sentences to perform a train-test split. "
        "Please add more data to your CoNLL file."
    )
elif num_samples < 3 and (num_samples * 0.3 >= 1 or num_samples * 0.5 >= 1):
     print("Warning: Very few samples. Splitting ratios might result in very small sets. "
           "Consider using integer `test_size` values if specific counts are desired.")
# --- Step 2: Split the dataset into train, validation, and test ---
# First, split into train and temp (validation + test)
train_test_split = full_dataset.train_test_split(test_size=0.2, seed=42) # 70% train, 30% temp

# Then, split the temp set into validation and test
test_validation_split = train_test_split['test'].train_test_split(test_size=0.5, seed=42) # 50% of 30% for test, 50% for val

# Create the final DatasetDict
raw_datasets = DatasetDict({
    'train': train_test_split['train'],
    'validation': test_validation_split['train'], # Using 'train' from the second split as validation
    'test': test_validation_split['test']
})

print(f"Total sentences: {len(full_dataset)}")
print(f"Train sentences: {len(raw_datasets['train'])}")
print(f"Validation sentences: {len(raw_datasets['validation'])}")
print(f"Test sentences: {len(raw_datasets['test'])}")

{'tokens': ['imitation', 'volcano', 'humidifier', 'with', 'led', 'light'], 'ner_tags': ['B-PRODUCT', 'I-PRODUCT', 'I-PRODUCT', 'I-PRODUCT', 'I-PRODUCT', 'I-PRODUCT']}
Total sentences read from file: 267
Total sentences: 267
Train sentences: 213
Validation sentences: 27
Test sentences: 27


In [7]:
# Collect all unique NER tags from your dataset
unique_tags = set()
for dataset_split in raw_datasets.values():
    for example in dataset_split:
        unique_tags.update(example["ner_tags"])

# Sort them for consistent ID assignment
label_list = sorted(list(unique_tags))
# Make sure "O" is always at ID 0 if preferred, or handle BIOES for correct metric calculation.
# For simplicity, if your tags are already BIO/BIOES, sorting is usually fine.
# If you need to add specific tags, e.g., for missing 'I-' tags, ensure they are present.

label_to_id = {label: i for i, label in enumerate(label_list)}
id_to_label = {i: label for i, label in enumerate(label_list)}

print(f"Detected Labels: {label_list}")
print(f"Label to ID Mapping: {label_to_id}")

Detected Labels: ['B-LOC', 'B-PRICE', 'B-PRODUCT', 'I-LOC', 'I-PRICE', 'I-PRODUCT', 'O']
Label to ID Mapping: {'B-LOC': 0, 'B-PRICE': 1, 'B-PRODUCT': 2, 'I-LOC': 3, 'I-PRICE': 4, 'I-PRODUCT': 5, 'O': 6}


In [None]:
from transformers import AutoTokenizer

model_name = "xlm-roberta-base" # or xlm-roberta-large
tokenizer = AutoTokenizer.from_pretrained(model_name)

def tokenize_and_align_labels(examples):
    tokenized_inputs = tokenizer(
        examples["tokens"],
        truncation=True,
        is_split_into_words=True,
    )

    labels = []
    for i, label in enumerate(examples["ner_tags"]):
        word_ids = tokenized_inputs.word_ids(batch_index=i)
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            if word_idx is None: # Special tokens
                label_ids.append(-100)
            elif word_idx != previous_word_idx: # First token of a new word
                label_ids.append(label_to_id[label[word_idx]])
            else: # Subsequent token of the same word
                current_tag = label[word_idx]
                # If original tag was B-X, make subsequent I-X. If I-X, keep I-X. Else -100.
                if current_tag.startswith("B-"):
                    label_ids.append(label_to_id["I-" + current_tag[2:]])
                elif current_tag.startswith("I-"):
                    label_ids.append(label_to_id[current_tag])
                else: # O tag or other types
                    label_ids.append(-100) # Or keep O if you want, but -100 is safer for loss
            previous_word_idx = word_idx
        labels.append(label_ids)

    tokenized_inputs["labels"] = labels
    return tokenized_inputs

# Apply the tokenization and alignment to your loaded datasets
tokenized_datasets = raw_datasets.map(
    tokenize_and_align_labels,
    batched=True, # Process in batches for speed
    remove_columns=raw_datasets["train"].column_names, # Remove original 'tokens' and 'ner_tags'
)

print(tokenized_datasets)
print(tokenized_datasets["train"][0]) # Check a sample to ensure it looks correct   

In [8]:
import numpy as np
from seqeval.metrics import classification_report, f1_score, precision_score, recall_score

def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)

    # Remove ignored index (where label is -100)
    true_predictions = [
        [id_to_label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    true_labels = [
        [id_to_label[l] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    results = {
        "precision": precision_score(true_labels, true_predictions),
        "recall": recall_score(true_labels, true_predictions),
        "f1": f1_score(true_labels, true_predictions),
    }
    return results

In [9]:
model_names_to_compare = [
    "xlm-roberta-base",
    "distilbert-base-multilingual-cased",
    "bert-base-multilingual-cased"
]

In [None]:
# Model comparison loop example
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer, DataCollatorForTokenClassification

from datasets import DatasetDict
import numpy as np
from seqeval.metrics import classification_report, f1_score, precision_score, recall_score
import os

# Assume raw_datasets, label_to_id, id_to_label, read_conll_file, compute_metrics are defined from previous steps

results_summary = {}

for model_name in model_names_to_compare:
    print(f"\n--- Fine-tuning {model_name} ---")

    try:
        # Load tokenizer specific to the model
        tokenizer = AutoTokenizer.from_pretrained(model_name)

        # Apply tokenization and alignment
        # Need to re-apply map because tokenizer changes
        def tokenize_and_align_labels(examples):
            tokenized_inputs = tokenizer(
                examples["tokens"],
                truncation=True,
                is_split_into_words=True,
            )
            labels = []
            for i, label in enumerate(examples["ner_tags"]):
                word_ids = tokenized_inputs.word_ids(batch_index=i)
                previous_word_idx = None
                label_ids = []
                for word_idx in word_ids:
                    if word_idx is None:
                        label_ids.append(-100)
                    elif word_idx != previous_word_idx:
                        label_ids.append(label_to_id[label[word_idx]])
                    else:
                        current_tag = label[word_idx]
                        if current_tag.startswith("B-"):
                            label_ids.append(label_to_id["I-" + current_tag[2:]])
                        elif current_tag.startswith("I-"):
                            label_ids.append(label_to_id[current_tag])
                        else:
                            label_ids.append(-100)
                    previous_word_idx = word_idx
                labels.append(label_ids)
            tokenized_inputs["labels"] = labels
            return tokenized_inputs

        tokenized_datasets = raw_datasets.map(
            tokenize_and_align_labels,
            batched=True,
            remove_columns=raw_datasets["train"].column_names,
            load_from_cache_file=False # Important to re-process for each tokenizer
        )

        # Load model
        model = AutoModelForTokenClassification.from_pretrained(
            model_name,
            num_labels=len(label_list),
            id2label=id_to_label,
            label2id=label_to_id,
        )

        # Data collator (tokenizer needed for padding)
        data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

        # Training arguments (adjust output_dir for each model)
        output_dir = f"drive/MyDrive/model_data/model/results_{model_name.replace('/', '_')}_ner"
        training_args = TrainingArguments(
            output_dir=output_dir,
            learning_rate=2e-5,
            per_device_train_batch_size=16,
            per_device_eval_batch_size=16,
            num_train_epochs=3, # Keep consistent for comparison
            weight_decay=0.01,
            eval_strategy="epoch",
            save_strategy="epoch",
            logging_dir=f'drive/MyDrive/model_data/model/logs_{model_name.replace("/", "_")}',
            logging_steps=100,
            load_best_model_at_end=True,
            metric_for_best_model="f1",
            report_to="tensorboard",
            # disable_tqdm=True, # Optional: disable progress bars for cleaner logs if running many
        )

        trainer = Trainer(
            model=model,
            args=training_args,
            train_dataset=tokenized_datasets["train"],
            eval_dataset=tokenized_datasets["validation"],
            processing_class=tokenizer,
            data_collator=data_collator,
            compute_metrics=compute_metrics, # Re-use the seqeval compute_metrics
        )

        # Train
        trainer.train()

        # Evaluate on validation set (best model will be loaded)
        val_metrics = trainer.evaluate(tokenized_datasets["validation"])
        print(f"Validation metrics for {model_name}: {val_metrics}")

        # Store results
        results_summary[model_name] = {
            "validation_f1": val_metrics.get('eval_f1'),
            "validation_precision": val_metrics.get('eval_precision'),
            "validation_recall": val_metrics.get('eval_recall'),
            "validation_loss": val_metrics.get('eval_loss')
            # "training_time_seconds": trainer.state.global_step * training_args.logging_steps * (trainer.state.log_history[-1]['loss'] / trainer.state.log_history[-1]['learning_rate']) if trainer.state.log_history else 'N/A' # A rough estimate
        }

        # Save the best model explicitly (Trainer might already do it, but good to be sure)
        trainer.save_model(f'drive/MyDrive/model_data/model/{model_name}')

    except Exception as e:
        print(f"Error fine-tuning {model_name}: {e}")
        results_summary[model_name] = {"error": str(e)}

print("\n--- Comparison Summary ---")
for model, metrics in results_summary.items():
    print(f"Model: {model}")
    for metric, value in metrics.items():
        print(f"  {metric}: {value}")

In [None]:
from transformers import pipeline
import torch 
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
for model_name in model_names_to_compare:
  final_best_model_path = f"drive/MyDrive/model_data/model/results_{model_name.replace('/', '_')}_ner/checkpoint-14"
  if model_name == "xlm-roberta-base":
    final_best_model_path = f"drive/MyDrive/model_data/model/xlm_roberta_ner_results/checkpoint-42"
  print(final_best_model_path)
  loaded_tokenizer = AutoTokenizer.from_pretrained(final_best_model_path)
  loaded_model = AutoModelForTokenClassification.from_pretrained(
    final_best_model_path,
    id2label=id_to_label,
    label2id=label_to_id
  )
  loaded_model.to(device)
  ner_pipeline = pipeline(
    "token-classification",
    model=loaded_model,
    tokenizer=loaded_tokenizer,
    aggregation_strategy="simple", # Optional: aggregates subword tokens into full words
    device=0 if torch.cuda.is_available() else -1 # 0 for GPU, -1 for CPU
  )
  amharic_text = "Smart Usb Ultrasonic Car And Home Air Humidifier With Colorful Led Light Original High-quality በኤሌክትሪክ የሚሰራ ለቤትና ለመኪና መልካም መዓዛን የሚሰጥ  Elevate the comfort level within your living premises with this fantastic Green Lion Air Mist Humidifier ዋጋ፦ 1100 ብር ውስን ፍሬ ነው ያለን አድራሻ #መገናኛ_መሰረት_ደፋር_ሞል_ሁለተኛ_ፎቅ ቢሮ ቁ S05/S06 0902660722 0928460606 በTelegram ለማዘዝ  ይጠቀሙ ለተጨማሪ ማብራሪያ የቴሌግራም ገፃችን"
  print(f"\nInference on text 1: '{amharic_text}'{model_name}")
  results_1 = ner_pipeline(amharic_text)
  print(results_1) 

#### Using LIME to analysis token prediction

In [None]:
# --- LIME Prediction Function Wrapper for a Specific Token ---
# Similar to SHAP, but LIME expects predict_proba to return (num_samples, num_classes)
# where num_classes are the classes for the target token's prediction.
choosen_model = f"drive/MyDrive/model_data/model/results_distilbert-base-multilingual-cased_ner"
tokenizer = AutoTokenizer.from_pretrained(final_best_model_path)
model = AutoModelForTokenClassification.from_pretrained(
    final_best_model_path,
    id2label=id_to_label,
    label2id=label_to_id
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
model.eval() # Set to evaluation mode
print("Model and tokenizer loaded successfully.")
def get_lime_prediction_wrapper(model, tokenizer, target_word_index_in_original_text):
    def predict_proba_fn(texts):
        all_target_token_probs = []
        for text in texts:
            tokenized_input = tokenizer(
                text,
                return_tensors="pt",
                padding=True,
                truncation=True,
                is_split_into_words=False
            )
            input_ids = tokenized_input["input_ids"].to(model.device)

            word_ids = tokenized_input.word_ids(batch_index=0)

            target_token_idx = None
            for i, wid in enumerate(word_ids):
                if wid == target_word_index_in_original_text:
                    target_token_idx = i
                    break

            if target_token_idx is None:
                # If target word is missing in perturbed text, return uniform distribution
                all_target_token_probs.append(np.full(len(label_list), 1.0 / len(label_list)))
                continue

            with torch.no_grad():
                outputs = model(input_ids)

            target_token_logits = outputs.logits.squeeze(0)[target_token_idx, :]
            target_token_probs = F.softmax(target_token_logits, dim=-1).cpu().numpy()

            all_target_token_probs.append(target_token_probs)

        return np.array(all_target_token_probs)

    return predict_proba_fn

# --- LIME Usage Example ---
text_to_explain_lime = "Smart Usb Ultrasonic Car And Home Air Humidifier With Colorful Led Light Original High-quality በኤሌክትሪክ የሚሰራ ለቤትና ለመኪና መልካም መዓዛን የሚሰጥ  Elevate the comfort level within your living premises with this fantastic Green Lion Air Mist Humidifier ዋጋ፦ 1100 ብር ውስን ፍሬ ነው ያለን አድራሻ #መገናኛ_መሰረት_ደፋር_ሞል_ሁለተኛ_ፎቅ ቢሮ ቁ S05/S06 0902660722 0928460606 በTelegram ለማዘዝ  ይጠቀሙ ለተጨማሪ ማብራሪያ የቴሌግራም ገፃችን"
# "Commercial Bank of Ethiopia opened a new branch in 2024."
original_words_lime = custom_word_tokenizer(text_to_explain_lime)

# Let's explain why "ባንክ" (Bank) at index 2 is predicted as I-ORG (assuming it is part of an ORG entity).
target_word_index_lime = 1 # Index of "ባንክ"

# Get the LIME prediction wrapper function for this specific token
lime_predict_proba_fn = get_lime_prediction_wrapper(model, tokenizer, target_word_index_lime)

# Create a LIME explainer
explainer_lime = LimeTextExplainer(
    class_names=label_list,
    split_expression=r'\s+' # Use regex for spaces to handle multiple spaces
)

print(f"\nGenerating LIME Explanations for word '{original_words_lime[target_word_index_lime]}' in sentence: '{text_to_explain_lime}' (This may take a while)...")

# Explain the instance
# `num_features` is the number of words to show in the explanation
# `num_samples` is how many perturbed samples LIME creates
# `top_labels` is how many of the top predicted labels for the target token to explain
explanation_lime = explainer_lime.explain_instance(
    text_to_explain_lime,
    lime_predict_proba_fn,
    num_features=5, # Number of words to highlight
    num_samples=1000, # Number of perturbations (can be slow)
    top_labels=3 # Show explanations for top 3 predicted labels of the target token
)

# --- Visualize LIME Explanation ---
print(f"\nLIME Explanation for '{original_words_lime[target_word_index_lime]}' (index {target_word_index_lime}):")
print(explanation_lime.top_labels)
for label_id in explanation_lime.top_labels:
    label_name = id_to_label[label_id]
    # print(f"  Explaining for label: {label_name} (Probability: {prob:.4f})")
    print("  Word contributions:")
    for word, weight in explanation_lime.as_list(label=label_id):
        print(f"    - '{word}': {weight:.4f}")