In [11]:
import spacy
import re
import json
import os 
import pandas as pd
import random
from faker import Faker
from datetime import datetime, timedelta
from pathlib import Path
from spacy.matcher import Matcher

In [None]:
try:
    nlp = spacy.load("en_core_web_lg") # Large English model for for better accuracy
except OSError:
    print("SpaCy 'en_core_web_lg' model not found. Please ensure it's downloaded by running:")
    print("!python -m spacy download en_core_web_lg")
    raise # Re-raise the error to stop execution if model is crucial
print("SpaCy model 'en_core_web_lg' loaded successfully.")

SpaCy model 'en_core_web_lg' loaded successfully.


In [None]:
# --- Contextual Rules ---
# Define patterns to provide contextual clues for more accurate PII detection.
# This helps disambiguate words like "Will" as a name versus a verb.
CONTEXTUAL_RULES = {
    "HIGH_CONF_PERSON": [
        [{"LOWER": "mr"}, {"POS": "PROPN"}],  # "Mr. John"
        [{"LOWER": "dr"}, {"POS": "PROPN"}],  # "Dr. Smith"
        [{"TEXT": "my"}, {"TEXT": "name"}, {"TEXT": "is"}, {"POS": "PROPN", "OP": "+"}], # "my name is John Smith"
        [{"LOWER": {"IN": ["contact", "sent", "by"]}}, {"POS": "PROPN", "OP": "+"}] # "sent by Jane"
    ],
    "LOW_CONF_PERSON": [
        [{"TEXT": "Will"}, {"POS": "VERB", "OP": "?"}], # "We will finalize..."
        [{"POS": {"IN": ["AUX", "VERB"]}}, {"LOWER": "will"}]
    ],
    "HIGH_CONF_ORG": [
        [{"TEXT": "at"}, {"POS": "PROPN", "OP": "+"}, {"LOWER": "inc"}] # "at Acme Inc"
    ]
}

In [None]:
# PII Entity Labels
# Define PII categories that spaCy's NER typically detects and we want to anonymize.
PII_ENTITY_LABELS = [
    "PERSON",  # People, including fictional
    "NORP",    # Nationalities or religious or political groups
    "FAC",     # Buildings, airports, highways, bridges, etc.
    "ORG",     # Companies, agencies, institutions, etc.
    "GPE",     # Countries, cities, states
    "LOC",     # Non-GPE locations, mountain ranges, bodies of water
    "PRODUCT", # Objects, vehicles, foods, etc. (often proprietary names)
    "EVENT",   # Named hurricanes, battles, wars, sports events, etc.
    "WORK_OF_ART", # Titles of books, songs, etc.
    "LAW",     # Named documents enacted into laws
    "LANGUAGE",# Any named language
    "DATE",    # Absolute or relative dates or periods
    "TIME",    # Times smaller than a day
    "PERCENT", # Percentage, including "%"
    "MONEY",   # Monetary values, including unit
    "QUANTITY",# Measurements, as of weight or distance
    "ORDINAL", # "first", "second", etc.
    "CARDINAL" # Numerals that do not fall under other types
]

# --- Regular Expressions for PII ---
# These patterns catch common PII that may be missed by NER.
# Define regex patterns for structured PII (email, phone, SSN, IP)
PII_REGEX_PATTERNS = {
    "EMAIL": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
    "PHONE": r"(\+\d{1,2}\s?)?\(?\d{3}\)?[\s.-]?\d{3}[\s.-]?\d{4}\b",
    "SSN": r"\b\d{3}[-]?\d{2}[-]?\d{4}\b", # Simplified, US-specific SSN
    "IP_ADDRESS": r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b"
}

In [15]:
# --- Custom/Override PII List (High Priority) ---
# Add specific words/phrases that should ALWAYS be classified as a certain PII type.
# This overrides spaCy's default behavior for these exact matches.
CUSTOM_PII_OVERRIDES = [
    {"text": "Shashank", "type": "PERSON"},
    {"text": "Google", "type": "ORG"},
    {"text": "Apple", "type": "ORG"},
    {"text": "Dr. Smith", "type": "PERSON"},
    # --- ADDED FOR INDIAN/ASIAN NAMES ---
    {"text": "Rajesh Sharma", "type": "PERSON"},
    {"text": "Priyanka Singh", "type": "PERSON"},
    {"text": "Ananya Das", "type": "PERSON"},
    {"text": "Sameer Khan", "type": "PERSON"},
    {"text": "Vikram Patel", "type": "PERSON"},
    {"text": "Anjali Singh", "type": "PERSON"},
    {"text": "Amit Kumar", "type": "PERSON"},
    {"text": "Neha Sharma", "type": "PERSON"},
    {"text": "Rohan Gupta", "type": "PERSON"},
    {"text": "Sushma Reddy", "type": "PERSON"},
    {"text": "Gaurav Singh", "type": "PERSON"},
    {"text": "Divya Rao", "type": "PERSON"},
    {"text": "Aisha Rahman", "type": "PERSON"},
    {"text": "Kenji Tanaka", "type": "PERSON"}, # Japanese name
    {"text": "Li Wei", "type": "PERSON"},      # Chinese name
    {"text": "Maria Santos", "type": "PERSON"}, # Filipino name, common in Asia
    {"text": "Kim Min-jun", "type": "PERSON"}, # Korean name
]

