<a href="https://colab.research.google.com/github/plony/week_4_building_an_amharic_e_commerce_data_extractor/blob/main/Taks_6_FinTech_Vendor_Scorecard_for_Micro_Lending.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Task 4: Model Comparison & Selection**

In [None]:
!pip install transformers datasets accelerate seqeval -q
!pip install optimum -q # Optional: For ONNX export or quantization later

In [None]:
!pip install seqeval -q

In [None]:
print("\n2. Importing libraries and loading model components...")
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer, DataCollatorForTokenClassification
from datasets import load_dataset, Dataset, Features, Value, ClassLabel, Sequence
from seqeval.metrics import classification_report, accuracy_score, f1_score, precision_score, recall_score
import numpy as np
import os

# Define the model checkpoint
# Choose one of the following models by uncommenting it:
print("\n2. Importing libraries and loading model components...")
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer, DataCollatorForTokenClassification
from datasets import load_dataset, Dataset, Features, Value, ClassLabel, Sequence
from seqeval.metrics import classification_report, accuracy_score, f1_score, precision_score, recall_score
import numpy as np
import os

# Define the model checkpoint
# Choose one of the following models by uncommenting it:
# MODEL_CHECKPOINT = "xlm-roberta-base"        # Strong general-purpose multilingual model
# MODEL_CHECKPOINT = "attributio/bert-tiny-amharic" # Smaller, faster, Amharic-specific
MODEL_CHECKPOINT = "bert-base-multilingual-cased"  # Good multilingual model for African languages

# Load tokenizer
print(f"Loading tokenizer from: {MODEL_CHECKPOINT}")
tokenizer = AutoTokenizer.from_pretrained(MODEL_CHECKPOINT)

# Define your labels - these MUST match exactly with your CoNLL labels
# IMPORTANT: Ensure this list contains all unique B-I-O tags from your labeled_telegram_product_price_location.txt
label_list = [
    "O",
    "B-PRODUCT", # Changed P to uppercase
    "I-PRODUCT", # Changed P to uppercase
    "B-PRICE",
    "I-PRICE",
    "B-LOC",
    "I-LOC"
]
id2label = {i: label for i, label in enumerate(label_list)}
label2id = {label: i for i, label in enumerate(label_list)}

print(f"Defined labels: {label_list}")
print(f"id2label mapping: {id2label}")
print(f"label2id mapping: {label2id}")
print(f"Loading tokenizer from: {MODEL_CHECKPOINT}")
tokenizer = AutoTokenizer.from_pretrained(MODEL_CHECKPOINT)

# Define your labels - these MUST match exactly with your CoNLL labels
# IMPORTANT: Ensure this list contains all unique B-I-O tags from your labeled_telegram_product_price_location.txt
label_list = [
    "O",
    "B-PRODUCT", # Changed P to uppercase
    "I-PRODUCT", # Changed P to uppercase
    "B-PRICE",
    "I-PRICE",
    "B-LOC",
    "I-LOC"
]
id2label = {i: label for i, label in enumerate(label_list)}
label2id = {label: i for i, label in enumerate(label_list)}

print(f"Defined labels: {label_list}")
print(f"id2label mapping: {id2label}")
print(f"label2id mapping: {label2id}")

In [None]:
print("\n3. Loading the labeled dataset...")


# Run this cell, a file uploader will appear.  .txt file.
from google.colab import files
uploaded = files.upload()

uploaded_file_name = list(uploaded.keys())[0]
print(f"Uploaded file: {uploaded_file_name}")
file_name = uploaded_file_name # Use the uploaded file name



# # from google.colab import drive
# # drive.mount('/content/drive')



# # print(f"Looking for file at: {file_path}")
# # file_name = file_path # Use the full path as the file_name


