In [None]:
!pip install pandas requests openpyxl

In [None]:
"""
Entity classification mapping dictionary.

This dictionary maps entity types to their corresponding identifiers
used in the entity classification system.
"""

mapping_dict = {
#"LLM_PERSON": "person-name",
#"STREET_ADDRESS": "street-address",
"DATE_OF_BIRTH": "date-of-birth",
"US_SSN": "us-ssn",
"US_DRIVER_LICENSE": "us-drivers-license",
"PHONE_NUMBER": "phone-number",
"EMAIL_ADDRESS": "email-address",
"US_BANK_NUMBER": "us-bank-account-number",
"ROUTING_NUMBER": "bank-routing-number",
}



In [None]:

import pandas as pd
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from typing import Tuple, List, Dict, Any, Set
from typing import Dict
from collections import defaultdict
import requests


In [None]:
def call_classification_api(text, mode="all", anonymize=False):
    """
    Call the Pebblo classification API
    
    Args:
        text (str): The text to classify
        mode (str): Classification mode - "all", "entity", or "topic"
        anonymize (bool): Whether to anonymize the results
    
    Returns:
        dict: The classification response
    """
    url = "http://localhost:8000/api/v1/classify"
    
    # Prepare the request payload without llm_config
    payload = {
        "text": text,
        "anonymize": anonymize,
        "country_list": ["US"]
    }
    
    try:
        #print("payload", payload)
        response = requests.post(url, json=payload)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error making API request: {e}")
        return None
    

In [None]:

def read_entity_test_data(file_path: str) -> pd.DataFrame:
    """
    Reads the entity test data from an Excel file.

    Args:
        file_path (str): Path to the Excel file.

    Returns:
        pd.DataFrame: DataFrame containing the test data.

    Raises:
        FileNotFoundError: If the file does not exist.
        ValueError: If the file cannot be read as an Excel file.
    """
    try:
        df = pd.read_excel(file_path)
        return df
    except FileNotFoundError as fnf_err:
        print(f"File not found: {file_path}")
        raise fnf_err
    except Exception as exc:
        print(f"Error reading Excel file: {exc}")
        raise ValueError(f"Could not read Excel file: {file_path}") from exc

# Example usage:
# test_data_df = read_entity_test_data("entity_test_data.xlsx")


In [None]:
input_excel="../test_data/final_entity_dataset.xlsx"

In [None]:

test_data_df = read_entity_test_data(input_excel)

In [None]:
experiment_name = "Experiment_13"
description = ""
tags = []
test_column = "mapped_output"

In [None]:
def convert_api_response_to_entity_list(api_response: Dict[str, Any]) -> List[Dict[str, Any]]:
    """
    Converts an API response with 'entityDetails' and 'data' fields into a list of entity dicts
    in the format: {'start': int, 'end': int, 'label': str, 'extracted_text': str}.

    Args:
        api_response (Dict[str, Any]): The API response containing 'entityDetails' and 'data'.

    Returns:
        List[Dict[str, Any]]: List of entities in the required format.

    Raises:
        KeyError: If required keys are missing in the API response.
        ValueError: If location format is invalid.
    """
    entities: List[Dict[str, Any]] = []
    entity_details = api_response.get("entityDetails", {})
    text_data = api_response.get("data", "")

    if not entity_details or not isinstance(entity_details, dict):
        return entities

    for label, entity_list in entity_details.items():
        for entity in entity_list:
            location = entity.get("location")
            if not location or "_" not in location:
                continue
            try:
                start_str, end_str = location.split("_")
                start = int(start_str)
                end = int(end_str)
                extracted_text = text_data[start:end]
                entities.append({
                    "start": start,
                    "end": end,
                    "label": label,
                    "extracted_text": extracted_text
                })
            except (ValueError, TypeError) as exc:
                print(f"Skipping entity with invalid location '{location}': {exc}")
                continue
    return entities