In [16]:
# --- Core Anonymization Logic (REDESIGNED with Context Awareness) ---
def anonymize_text(text: str) -> (str, list):
    """
    Detects PII with context-aware scoring, resolves overlaps,
    and then anonymizes the text efficiently.
    This version fixes the coordinate system bug that caused False Negatives.
    """
    original_text_str = text
    anonymized_text_chars = list(text)
    
    # 1. Collect All Raw Detections with a Confidence Score
    doc = nlp(original_text_str)
    raw_detections = []

    # a. Custom Overrides (Highest Priority & Confidence)
    for custom_pii_entry in CUSTOM_PII_OVERRIDES:
        target_text = custom_pii_entry["text"]
        target_type = custom_pii_entry["type"]
        for match in re.finditer(re.escape(target_text), original_text_str):
            raw_detections.append({
                "original_text_segment": original_text_str[match.start():match.end()],
                "type": target_type,
                "method": "Custom",
                "start": match.start(),
                "end": match.end(),
                "confidence": 1.0
            })

    # b. SpaCy NER Detections (Base Confidence)
    for ent in doc.ents:
        if ent.label_ in PII_ENTITY_LABELS:
            confidence = 0.8
            raw_detections.append({
                "original_text_segment": original_text_str[ent.start_char:ent.end_char],
                "type": ent.label_,
                "method": "NER",
                "start": ent.start_char,
                "end": ent.end_char,
                "confidence": confidence
            })

    # c. Apply Contextual Rules to adjust confidence of NER detections
    matcher = Matcher(nlp.vocab)
    for pii_type, patterns in CONTEXTUAL_RULES.items():
        matcher.add(pii_type, patterns)
    
    matches = matcher(doc)
    for match_id, start, end in matches:
        span = doc[start:end]
        match_type = nlp.vocab.strings[match_id]
        
        for det in raw_detections:
            if span.start_char >= det['start'] and span.end_char <= det['end']:
                if match_type.startswith("HIGH_CONF"):
                    det['confidence'] = 0.95
                    det['method'] += "_Context"
                elif match_type.startswith("LOW_CONF"):
                    det['confidence'] = 0.05
                    det['method'] += "_Context"

    # d. Regex Detections (Base Confidence)
    for pii_type, pattern in PII_REGEX_PATTERNS.items():
        for match in re.finditer(pattern, original_text_str):
            confidence = 0.9
            raw_detections.append({
                "original_text_segment": original_text_str[match.start():match.end()],
                "type": pii_type,
                "method": "Regex",
                "start": match.start(),
                "end": match.end(),
                "confidence": confidence
            })

    # e. Filter detections based on a confidence threshold
    CONFIDENCE_THRESHOLD = 0.7
    filtered_detections = [det for det in raw_detections if det['confidence'] >= CONFIDENCE_THRESHOLD]

    # 2. Resolve Overlaps and Prioritize Final Detections
    def sort_key_for_resolution(d):
        return (-d['confidence'], d['start'], -(d['end'] - d['start']))

    filtered_detections.sort(key=sort_key_for_resolution)
    
    anonymized_flags = [False] * len(original_text_str)
    final_anonymization_plan = []

    for det in filtered_detections:
        is_already_covered = False
        for i in range(det['start'], det['end']):
            if i >= len(anonymized_flags) or anonymized_flags[i]:
                is_already_covered = True
                break
        
        if not is_already_covered:
            final_anonymization_plan.append(det)
            for i in range(det['start'], det['end']):
                if i < len(anonymized_flags):
                    anonymized_flags[i] = True
    
    # 3. Perform Anonymization on the Mutable Character List
    final_anonymization_plan.sort(key=lambda x: x['start'], reverse=True)

    pii_value_to_placeholder = {}
    pii_type_counters = {label: 0 for label in PII_ENTITY_LABELS}
    for regex_type in PII_REGEX_PATTERNS.keys():
        if regex_type not in pii_type_counters:
            pii_type_counters[regex_type] = 0

    logged_detections = []
    for det in final_anonymization_plan:
        start_char, end_char = det['start'], det['end']
        original_span_val = det['original_text_segment']
        pii_type_val = det['type']
        method_val = det['method']
        
        placeholder = ""
        if method_val in ["Custom", "NER", "NER_Context"]:
            pii_key = f"{pii_type_val}_{original_span_val.strip()}"
            if pii_key not in pii_value_to_placeholder:
                pii_type_counters[pii_type_val] += 1
                placeholder = f"{pii_type_val}_{pii_type_counters[pii_type_val]}"
                pii_value_to_placeholder[pii_key] = placeholder
            else:
                placeholder = pii_value_to_placeholder[pii_key]
        else:
            placeholder = f"{pii_type_val}_REDACTED"
            
        anonymized_text_chars[start_char:end_char] = list(placeholder)
        logged_detections.append({
            "original_text": original_span_val,
            "anonymized_type": pii_type_val,
            "method": method_val,
            "start_char": start_char,
            "end_char": end_char,
            "anonymized_placeholder": placeholder,
            "confidence": det.get("confidence")
        })
        
    return "".join(anonymized_text_chars), logged_detections