# Function to parse CoNLL formatted file
def parse_conll_file(file_path):
    """Parses a CoNLL formatted file into a list of (words, tags) tuples."""
    texts = []
    tags = []
    current_words = []
    current_tags = []

    with open(file_path, "r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if line: # If line is not empty
                parts = line.split()
                if len(parts) == 2:
                    word, tag = parts[0], parts[1]
                    current_words.append(word)
                    current_tags.append(tag)
                else:
                    # Handle malformed lines: if a line is not empty but doesn't have 2 parts, it's malformed
                    print(f"Warning: Skipping malformed line (expected 2 parts, got {len(parts)}): '{line}'")
            else: # Empty line indicates end of a sentence/message
                if current_words: # Only add if there are words in the current sentence
                    texts.append(current_words)
                    tags.append(current_tags)
                    current_words = []
                    current_tags = []
    # Add any remaining sentence at the end of the file (important if file doesn't end with blank line)
    if current_words:
        texts.append(current_words)
        tags.append(current_tags)

    return texts, tags


raw_texts, raw_tags = parse_conll_file(file_name)
print(f"Successfully parsed {len(raw_texts)} sentences from the CoNLL file.")

# Check for any tags in data that are not in label_list
all_unique_tags_in_data = set(tag for sublist in raw_tags for tag in sublist)
missing_labels_in_config = all_unique_tags_in_data - set(label_list)
if missing_labels_in_config:
    print(f"\nWARNING: Found tags in your data not present in 'label_list': {missing_labels_in_config}")
    print("Please update 'label_list' in Cell 2 to include these tags and rerun all cells from the beginning.")
    # Consider adding `raise ValueError("Missing labels in config")`
    # if this critical issue occurs.


# Convert raw_tags (string labels) to numerical IDs
numeric_tags = []
for i, sentence_tags in enumerate(raw_tags):
    current_numeric_tags = []
    for tag in sentence_tags:
        if tag in label2id:
            current_numeric_tags.append(label2id[tag])
        else:
            # This case should ideally be caught by the warning above.
            print(f"Error: Tag '{tag}' not found in label2id for sentence {i}. Assigning 'O'.")
            current_numeric_tags.append(label2id["O"])
    numeric_tags.append(current_numeric_tags)


# Create a Hugging Face Dataset
features = Features({
    'id': Value('string'),
    'tokens': Sequence(Value('string')),
    'ner_tags': Sequence(ClassLabel(names=label_list))
})

data_dict_list = []
for i, (tokens, tags) in enumerate(zip(raw_texts, numeric_tags)):
    # Ensure tokens and tags have the same length
    if len(tokens) != len(tags):
        print(f"Warning: Token-tag length mismatch in sentence {i}. Skipping this sentence.")
        print(f"Tokens: {tokens}")
        print(f"Tags: {[id2label[t] for t in tags]}") # Convert numerical tags back to string for printing
        continue # Skip this malformed sentence
    data_dict_list.append({
        'id': str(i),
        'tokens': tokens,
        'ner_tags': tags
    })

dataset = Dataset.from_list(data_dict_list, features=features)
print(f"Dataset loaded with {len(dataset)} examples.")
print("Example from dataset (first entry):")
print(dataset[0])

# Split into training and validation sets
# small validation set (e.g., 10-20% of data). Adjust test_size as needed.
train_test_split = dataset.train_test_split(test_size=0.2, seed=42) # Added seed for reproducibility
train_dataset = train_test_split['train']
eval_dataset = train_test_split['test']

print(f"\nTrain dataset size: {len(train_dataset)}")
print(f"Eval dataset size: {len(eval_dataset)}")

In [None]:
print("\n4. Tokenizing data and aligning labels...")

def tokenize_and_align_labels(examples):
    # This function expects examples['tokens'] to be a list of lists of words (sentences)
    # and examples['ner_tags'] to be a list of lists of numerical tag IDs.
    tokenized_inputs = tokenizer(
        examples["tokens"],
        truncation=True, # Truncate long sequences to model's max input length
        is_split_into_words=True # Tells the tokenizer that inputs are already pre-split into words
    )
    labels = []
    for i, label in enumerate(examples["ner_tags"]): # Iterate through each sentence's original labels
        word_ids = tokenized_inputs.word_ids(batch_index=i) # Get word IDs for the current tokenized sentence
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            # Special tokens (like CLS, SEP, PAD) have a word_idx of None.
            # We set their label to -100 so they are ignored in loss computation.
            if word_idx is None:
                label_ids.append(-100)
            # If this is the first token of a new word, assign its original label.
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            # If it's a subsequent subword token of the same word:
            else:
                # Get the original string label for the word
                original_label_str = id2label[label[word_idx]]
                # If the original label was a 'B-' tag, change it to 'I-'.
                # Otherwise, keep it as 'I-' or 'O'. This ensures all subwords of an entity
                # are labeled as 'I-' (or 'O' if the word was 'O').
                if original_label_str.startswith("B-"):
                    label_ids.append(label2id[f"I-{original_label_str[2:]}"])
                else:
                    label_ids.append(label[word_idx]) # For I- and O tags, keep them as is
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

# Apply the tokenization and alignment to both training and evaluation datasets
tokenized_train_dataset = train_dataset.map(tokenize_and_align_labels, batched=True)
tokenized_eval_dataset = eval_dataset.map(tokenize_and_align_labels, batched=True)

print("\nExample of tokenized and aligned data (first entry from training set):")
first_example_tokenized = tokenized_train_dataset[0]
print("Original Tokens (first sentence in training set):", train_dataset[0]["tokens"])
print("Original Labels:", [id2label[l] for l in train_dataset[0]["ner_tags"]])
print("Subword Tokens (after tokenization):", tokenizer.convert_ids_to_tokens(first_example_tokenized["input_ids"]))
print("Aligned Numerical Labels:", first_example_tokenized["labels"])
print("Decoded Aligned Labels:", [id2label[l] if l != -100 else "IGNORE" for l in first_example_tokenized["labels"]])

In [None]:
print("\n4. Tokenizing data and aligning labels...")

def tokenize_and_align_labels(examples):
    # This function expects examples['tokens'] to be a list of lists of words (sentences)
    # and examples['ner_tags'] to be a list of lists of numerical tag IDs.
    tokenized_inputs = tokenizer(
        examples["tokens"],
        truncation=True, # Truncate long sequences to model's max input length
        is_split_into_words=True # Tells the tokenizer that inputs are already pre-split into words
    )
    labels = []
    for i, label in enumerate(examples["ner_tags"]): # Iterate through each sentence's original labels
        word_ids = tokenized_inputs.word_ids(batch_index=i) # Get word IDs for the current tokenized sentence
        previous_word_idx = None
        label_ids = []
        for word_idx in word_ids:
            # Special tokens (like CLS, SEP, PAD) have a word_idx of None.
            # We set their label to -100 so they are ignored in loss computation.
            if word_idx is None:
                label_ids.append(-100)
            # If this is the first token of a new word, assign its original label.
            elif word_idx != previous_word_idx:
                label_ids.append(label[word_idx])
            # If it's a subsequent subword token of the same word:
            else:
                # Get the original string label for the word
                original_label_str = id2label[label[word_idx]]
                # If the original label was a 'B-' tag, change it to 'I-'.
                # Otherwise, keep it as 'I-' or 'O'. This ensures all subwords of an entity
                # are labeled as 'I-' (or 'O' if the word was 'O').
                if original_label_str.startswith("B-"):
                    label_ids.append(label2id[f"I-{original_label_str[2:]}"])
                else:
                    label_ids.append(label[word_idx]) # For I- and O tags, keep them as is
            previous_word_idx = word_idx
        labels.append(label_ids)
    tokenized_inputs["labels"] = labels
    return tokenized_inputs

# Apply the tokenization and alignment to both training and evaluation datasets
tokenized_train_dataset = train_dataset.map(tokenize_and_align_labels, batched=True)
tokenized_eval_dataset = eval_dataset.map(tokenize_and_align_labels, batched=True)

print("\nExample of tokenized and aligned data (first entry from training set):")
first_example_tokenized = tokenized_train_dataset[0]
print("Original Tokens (first sentence in training set):", train_dataset[0]["tokens"])
print("Original Labels:", [id2label[l] for l in train_dataset[0]["ner_tags"]])
print("Subword Tokens (after tokenization):", tokenizer.convert_ids_to_tokens(first_example_tokenized["input_ids"]))
print("Aligned Numerical Labels:", first_example_tokenized["labels"])
print("Decoded Aligned Labels:", [id2label[l] if l != -100 else "IGNORE" for l in first_example_tokenized["labels"]])

In [None]:
#Set up Training Arguments and Model
print("\n5. Setting up training arguments and model...")

# Initialize the Data Collator for Token Classification
# This handles padding of sequences to the longest sequence in each batch,
# and also stacks inputs into tensors.
data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

# Load the model for token classification
# This will add a classification head on top of the pre-trained model.
model = AutoModelForTokenClassification.from_pretrained(
    MODEL_CHECKPOINT,
    num_labels=len(label_list), # Number of unique NER labels
    id2label=id2label,         # Map numerical IDs back to string labels for output
    label2id=label2id          # Map string labels to numerical IDs for internal use
)

# Verify if model's label configurations are correctly set
print(f"Model num_labels: {model.config.num_labels}")
print(f"Model id2label: {model.config.id2label}")
print(f"Model label2id: {model.config.label2id}")

# Define training arguments
# These parameters significantly impact training time and model performance.
# Adjust `num_train_epochs` and `per_device_train_batch_size` based on your dataset size
# and available GPU memory.
training_args = TrainingArguments(
    output_dir="./results",
    eval_strategy="epoch",      # Evaluate at the end of each epoch (updated argument name)# Directory to save model checkpoints and training logs
     learning_rate=2e-5,                       # Learning rate for the optimizer (typical for fine-tuning)
    per_device_train_batch_size=16,           # Batch size per GPU/CPU during training
    per_device_eval_batch_size=16,            # Batch size per GPU/CPU during evaluation
    num_train_epochs=5,                       # Number of full passes over the training data
    weight_decay=0.01,                        # L2 regularization to prevent overfitting
    logging_dir="./logs",                     # Directory for TensorBoard logs
    logging_steps=100,                        # How often to log training information
    save_strategy="epoch",                    # Save a model checkpoint at the end of each epoch
    save_total_limit=2,                       # Keep only the last 2 best checkpoints to save disk space
    report_to="none",                         # Disable integrations like Weights & Biases for simplicity
    fp16=True,                                # Enable mixed precision training (float16) for faster GPU training
    push_to_hub=False,                        # Do not push the model to the Hugging Face Hub automatically
    load_best_model_at_end=True,              # Load the model with the best evaluation metric at the end of training
    metric_for_best_model="overall_f1",       # The metric to monitor for selecting the best model
    greater_is_better=True,                   # For F1-score, a higher value is better
)

In [None]:
#Initialize and Run the Hugging Face Trainer
print("\n6. Initializing and running the Hugging Face Trainer...")

# Define the compute_metrics function for NER evaluation using seqeval
def compute_metrics(p):
    predictions, labels = p
    # Convert prediction logits to predicted label IDs
    predictions = np.argmax(predictions, axis=2)

    # Convert numerical labels and predictions back to string labels for seqeval
    # Also, remove ignored index (-100)
    true_labels = [[id2label[l] for l in label if l != -100] for label in labels]
    true_predictions = [
        [id2label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]

    # --- Important for seqeval ---
    # Ensure lengths of prediction and true label lists are identical for each sample.
    # This addresses potential issues where `word_ids` or `prediction` might cause slight mismatches.
    cleaned_true_predictions = []
    cleaned_true_labels = []
    for pred_list, label_list_i in zip(true_predictions, true_labels):
        if len(pred_list) == len(label_list_i):
            cleaned_true_predictions.append(pred_list)
            cleaned_true_labels.append(label_list_i)
        else:
            # This should ideally not happen if tokenization and alignment are robust.
            # Print a warning if a mismatch occurs, indicating a potential data or alignment issue.
            print(f"Warning: Skipping a sample in metrics calculation due to length mismatch: pred={len(pred_list)}, label={len(label_list_i)}")

    if not cleaned_true_labels: # Handle case where all samples are skipped or no valid labels
        return {"overall_precision": 0.0, "overall_recall": 0.0, "overall_f1": 0.0, "overall_accuracy": 0.0}

    # Generate the classification report from seqeval
    report = classification_report(cleaned_true_labels, cleaned_true_predictions, output_dict=True)

    # Extract overall metrics, typically using 'micro avg' for overall performance in NER
    overall_f1 = report['micro avg']['f1-score'] if 'micro avg' in report else f1_score(cleaned_true_labels, cleaned_true_predictions, average='micro')
    overall_precision = report['micro avg']['precision'] if 'micro avg' in report else precision_score(cleaned_true_labels, cleaned_true_predictions, average='micro')
    overall_recall = report['micro avg']['recall'] if 'micro avg' in report else recall_score(cleaned_true_labels, cleaned_true_predictions, average='micro')
    overall_accuracy = accuracy_score(cleaned_true_labels, cleaned_true_predictions)

    metrics = {
        "overall_precision": overall_precision,
        "overall_recall": overall_recall,
        "overall_f1": overall_f1,
        "overall_accuracy": overall_accuracy,
    }

    # Add per-entity F1 scores if they exist in the report (excluding 'O' tag and 'micro avg')
    for entity_type in label_list:
        # Check if the entity type is present in the report (i.e., it appeared in the eval set)
        if entity_type != 'O' and entity_type in report:
            metrics[f"{entity_type}_f1"] = report[entity_type]['f1-score']

    return metrics


# Initialize the Trainer
trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=tokenized_train_dataset,
    eval_dataset=tokenized_eval_dataset, # The model will be evaluated on this dataset
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics, # Function to compute evaluation metrics
)

print("\nStarting model training...")
# This will start the training loop. Progress bars and metrics will be displayed.
trainer.train()
print("\nTraining complete!")

In [None]:
#Evaluate the Fine-tuned Model (Final Evaluation)
print("\n7. Evaluating the fine-tuned model on the validation set (final check)...")
# This will run a final evaluation on the `eval_dataset` using the best model loaded at the end of training.
eval_results = trainer.evaluate()
print("Final Evaluation Results:", eval_results)

In [None]:
print("\n8. Saving the fine-tuned model and tokenizer...")

model_save_path = "./fine_tuned_amharic_ner_model_v1" # Local path in Colab (temporary, deleted when session ends)

os.makedirs(model_save_path, exist_ok=True)

# Save the model's weights, configuration, and vocabulary
trainer.save_model(model_save_path) # Saves the model's weights and configuration
tokenizer.save_pretrained(model_save_path) # Saves the tokenizer files (vocab, merges, etc.)

print(f"Model and tokenizer saved successfully to: {model_save_path}")


# This zips the model directory and initiates a browser download.
try:
    if not "MyDrive" in model_save_path: # Only attempt download if not saved to Drive
        print("\nAttempting to zip and download the model (if saved locally)...")
        !zip -r /content/fine_tuned_amharic_ner_model_v1.zip {model_save_path}
        from google.colab import files
        files.download('/content/fine_tuned_amharic_ner_model_v1.zip')
        print("Model zip file download initiated.")
    else:
        print("Model saved to Google Drive, no need to download from Colab local storage.")
except Exception as e:
    print(f"Could not zip or download model (an error occurred or it was saved to Drive): {e}")

print("\n--- Fine-tuning process complete ---")

# **Task 5: Model Interpretability**

In [None]:
# New Cell: Install Interpretability Libraries
print("Installing interpretability libraries (SHAP, LIME)...")
!pip install shap lime -q
print("Interpretability libraries installed.")

In [None]:
# Load Fine-tuned mBERT Model

from transformers import AutoTokenizer, AutoModelForTokenClassification
import os
import torch


model_load_path = "/content/fine_tuned_amharic_ner_model_v1"

if not os.path.exists(model_load_path):
    print(f"Error: Model not found at {model_load_path}. Please ensure the path is correct and the model was saved.")
else:
    print(f"Loading model from: {model_load_path}")

    # Load tokenizer
    tokenizer = AutoTokenizer.from_pretrained(model_load_path)

    # Load model
    model = AutoModelForTokenClassification.from_pretrained(model_load_path)
    model.eval() # Set model to evaluation mode

    # Retrieve label mappings from the loaded model's config
    id2label = model.config.id2label
    label2id = model.config.label2id
    label_list = list(label2id.keys()) # Recreate label_list from label2id for consistency

    print(f"Model and tokenizer loaded successfully.")
    print(f"Loaded label mappings: {id2label}")
    print(f"Loaded label_list: {label_list}")

In [None]:
# LIME for Token Classification (MAIN INTERPRETATION)

from lime.lime_text import LimeTextExplainer
import numpy as np
import torch
from IPython.core.display import display, HTML # For rich display

if 'model' not in locals() or 'tokenizer' not in locals() or 'id2label' not in locals() or 'label_list' not in locals():
    print("Model, tokenizer, or labels not loaded. Please run the 'Load Fine-tuned mBERT Model' cell first.")
else:
    print("Starting LIME explanation...")

    # Let's use the same example sentence.
    # From previous output:
    # Original Text: 'ዋጋ፦ 1,300 ብር አድራሻ መገናኛ ስሪ ኤም ሲቲ ሞል ሁለተኛ ፎቅ'
    # Predicted labels: ['O', 'B-PRODUCT', 'I-PRODUCT', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'O']

    example_text_lime = "ዋጋ፦ 1,300 ብር አድራሻ መገናኛ ስሪ ኤም ሲቲ ሞል ሁለተኛ ፎቅ"


    # --- Define a prediction function for LIME ---
    # LIME expects a function that takes a list of strings and returns a
    # 2D numpy array of probabilities, where each row corresponds to a string
    # and each column corresponds to a class/label.
    # For NER, we adapt this by returning the maximum probability found for each label type
    # across all tokens in the sentence. This allows LIME to function.
    def predict_proba_lime(texts):
        aggregated_probs = []
        for text_single in texts:
            inputs = tokenizer(text_single, return_tensors="pt", truncation=True, padding=True)

            # Handle cases where input might become empty or too short after tokenization/truncation
            if inputs['input_ids'].numel() < 2: # At least [CLS] and [SEP]
                # If invalid, return a zeros array with the shape of (1, num_labels)
                aggregated_probs.append(np.zeros(len(label_list)))
                continue

            with torch.no_grad():
                outputs = model(**inputs)

            logits = outputs.logits.squeeze(0) # (sequence_length, num_labels)

            # Handle potential empty logits or incorrect dimensions
            if logits.numel() == 0 or logits.dim() < 2 or logits.shape[1] != len(label_list):
                 aggregated_probs.append(np.zeros(len(label_list)))
                 continue

            probabilities = torch.softmax(logits, dim=-1) # (sequence_length, num_labels)

            # Take the maximum probability for each label type across all tokens.
            # This aggregates token-level predictions to a sentence-level "likelihood" for LIME.
            max_probs_per_label = torch.max(probabilities, dim=0).values.cpu().numpy()
            aggregated_probs.append(max_probs_per_label)

        return np.array(aggregated_probs)


    # Initialize LIME explainer
    # Use your label_list for class names so LIME knows which column maps to which label
    explainer = LimeTextExplainer(class_names=label_list)

    # --- Explain a prediction ---
    # We will focus on one of the labels that was *predicted* by the model for this sentence: 'B-PRODUCT'.
    # Or, we can choose a label we *expect* to see, like 'B-PRICE', to see what contributes to *not* predicting it, or to small probability.

    # Let's try to explain why 'B-PRODUCT' was predicted, as it was one of the actual predictions.
    target_label_name_lime = 'B-PRODUCT'
    target_label_index_lime = label2id[target_label_name_lime]


    print(f"\nExplaining prediction for label: '{target_label_name_lime}' (aggregated probability across sentence)")
    print(f"Sentence: '{example_text_lime}'")

    # Generate explanation
    # `num_features`: how many words to highlight in the explanation
    # `num_samples`: how many perturbed samples LIME generates (higher = more stable, slower)
    explanation_lime = explainer.explain_instance(
        example_text_lime,
        predict_proba_lime,
        num_features=10,  # Top 10 most important words
        num_samples=2000, # Number of perturbed samples (can increase for more stability, but slower)
        labels=[target_label_index_lime] # Explain specifically this label's probability
    )

    # Display explanation in notebook
    print("\nLIME Explanation (HTML Output - green for positive contribution, red for negative):")
    display(HTML(explanation_lime.as_html(labels=[target_label_index_lime])))

    print("\nWords contributing to the prediction (Raw List):")
    # You can also get explanations for other labels by changing `label=...`
    for word, weight in explanation_lime.as_list(label=target_label_index_lime):
        print(f"  '{word}': {weight:.4f}")

    print("\nNote: LIME provides local explanations (for this specific sentence).")
    print("Green words in the HTML output indicate positive contribution to the prediction of the target label.")
    print("Red words indicate negative contribution.")

In [None]:
# SHAP for Token Classification

import shap
import numpy as np
import torch

# Ensure the model and tokenizer are loaded from the previous cell
if 'model' not in locals() or 'tokenizer' not in locals():
    print("Model or tokenizer not loaded. Please run the 'Load Fine-tuned mBERT Model' cell first.")
else:
    print("Starting SHAP explanation (conceptual demonstration - REVISED & FIXED)...")

    # Example text for explanation
    text = "ዋጋ፦ 1,300 ብር አድራሻ መገናኛ ስሪ ኤም ሲቲ ሞል ሁለተኛ ፎቅ"

    # This function will take a list of text strings (perturbed by SHAP)
    # and return the probabilities for a *specific target label* for a *specific target token*.
    # This is a simplification to make SHAP runnable, as full NER explanation is complex.

    # First, get the model's actual prediction for the example sentence
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    with torch.no_grad():
        outputs = model(**inputs)
    predictions = torch.argmax(outputs.logits, dim=-1).squeeze(0).cpu().numpy()

    # Convert predicted IDs back to labels for display
    predicted_labels = [id2label[p_id] for p_id in predictions]
    subword_tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"].squeeze(0))

    print(f"\nOriginal Text: '{text}'")
    print(f"Subword Tokens: {subword_tokens}")
    print(f"Predicted Labels: {predicted_labels}")

    # --- Choose a specific token and its predicted label to explain ---
    # We will now explain why '[UNK]' at index 1 is predicted as 'B-PRODUCT',
    # as this was an actual prediction from your model for this sentence.
    target_token_str = '[UNK]' # The subword token string
    target_label_name = 'B-PRODUCT' # The label it was predicted as

    # Find the index of the target token and its corresponding label ID
    target_token_index = None # Reset to None for fresh search
    target_label_id = label2id[target_label_name]

    # Iterate through the predictions to find the first instance of our target
    for idx, (token, label) in enumerate(zip(subword_tokens, predicted_labels)):
        if token == target_token_str and label == target_label_name:
            target_token_index = idx
            break # Found it, break the loop

    if target_token_index is None:
        print(f"\nCould not find '{target_token_str}' with '{target_label_name}' label in the example (even after retry).")
        print("This means the model didn't predict this combination in the given sample.")
        print("Please manually inspect the subword_tokens and predicted_labels to choose a target that *was* predicted.")
        print("Predicted Labels for reference: ", predicted_labels) # Print again for convenience
    else:
        print(f"\nAttempting to explain prediction for token '{subword_tokens[target_token_index]}' at index {target_token_index} as '{id2label[target_label_id]}'")

        # This `predictor` function is tailored for SHAP to explain a single output (probability of a specific label for a specific token)
        def specific_token_label_predictor(texts_list):
            probs_for_target = []
            for text_val in texts_list:
                inputs_val = tokenizer(text_val, return_tensors="pt", truncation=True, padding=True)
                with torch.no_grad():
                    outputs_val = model(**inputs_val)
                logits_val = outputs_val.logits.squeeze(0) # Remove batch dim

                # NEW ROBUST CHECK HERE:
                # Check if logits_val is empty or if target_token_index is out of bounds
                if logits_val.numel() == 0 or target_token_index >= logits_val.shape[0]:
                    # Return 0.0 probability for the target label if the sequence is invalid or token is missing
                    probs_for_target.append(0.0)
                else:
                    probs = torch.softmax(logits_val[target_token_index], dim=-1) # Probabilities for target token
                    probs_for_target.append(probs[target_label_id].item()) # Probability of the specific target label

            return np.array(probs_for_target).reshape(-1, 1) # SHAP expects 2D array (num_samples, num_outputs)


        # Create a text masker for SHAP
        masker = shap.maskers.Text(tokenizer.mask_token, collapse_mask_token=True)

        # Initialize the SHAP Explainer
        # Using `shap.Explainer` with the custom `specific_token_label_predictor`
        explainer = shap.Explainer(specific_token_label_predictor, masker)

        # Generate SHAP values for the original text
        # This can take a moment depending on num_samples in explainer and text length
        print(f"Generating SHAP values for '{text}'...")
        shap_values = explainer([text]) # Pass the text in a list

        # Print some info about the SHAP values
        print(f"\nSHAP Values generated. Structure for visualization might vary.")
        print(f"Base Value (expected prediction without any features): {shap_values.base_values[0]:.4f}")
        print("Word contributions (values indicate impact on the prediction's log-odds for the target label):")

        # For text data, shap_values.data holds the tokenized words, shap_values.values hold the contributions.
        # Ensure we don't go out of bounds if SHAP's output shape is unexpected
        if shap_values.data.ndim > 1 and shap_values.values.ndim > 1 and shap_values.data.shape[1] == shap_values.values.shape[1]:
            for i, word in enumerate(shap_values.data[0]): # Iterate through the words
                if i < shap_values.values[0].shape[0]:
                    print(f"  '{word}': {shap_values.values[0][i][0]:.4f}") # Display the first output's contribution
        else:
            print("SHAP values structure not directly suitable for simple word-by-word printout. Verify `shap_values.data` and `shap_values.values` shapes.")
            print(f"shap_values.data.shape: {shap_values.data.shape}")
            print(f"shap_values.values.shape: {shap_values.values.shape}")


        print("\nNote: For advanced SHAP visualizations with text (like highlighting words in the sentence),")
        print("it often requires more complex integration or specialized `shap.plots.text` usage tailored for sequence models.")
        print("LIME often provides a more straightforward visual interpretation for token contributions.")

# **Taks 6: FinTech Vendor Scorecard for Micro-Lending**

In [None]:
import pandas as pd
from datetime import datetime
import re
import torch
import numpy as np
from transformers import AutoTokenizer, AutoModelForTokenClassification

# --- 0. Ensure Model and Tokenizer are Loaded (from Task 5 setup) ---
# This block assumes 'model', 'tokenizer', 'id2label', 'label2id', 'label_list'

# re-run the model loading cell from Task 5.

# Check if model/tokenizer are loaded. If not, provide a warning.
if 'model' not in locals() or 'tokenizer' not in locals() or \
   'id2label' not in locals() or 'label2id' not in locals():
    print("WARNING: mBERT model and tokenizer not found in current environment.")
    print("Please ensure you have run the 'Load Fine-tuned mBERT Model' cell from Task 5 before running this code.")
    # Exit or provide placeholder objects to prevent immediate errors for demonstration
    # In a real scenario, you'd halt execution or load them here.
    # For this script, we'll proceed with dummy objects if not found,
    # but the NER part will not function correctly without actual model.
    class DummyTokenizer:
        def __call__(self, text, return_tensors, truncation, padding):
            return {'input_ids': torch.tensor([[101, 102]]), 'attention_mask': torch.tensor([[1, 1]])}
        def convert_ids_to_tokens(self, ids):
            return ["[CLS]", "[SEP]"]
    class DummyModel:
        def __call__(self, input_ids, attention_mask):
            return type('obj', (object,), {'logits': torch.randn(1, 2, 20)})() # Dummy logits
        def eval(self): pass

    tokenizer = DummyTokenizer()
    model = DummyModel()
    id2label = {0: "O", 1: "B-PRODUCT", 2: "I-PRODUCT", 3: "B-PRICE", 4: "I-PRICE", 5: "B-CURRENCY", 6: "I-CURRENCY", 7: "B-LOCATION", 8: "I-LOCATION"}
    label2id = {v: k for k, v in id2label.items()}
    label_list = list(label2id.keys())
    print("Using dummy model/tokenizer for demonstration. NER extraction will not be functional.")


# --- 1. Data Loading and Preprocessing

data_raw = [
    {"channel_name": "EthioTechMart", "post_id": 1, "text": "ላፕቶፕ Dell XPS 13 ዋጋ: 45,000 ብር. አድራሻ: አዲስ አበባ, ቦሌ.", "views": 1200, "timestamp": "2025-06-20T10:00:00Z"},
    {"channel_name": "EthioTechMart", "post_id": 2, "text": "አዲስ ሳምሰንግ ስልክ ጋላክሲ S23 ዋጋ: 30,000 ብር.", "views": 1500, "timestamp": "2025-06-21T14:30:00Z"},
    {"channel_name": "EthioTechMart", "post_id": 3, "text": "ገራሚ ጌም ኮንሶል Playstation 5 ዋጋ: 70,000 ብር.", "views": 2000, "timestamp": "2025-06-22T09:15:00Z"},
    {"channel_name": "EthioTechMart", "post_id": 4, "text": "ዋጋ፦ 1,300 ብር አድራሻ መገናኛ ስሪ ኤም ሲቲ ሞል ሁለተኛ ፎቅ", "views": 500, "timestamp": "2025-06-18T11:00:00Z"}, # This is our difficult case from Task 5
    {"channel_name": "EthioTechMart", "post_id": 5, "text": "ሽያጭ! የዳቦ መጋገሪያ ማሽን ዋጋ: 5,500 ETB. ይገኛል ሜክሲኮ አደባባይ.", "views": 900, "timestamp": "2025-06-15T18:00:00Z"},
    {"channel_name": "AddisElectronics", "post_id": 6, "text": "ለሽያጭ: Sony Camera Alpha 7 III. ዋጋ: 120,000 ብር. ሱቃችን ፒያሳ ነው", "views": 3000, "timestamp": "2025-06-19T12:00:00Z"},
    {"channel_name": "AddisElectronics", "post_id": 7, "text": "አይፎን 14 ፕሮ ማክስ በ24,500 ብር ብቻ! አድራሻ: ካዛንቺስ.", "views": 2800, "timestamp": "2025-06-20T16:00:00Z"},
    {"channel_name": "AddisElectronics", "post_id": 8, "text": "ቲቪ Samsung 55 ኢንች ዋጋ: 40,000 ብር. ቦሌ በሚገኘው መጋዘን", "views": 2500, "timestamp": "2025-06-22T10:30:00Z"},
    {"channel_name": "AddisHomeGoods", "post_id": 9, "text": "የተለያዩ የወጥ ቤት እቃዎች. ዋጋ: 2,500 ብር ጀምሮ.", "views": 700, "timestamp": "2025-06-17T08:00:00Z"},
    {"channel_name": "AddisHomeGoods", "post_id": 10, "text": "አዲስ የመኝታ ክፍል እቃዎች ዋጋ: 18,000 ብር.", "views": 650, "timestamp": "2025-06-18T15:00:00Z"},
    {"channel_name": "EthioTechMart", "post_id": 11, "text": "የጨዋታ ጆሮ ማዳመጫዎች (Gaming Headphones) በ1,800 ብር ብቻ!", "views": 1050, "timestamp": "2025-06-19T13:00:00Z"},
    {"channel_name": "EthioTechMart", "post_id": 12, "text": "ብራንድ አዲስ የሰዓት ስልክ በ8,500 ብር!", "views": 950, "timestamp": "2025-06-23T11:00:00Z"},
    {"channel_name": "EthioTechMart", "post_id": 13, "text": "Dell ላፕቶፕ - 35,000 ብር.", "views": 1100, "timestamp": "2025-06-16T09:00:00Z"},
    {"channel_name": "AddisElectronics", "post_id": 14, "text": "ስማርት ዋች በ10,000 ብር.", "views": 2200, "timestamp": "2025-06-21T10:00:00Z"},
    {"channel_name": "AddisHomeGoods", "post_id": 15, "text": "የአትክልት መሳሪያዎች: 700 ብር.", "views": 500, "timestamp": "2025-06-20T17:00:00Z"},
]

df_posts = pd.DataFrame(data_raw)
df_posts['timestamp'] = pd.to_datetime(df_posts['timestamp'])


In [None]:

# --- 2. NER Entity Extraction Function ---

def extract_entities_from_text(text, tokenizer, model, id2label):
    """
    Extracts entities (PRODUCT, PRICE, CURRENCY, LOCATION) from a given text
    using the fine-tuned NER model.
    """
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True, max_length=512)
    with torch.no_grad():
        outputs = model(**inputs)

    predictions = torch.argmax(outputs.logits, dim=-1).squeeze(0).cpu().numpy()
    tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"].squeeze(0))

    entities = {
        'PRODUCT': [],
        'PRICE': [],
        'CURRENCY': [],
        'LOCATION': []
    }

    current_entity = {"text": [], "type": None}

    for token, pred_id in zip(tokens, predictions):
        label = id2label[pred_id]

        # Skip special tokens and unknown tokens if they don't form part of a recognized entity
        if token in tokenizer.all_special_tokens or token.startswith("##") or token == '[UNK]':
            if label == 'O' and current_entity["type"] is None:
                continue # Skip if it's outside and not part of an ongoing entity
            # If it's a subword or UNK and part of an ongoing entity, include it in text
            if current_entity["type"] is not None:
                current_entity["text"].append(token.replace("##", "")) # Remove ## for readability

        elif label.startswith("B-"):
            # If there was a previous entity, save it
            if current_entity["type"] is not None and current_entity["text"]:
                entity_text = tokenizer.decode(tokenizer.convert_tokens_to_ids(current_entity["text"]), skip_special_tokens=True).strip()
                # entity_text = "".join(current_entity["text"]).strip() # Simpler string join for subwords
                entities[current_entity["type"]].append(entity_text)

            # Start a new entity
            entity_type = label[2:]
            current_entity = {"text": [token], "type": entity_type}

        elif label.startswith("I-"):
            if current_entity["type"] is not None and label[2:] == current_entity["type"]:
                current_entity["text"].append(token)
            else:
                # Malformed sequence (I- without B- or type mismatch), treat as new B- or O
                if current_entity["type"] is not None and current_entity["text"]:
                    entity_text = tokenizer.decode(tokenizer.convert_tokens_to_ids(current_entity["text"]), skip_special_tokens=True).strip()
                    entities[current_entity["type"]].append(entity_text)

                entity_type = label[2:]
                current_entity = {"text": [token], "type": entity_type} # Start as if it was a B-

        else: # label == 'O'
            if current_entity["type"] is not None and current_entity["text"]:
                entity_text = tokenizer.decode(tokenizer.convert_tokens_to_ids(current_entity["text"]), skip_special_tokens=True).strip()
                entities[current_entity["type"]].append(entity_text)
            current_entity = {"text": [], "type": None}

    # Don't forget to save the last entity if loop ends
    if current_entity["type"] is not None and current_entity["text"]:
        entity_text = tokenizer.decode(tokenizer.convert_tokens_to_ids(current_entity["text"]), skip_special_tokens=True).strip()
        entities[current_entity["type"]].append(entity_text)

    return entities

