In [None]:

from google.colab import drive
drive.mount('/content/drive')

# Install necessary libraries
!pip install transformers datasets accelerate seqeval -q
!pip install torch # Ensure torch is installed if not already
!pip install seqeval -q
import os
import pandas as pd
from datasets import load_dataset, Dataset, Features, Value, ClassLabel
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer
from seqeval.metrics import classification_report
import numpy as np
import torch

In [None]:
import os
import pandas as pd
from datasets import load_dataset, Dataset, Features, Value, Sequence, ClassLabel
from transformers import AutoTokenizer, AutoModelForTokenClassification, TrainingArguments, Trainer, pipeline
from seqeval.metrics import classification_report
import numpy as np
import torch
from google.colab import drive
import time

drive.mount('/content/drive', force_remount=True)

conll_file_path = '/content/drive/MyDrive/labeled_amharic_ner_data.conll'

# Check if the file exists
if not os.path.exists(conll_file_path):
    print(f"Error: CoNLL file not found at {conll_file_path}")
    print("Please ensure 'labeled_amharic_ner_data.conll' is in your Google Drive's root and the path is correct.")
else:
    print(f"CoNLL file found at {conll_file_path}")

# Definelabels
label_list = ['O', 'B-Product', 'I-Product', 'B-LOC', 'I-LOC', 'B-PRICE', 'I-PRICE']
id2label = {i: label for i, label in enumerate(label_list)}
label2id = {label: i for i, label in enumerate(label_list)}

# Function to read CoNLL file
def read_conll_file(file_path):
    tokens = []
    ner_tags = []
    current_tokens = []
    current_tags = []

    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if line:
                parts = line.split('\t')
                if len(parts) == 2:
                    current_tokens.append(parts[0])
                    current_tags.append(parts[1].strip())
            else:
                if current_tokens:
                    tokens.append(current_tokens)
                    ner_tags.append(current_tags)
                current_tokens = []
                current_tags = []
        if current_tokens:
            tokens.append(current_tokens)
            ner_tags.append(current_tags)
    return {'tokens': tokens, 'ner_tags': ner_tags}

# Load custom dataset
raw_data_for_hf = read_conll_file(conll_file_path)

# Define features for the dataset
features_for_dataset = Features({
    'tokens': Sequence(Value('string')),
    'ner_tags': Sequence(ClassLabel(names=label_list))
})

# Create Hugging Face Dataset
full_dataset = Dataset.from_dict(raw_data_for_hf, features=features_for_dataset)

print(f"Dataset loaded. Number of examples: {len(full_dataset)}")
print("First example:", full_dataset[0])

# Train-test split
train_test_split = full_dataset.train_test_split(test_size=0.2)
train_dataset = train_test_split['train']
eval_dataset = train_test_split['test']

print(f"Train dataset size: {len(train_dataset)}")
print(f"Eval dataset size: {len(eval_dataset)}")

from transformers import DataCollatorForTokenClassification

def compute_metrics(p):
    predictions, labels = p
    predictions = np.argmax(predictions, axis=2)
    true_labels = [[id2label[l] for l in label if l != -100] for label in labels]
    true_predictions = [
        [id2label[p] for (p, l) in zip(prediction, label) if l != -100]
        for prediction, label in zip(predictions, labels)
    ]
    report = classification_report(true_labels, true_predictions, output_dict=True, zero_division=0)
    overall_metrics = report.get('micro avg', {})
    f1_score = overall_metrics.get('f1', overall_metrics.get('f1-score', 0.0))
    return {
        "precision": overall_metrics.get('precision', 0.0),
        "recall": overall_metrics.get('recall', 0.0),
        "f1": f1_score,
        "accuracy": overall_metrics.get('precision', 0.0)
    }