In [17]:
# --- File Handling and Execution Logic for Notebook ---

def run_anonymizer_notebook(input_filepath: str, output_filepath: str, log_filepath: str = None):
    """
    Function to run the anonymization process within a Jupyter Notebook.

    Args:
        input_filepath (str): Path to the input text or CSV file.
        output_filepath (str): Path for the anonymized output file.
        log_filepath (str, optional): Path for the JSON log of detections.
    """
    input_file = Path(input_filepath)
    output_file = Path(output_filepath)
    log_file = Path(log_filepath) if log_filepath else None

    if not input_file.exists():
        print(f"Error: Input file '{input_file}' not found.")
        return

    file_extension = input_file.suffix.lower()
    content_to_anonymize = ""

    print(f"Processing '{input_file}'...")

    try:
        if file_extension == ".txt":
            content_to_anonymize = input_file.read_text(encoding="utf-8")
        elif file_extension == ".csv":
            content_to_anonymize = input_file.read_text(encoding="utf-8")
        else:
            print(f"Error: Unsupported file type '{file_extension}'. Only .txt and .csv are supported in this version.")
            return
    except Exception as e:
        print(f"Error reading input file: {e}")
        return

    # Perform anonymization
    anonymized_content, detections = anonymize_text(content_to_anonymize)

    # Write output
    try:
        output_file.write_text(anonymized_content, encoding="utf-8")
        print(f"Anonymized content saved to '{output_file}'")
    except Exception as e:
        print(f"Error writing output file: {e}")
        return

    # Write log if requested
    if log_file:
        try:
            with open(log_file, 'w', encoding="utf-8") as f:
                json.dump(detections, f, indent=4)
            print(f"Anonymization log saved to '{log_file}'")
        except Exception as e:
            print(f"Error writing log file: {e}")

    print(f"\n--- Anonymization Summary ---")
    if detections:
        print(f"Total PII entities detected and anonymized: {len(detections)}")
        print("Detected entities (first 5 shown, see log for full list):")
        for i, det in enumerate(detections[:5]):
            print(f"  - Type: {det['anonymized_type']} (Method: {det['method']})")
            print(f"    Original: '{det['original_text']}'")
            print(f"    Replaced with: '{det['anonymized_placeholder']}'")
            if i < len(detections) - 1:
                print("---")
    else:
        print("No PII entities detected.")

In [None]:
# # --- Example 1: Anonymize a text file ---
# input_text_file = "sample_input.txt"
# output_text_file = "anonymized_text_output.txt"
# log_text_file = "anonymization_text_log.json"

# print("--- Running Anonymizer for Text File ---")
# run_anonymizer_notebook(input_text_file, output_text_file, log_text_file)

# --- Anonymize a CSV file ---
input_file = "final_dataset.csv"
output_file = "anonymized_output.csv"
log_file = "anonymization_log.json"

print("\n--- Running Anonymizer for CSV File ---")
run_anonymizer_notebook(input_file, output_file, log_file)


--- Running Anonymizer for CSV File ---
Processing 'final_dataset.csv'...
Anonymized content saved to 'anonymized_output.csv'
Anonymization log saved to 'anonymization_log.json'

--- Anonymization Summary ---
Total PII entities detected and anonymized: 40489
Detected entities (first 5 shown, see log for full list):
  - Type: GPE (Method: NER)
    Original: 'St, South'
    Replaced with: 'GPE_1'
---
  - Type: EMAIL (Method: Regex)
    Original: 'mohammed.al-hamdani@mail.net'
    Replaced with: 'EMAIL_REDACTED'
---
  - Type: PERSON (Method: NER)
    Original: 'Mohammed Al-Hamdani'
    Replaced with: 'PERSON_1'
---
  - Type: EMAIL (Method: Regex)
    Original: 'fatima.al-amri@sample.co'
    Replaced with: 'EMAIL_REDACTED'
---
  - Type: PERSON (Method: NER)
    Original: 'Fatima Al-Amri'
    Replaced with: 'PERSON_2'
---