In [None]:
def aggregate_classification_results(
    df: pd.DataFrame, text_column: str = "text", max_workers: int = 6
) -> (List[Dict[str, Any]], List[float]):
    """
    Iterates over each row in the DataFrame, sends the specified text column to the classification API in parallel,
    and aggregates the results in a list.

    Args:
        df (pd.DataFrame): DataFrame containing the test data.
        text_column (str): Name of the column containing the text to classify.
        max_workers (int): Maximum number of threads to use for parallel processing.

    Returns:
        Tuple[List[Dict[str, Any]], List[float]]: List of classification results and response times for each row.

    Raises:
        KeyError: If the specified text_column does not exist in the DataFrame.
    """
    if text_column not in df.columns:
        raise KeyError(f"Column '{text_column}' not found in DataFrame.")

    results: List[Any] = [None] * len(df)
    response_time: List[float] = [0.0] * len(df)

    def process_row(idx: int, text: str):
        if idx % 100 == 0:
            print(f"Processing row {idx} of {len(df)}")
        start_time = time.time()
        try:
            result = call_classification_api(text, mode="entity")
            converted = convert_api_response_to_entity_list(result)
        except Exception as exc:
            print(f"Error processing row {idx}: {exc}")
            converted = {"row_index": idx, "error": str(exc)}
        end_time = time.time()
        return idx, converted, end_time - start_time

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_idx = {
            executor.submit(process_row, idx, row.get(text_column, "")): idx
            for idx, row in df.iterrows()
        }
        for future in as_completed(future_to_idx):
            idx, converted, elapsed = future.result()
            results[idx] = converted
            response_time[idx] = elapsed

    return results, response_time


# Aggregate results for the first 10 rows
start_time = time.time()
classification_results, response_time = aggregate_classification_results(
    test_data_df, text_column="text"
)


In [None]:
test_data_df[f"response_time_{experiment_name}"] = response_time
test_data_df[f"classification_results_{experiment_name}"] = classification_results
test_data_df.to_excel(input_excel, index=False)
experiment_time = time.time() - start_time
print("Done")

In [None]:
def extract_entity_pairs_from_list(entity_list: List[dict]) -> Set[Tuple[str, str]]:
    """
    Extracts (entity_type, value) pairs from a list of entity dicts.

    Args:
        entity_list (List[dict]): List of entity dicts, each with at least 'label' and 'extracted_text' keys.

    Returns:
        Set[Tuple[str, str]]: Set of (entity_type, value) pairs.
    """
    entity_pairs = set()
    if not isinstance(entity_list, list):
        return entity_pairs
    for ent in entity_list:
        ent_type = ent.get("label")
        value = ent.get("extracted_text").strip()
        if ent_type is not None and value is not None:
            entity_pairs.add((ent_type, str(value)))
    return entity_pairs