# Apply NER to all posts
df_posts['extracted_entities'] = df_posts['text'].apply(
    lambda x: extract_entities_from_text(x, tokenizer, model, id2label)
)

print("Entities extracted for sample posts:")
print(df_posts[['text', 'extracted_entities']].head())


In [None]:
# --- 3. Helper Functions for Metric Calculation ---

def calculate_posting_frequency(df_vendor_posts):
    """Calculates average posts per week for a vendor."""
    if df_vendor_posts.empty:
        return 0.0

    # Ensure timestamp is datetime type
    df_vendor_posts = df_vendor_posts.sort_values('timestamp')

    # Calculate duration in weeks
    if len(df_vendor_posts) == 1:
        # If only one post, assume a frequency of 1 post/week for simplicity
        # or handle as 'insufficient data'
        return 1.0 # Or np.nan

    min_date = df_vendor_posts['timestamp'].min()
    max_date = df_vendor_posts['timestamp'].max()

    time_span_days = (max_date - min_date).days

    if time_span_days == 0: # All posts on the same day
        return len(df_vendor_posts) * 7.0 # Posts per day * 7

    posting_frequency = (len(df_vendor_posts) / time_span_days) * 7
    return posting_frequency

def parse_price(price_str):
    """
    Parses a price string (e.g., '1,300', '45.000', '5,500 ETB') into a float.
    Handles Amharic numerals if present. Assumes numbers are mostly Western digits but
    adds a basic Amharic digit mapping for robustness.
    """
    if not isinstance(price_str, str):
        return np.nan

    # Amharic to Latin digit mapping (basic)
    amharic_digits = {
        '፩': '1', '፪': '2', '፫': '3', '፬': '4', '፭': '5',
        '፮': '6', '፯': '7', '፰': '8', '፱': '9', '፰': '0' # Note: ፰ is 8, no explicit 0 in traditional Ethiopian numerals but for general text we might see it.
    }
    # For more common modern Amharic numeric strings, it's usually Western digits.
    # We will primarily focus on Western digits as they are common in digital contexts.

    cleaned_price = price_str.lower().replace("ብር", "").replace("etb", "").strip()

    # Replace Amharic digits with Latin digits
    for am_digit, lat_digit in amharic_digits.items():
        cleaned_price = cleaned_price.replace(am_digit, lat_digit)

    # Remove commas (used as thousands separators) and non-numeric characters except period
    cleaned_price = re.sub(r'[^\d.]', '', cleaned_price)

    try:
        return float(cleaned_price)
    except ValueError:
        return np.nan # Return NaN if conversion fails