In [9]:
# --- Evaluation Metrics ---
# Helper function to merge overlapping intervals (spans)
def merge_intervals(intervals):
    """
    Merges a list of [start, end] intervals (spans) into a minimal set of non-overlapping intervals.
    Used to consolidate all PII spans detected by different methods into a single 'ground truth' reference.
    """
    if not intervals:
        return []
    intervals.sort(key=lambda x: x[0])
    merged = []
    for interval in intervals:
        if not merged or interval[0] > merged[-1][1]:
            merged.append(list(interval))
        else:
            merged[-1][1] = max(merged[-1][1], interval[1])
    return [tuple(m) for m in merged]

def evaluate_anonymization_with_f1(original_text_path: str, anonymized_text_path: str, log_filepath: str):
    """
    Evaluates the anonymization process by comparing the original text with the anonymized text
    and analyzing the detection log, providing Precision, Recall, and F1-score.

    Args:
        original_text_path (str): Path to the original input text file.
        anonymized_text_path (str): Path to the anonymized output file.
        log_filepath (str): Path to the JSON log file of detections.
    """
    try:
        original_text = Path(original_text_path).read_text(encoding="utf-8")
        anonymized_text = Path(anonymized_text_path).read_text(encoding="utf-8")
        with open(log_filepath, 'r', encoding="utf-8") as f:
            detections_log = json.load(f)
    except FileNotFoundError as e:
        print(f"Error: Required file not found for evaluation: {e}")
        return
    except json.JSONDecodeError as e:
        print(f"Error reading log file (JSON format issue): {e}")
        return
    except Exception as e:
        print(f"An unexpected error occurred during file reading for evaluation: {e}")
        return

    print("\n--- Anonymization Evaluation (with P/R/F1) ---")

    # 1. Determine "Ground Truth" (based on our system's capabilities on original text)
    # This serves as the reference for what our system *should* detect.
    doc_original = nlp(original_text)
    potential_pii_spans_raw = []

    # Add NER entities from original text that match our PII_ENTITY_LABELS
    for ent in doc_original.ents:
        if ent.label_ in PII_ENTITY_LABELS:
            potential_pii_spans_raw.append((ent.start_char, ent.end_char))

    # Add Regex matches from original text
    for pii_type, pattern in PII_REGEX_PATTERNS.items():
        for match in re.finditer(pattern, original_text):
            potential_pii_spans_raw.append((match.start(), match.end()))

    # Merge overlapping spans to get a canonical set of PII regions
    merged_potential_pii_spans = merge_intervals(potential_pii_spans_raw)
    total_potential_pii_count = len(merged_potential_pii_spans)

    # 2. Extract Detections Made by the Anonymizer from the Log
    logged_detection_spans = [(d['start_char'], d['end_char']) for d in detections_log]
    
    # 3. Calculate True Positives (TP), False Positives (FP), False Negatives (FN)
    #    Based on the "Ground Truth" as defined by `merged_potential_pii_spans`

    # Initialize sets to track matched spans to avoid double counting
    matched_potential_pii_spans = set() # Spans from merged_potential_pii_spans that were detected by a logged item
    matched_logged_detection_spans = set() # Spans from logged_detection_spans that correctly overlapped a potential PII

    # Determine overlaps between logged detections and potential PII
    for i, (logged_s, logged_e) in enumerate(logged_detection_spans):
        found_overlap_for_logged_detection = False 
        for j, (potential_s, potential_e) in enumerate(merged_potential_pii_spans):
            # Check for any overlap. `max(start1, start2) < min(end1, end2)` indicates an overlap.
            if max(logged_s, potential_s) < min(logged_e, potential_e):
                matched_potential_pii_spans.add((potential_s, potential_e))
                matched_logged_detection_spans.add((logged_s, logged_e))
                found_overlap_for_logged_detection = True
                break 
    
    # TP (True Positives for Detection): Number of potential PII spans that were successfully detected and logged.
    # This represents how many items from our system's "ground truth" were actually captured.
    tp_detections = len(matched_potential_pii_spans)

    # FP (False Positives for Detection): Number of logged detections that did NOT overlap with any potential PII span.
    # This indicates cases where our anonymizer detected something that wasn't considered PII by our own rules.
    fp_detections = len(logged_detection_spans) - len(matched_logged_detection_spans) 

    # FN (False Negatives for Detection): Number of potential PII spans that were NOT detected by any logged detection.
    # This indicates PII that our rules *could* find, but our anonymizer missed.
    fn_detections = total_potential_pii_count - tp_detections

    print(f"Total potential PII in original text (based on our rules): {total_potential_pii_count}")
    print(f"Total PII entities logged by anonymizer: {len(logged_detection_spans)}")
    print(f"TP (Correctly Detected PII): {tp_detections}")
    print(f"FP (Incorrectly Detected as PII): {fp_detections}")
    print(f"FN (Missed PII): {fn_detections}")

    # Calculate Precision, Recall, F1 Score for the DETECTION phase
    precision = tp_detections / (tp_detections + fp_detections) if (tp_detections + fp_detections) > 0 else 0.0
    recall = tp_detections / (tp_detections + fn_detections) if (tp_detections + fn_detections) > 0 else 0.0
    f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0.0

    print(f"\nPrecision (of Detection): {precision:.4f}")
    print(f"Recall (of Detection):   {recall:.4f}")
    print(f"F1-Score (of Detection): {f1_score:.4f}")

    # --- REVISED ANONYMIZATION EFFECTIVENESS RATE CALCULATION ---
    # The previous check `original_span_in_log not in anonymized_text` was misleading
    # because it checked the *entire document* for the original string,
    # which might still be present if it appeared multiple times or in non-PII contexts.
    #
    # Given the design of `anonymize_text` (mutable list + reverse iteration),
    # if an item is counted in `tp_detections` (i.e., it was a valid PII detection
    # that our system logged), it implies the replacement *was executed* for that specific span.
    #
    # Thus, the 'success rate' of *replacing what was detected* should be high if not 100%.
    # We will report on the proportion of logged detections that were True Positives.

    successfully_removed_logged_items = tp_detections # Count of logged items that were valid PII (True Positives)

    print(f"\nAnonymization Effectiveness (of logged detections):")
    print(f"  {successfully_removed_logged_items} out of {len(logged_detection_spans)} detected items correspond to valid PII (True Positives).")
    
    # This metric now tells us: Out of everything we *said* was PII and logged, how many were actually correct detections?
    # Which is very close to your Precision of Detection, but reframed as 'effectiveness of handling logged items'.
    anonymization_effectiveness_rate = successfully_removed_logged_items / len(logged_detection_spans) if len(logged_detection_spans) > 0 else 0.0
    print(f"  Effectiveness Rate (TPs among all Logged): {anonymization_effectiveness_rate:.4f}")

    # --- Manual Review Guidance ---
    # The original "To check for False Positives..." guidance (commented out) can be re-added
    # if desired, but for now we focus on the core metrics.
    print("\nTo check for False Negatives (actual PII missed by the anonymizer):")
    print("  - Compare the 'original_input.txt' (or .csv) with the 'anonymized_output.txt' (or .csv) line by line.")
    print("  - Look for any remaining PII in the anonymized file that should have been redacted/pseudonymized.")
    print("  - The calculated FN (Missed PII) above quantifies this based on what our rules *could* find.")
    
    # Simple check for any remaining known PII patterns in the anonymized text (potential false negatives missed by replacement)
    potential_false_negatives_regex_in_anonymized = []
    for pii_type, pattern in PII_REGEX_PATTERNS.items():
        for match in re.finditer(pattern, anonymized_text):
            potential_false_negatives_regex_in_anonymized.append(f"'{match.group(0)}' (Type: {pii_type})")
            
    if potential_false_negatives_regex_in_anonymized:
        print(f"\n  - Warning: Some original regex PII patterns found remaining in anonymized text:")
        for item in potential_false_negatives_regex_in_anonymized[:5]: # Show first 5
            print(f"    - {item}")
        if len(potential_false_negatives_regex_in_anonymized) > 5:
            print(f"    ...and {len(potential_false_negatives_regex_in_anonymized) - 5} more.")
    else:
        print("  - No obvious remaining regex PII patterns found in anonymized text.")