def compute_entity_metrics_for_lists(
    df: pd.DataFrame,
    actual_col: str = "mapped_output",
    pred_col: str = None,
) -> Tuple[pd.DataFrame, Dict[str, float]]:
    """
    Computes per-entity and overall metrics (accuracy, precision, recall, F1) for entity classification,
    assuming both actual and predicted entities are lists of dicts.

    Args:
        df (pd.DataFrame): DataFrame with ground truth in `actual_col`.
        classification_results (List[List[dict]]): List of predicted entity lists (one per row).
        actual_col (str): Name of the column with actual entity lists.
        pred_col (str, optional): If not None, use this column in df for predicted entity lists.

    Returns:
        Tuple[pd.DataFrame, Dict[str, float]]: (Per-entity metrics DataFrame, overall metrics dict)
    """
    entity_stats = defaultdict(lambda: {
        "support": 0,
        "actual_count": 0,
        "correct": 0,
        "extra": 0,
        "missed": 0,
        "wrong": 0,
        "tp": 0,
        "fp": 0,
        "fn": 0,
    })

    all_entity_types = set()

    for idx, row in df.iterrows():
        # Get actual entities as list of dicts
        actual_entities = row[actual_col]
        # INSERT_YOUR_CODE
        # Replace any curly single quotes (‘ or ’) with straight single quotes (')
        if isinstance(actual_entities, str):
            actual_entities = actual_entities.replace('‘', "'").replace('’', "'")
            #actual_entities = actual_entities.replace('’', "'")
            actual_entities = actual_entities.replace('‘', '"')
        pred_entities = row[pred_col]

        if isinstance(actual_entities, str):
            try:
                actual_entities = eval(actual_entities)
            except Exception as e:
                print("Exception in actual_entities", e)
                actual_entities = []
        if not isinstance(actual_entities, list):
            actual_entities = []
        if isinstance(pred_entities, str):
            try:
                pred_entities = eval(pred_entities)
            except Exception as e:
                print("Exception in pred_entities", e)
                pred_entities = []
        if not isinstance(pred_entities, list):
            pred_entities = []
        # Build sets of (entity_type, value) for actual and predicted

        actual_set = extract_entity_pairs_from_list(actual_entities)
        pred_set = extract_entity_pairs_from_list(pred_entities)


        actual_types = set([et for et, _ in actual_set])
        pred_types = set([et for et, _ in pred_set])
        all_types = actual_types | pred_types
        all_entity_types.update(all_types)

        # Update support and actual_count
        for ent_type in actual_types:
            entity_stats[ent_type]["support"] += 1
            entity_stats[ent_type]["actual_count"] += sum(1 for t, _ in actual_set if t == ent_type)

        # For each entity type, update stats
        for ent_type in all_types:
            actual_vals = set([v for t, v in actual_set if t == ent_type])
            pred_vals = set([v for t, v in pred_set if t == ent_type])
            correct = actual_vals & pred_vals
            extra = pred_vals - actual_vals
            missed = actual_vals - pred_vals
            # "wrong" is not well-defined for this case, but we keep it for compatibility
            wrong = set()

            entity_stats[ent_type]["correct"] += len(correct)
            entity_stats[ent_type]["extra"] += len(extra)
            entity_stats[ent_type]["missed"] += len(missed)
            entity_stats[ent_type]["wrong"] += len(wrong)
            entity_stats[ent_type]["tp"] += len(correct)
            entity_stats[ent_type]["fp"] += len(extra)
            entity_stats[ent_type]["fn"] += len(missed)

    # Compute metrics per entity
    metrics = []
    for ent_type in sorted(all_entity_types):
        stats = entity_stats[ent_type]
        tp = stats["tp"]
        fp = stats["fp"]
        fn = stats["fn"]
        support = stats["support"]
        actual_count = stats["actual_count"]
        denom = tp + fp + fn
        accuracy = tp / denom if denom > 0 else 0.0
        precision = tp / (tp + fp) if (tp + fp) > 0 else 0.0
        recall = tp / (tp + fn) if (tp + fn) > 0 else 0.0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0.0

        metrics.append({
            "entity_type": ent_type,
            "actual_count": actual_count,
            "support": support,
            "correct": stats["correct"],
            "extra": stats["extra"],
            "missed": stats["missed"],
            "wrong": stats["wrong"],
            "accuracy": accuracy,
            "precision": precision,
            "recall": recall,
            "f1": f1
        })

    metrics_df = pd.DataFrame(metrics)

    # Compute macro averages (across all entities)
    macro = {}
    for metric in ["accuracy", "precision", "recall", "f1"]:
        macro[f"macro_{metric}"] = metrics_df[metric].mean() if not metrics_df.empty else 0.0

    return metrics_df, macro




In [None]:
# Example usage:
# Assume test_data_df["mapped_output"] and classification_results are both lists of dicts per row
entity_metrics_df, macro_metrics = compute_entity_metrics_for_lists(
    test_data_df, actual_col=test_column, pred_col=f"classification_results_{experiment_name}"
)
# Sort the entity_metrics_df by f1 score in descending order for better interpretability
entity_metrics_df = entity_metrics_df.sort_values(by="f1", ascending=False).reset_index(drop=True)
entity_metrics_df = entity_metrics_df[entity_metrics_df["f1"] > 0].reset_index(drop=True)
pd.set_option('display.max_colwidth', 200)
display(entity_metrics_df[["entity_type", "actual_count", "correct", "extra", "missed", "wrong", "accuracy", "precision", "recall", "f1"]])
print("Macro-averaged metrics across all entities:")
for k, v in macro_metrics.items():
    print(f"{k}: {v:.3f}")