In [None]:
# --- 4. Vendor Analytics Engine ---

def develop_vendor_analytics_engine(df_posts):
    """
    Processes all posts, extracts entities, and calculates key performance metrics for each vendor.
    """
    vendor_metrics = {}

    for channel_name, df_vendor_posts in df_posts.groupby('channel_name'):
        print(f"\nProcessing vendor: {channel_name}")

        # Initialize metrics for the current vendor
        total_posts = len(df_vendor_posts)
        total_views = df_vendor_posts['views'].sum()

        # Market Reach & Engagement
        avg_views_per_post = total_views / total_posts if total_posts > 0 else 0

        top_post = df_vendor_posts.loc[df_vendor_posts['views'].idxmax()] if not df_vendor_posts.empty else None
        top_post_product = None
        top_post_price = np.nan

        if top_post is not None:
            top_post_entities = top_post['extracted_entities']
            if top_post_entities.get('PRODUCT'):
                top_post_product = top_post_entities['PRODUCT'][0] # Take first product if multiple
            if top_post_entities.get('PRICE'):
                # Try to parse the price from the top post
                raw_price = top_post_entities['PRICE'][0]
                top_post_price = parse_price(raw_price)

        # Activity & Consistency
        posting_frequency = calculate_posting_frequency(df_vendor_posts)

        # Business Profile (Average Price Point)
        all_prices = []
        for entities_dict in df_vendor_posts['extracted_entities']:
            if entities_dict.get('PRICE'):
                for price_str in entities_dict['PRICE']:
                    parsed_price = parse_price(price_str)
                    if not np.isnan(parsed_price):
                        all_prices.append(parsed_price)

        average_price_point = np.mean(all_prices) if all_prices else np.nan

        # Combine metrics into a simple, weighted "Lending Score"
        # Example weighting: (Average Views * 0.4) + (Posting Frequency * 0.3) + (Average Price Point / Max Price * 0.3)
        # Assuming higher price point is better, but needs normalization.
        # Let's normalize views and frequency by their max across all vendors for fairness if we were comparing broadly.



        vendor_metrics[channel_name] = {
            "Total Posts": total_posts,
            "Posting Frequency (Posts/Week)": posting_frequency,
            "Average Views per Post": avg_views_per_post,
            "Top Performing Post": top_post['text'] if top_post is not None else "N/A",
            "Top Post Product": top_post_product if top_post_product else "N/A",
            "Top Post Price (ETB)": top_post_price,
            "Average Price Point (ETB)": average_price_point,
        }

    return pd.DataFrame.from_dict(vendor_metrics, orient='index')