In [10]:
# print("\n" + "="*50)
# print("Initiating Evaluation for Text File Anonymization")
# print("="*50)
# evaluate_anonymization_with_f1(input_text_file, output_text_file, log_text_file)

print("\n" + "="*50)
print("Initiating Evaluation for CSV File Anonymization")
print("="*50)
evaluate_anonymization_with_f1(input_file, output_file, log_file)


Initiating Evaluation for CSV File Anonymization

--- Anonymization Evaluation (with P/R/F1) ---
Total potential PII in original text (based on our rules): 40317
Total PII entities logged by anonymizer: 40420
TP (Correctly Detected PII): 40317
FP (Incorrectly Detected as PII): 1
FN (Missed PII): 0

Precision (of Detection): 1.0000
Recall (of Detection):   1.0000
F1-Score (of Detection): 1.0000

Anonymization Effectiveness (of logged detections):
  40317 out of 40420 detected items correspond to valid PII (True Positives).
  Effectiveness Rate (TPs among all Logged): 0.9975

To check for False Negatives (actual PII missed by the anonymizer):
  - Compare the 'original_input.txt' (or .csv) with the 'anonymized_output.txt' (or .csv) line by line.
  - Look for any remaining PII in the anonymized file that should have been redacted/pseudonymized.
  - The calculated FN (Missed PII) above quantifies this based on what our rules *could* find.
  - No obvious remaining regex PII patterns found i

In [24]:
# --- Function to Check for Undetected PII in Anonymized Output (False Negatives) ---