In [None]:
experiment_name

In [None]:
# Count the number of NaN values in the test_data_df DataFrame
nan_count = test_data_df.isna().sum().sum()
print(f"Total number of NaN values in test_data_df: {nan_count}")
# Identify columns in test_data_df that contain NaN values and print them

def print_columns_with_nan(df: pd.DataFrame) -> None:
    """
    Prints the names of columns in the DataFrame that contain NaN values.

    Args:
        df (pd.DataFrame): The DataFrame to check for NaN values.

    Returns:
        None
    """
    nan_columns = df.columns[df.isna().any()].tolist()
    if nan_columns:
        print(f"Columns with NaN values: {nan_columns}")
    else:
        print("No columns contain NaN values.")

num_nan_classification_results = test_data_df["classification_results"].isna().sum()
print(f"Number of NaN values in 'classification_results': {num_nan_classification_results}")



In [None]:
test_data_df.iloc[1395]

In [None]:


def get_rows_with_extra_entity(
    test_data_df: pd.DataFrame,
    entity_label: str,
    actual_col: str = "mapped_output",
    pred_col: str = "classification_results",
) -> pd.DataFrame:
    """
    Returns rows from test_data_df where the specified entity_label was detected as extra
    in classification_results (i.e., present in prediction but not in mapped_output).

    Only the specified entity_label is checked. The entity_label should match the label
    used in the entity extraction output (e.g., 'bank-account-number', 'ssn', etc.).

    Args:
        test_data_df (pd.DataFrame): The original test data DataFrame.
        classification_results (list[dict]): List of dicts with extracted entities per row.
        entity_label (str): The entity type to check for extra detections.

    Returns:
        pd.DataFrame: Subset of test_data_df where entity_label was extra.
    """
    extra_indices = []
    for idx, (actual, predicted) in enumerate(
        zip(test_data_df[actual_col], test_data_df[pred_col])
    ):
        # Ensure 'actual' is a list of dicts, not a string
        if isinstance(actual, str):
            try:
                import ast
                actual = actual.replace('‘', "'").replace('’', "'")
            #actual_entities = actual_entities.replace('’', "'")
                actual = actual.replace('‘', '"')
                actual = ast.literal_eval(actual)
            except (ValueError, SyntaxError):
                actual = []
        # Only consider entities of the specified label
        actual_entities = [
            ent for ent in actual if ent.get("label") == entity_label
        ]
        if isinstance(predicted, str):
            try:
                import ast
                predicted = ast.literal_eval(predicted)
                print("predicted",predicted)
            except (ValueError, SyntaxError):
                predicted_entities = []

        try:    
            predicted_entities = [
                ent for ent in predicted if ent.get("label") == entity_label
            ]  
        except:
            print("predicted - idx",idx, predicted)
            predicted_entities = []

        # If there are predicted entities but none actual, mark as extra
        if predicted_entities and not actual_entities:
            extra_indices.append(idx)
        else:
            # Check if any predicted entity is not matched by actual (by extracted_text or span)
            for pred_ent in predicted_entities:
                match_found = any(
                    (pred_ent.get("extracted_text") == act_ent.get("extracted_text"))
                    or (
                        pred_ent.get("start") == act_ent.get("start")
                        and pred_ent.get("end") == act_ent.get("end")
                    )
                    for act_ent in actual_entities
                )
                if not match_found:
                    extra_indices.append(idx)
                    break  # Only need to add the row once

    return test_data_df.iloc[extra_indices]

# Example usage:
# To get rows with extra detections for a specific entity type, e.g., "bank-account-number" or "ssn":
# extra_bank_account_rows = get_rows_with_extra_entity(test_data_df, classification_results, "bank-account-number")
extra_entity = get_rows_with_extra_entity(test_data_df, "uk-sort-code", "mapped_output", f"classification_results_{experiment_name}")
len(extra_entity)

In [None]:
f"classification_results_{experiment_name}"