!rm -rf ./results_*
!rm -rf ./logs_*
!rm -rf ~/.cache/huggingface/
!rm -rf /tmp/*

model_checkpoints_to_compare = {
    "XLM-R_Amharic_NER": "mbeukman/xlm-roberta-base-finetuned-ner-amharic",
    "mBERT": "bert-base-multilingual-cased",
    "DistilBERT_Multi": "distilbert-base-multilingual-cased",
}

results = {}

for model_name, checkpoint in model_checkpoints_to_compare.items():
    print(f"\n--- Fine-tuning {model_name} ({checkpoint}) ---")
    !rm -rf /tmp/*

    tokenizer = AutoTokenizer.from_pretrained(checkpoint)
    model = AutoModelForTokenClassification.from_pretrained(
        checkpoint,
        num_labels=len(label_list),
        id2label=id2label,
        label2id=label2id,
        ignore_mismatched_sizes=True
    )
    def tokenize_and_align_labels_for_current_model(examples):
        tokenized_inputs = tokenizer(
            examples["tokens"], truncation=True, is_split_into_words=True
        )
        labels = []
        for i, label_ids_for_example in enumerate(examples["ner_tags"]):
            word_ids = tokenized_inputs.word_ids(batch_index=i)
            previous_word_idx = None
            current_label_ids = []
            for word_idx in word_ids:
                if word_idx is None:
                    current_label_ids.append(-100)
                elif word_idx != previous_word_idx:
                    current_label_ids.append(label_ids_for_example[word_idx])
                else:
                    original_word_label_id = label_ids_for_example[word_idx]
                    original_word_label_name = id2label[original_word_label_id]
                    if original_word_label_name.startswith("B-"):
                        new_label_name = f"I-{original_word_label_name[2:]}"
                        current_label_ids.append(label2id.get(new_label_name, original_word_label_id))
                    else:
                        current_label_ids.append(original_word_label_id)
                previous_word_idx = word_idx
            labels.append(current_label_ids)
        tokenized_inputs["labels"] = labels
        return tokenized_inputs

    current_tokenized_train_dataset = train_dataset.map(tokenize_and_align_labels_for_current_model, batched=True)
    current_tokenized_eval_dataset = eval_dataset.map(tokenize_and_align_labels_for_current_model, batched=True)

    current_data_collator = DataCollatorForTokenClassification(tokenizer=tokenizer)

    current_output_dir = f"./results_{model_name}"
    current_logging_dir = f"./logs_{model_name}"
    if os.path.exists(current_output_dir):
        !rm -rf {current_output_dir}
    if os.path.exists(current_logging_dir):
        !rm -rf {current_logging_dir}
    os.makedirs(current_output_dir, exist_ok=True)
    os.makedirs(current_logging_dir, exist_ok=True)

    training_args_current = TrainingArguments(
        output_dir=current_output_dir,
        eval_strategy="epoch",
        learning_rate=2e-5,
        per_device_train_batch_size=8,
        per_device_eval_batch_size=8,
        num_train_epochs=7,
        weight_decay=0.01,
        logging_dir=current_logging_dir,
        logging_steps=10,
        save_strategy="epoch",
        load_best_model_at_end=True,
        metric_for_best_model="f1",
        report_to="none"
    )

    trainer_current = Trainer(
        model=model,
        args=training_args_current,
        train_dataset=current_tokenized_train_dataset,
        eval_dataset=current_tokenized_eval_dataset,
        tokenizer=tokenizer,
        data_collator=current_data_collator,
        compute_metrics=compute_metrics
    )

    train_start_time = time.time()
    trainer_current.train()
    train_end_time = time.time()
    training_time = train_end_time - train_start_time

    eval_metrics = trainer_current.evaluate()

    inference_start_time = time.time()
    sample_text = "አዲስ አበባ ላይ ቴሌቪዥን በ1000 ብር ይሸጣል"
    ner_pipeline = pipeline("ner", model=model, tokenizer=tokenizer, aggregation_strategy="simple")
    _ = ner_pipeline(sample_text)
    inference_end_time = time.time()
    inference_time = inference_end_time - inference_start_time

    results[model_name] = {
        "checkpoint": checkpoint,
        "eval_metrics": eval_metrics,
        "training_time_seconds": training_time,
        "inference_time_per_sample_seconds": inference_time / len(sample_text.split())
    }

    output_model_dir_specific = f"/content/drive/MyDrive/fine_tuned_amharic_ner_model_{model_name}"
    os.makedirs(output_model_dir_specific, exist_ok=True)

    # Save only the best model (which the trainer already loaded at the end)
    trainer_current.save_model(output_model_dir_specific)
    tokenizer.save_pretrained(output_model_dir_specific)
    print(f"Model {model_name} saved to: {output_model_dir_specific}")


# Print all results in a comparison table
print("\n--- Model Comparison Results ---")
for model_name, data in results.items():
    print(f"\nModel: {model_name}")
    print(f"  Checkpoint: {data['checkpoint']}")
    print(f"  Evaluation Metrics (on validation set):")
    for metric, value in data['eval_metrics'].items():
        print(f"    {metric}: {value:.4f}")
    print(f"  Training Time: {data['training_time_seconds']:.2f} seconds")
    print(f"  Approx. Inference Time per Sample: {data['inference_time_per_sample_seconds']:.4f} seconds (for a rough sentence length)")

In [None]:
!pip install lime shap
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification
import shap
import lime
from lime.lime_text import LimeTextExplainer
import numpy as np
import pandas as pd
import json
import re

# Define the model path from Task 4's output
MODEL_PATH = "/content/drive/MyDrive/fine_tuned_amharic_ner_model_XLM-R_Amharic_NER"
# Define the label list used during training
label_list = ['O', 'B-Product', 'I-Product', 'B-LOC', 'I-LOC', 'B-Money', 'I-Money']
id_to_label = {i: label for i, label in enumerate(label_list)}
label_to_id = {label: i for i, label in enumerate(label_list)}

# Load the fine-tuned model and tokenizer
print(f"Loading model from {MODEL_PATH}...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForTokenClassification.from_pretrained(MODEL_PATH)
model.eval() # Set model to evaluation mode
print("Model loaded successfully.")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Using device: {device}")


In [None]:

MODEL_PATH = "/content/drive/MyDrive/fine_tuned_amharic_ner_model_XLM-R_Amharic_NER"
# Define the label list used during training
label_list = ['O', 'B-Product', 'I-Product', 'B-LOC', 'I-LOC', 'B-Money', 'I-Money']
id_to_label = {i: label for i, label in enumerate(label_list)}
label_to_id = {label: i for i, label in enumerate(label_list)}

# Load the fine-tuned model and tokenizer
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification

print(f"Loading model from {MODEL_PATH}...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForTokenClassification.from_pretrained(MODEL_PATH)
model.eval() # Set model to evaluation mode
print("Model loaded successfully.")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Using device: {device}")

# Example data for interpretation (from your Task 4 output)
example_sentence = "Electric Charcoal Burner በቀላሉ ከሰል ለማያያዝ የሚሆን አነስ ያለ ቦታ የማይዝ የሚሰራ ሻይ፣ ቡና ለማፍላት የሚሆን ዋጋ፦ 1600 ብር ውስን ፍሬ ነው ያለው አድራሻ መገናኛ_መሰረት_ደፋር_ሞል_ሁለተኛ_ፎቅ ቢሮ ቁ S05S06 0902660722 0928460606 በTelegram ለማዘዝ ይጠቀሙ zemencallcenter zemenexpressadmin ለተጨማሪ ማብራሪያ የቴሌግራም ገፃችን httpstelegrammezemenexpress"

def get_token_probabilities_for_target_label(text_list, target_token_idx, target_label_id):
    """
    Predicts the probability of a specific label for a specific token across texts.
    """
    probs_for_target_label = []
    for text in text_list:
        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
        with torch.no_grad():
            outputs = model(**inputs)
        logits = outputs.logits
        probabilities = torch.softmax(logits, dim=-1).squeeze(0).cpu().numpy() # (seq_len, num_labels)

        encoded_input = tokenizer.encode_plus(text, return_offsets_mapping=True, add_special_tokens=True)

        original_tokens = tokenizer.convert_ids_to_tokens(encoded_input['input_ids'])

        if target_token_idx < probabilities.shape[0]: # Check if index is within sequence length
            probs_for_target_label.append(probabilities[target_token_idx, target_label_id])
        else:
            probs_for_target_label.append(0.0)
    return np.array(probs_for_target_label)


target_word = "Electric"
target_label = "B-Product"
target_label_id = label_to_id[target_label]

encoded_example = tokenizer.encode_plus(example_sentence, return_tensors="pt", add_special_tokens=True)
input_ids = encoded_example['input_ids'][0].tolist()
tokens = tokenizer.convert_ids_to_tokens(input_ids)

# Find the index of 'Electric' in the *tokenized* sequence (it's usually the second token, index 1)
target_token_in_model_output_idx = -1
for i, token in enumerate(tokens):
    if target_word.lower() in token.lower() and i > 0 and token != "<s>":
        target_token_in_model_output_idx = i
        break
if target_token_in_model_output_idx == -1:
    print(f"Warning: Could not find '{target_word}' in the tokenized sequence.")
    target_token_in_model_output_idx = 1 # Fallback for demo if not found

print(f"Explaining prediction for token '{tokens[target_token_in_model_output_idx]}' (index {target_token_in_model_output_idx}) as '{target_label}'")

# Create a lambda function for KernelSHAP to explain a single output (probability of B-Product for 'Electric')
background_texts = [example_sentence]

# Create the explainer
explainer = shap.Explainer(
    lambda texts: get_token_probabilities_for_target_label(texts, target_token_in_model_output_idx, target_label_id),
    tokenizer
)

shap_values = explainer([example_sentence])

print(f"\nSHAP Explanation for '{target_word}' as '{target_label}':")
text_data = [(token, value) for token, value in zip(shap_values[0].data, shap_values[0].values)]
print(f"Text token contributions to '{target_label}' prediction for '{target_word}':")
for token, value in text_data:
    print(f"  Token: '{token}', SHAP Value: {value:.4f}")

In [None]:
!pip install lime shap
import torch
from transformers import AutoTokenizer, AutoModelForTokenClassification
import numpy as np
import shap
import lime
from lime.lime_text import LimeTextExplainer
from google.colab import drive
drive.mount('/content/drive')

MODEL_PATH = "/content/drive/MyDrive/fine_tuned_amharic_ner_model_XLM-R_Amharic_NER"

import os
if not os.path.exists(MODEL_PATH):
    print(f"Error: Model directory not found at {MODEL_PATH}")
else:
    print(f"Model directory found at {MODEL_PATH}. Contents: {os.listdir(MODEL_PATH)}")
    # You should see files like 'config.json', 'pytorch_model.bin' etc.
label_list = ['O', 'B-Product', 'I-Product', 'B-LOC', 'I-LOC', 'B-Money', 'I-Money']
id_to_label = {i: label for i, label in enumerate(label_list)}
label_to_id = {label: i for i, label in enumerate(label_list)}

# Load the fine-tuned model and tokenizer
print(f"Loading model from {MODEL_PATH}...")
tokenizer = AutoTokenizer.from_pretrained(MODEL_PATH)
model = AutoModelForTokenClassification.from_pretrained(MODEL_PATH)
model.eval() # Set model to evaluation mode
print("Model loaded successfully.")

# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
print(f"Using device: {device}")

# Example sentence for interpretation
example_sentence = "Electric Charcoal Burner በቀላሉ ከሰል ለማያያዝ የሚሆን አነስ ያለ ቦታ የማይዝ የሚሰራ ሻይ፣ ቡና ለማፍላት የሚሆን ዋጋ፦ 1600 ብር ውስን ፍሬ ነው ያለው አድራሻ መገናኛ_መሰረት_ደፋር_ሞል_ሁለተኛ_ፎቅ ቢሮ ቁ S05S06 0902660722 0928460606 በTelegram ለማዘዝ ይጠቀሙ zemencallcenter zemenexpressadmin ለተጨማሪ ማብራሪያ የቴሌግራም ገፃችን httpstelegrammezemenexpress"

# This part determines the specific index of "Electric" in the tokenized sequence.
target_word = "Electric"
encoded_example = tokenizer.encode_plus(example_sentence, return_tensors="pt", add_special_tokens=True)
input_ids = encoded_example['input_ids'][0].tolist()
tokens = tokenizer.convert_ids_to_tokens(input_ids)
target_token_in_model_output_idx = -1
for i, token in enumerate(tokens):
    if target_word.lower() in token.lower() and i > 0 and token != "<s>":
        target_token_in_model_output_idx = i
        break
if target_token_in_model_output_idx == -1:
    target_token_in_model_output_idx = 1 # Fallback if not found, for demonstration

# --- LIME-related functions and execution code follow here ---
def predict_ner_token_probs(texts, original_sentence_tokens, target_original_word_idx):
    all_target_token_probs = []
    original_word_at_target_idx = original_sentence_tokens[target_original_word_idx]

    for text in texts:
        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
        with torch.no_grad():
            outputs = model(**inputs)
        logits = outputs.logits
        probabilities = torch.softmax(logits, dim=-1).squeeze(0).cpu().numpy()

        found_subword_idx = -1
        current_tokenized_ids = inputs['input_ids'][0].tolist()
        current_subwords = tokenizer.convert_ids_to_tokens(current_tokenized_ids)

        for i, subword in enumerate(current_subwords):
            if original_word_at_target_idx.lower() in subword.lower() and i < probabilities.shape[0]:
                found_subword_idx = i
                break

        if found_subword_idx != -1:
            all_target_token_probs.append(probabilities[found_subword_idx])
        else:
            # If the word is not found in the perturbed text, return zeros
            all_target_token_probs.append(np.zeros(len(label_list)))

    return np.array(all_target_token_probs)

original_words = example_sentence.split()
target_word_for_lime = original_words[0]
target_word_original_idx = 0

explainer = LimeTextExplainer(
    class_names=label_list,
    # Use a simple whitespace split for LIME's internal segmentation.
    # The actual BERT tokenization happens within predict_ner_token_probs.
    split_expression=lambda text: text.split(' '),
)

print(f"\nLIME Explanation for '{target_word_for_lime}':")

original_sentence_split = example_sentence.split()
lime_exp = explainer.explain_instance(
    text_instance=example_sentence,
    classifier_fn=lambda texts: predict_ner_token_probs(texts, original_sentence_split, target_word_original_idx),
    labels=[label_to_id[label] for label in label_list], # All possible labels
    num_features=5,
    num_samples=100
)

print(f"\nLIME Explanation for '{original_sentence_split[target_word_original_idx]}' (original word index {target_word_original_idx}):")
inputs_original = tokenizer(example_sentence, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
with torch.no_grad():
    outputs_original = model(**inputs_original)
logits_original = outputs_original.logits
probabilities_original = torch.softmax(logits_original, dim=-1).squeeze(0).cpu().numpy()

# Ensure target_token_in_model_output_idx is within bounds
if target_token_in_model_output_idx >= probabilities_original.shape[0]:
    print(f"Warning: target_token_in_model_output_idx ({target_token_in_model_output_idx}) {probabilities_original.shape}. Setting to 1.")
    target_token_in_model_output_idx = 1 # Fallback

predicted_label_id = np.argmax(probabilities_original[target_token_in_model_output_idx])
predicted_label = id_to_label[predicted_label_id]
print(f"Predicted label for '{original_sentence_split[target_word_original_idx]}' is: {predicted_label}")

print("\nExplanation for predicted label:")
for feature, weight in lime_exp.as_list(label=predicted_label_id):
    print(f"  - {feature}: {weight:.4f}")

print("\nExplanation for all labels:")
for label_id in lime_exp.available_labels():
    print(f"\n--- For Label: {id_to_label[label_id]} ---")
    for feature, weight in lime_exp.as_list(label=label_id):
        print(f"  - {feature}: {weight:.4f}")

In [None]:
# Re-running SHAP/LIME for the difficult case

difficult_case_sentence = "ይህ ምርት 500 ግራም ይመዝናል እና ዋጋው 100 ብር ነው።"

target_difficult_word = "500"
target_difficult_word_original_idx = -1
difficult_sentence_split = difficult_case_sentence.split()
for i, word in enumerate(difficult_sentence_split):
    if word == target_difficult_word:
        target_difficult_word_original_idx = i
        break
if target_difficult_word_original_idx == -1:
    print(f"Warning: Could not find '{target_difficult_word}' in the difficult sentence. Setting to 0 for demo.")
    target_difficult_word_original_idx = 0 # Fallback for demo


print(f"\n--- Analyzing Difficult Case: '{difficult_case_sentence}' ---")

# SHAP for difficult case (e.g., explaining why '500' is 'O')
target_label_o_id = label_to_id['O']
target_token_difficult_idx = -1

encoded_difficult = tokenizer.encode_plus(difficult_case_sentence, return_tensors="pt", add_special_tokens=True)
tokens_difficult = tokenizer.convert_ids_to_tokens(encoded_difficult['input_ids'][0].tolist())

# Find the target token's index in the model's tokenized sequence (original sentence)
found_target_in_original = False
for i, token in enumerate(tokens_difficult):
    if target_difficult_word.lower() in token.lower() and i > 0 and token not in tokenizer.all_special_tokens:
        target_token_difficult_idx = i
        found_target_in_original = True
        break

if not found_target_in_original:
    print(f"Warning: Could not find '{target_difficult_word}' subword in original difficult sentence tokenization. Using first content token as fallback.")
    for i, token in enumerate(tokens_difficult):
        if token not in tokenizer.all_special_tokens:
            target_token_difficult_idx = i
            break
    if target_token_difficult_idx == -1:
        target_token_difficult_idx = 1 # Last resort fallback


print(f"Explaining prediction for token '{tokens_difficult[target_token_difficult_idx]}' (index {target_token_difficult_idx}) as 'O'")

# --- SHAP Specific Changes ---

def shap_predictor(texts):
    probabilities_for_target_label = []
    for text_instance in texts:

        if isinstance(text_instance, np.ndarray):
            text_str = text_instance[0] if text_instance.size > 0 else ""
        elif isinstance(text_instance, str):
            text_str = text_instance
        else:

            probabilities_for_target_label.append(0.0)
            continue

        if not text_str:
            probabilities_for_target_label.append(0.0)
            continue

        inputs = tokenizer(text_str, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
        with torch.no_grad():
            outputs = model(**inputs)
        logits = outputs.logits
        probabilities = torch.softmax(logits, dim=-1).squeeze(0).cpu().numpy()

        current_tokens_ids = inputs['input_ids'][0].tolist()
        current_tokens = tokenizer.convert_ids_to_tokens(current_tokens_ids)

        current_target_idx_in_perturbed = -1

        for i, tok in enumerate(current_tokens):
            if target_difficult_word.lower() in tok.lower() and i < probabilities.shape[0] and tok not in tokenizer.all_special_tokens:
                current_target_idx_in_perturbed = i
                break

        if current_target_idx_in_perturbed == -1:
            probabilities_for_target_label.append(0.0)
            continue

        prob_target_label = probabilities[current_target_idx_in_perturbed, target_label_o_id]
        probabilities_for_target_label.append(prob_target_label)

    return np.array(probabilities_for_target_label).reshape(-1, 1)

explainer_difficult = shap.Explainer(
    model=shap_predictor,
    masker=shap.maskers.Text(mask_token=tokenizer.mask_token, tokenizer=tokenizer)
)

shap_values_difficult = explainer_difficult([difficult_case_sentence])

print(f"\nSHAP Explanation for '{target_difficult_word}' as 'O' in difficult case:")
if len(shap_values_difficult) > 0:
    for i, instance in enumerate(shap_values_difficult):
        if hasattr(instance, 'data') and hasattr(instance, 'values'):
            text_data_difficult = [(token, value) for token, value in zip(instance.data, instance.values)]
            print(f"Explanation for instance {i}:")
            for token, value in text_data_difficult:
                # FIX: Access the scalar value from the numpy array
                if isinstance(value, np.ndarray) and value.size == 1:
                    actual_value = value.item()y
                else:
                    actual_value = value # Assume it's already a scalar if not a 1-element array
                print(f"  Token: '{token}', SHAP Value: {actual_value:.4f}")
        else:
            print(f"Could not retrieve data and values for SHAP instance {i}. Type: {type(instance)}")
else:
    print("No SHAP values generated.")


# LIME for difficult case (e.g., explaining why '500' is 'O')

explainer = LimeTextExplainer(
    class_names=label_list,
    split_expression=lambda text: text.split(' '),
)

def predict_ner_token_probs(texts, original_sentence_tokens, target_original_word_idx):
    all_target_token_probs = []
    original_word_at_target_idx = original_sentence_tokens[target_original_word_idx]

    for text in texts:
        inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
        with torch.no_grad():
            outputs = model(**inputs)
        logits = outputs.logits
        probabilities = torch.softmax(logits, dim=-1).squeeze(0).cpu().numpy()

        found_subword_idx = -1
        current_tokenized_ids = inputs['input_ids'][0].tolist()
        current_subwords = tokenizer.convert_ids_to_tokens(current_tokenized_ids)

        for i, subword in enumerate(current_subwords):

            if original_word_at_target_idx.lower() in subword.lower() and i < probabilities.shape[0]:
                found_subword_idx = i
                break

        if found_subword_idx != -1:
            all_target_token_probs.append(probabilities[found_subword_idx])
        else:
            all_target_token_probs.append(np.zeros(len(label_list)))

    return np.array(all_target_token_probs)


lime_exp_difficult = explainer.explain_instance(
    text_instance=difficult_case_sentence,
    classifier_fn=lambda texts: predict_ner_token_probs(texts, difficult_sentence_split, target_difficult_word_original_idx),
    labels=[label_to_id[label] for label in label_list],
    num_features=5,
    num_samples=100
)

inputs_difficult = tokenizer(difficult_case_sentence, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
with torch.no_grad():
    outputs_difficult = model(**inputs_difficult)
logits_difficult = outputs_difficult.logits
probabilities_difficult = torch.softmax(logits_difficult, dim=-1).squeeze(0).cpu().numpy()

if target_token_difficult_idx >= probabilities_difficult.shape[0]:
    print(f"Warning: target_token_difficult_idx ({target_token_difficult_idx}) is out of bounds for model output shape {probabilities_difficult.shape}. Adjusting to nearest valid index.")
    if probabilities_difficult.shape[0] > 1:
        target_token_difficult_idx = 1
    else:
        target_token_difficult_idx = 0


predicted_label_id_difficult = np.argmax(probabilities_difficult[target_token_difficult_idx])
predicted_label_name_difficult = id_to_label[predicted_label_id_difficult]

print(f"\nModel's Predicted Label for '{tokens_difficult[target_token_difficult_idx]}' in difficult case is: {predicted_label_name_difficult} (Prob: {probabilities_difficult[target_token_difficult_idx, predicted_label_id_difficult]:.4f})")
print(f"Contribution of words to predicting '{predicted_label_name_difficult}' for '{difficult_sentence_split[target_difficult_word_original_idx]}' (LIME):")

for feature, weight in lime_exp_difficult.as_list(label=predicted_label_id_difficult):
    print(f"  Word: '{feature}', Contribution: {weight:.4f}")

In [None]:
import pandas as pd
import numpy as np
import re
import torch
from lime.lime_text import LimeTextExplainer
print("\n" + "="*50)
print("TASK 6: FINTECH VENDOR SCORECARD FOR MICRO-LENDING")
print("="*50)

synthetic_posts_data = [
    {
        "vendor_id": "EthioTechMart",
        "post_text": "Electric Charcoal Burner በቀላሉ ከሰል ለማያያዝ የሚሆን አነስ ያለ ቦታ የማይዝ የሚሰራ ሻይ፣ ቡና ለማፍላት የሚሆን ዋጋ፦ 1600 ብር ውስን ፍሬ ነው ያለው አድራሻ መገናኛ_መሰረት_ደፋር_ሞል_ሁለተኛ_ፎቅ ቢሮ ቁ S05S06 0902660722 0928460606 በTelegram ለማዘዝ ይጠቀሙ zemencallcenter zemenexpressadmin ለተጨማሪ ማብራሪያ የቴሌግራም ገፃችን httpstelegrammezemenexpress",
        "timestamp": "2025-06-20 10:00:00",
        "views": 5200,
        "ner_labels": [('Electric Charcoal Burner', 'Product'), ('1600', 'Money'), ('መገናኛ_መሰረት_ደፋር_ሞል_ሁለተኛ_ፎቅ', 'LOC')]
    },
    {
        "vendor_id": "EthioTechMart",
        "post_text": "አዲስ አይፎን 15 ፕሮ ማክስ በ200,000 ብር ብቻ። ፈጥነው ይዘዙ!",
        "timestamp": "2025-06-22 14:30:00",
        "views": 8500,
        "ner_labels": [('አይፎን 15 ፕሮ ማክስ', 'Product'), ('200,000', 'Money')]
    },
    {
        "vendor_id": "EthioTechMart",
        "post_text": "ምርጥ የቡና መፍጫ ማሽን 3500 ብር። አድራሻ ቦሌ ሩዋንዳ.",
        "timestamp": "2025-06-25 09:15:00",
        "views": 4800,
        "ner_labels": [('የቡና መፍጫ ማሽን', 'Product'), ('3500', 'Money'), ('ቦሌ ሩዋንዳ', 'LOC')]
    },
    {
        "vendor_id": "EthioTechMart",
        "post_text": "የቤት እቃዎች ቅናሽ! ለበለጠ መረጃ ይደውሉልን።",
        "timestamp": "2025-06-28 11:00:00",
        "views": 3100,
        "ner_labels": [] # No specific product/money entity extracted
    },
    {
        "vendor_id": "FashionEthio",
        "post_text": "የሴቶች ፋሽን ልብሶች። ዋጋ 800 ብር። አድራሻ ሜክሲኮ.",
        "timestamp": "2025-06-18 16:00:00",
        "views": 2500,
        "ner_labels": [('የሴቶች ፋሽን ልብሶች', 'Product'), ('800', 'Money'), ('ሜክሲኮ', 'LOC')]
    },
    {
        "vendor_id": "FashionEthio",
        "post_text": "ቄንጠኛ ጫማዎች በ1200 ብር።",
        "timestamp": "2025-06-20 10:30:00",
        "views": 3200,
        "ner_labels": [('ጫማዎች', 'Product'), ('1200', 'Money')]
    },
    {
        "vendor_id": "FashionEthio",
        "post_text": "ልዩ ልዩ ሽመናዎች። በታላቅ ቅናሽ! ይምጡና ይጎብኙን።",
        "timestamp": "2025-06-21 18:00:00",
        "views": 1800,
        "ner_labels": [('ሽመናዎች', 'Product')]
    },
    {
        "vendor_id": "FashionEthio",
        "post_text": "የባህል ልብሶች 1500 ብር። አድራሻ ፒያሳ.",
        "timestamp": "2025-06-26 09:00:00",
        "views": 2900,
        "ner_labels": [('የባህል ልብሶች', 'Product'), ('1500', 'Money'), ('ፒያሳ', 'LOC')]
    },
    {
        "vendor_id": "EthioCars",
        "post_text": "የ2020 ቶዮታ ካምሪ ለሽያጭ ቀርቧል። ዋጋ፦ 3 ሚሊየን ብር።",
        "timestamp": "2025-06-19 12:00:00",
        "views": 15000,
        "ner_labels": [('ቶዮታ ካምሪ', 'Product'), ('3 ሚሊየን', 'Money')]
    },
    {
        "vendor_id": "EthioCars",
        "post_text": "አዳዲስ የመርሴዲስ መኪኖች አሉ።",
        "timestamp": "2025-06-27 10:00:00",
        "views": 9000,
        "ner_labels": [('መርሴዲስ መኪኖች', 'Product')]
    }
]

df_posts = pd.DataFrame(synthetic_posts_data)
df_posts['timestamp'] = pd.to_datetime(df_posts['timestamp'])


def get_ner_predictions(text, model, tokenizer, id_to_label):
    inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512).to(device)
    with torch.no_grad():
        outputs = model(**inputs)
    predictions = torch.argmax(outputs.logits, dim=-1).squeeze(0).cpu().numpy()

    tokens = tokenizer.convert_ids_to_tokens(inputs["input_ids"].squeeze().tolist())
    entities = []
    current_entity = []
    current_entity_type = None

    for i, (token_id, pred_id) in enumerate(zip(inputs["input_ids"].squeeze().tolist(), predictions)):
        token = tokenizer.decode([token_id])

        if token in ['<s>', '</s>', '<pad>']:
            if current_entity: # If there's an active entity before a special token, close it
                entities.append((" ".join(current_entity).replace(" ##", ""), current_entity_type))
            current_entity = []
            current_entity_type = None
            continue

        pred_label = id_to_label[pred_id]

        if pred_label.startswith("B-"):
            if current_entity: # Close previous entity if exists
                entities.append((" ".join(current_entity).replace(" ##", ""), current_entity_type))
            current_entity = [token.replace(" ", "")]
            current_entity_type = pred_label[2:]
        elif pred_label.startswith("I-"):
            if current_entity and pred_label[2:] == current_entity_type:
                current_entity.append(token.replace(" ", ""))
            else:
                if current_entity:
                    entities.append((" ".join(current_entity).replace(" ##", ""), current_entity_type))
                current_entity = [token.replace(" ", "")]
                current_entity_type = pred_label[2:] # Start new entity
        else:
            if current_entity:
                entities.append((" ".join(current_entity).replace(" ##", ""), current_entity_type))
            current_entity = []
            current_entity_type = None
    if current_entity:
        entities.append((" ".join(current_entity).replace(" ##", ""), current_entity_type))
    return entities

# --- Develop a Vendor Analytics Engine ---
class VendorAnalyticsEngine:
    def __init__(self, posts_df, ner_model, ner_tokenizer, id_to_label_map):
        self.posts_df = posts_df
        self.ner_model = ner_model
        self.ner_tokenizer = ner_tokenizer
        self.id_to_label_map = id_to_label_map
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.ner_model.to(self.device)
        self.ner_model.eval()

    def _get_ner_entities_from_text(self, text):
        """Helper to get NER entities for a given text."""
        return get_ner_predictions(text, self.ner_model, self.ner_tokenizer, self.id_to_label_map)

    def calculate_vendor_metrics(self, vendor_id):
        vendor_posts = self.posts_df[self.posts_df['vendor_id'] == vendor_id].copy()

        if vendor_posts.empty:
            return None

        # Sort posts by timestamp for consistency calculations
        vendor_posts = vendor_posts.sort_values(by='timestamp')

        # 1. Activity & Consistency: Posting Frequency (Posts per Week)
        if len(vendor_posts) > 1:
            time_span_days = (vendor_posts['timestamp'].max() - vendor_posts['timestamp'].min()).days
            if time_span_days == 0:
                posting_frequency = len(vendor_posts) * 7
            else:
                posting_frequency = (len(vendor_posts) / time_span_days) * 7
        else:
            posting_frequency = 1.0

        average_views_per_post = vendor_posts['views'].mean()

        top_post = vendor_posts.loc[vendor_posts['views'].idxmax()]
        top_product_ner = [ent for ent in top_post['ner_labels'] if ent[1] == 'Product']

        top_price_ner = [ent for ent in top_post['ner_labels'] if ent[1] == 'Money']

        top_performing_product = top_product_ner[0][0] if top_product_ner else "N/A"
        top_performing_price = self._parse_price(top_price_ner[0][0]) if top_price_ner else None

        all_prices = []
        for _, row in vendor_posts.iterrows():
            for entity, entity_type in row['ner_labels']:
                if entity_type == 'Money':
                    price = self._parse_price(entity)
                    if price is not None:
                        all_prices.append(price)

        average_price_point = np.mean(all_prices) if all_prices else 0.0

        return {
            "vendor_id": vendor_id,
            "posting_frequency": posting_frequency,
            "average_views_per_post": average_views_per_post,
            "top_performing_post_views": top_post['views'],
            "top_performing_post_product": top_performing_product,
            "top_performing_post_price": top_performing_price,
            "average_price_point": average_price_point
        }

    def _parse_price(self, price_string):
        price_string = price_string.lower().replace(",", "")
        price = None
        try:
            if "ሚሊየን" in price_string or "million" in price_string:

                num_part = re.search(r'(\d+\.?\d*)\s*(ሚሊየን|million)', price_string)
                if num_part:
                    price = float(num_part.group(1)) * 1_000_000
            elif "ሺህ" in price_string or "k" in price_string:
                num_part = re.search(r'(\d+\.?\d*)\s*(ሺህ|k)', price_string)
                if num_part:
                    price = float(num_part.group(1)) * 1_000
            else:
                price = float(re.sub(r'[^\d.]', '', price_string))
        except ValueError:
            pass # Return None if parsing fails
        return price

    def calculate_all_vendors_scores(self):
        vendor_ids = self.posts_df['vendor_id'].unique()
        all_vendor_metrics = []
        for vendor_id in vendor_ids:
            metrics = self.calculate_vendor_metrics(vendor_id)
            if metrics:
                all_vendor_metrics.append(metrics)
        return all_vendor_metrics

    def assign_lending_score(self, vendor_metrics):

        max_views = self.posts_df['views'].max() if not self.posts_df.empty else 1

        all_frequencies = []
        for vendor_id in self.posts_df['vendor_id'].unique():
            vendor_posts_temp = self.posts_df[self.posts_df['vendor_id'] == vendor_id].copy()
            if len(vendor_posts_temp) > 1:
                time_span_days_temp = (vendor_posts_temp['timestamp'].max() - vendor_posts_temp['timestamp'].min()).days
                if time_span_days_temp == 0:
                    all_frequencies.append(len(vendor_posts_temp) * 7)
                else:
                    all_frequencies.append((len(vendor_posts_temp) / time_span_days_temp) * 7)
            elif len(vendor_posts_temp) == 1: # Only one post
                all_frequencies.append(1.0) # Assume 1 post per week for a single post
            else: # No posts for this vendor
                all_frequencies.append(0.0) # Or some other neutral value

        max_frequency = max(all_frequencies) if all_frequencies else 1.0
        if max_frequency == 0:
            max_frequency = 1.0

        all_extracted_prices = []
        for _, row in self.posts_df.iterrows():
            for entity, entity_type in row['ner_labels']:
                if entity_type == 'Money':
                    price = self._parse_price(entity)
                    if price is not None:
                        all_extracted_prices.append(price)
        max_avg_price = np.mean(all_extracted_prices) * 2 if all_extracted_prices else 100000 # A heuristic max

        norm_views = vendor_metrics['average_views_per_post'] / max_views
        norm_freq = vendor_metrics['posting_frequency'] / max_frequency
        norm_price = vendor_metrics['average_price_point'] / max_avg_price if max_avg_price > 0 else 0

        # Define weights (can be adjusted based on business logic)
        weight_views = 0.4
        weight_frequency = 0.3
        weight_price = 0.3

        lending_score = (norm_views * weight_views) + \
                        (norm_freq * weight_frequency) + \
                        (norm_price * weight_price)

        # Scale to a more intuitive range, e.g., 0-100
        lending_score_scaled = lending_score * 100
        return round(lending_score_scaled, 2)

# --- Main execution for Task 6 ---
if __name__ == "__main__":
    try:
        model
        tokenizer
        id_to_label
        device
    except NameError:
        print("Warning: `model`, `tokenizer`, `id_to_label`, `device` are not defined globally. Please ensure they are loaded in previous cells or define placeholders here.")
        # Minimal placeholders to allow the code to run for demonstration purposes
        class MockTokenizer:
            def __call__(self, text, return_tensors, padding, truncation, max_length):
                return {'input_ids': torch.tensor([[0, 1, 2, 3, 4]]), 'attention_mask': torch.tensor([[1, 1, 1, 1, 1]])}
            def decode(self, ids):
                return f"tok_{ids[0]}"
            def convert_ids_to_tokens(self, ids):
                return [f"tok_{i}" for i in ids]
        class MockModel(torch.nn.Module):
            def __init__(self):
                super().__init__()
                self.logits = torch.randn(1, 5, 7)
            def __call__(self, **kwargs):
                return type('obj', (object,), {'logits': self.logits})()
            def to(self, device): pass
            def eval(self): pass

        tokenizer = MockTokenizer()
        model = MockModel()
        device = torch.device("cpu")
        label_list = ["O", "B-Product", "I-Product", "B-Money", "I-Money", "B-LOC", "I-LOC"]
        id_to_label = {i: label for i, label in enumerate(label_list)}
        label_to_id = {label: i for i, label in enumerate(label_list)}


    # Create the analytics engine
    analytics_engine = VendorAnalyticsEngine(df_posts, model, tokenizer, id_to_label)

    # Calculate metrics for all vendors
    all_vendor_metrics_data = analytics_engine.calculate_all_vendors_scores()

    # Create the final scorecard table
    scorecard_data = []
    for vendor_data in all_vendor_metrics_data:
        lending_score = analytics_engine.assign_lending_score(vendor_data)
        scorecard_data.append({
            "Vendor ID": vendor_data['vendor_id'],
            "Avg. Views/Post": round(vendor_data['average_views_per_post']),
            "Posts/Week": round(vendor_data['posting_frequency'], 2),
            "Avg. Price (ETB)": round(vendor_data['average_price_point'], 2),
            "Top Post Product": vendor_data['top_performing_post_product'],
            "Top Post Price": vendor_data['top_performing_post_price'],
            "Lending Score": lending_score
        })

    # Sort by Lending Score for better presentation
    vendor_scorecard_df = pd.DataFrame(scorecard_data).sort_values(by="Lending Score", ascending=False)

    print("\n--- Vendor Scorecard ---")
    print(vendor_scorecard_df.to_string(index=False))

    print("\n--- Vendor Scorecard Analysis ---")
    print("The 'Lending Score' is a composite metric designed to identify promising vendors for micro-lending.")
    print("It combines aspects of activity, market reach, and business profile.")
    print("  - **Avg. Views/Post**: Indicates customer interest and reach. Higher is better.")
    print("  - **Posts/Week**: Shows consistency and activity level. Higher is better.")
    print("  - **Avg. Price (ETB)**: Gives insight into the typical price point of products. This could influence loan size and risk assessment.")
    print("  - **Top Post Product/Price**: Highlights successful products and their associated price.")
    print("  - **Lending Score**: Our custom score, normalized 0-100. Higher scores suggest more active and engaging vendors.")
    print("\nBased on this synthetic data:")
    print(f"- **{vendor_scorecard_df.iloc[0]['Vendor ID']}** appears to be the most promising due to its high views and consistent posting.")
    print("  - Businesses with higher average views per post and more frequent posting generally indicate stronger engagement and a more active business, making them potentially lower risk for micro-lending.")
    print("  - The average price point helps understand the scale of products they deal with, which can inform loan amounts.")