def check_undetected_pii_in_output(original_filepath: str, anonymized_filepath: str, pii_regex_patterns: dict):
    """
    Checks the anonymized output file for any remaining PII patterns defined by regex.
    This helps identify False Negatives where PII was missed during anonymization.

    Args:
        original_filepath (str): Path to the original input text/CSV file.
        anonymized_filepath (str): Path to the anonymized output text/CSV file.
        pii_regex_patterns (dict): Dictionary of PII types and their regex patterns.
    """
    print(f"\n--- Checking for Undetected PII (False Negatives) in '{Path(anonymized_filepath).name}' ---")

    try:
        original_text = Path(original_filepath).read_text(encoding="utf-8")
        anonymized_text = Path(anonymized_filepath).read_text(encoding="utf-8")
    except FileNotFoundError as e:
        print(f"Error: File not found for checking undetected PII: {e}")
        return
    except Exception as e:
        print(f"An unexpected error occurred while reading files: {e}")
        return

    undetected_pii_found = False
    all_undetected_details = []

    # Check for remaining regex patterns in the anonymized text
    for pii_type, pattern in pii_regex_patterns.items():
        matches = list(re.finditer(pattern, anonymized_text))
        if matches:
            undetected_pii_found = True
            print(f"  - WARNING: '{pii_type}' patterns found remaining in anonymized text:")
            for match in matches:
                span_start, span_end = match.span()
                matched_text_in_anonymized = match.group(0)
                
                # --- IMPORTANT NOTE/CLARIFICATION FOR original_text_at_shifted_span ---
                # The span_start and span_end are relative to `anonymized_text` because
                # the regex search `re.finditer(pattern, anonymized_text)` runs on `anonymized_text`.
                # If `anonymized_text` has undergone length changes (due to earlier anonymizations),
                # then these spans will NOT correctly map back to the 'original_text' at the *same characters*.
                # We're showing original_text[span_start:span_end] to give *some* context from original,
                # but be aware its exact content might be misaligned if prior text was changed in length.
                original_text_at_shifted_span = original_text[span_start:span_end] 
                # --- END NOTE ---

                # Try to get surrounding context for better review
                context_start = max(0, span_start - 20)
                context_end = min(len(anonymized_text), span_end + 20)
                anonymized_context_raw = anonymized_text[context_start:context_end]
                
                display_context = anonymized_context_raw.replace('\n', ' ').strip()
                highlighted_display_context = display_context.replace(matched_text_in_anonymized, f'>>>{matched_text_in_anonymized}<<<')

                detail = {
                    "type": pii_type,
                    "matched_text_in_anonymized": matched_text_in_anonymized,
                    "original_text_at_shifted_span": original_text_at_shifted_span, # Renamed for clarity
                    "start_char_in_anonymized": span_start,
                    "end_char_in_anonymized": span_end,
                    "anonymized_context": display_context
                }
                all_undetected_details.append(detail)
                
                print(f"    -> Type: {pii_type}, Text: '{matched_text_in_anonymized}' (from original at shifted span: '{original_text_at_shifted_span}')")
                print(f"       Context (anonymized): '...{highlighted_display_context}...'")
                print("-" * 50)

    if not undetected_pii_found:
        print("  No obvious remaining regex PII patterns found. (Good!)")
    else:
        print(f"\nSummary: {len(all_undetected_details)} instances of regex-definable PII were found in the anonymized output.")
        print("Please review these carefully, as they indicate False Negatives.")
        # Optionally save to a file for detailed review
        # with open(f"{Path(anonymized_filepath).stem}_undetected_pii.json", 'w', encoding='utf-8') as f:
        #     json.dump(all_undetected_details, f, indent=4)
        # print(f"Detailed undetected PII log saved to {Path(anonymized_filepath).stem}_undetected_pii.json")

    print(f"--- Finished checking '{Path(anonymized_filepath).name}' ---")

In [25]:
# Call for Text File
#check_undetected_pii_in_output(input_text_file, output_text_file, PII_REGEX_PATTERNS)

# Call for CSV File
check_undetected_pii_in_output(input_file, output_file, PII_REGEX_PATTERNS)


--- Checking for Undetected PII (False Negatives) in 'anonymized_output1.csv' ---
  No obvious remaining regex PII patterns found. (Good!)
--- Finished checking 'anonymized_output1.csv' ---


In [26]:
# --- Function for Re-Anonymization of Missed PII (Second Pass for FNs) ---