In [None]:
from typing import List, Dict, Any
import ast

def extract_entities_from_results(
    rows_df,
    classification_results: List[List[Dict[str, Any]]],
    entity_label: str
) -> List[str]:
    """
    
    Extracts the specified entity from classification_results for the given rows.

    Args:
        rows_df (pd.DataFrame): DataFrame containing the rows of interest (e.g., extra_entity).
        classification_results (List[List[Dict[str, Any]]]): List of entity dicts per row.
        entity_label (str): The entity type to extract.

    Returns:
        List[str]: List of extracted entity texts for the specified entity_label.
    """
    extracted_entities = []
    for idx in rows_df.index:
        # Defensive: classification_results may be a list of dicts or a string
        row_results = classification_results[idx]
        if isinstance(row_results, str):
            try:
                row_results = ast.literal_eval(row_results)
            except (ValueError, SyntaxError):
                row_results = []
        for ent in row_results:
            if ent.get("label") == entity_label and "extracted_text" in ent:
                extracted_entities.append(ent["extracted_text"])
    return extracted_entities

# Example usage:
# Extract all phone numbers from extra_entity rows
phone_numbers = extract_entities_from_results(extra_entity, classification_results, "phone-number")
print("Extracted phone numbers:", phone_numbers)


In [None]:
phone_numbers.index('284-38-7491')

In [None]:
# Get all rows where 'bank-routing-number' was missed
def get_rows_with_missed_entity(
    test_data_df: pd.DataFrame,
    classification_results: list[dict],
    entity_label: str
) -> pd.DataFrame:
    """
    Returns rows from test_data_df where the specified entity_label was missed in classification_results.

    Args:
        test_data_df (pd.DataFrame): The original test data DataFrame.
        classification_results (list[dict]): List of dicts with extracted entities per row.
        entity_label (str): The entity type to check for missed detections.

    Returns:
        pd.DataFrame: Subset of test_data_df where entity_label was missed.
    """
    missed_indices = []
    for idx, (actual, predicted) in enumerate(
        zip(test_data_df["mapped_output"], classification_results)
    ):
        # Ensure 'actual' is a list of dicts, not a string
        if isinstance(actual, str):
            try:
                import ast
                actual = ast.literal_eval(actual)
            except (ValueError, SyntaxError):
                actual = []
        # Get all actual entities of the target type
        actual_entities = [
            ent for ent in actual if ent.get("label") == entity_label
        ]
        # Get all predicted entities of the target type
        predicted_entities = [
            ent for ent in predicted if ent.get("label") == entity_label
        ]
        # If there are actual entities but none predicted, mark as missed
        if actual_entities and not predicted_entities:
            missed_indices.append(idx)
        else:
            # Check if any actual entity is not matched by prediction (by extracted_text or span)
            for act_ent in actual_entities:
                match_found = any(
                    (act_ent.get("extracted_text") == pred_ent.get("extracted_text"))
                    or (
                        act_ent.get("start") == pred_ent.get("start")
                        and act_ent.get("end") == pred_ent.get("end")
                    )
                    for pred_ent in predicted_entities
                )
                if not match_found:
                    missed_indices.append(idx)
                    break

    return test_data_df.iloc[missed_indices]

# Usage: get all rows where 'bank-routing-number' was missed
missed_rows = get_rows_with_missed_entity(
    test_data_df, classification_results, entity_label="uk-nino",
)

len(missed_rows)


In [None]:
# Print all bank account numbers from the missed rows
def extract_bank_account_numbers(df, missed_label) -> list[str]:
    """Extracts all unique bank account numbers from the 'mapped_output' column of the given DataFrame.

    Args:
        df (pd.DataFrame): DataFrame containing a 'mapped_output' column with entity extraction results.

    Returns:
        list[str]: List of unique bank account numbers found in the missed rows.
    """
    bank_account_numbers = set()
    for entities in df["mapped_output"]:
        # Ensure entities is a list of dicts
        
        if isinstance(entities, str):
            try:
                import ast
                entities = ast.literal_eval(entities)
            except (ValueError, SyntaxError):
                entities = []
        for ent in entities:
            if (
                isinstance(ent, dict)
                and ent.get("label") == missed_label
                and ent.get("extracted_text")
            ):
                bank_account_numbers.add(ent["extracted_text"])
    return list(bank_account_numbers)