# Run the analytics engine
df_vendor_analytics = develop_vendor_analytics_engine(df_posts.copy()) # Use a copy to avoid modifying original

print("\n--- Vendor Analytics Summary (Raw Metrics) ---")
print(df_vendor_analytics)

In [None]:
# --- 5. Create a Final "Lending Score" and Vendor Scorecard ---

def create_lending_scorecard(df_analytics):
    """
    Calculates a weighted lending score and presents a summary scorecard.
    """
    df_scorecard = df_analytics.copy()

    # Normalize metrics for scoring (simple min-max scaling for demonstration)
    # Ensure no division by zero if max is 0
    max_views = df_scorecard['Average Views per Post'].max()
    df_scorecard['Normalized Views'] = df_scorecard['Average Views per Post'] / max_views if max_views > 0 else 0

    max_frequency = df_scorecard['Posting Frequency (Posts/Week)'].max()
    df_scorecard['Normalized Frequency'] = df_scorecard['Posting Frequency (Posts/Week)'] / max_frequency if max_frequency > 0 else 0

    # For average price, decide if higher or lower is better for lending.
    # Assuming higher price point indicates potentially higher value transactions.
    # Handle NaN values for max_price to avoid errors.
    prices_not_nan = df_scorecard['Average Price Point (ETB)'].dropna()
    max_price = prices_not_nan.max() if not prices_not_nan.empty else 1.0 # Avoid division by zero
    df_scorecard['Normalized Price'] = df_scorecard['Average Price Point (ETB)'] / max_price if max_price > 0 else 0
    # Fill NaN normalized prices with 0 if original was NaN
    df_scorecard['Normalized Price'] = df_scorecard['Normalized Price'].fillna(0)


    # Define weights (can be adjusted based on business priorities)
    # Example: Views are 50% important, Frequency 30%, Price 20%
    weight_views = 0.5
    weight_frequency = 0.3
    weight_price = 0.2

    # Calculate Lending Score
    df_scorecard['Lending Score'] = (
        df_scorecard['Normalized Views'] * weight_views +
        df_scorecard['Normalized Frequency'] * weight_frequency +
        df_scorecard['Normalized Price'] * weight_price
    )

    # Select and rename columns for the final report table
    final_columns = {
        'Average Views per Post': 'Avg. Views/Post',
        'Posting Frequency (Posts/Week)': 'Posts/Week',
        'Average Price Point (ETB)': 'Avg. Price (ETB)',
        'Lending Score': 'Lending Score'
    }

    df_final_scorecard = df_scorecard[list(final_columns.keys())].rename(columns=final_columns)

    return df_final_scorecard.sort_values('Lending Score', ascending=False)

# Generate and display the final scorecard
final_scorecard_df = create_lending_scorecard(df_vendor_analytics)

print("\n--- Final Vendor Scorecard for Micro-Lending ---")
print(final_scorecard_df.to_markdown(index=True)) # Use to_markdown for easy display in markdown format
print("\nNote: The 'Lending Score' is a composite metric based on weighted normalized values.")
print("Weights used: Average Views (0.5), Posting Frequency (0.3), Average Price Point (0.2).")
print("These weights can be adjusted based on EthioMart's specific lending criteria.")