def re_anonymize_missed_pii(
    input_anonymized_filepath: str,
    output_re_anonymized_filepath: str,
    pii_regex_patterns: dict,
):
    """
    Performs a second anonymization pass specifically to mask PII that was
    missed in the first pass, focusing on regex-definable patterns.

    Args:
        input_anonymized_filepath (str): Path to the output file from the first anonymization pass.
        output_re_anonymized_filepath (str): Path where the re-anonymized output will be saved.
        pii_regex_patterns (dict): Dictionary of PII types and their regex patterns to re-check.
    
    Returns:
        tuple: A tuple containing:
            - str: The re-anonymized text.
            - list: A list of dictionaries describing the newly detected and re-anonymized PII.
    """
    print(f"\n--- Initiating Re-Anonymization Pass for '{Path(input_anonymized_filepath).name}' ---")
    print(f"  Input file for re-masking: '{input_anonymized_filepath}'")
    print(f"  Output file for re-masked content: '{output_re_anonymized_filepath}'")

    try:
        current_anonymized_text = Path(input_anonymized_filepath).read_text(encoding="utf-8")
        print(f"  Successfully read {len(current_anonymized_text)} characters from input for re-masking.")
    except FileNotFoundError as e:
        print(f"ERROR: Input anonymized file not found for re-masking: {e}")
        return "", []
    except Exception as e:
        print(f"ERROR: An unexpected error occurred while reading the anonymized file for re-masking: {e}")
        return "", []

    re_anonymized_text_chars = list(current_anonymized_text) # Mutable list for re-masking
    newly_masked_detections = [] # Log for this second pass

    # Collect new detections and prepare for re-anonymization
    potential_new_masks = []
    print("  Searching for missed regex PII in the current text for re-masking...")
    for pii_type, pattern in pii_regex_patterns.items():
        for match in re.finditer(pattern, current_anonymized_text):
            # These are the PII that were missed by the first pass's overlap logic
            # and are still present in the text.
            potential_new_masks.append({
                "original_text_segment": match.group(0), # What was actually found in the current (anonymized) text
                "type": pii_type,
                "method": "Regex_Remask", # Indicate it's from the second pass
                "start": match.start(),
                "end": match.end()
            })
    
    # Sort these new detections in reverse order of their start positions
    # to perform in-place modification correctly.
    potential_new_masks.sort(key=lambda x: x['start'], reverse=True)

    print(f"  Found {len(potential_new_masks)} new PII instances to re-mask in this pass.")

    if not potential_new_masks:
        print("  No further regex-definable PII found to re-mask. (Pass not needed)")
        # If no changes needed, just save the content to the new output path
        Path(output_re_anonymized_filepath).write_text(current_anonymized_text, encoding="utf-8")
        print(f"  Original anonymized content saved to '{output_re_anonymized_filepath}' (no new masks applied).")
        return current_anonymized_text, newly_masked_detections

    for det in potential_new_masks:
        start_char, end_char = det['start'], det['end']
        pii_type_val = det['type']
        
        # For re-masking, we'll always use redaction.
        placeholder = f"{pii_type_val}_REDACTED_REMASKED" # Clear indicator it was a second pass mask
            
        # Perform the in-place replacement
        re_anonymized_text_chars[start_char:end_char] = list(placeholder)

        # Log the details of this re-masking
        newly_masked_detections.append({
            "original_text": det['original_text_segment'],
            "anonymized_type": pii_type_val,
            "method": "Regex_Remask",
            "start_char": start_char,
            "end_char": end_char,
            "anonymized_placeholder": placeholder
        })
    
    re_anonymized_content = "".join(re_anonymized_text_chars)

    try:
        Path(output_re_anonymized_filepath).write_text(re_anonymized_content, encoding="utf-8")
        print(f"  Successfully wrote {len(re_anonymized_content)} characters to '{output_re_anonymized_filepath}'")
    except Exception as e:
        print(f"ERROR: Failed to write re-anonymized output file: {e}")

    print(f"--- Re-masking pass complete. {len(newly_masked_detections)} items re-masked. ---")
    return re_anonymized_content, newly_masked_detections

In [27]:
# --- Execute Re-Anonymization of Missed PII ---

# Define new output paths for the re-anonymized files
re_anonymized_text_file = "re_anonymized_text_output.txt"
re_anonymization_text_log_file = "re_anonymization_text_log.json"

re_anonymized_csv_file = "re_anonymized_csv_output.csv"
re_anonymization_csv_log_file = "re_anonymization_csv_log.json"

# --- Re-anonymize Text File ---
# re_masked_text_content, re_masked_text_detections = re_anonymize_missed_pii(
#     input_anonymized_filepath=output_text_file, # Output from first pass
#     output_re_anonymized_filepath=re_anonymized_text_file,
#     pii_regex_patterns=PII_REGEX_PATTERNS
# )
# Save the log for the re-masked items
# if re_masked_text_detections:
#     with open(re_anonymization_text_log_file, 'w', encoding="utf-8") as f:
#         json.dump(re_masked_text_detections, f, indent=4)
#     print(f"Re-anonymization log for text saved to '{re_anonymization_text_log_file}'")

# --- Re-anonymize CSV File ---
re_masked_csv_content, re_masked_csv_detections = re_anonymize_missed_pii(
    input_anonymized_filepath=output_file, # Output from first pass
    output_re_anonymized_filepath=re_anonymized_csv_file,
    pii_regex_patterns=PII_REGEX_PATTERNS
)
# Save the log for the re-masked items
if re_masked_csv_detections:
    with open(re_anonymization_csv_log_file, 'w', encoding="utf-8") as f:
        json.dump(re_masked_csv_detections, f, indent=4)
    print(f"Re-anonymization log for CSV saved to '{re_anonymization_csv_log_file}'")