missed_bank_account_numbers = extract_bank_account_numbers(missed_rows, "phone-number")
print("Missed :")
for acc_num in missed_bank_account_numbers:
    print(acc_num)

In [None]:
index=2
text = extra_entity.iloc[index]["text"]
op = call_classification_api(text)
new_op =convert_api_response_to_entity_list(op)
new_op

In [None]:
extra_entity.iloc[index]["mapped_output"]

In [None]:
extra_entity.iloc[index][f"classification_results_{experiment_name}"]

In [None]:
text

In [None]:
extra_entity.to_excel("extra_entity.xlsx")

In [None]:
text = """   APPLICATION FOR HOUSING BENEFIT - Section C: Personal Details. Please complete all mandatory fields marked with an asterisk (*). Full name*: Marcus Johnson. Date of birth*: 15/03/1987. National Insurance number*: TK571394B. NHS number (if known): 785 341 9672. Current residential address*: 67 Elm Grove, Leeds, LS2 9JT. Are you the tenant? YES. Contact telephone number*: +44 113 542 8671. Email address: m.johnson@email.co.uk. Bank details for payments: Sort code: 40-52-71, Account number: 63847295, Account holder: Marcus Johnson. Declaration: I certify that the information provided is true and complete. Any false statements may result in prosecution. For office use only - Reference: HB/2025/3847. Processing officer: Sarah Mitchell, Leeds City Council.
"""

In [None]:
call_classification_api(text, mode="all")

In [None]:
text[1324:1331]

In [None]:
import pandas as pd

def update_license_plate_number_in_excel(
    excel_path: str,
    input_file_path: str,
    experiment_col: str = "classification_results_Experiment_5",
    mapped_col: str = "mapped_output",
    index_col: str = "index"
) -> None:
    """
    Updates the mapped_output column in the Excel file for the given index,
    setting the mapped_output to only the license_plate_number entity/entities
    found in the classification_results_Experiment_5 column.

    Args:
        excel_path (str): Path to the extra_entity Excel file.
        input_file_path (str): Path to the input file (not used for writing, just for context).
        experiment_col (str): Name of the column with classification results.
        mapped_col (str): Name of the column to update.
        index_col (str): Name of the index column.
    """
    # Read the Excel file
    df = pd.read_excel(excel_path)

    # Ensure the experiment column exists
    if experiment_col not in df.columns:
        raise ValueError(f"Column '{experiment_col}' not found in Excel file.")

    # Ensure the mapped column exists
    if mapped_col not in df.columns:
        raise ValueError(f"Column '{mapped_col}' not found in Excel file.")

    # Ensure the index column exists
    if index_col not in df.columns:
        raise ValueError(f"Column '{index_col}' not found in Excel file.")

    # Iterate over each row and update mapped_output for license_plate_number
    for idx, row in df.iterrows():
        classification_results = row[experiment_col]
        # Defensive: handle stringified lists/dicts
        if isinstance(classification_results, str):
            try:
                import ast
                classification_results = ast.literal_eval(classification_results)
            except Exception:
                continue  # skip if cannot parse

        if not isinstance(classification_results, list):
            continue

        # Extract only license_plate_number entities
        license_plate_entities = [
            entity for entity in classification_results
            if isinstance(entity, dict) and entity.get("label", "").lower() == "license_plate_number"
        ]

        # Update the mapped_output column for this row
        df.at[idx, mapped_col] = str(license_plate_entities)

    # Save the updated DataFrame back to Excel
    df.to_excel(excel_path, index=False)

# Example usage:
# update_license_plate_number_in_excel("extra_entity.xlsx", "input_file.txt")


In [None]:
import json
f = open("/Users/nishanjain/Downloads/german.txt")
txt = f.read()
op = json.loads(txt)

In [None]:
f = open("/Users/nishanjain/Downloads/german_2.txt")
txt = f.read()
op =  op + json.loads(txt)

In [None]:
test_data_df.columns

In [None]:
txt_list = []
op_list = []
for o in op:
    txt_list.append(o["Text"])
    op_list.append(o["Entities"])


df = pd.DataFrame({"text": txt_list, "mapped_output": op_list})
df.to_excel("german.xlsx", index=False)


In [None]:
# Concatenate df and test_data_df, keeping only 'text' and 'mapped_output' columns
combined_df = pd.concat([
    df[["text", "mapped_output"]],
    test_data_df[["text", "mapped_output"]]
], ignore_index=True)


In [None]:
test_data_df = combined_df.copy()

In [None]:
len(test_data_df)

In [None]:
len(df)

In [None]:
classification_results, response_time = aggregate_classification_results(df, text_column="text")

In [None]:
df[f"response_time_{experiment_name}"] = response_time
df[f"classification_results_{experiment_name}"] = classification_results
#test_data_df.to_excel(input_excel, index=False)
experiment_time = time.time() - start_time
print("Done")

In [None]:
# Example usage:
# Assume test_data_df["mapped_output"] and classification_results are both lists of dicts per row
entity_metrics_df, macro_metrics = compute_entity_metrics_for_lists(
    df, actual_col=test_column, pred_col=f"classification_results_{experiment_name}"
)
# Sort the entity_metrics_df by f1 score in descending order for better interpretability
entity_metrics_df = entity_metrics_df.sort_values(by="f1", ascending=False).reset_index(drop=True)
#entity_metrics_df = entity_metrics_df[entity_metrics_df["f1"] > 0].reset_index(drop=True)
pd.set_option('display.max_colwidth', 200)
display(entity_metrics_df[["entity_type", "actual_count", "correct", "extra", "missed", "wrong", "accuracy", "precision", "recall", "f1"]])
print("Macro-averaged metrics across all entities:")
for k, v in macro_metrics.items():
    print(f"{k}: {v:.3f}")

In [None]:
extra_entity = get_rows_with_extra_entity(df, "german-tax-identification-number", "mapped_output", f"classification_results_{experiment_name}")
len(extra_entity)

In [None]:
import requests
BASE_URL = "http://localhost:8000"
def call_classification_api(text, mode="all", anonymize=False):
    """
    Call the Pebblo classification API
    
    Args:
        text (str): The text to classify
        mode (str): Classification mode - "all", "entity", or "topic"
        anonymize (bool): Whether to anonymize the results
    
    Returns:
        dict: The classification response
    """
    url = f"{BASE_URL.rstrip('/')}/api/v1/classify"
    
    # Prepare the request payload without llm_config
    payload = {
        "text": text,
        "anonymize": anonymize,
        "country_list": ["US"]
    }
    
    try:
        #print("payload", payload)
        response = requests.post(url, json=payload)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error making API request: {e}")
        return None

In [None]:
missed_rows = get_rows_with_missed_entity(df, 
                                        classification_results, 
                                        entity_label="german-identity-card-number")

In [None]:
op