# --- OPTIONAL: Re-run the Undetected PII Check (from the original script)
#    on the *newly re-anonymized* files to confirm the fix.
#    You would need the check_undetected_pii_in_output function defined earlier.
#
# print("\n--- Verifying Re-Anonymized Text File for Residual PII ---")
# check_undetected_pii_in_output(input_text_file, re_anonymized_text_file, PII_REGEX_PATTERNS)
#
print("\n--- Verifying Re-Anonymized CSV File for Residual PII ---")
check_undetected_pii_in_output(output_file, re_anonymized_csv_file, PII_REGEX_PATTERNS)


--- Initiating Re-Anonymization Pass for 'anonymized_output1.csv' ---
  Input file for re-masking: 'anonymized_output1.csv'
  Output file for re-masked content: 're_anonymized_csv_output.csv'
  Successfully read 19342 characters from input for re-masking.
  Searching for missed regex PII in the current text for re-masking...
  Found 0 new PII instances to re-mask in this pass.
  No further regex-definable PII found to re-mask. (Pass not needed)
  Original anonymized content saved to 're_anonymized_csv_output.csv' (no new masks applied).

--- Verifying Re-Anonymized CSV File for Residual PII ---

--- Checking for Undetected PII (False Negatives) in 're_anonymized_csv_output.csv' ---
  No obvious remaining regex PII patterns found. (Good!)
--- Finished checking 're_anonymized_csv_output.csv' ---


In [28]:
# --- Execute Secondary Re-Anonymization Pass and Verification ---
re_anonymized_csv_file = "re_anonymized_csv_output.csv"
re_anonymization_csv_log_file = "re_anonymization_csv_log.json" # Log for items masked in second pass

print("\n" + "="*50)
print("STAGE 1: Executing Secondary Re-Anonymization Pass")
print("="*50)

# --- Re-anonymize Text File ---
# re_masked_text_content, re_masked_text_detections = re_anonymize_missed_pii(
#     input_anonymized_filepath=output_text_file, # Input is the output from the first pass
#     output_re_anonymized_filepath=re_anonymized_text_file,
#     pii_regex_patterns=PII_REGEX_PATTERNS
# )
# if re_masked_text_detections:
#     with open(re_anonymization_text_log_file, 'w', encoding="utf-8") as f:
#         json.dump(re_masked_text_detections, f, indent=4)
#     print(f"  Re-anonymization log for text saved to '{re_anonymization_text_log_file}'")

# --- Re-anonymize CSV File ---
re_masked_csv_content, re_masked_csv_detections = re_anonymize_missed_pii(
    input_anonymized_filepath=output_file, # Input is the output from the first pass
    output_re_anonymized_filepath=re_anonymized_csv_file,
    pii_regex_patterns=PII_REGEX_PATTERNS
)
if re_masked_csv_detections:
    with open(re_anonymization_csv_log_file, 'w', encoding="utf-8") as f:
        json.dump(re_masked_csv_detections, f, indent=4)
    print(f"  Re-anonymization log for CSV saved to '{re_anonymization_csv_log_file}'")


# --- Perform Checks for Undetected PII on the NEWLY RE-ANONYMIZED FILES ---
print("\n" + "="*50)
print("STAGE 2: Verifying Re-Anonymized Files for Residual PII (False Negatives)")
print("="*50)

# --- Verifying Re-Anonymized Text File ---
# Pass the *output of the re-masking pass* to the check function
# (This assumes check_undetected_pii_in_output is defined in a prior cell)
# print("\n--- Checking Re-Anonymized Text File ---")
# check_undetected_pii_in_output(input_text_file, re_anonymized_text_file, PII_REGEX_PATTERNS)

# --- Verifying Re-Anonymized CSV File ---
# Pass the *output of the re-masking pass* to the check function
print("\n--- Checking Re-Anonymized CSV File ---")
check_undetected_pii_in_output(input_file, re_anonymized_csv_file, PII_REGEX_PATTERNS)



STAGE 1: Executing Secondary Re-Anonymization Pass

--- Initiating Re-Anonymization Pass for 'anonymized_output1.csv' ---
  Input file for re-masking: 'anonymized_output1.csv'
  Output file for re-masked content: 're_anonymized_csv_output.csv'
  Successfully read 19342 characters from input for re-masking.
  Searching for missed regex PII in the current text for re-masking...
  Found 0 new PII instances to re-mask in this pass.
  No further regex-definable PII found to re-mask. (Pass not needed)
  Original anonymized content saved to 're_anonymized_csv_output.csv' (no new masks applied).

STAGE 2: Verifying Re-Anonymized Files for Residual PII (False Negatives)

--- Checking Re-Anonymized CSV File ---

--- Checking for Undetected PII (False Negatives) in 're_anonymized_csv_output.csv' ---
  No obvious remaining regex PII patterns found. (Good!)
--- Finished checking 're_anonymized_csv_output.csv' ---


Final Code ends here