In [None]:
text = """
RMEDICAL CERTIFICATE - Dr. med. Petra Hoffmann, Specialist in Internal Medicine. Practice: North Health Center, Hamburger Straße 89, 20459 Hamburg, Tel: +49 40 3847 2956. PATIENT DETAILS: Mrs. Maria Schneider, born 23.04.1978, residing at: Rosenstraße 34, 22301 Hamburg, Tel: +49 40 8394 5627. Identity Card: L32RX8H29D, Health Insurance: AOK Hamburg, Insurance Number: M847392654. DIAGNOSIS: Acute bronchitis (ICD-10: J20.9), first occurred on 18.10.2025. TREATMENT: Antibiotic therapy with Amoxicillin 500mg, three times daily for 7 days. Medical leave from 19.10.2025 to 26.10.2025 (8 calendar days). FOLLOW-UP: Control examination scheduled for 28.10.2025 at 2:30 PM. Return immediately if symptoms worsen. Doctor's tax details: Tax ID: 41 738 29 465 6, VAT ID: DE394728565. Patient's driver's license: H72M3N8V39C (temporary driving restriction due to medication). Hamburg, October 19, 2025. Dr. med. Petra Hoffmann, Physician."
 **Medical Loan Application**

Full Name: Reiner P. Misicher
Date of Birth: DD/MM/YYYY
National Insurance Number: MLNSPH31D16F536O
Contact Telephone Number: [+44] 1234567890
Email Address: r.misicher@example.com

Residential Address: 89, rue Auguste Lelièvre, London, N1 9AB, United Kingdom

Employment Information:
Employer's Name: St. Bartholomew's Hospital
Job Title: Consultant Cardiologist
Annual Income: £90,000

Medical History:
1. Chronic Condition: Hypertension - Diagnosed in 2010
2. Current Medications: Ramipril 5mg, Amlodipine 10mg
3. Allergies: No known allergies

Loan Information:
Loan Amount Requested: £20,000
Loan Purpose: Heart Valve Replacement Surgery
Estimated Treatment Cost: £25,000
Hospital: King's College Hospital, London
Tentative Surgery Date: 01/10/2023
Referring Physician: Dr. Sarah K. Johnson, Consultant Cardiothoracic Surgeon

Personal Declaration:
I, Reiner P. Misicher, hereby declare that all the information provided in this loan application is true and accurate to the best of my knowledge. I understand that any misrepresentation of facts may lead to the cancellation of the loan or legal consequences.

Signature: Reiner P. Misicher
Date: DD/MM/YYYY
Email Address: r.misicher@example.com
"""

In [None]:
index = 0
#text = missed_rows.iloc[index]["text"]
op = call_classification_api_2(text)
op

In [None]:
import requests

def call_classification_api_2(text, mode="all", anonymize=False):
    """
    Call the Pebblo classification API
    
    Args:
        text (str): The text to classify
        mode (str): Classification mode - "all", "entity", or "topic"
        anonymize (bool): Whether to anonymize the results
    
    Returns:
        dict: The classification response
    """
    url = f"{BASE_URL.rstrip('/')}/api/v1/classify"
    
    # Prepare the request payload without llm_config
    payload = {
        "text": text,
        "anonymize": anonymize,
        "country_list": ["US"]
    }
    
    try:
        #print("payload", payload)
        response = requests.post(url, json=payload)
        response.raise_for_status()
        return response.json()
    except requests.exceptions.RequestException as e:
        print(f"Error making API request: {e}")
        return None

In [1]:
from classifier.entity_classifier.entity_classifier import EntityClassifier

entity_classfier = EntityClassifier(countries=["US"])

  from .autonotebook import tqdm as notebook_tqdm
2025-11-14 10:32:43.955 - classifier.entity_classifier.core.loader - INFO - Entities config base dir override: None
2025-11-14 10:32:43.956 - classifier.entity_classifier.core.loader - INFO - Entities config default base dir: /Users/nishanjain/Desktop/iconect/classifier/myenv/lib/python3.12/site-packages/classifier/entity_classifier)
2025-11-14 10:32:44.989 - classifier.entity_classifier.core.loader - INFO - Entities config base dir override: None
2025-11-14 10:32:44.990 - classifier.entity_classifier.core.loader - INFO - Entities config default base dir: /Users/nishanjain/Desktop/iconect/classifier/myenv/lib/python3.12/site-packages/classifier/entity_classifier)


In [2]:
entity_classfier.entity_classifier_and_anonymizer("my ssn is 321-45-7891")

2025-11-14 10:34:21.106 - classifier.entity_classifier.analyzers.base_analyzer - INFO - use_llm is False - skipping LLM detection
2025-11-14 10:34:21.108 - classifier.entity_classifier.entity_classifier - INFO - analyzer_results [type: US_SSN, start: 10, end: 21, score: 0.85]


({'us-ssn': [{'location': '10_21',
    'confidence_score': 0.85,
    'entity_value': '321-45-7891',
    'start_index': 10,
    'end_index': 21}]},
 'my ssn is 321-45-7891')