In [1]:
import re
import time
import json
import hashlib
import numpy as np
import spacy
from datetime import datetime
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from spacy.matcher import DependencyMatcher
from typing import List, Dict, Any, Set
import pandas as pd


In [10]:
# -*- coding: utf-8 -*-

import re
import time
import json
import hashlib
import numpy as np
import spacy
from datetime import datetime
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
from spacy.matcher import DependencyMatcher
from typing import List, Dict, Any, Set

# ==========================================
# 1. USER PROVIDED VALIDATORS (STRICT LOGIC)
# ==========================================
class Validators:
    IGNORED_FILE_EXTENSIONS = {
        'pdf', 'jpg', 'png', 'gif', 'doc', 'docx', 'xls', 'xlsx',
        'txt', 'zip', 'rar', 'exe', 'mp3', 'mp4', 'json', 'xml', 'js', 'py'
    }

    # Common non-PII words that match ID patterns (Rubbish filter)
    GERMAN_RUBBISH = {
        "kannst", "helfen", "beantworten", "verstopft", "freundlich", "formulieren", 
        "morgen", "zwischen", "kommen", "folgene", "reingekommen", "antwort", 
        "geehrte", "herren", "mitarbeiter", "leider", "melden", "nettes", 
        "brauche", "vorlage", "wohnhaft", "heisst", "nachricht", "dringend",
        "stunde", "vertrag", "tabelle", "eintrag", "arbeitge", "adresse"
    }

    SAFE_TLDS = {
        'com', 'net', 'org', 'info', 'biz', 'co', 'io', 'me', 'edu', 'gov', 'int', 'mil',
        'de', 'at', 'ch', 'eu', 'nl', 'fr', 'uk', 'be', 'dk', 'no', 'se', 'fi', 'pl', 'it', 'es',
        'app', 'dev', 'ai', 'cloud', 'tech', 'digital', 'studio', 'online', 'shop', 'store',
        'berlin', 'hamburg', 'koeln', 'bayern'
    }

    @staticmethod
    def normalize(text: str) -> str:
        """Removes spaces, dashes, dots, parens for validation."""
        return re.sub(r"[\s\-\./\(\)\\]", "", text)

    @staticmethod
    def fix_common_typos(text: str) -> str:
        """Fixes common OCR/Typo errors specifically for financial strings."""
        return text.upper().replace('O', '0').replace('I', '1').replace('S', '5')

    # --- PHONE VALIDATION ---
    @staticmethod
    def validate_phone(text: str) -> bool:
        if re.search(r"\d{2}\.\d{2}\.\-\d{2}\.\d{2}", text): return False
        if re.search(r"\d{1,2}[\.\/]\d{1,2}[\.\/]\d{2,4}", text): return False
        
        clean = text.lower().replace('o', '0').replace('l', '1')
        clean = re.sub(r"onal|abortel", "", clean)
        clean = re.sub(r"[\s\-\./\(\)\\]", "", clean)
        
        if len(clean) < 7 or len(clean) > 15: return False
        if len(clean) == 8 and ('.' in text or '/' in text):
            if re.search(r"[01]\d[\.\/][12]\d{3}", text) or re.search(r"[12]\d{3}[\.\/][01]\d", text):
                return False
        if len(set(clean)) <= 2 and len(clean) > 9: return False 
        if re.search(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", text): return False 
        return True

    # --- EMAIL VALIDATION ---
    @staticmethod
    def validate_email(text: str) -> bool:
        if "@" not in text: return False
        parts = text.split('.')
        return len(parts[-1]) >= 2 and len(parts[-1]) <= 8

    # --- URL VALIDATION ---
    @staticmethod
    def validate_url(text: str) -> bool:
        if "@" in text: return False
        if text.lower().startswith(('http:', 'https:', 'www.')): return True
        domain_part = text.split('/')[0]
        parts = domain_part.split('.')
        if len(parts) < 2: return False
        valid_tld = any(part.lower() in Validators.SAFE_TLDS for part in parts[-2:])
        if not valid_tld: return False
        last = parts[-1].lower()
        return not (len(last) < 2 or last.isdigit() or last in Validators.IGNORED_FILE_EXTENSIONS)

    # --- IBAN VALIDATION ---
    @staticmethod
    def validate_iban(text: str) -> bool:
        def check_sum(clean_text):
            if "ZZZ" in clean_text.upper(): return False
            if len(clean_text) < 15 or len(clean_text) > 34: return False
            try:
                rearranged = clean_text[4:] + clean_text[:4]
                numeric_iban = ""
                for char in rearranged:
                    if char.isalpha(): numeric_iban += str(ord(char) - 55)
                    elif char.isdigit(): numeric_iban += char
                    else: return False
                return int(numeric_iban) % 97 == 1
            except: return False
        clean = Validators.normalize(text).upper()
        if check_sum(clean): return True
        fixed = Validators.fix_common_typos(clean)
        return check_sum(fixed)

    # --- CARD VALIDATION ---
    @staticmethod
    def validate_card(text: str) -> bool:
        clean = Validators.normalize(text)
        if not clean.isdigit() or not (13 <= len(clean) <= 19) or clean.startswith('0'): 
            return False
        digits = [int(d) for d in clean]
        checksum = 0
        for i, digit in enumerate(reversed(digits)):
            if i % 2 == 1:
                doubled = digit * 2
                checksum += doubled if doubled < 10 else doubled - 9
            else: checksum += digit
        return checksum % 10 == 0

    @staticmethod
    def validate_id(text: str, id_type: str) -> bool:
        clean = Validators.normalize(text)
        
        # --- RUBBISH FILTER ---
        if clean.lower() in Validators.GERMAN_RUBBISH: return False
        if clean.isalpha() and len(clean) < 10: 
            if clean.lower() in Validators.GERMAN_RUBBISH: return False

        if id_type == "PII:ID:TAX": return len(clean) == 11 and clean.isdigit()
        if id_type == "PII:ID:SVN": return 9 <= len(clean) <= 15
        if id_type == "PII:ID:DRIVERLICENSE":
            return len(clean) == 11 and any(char.isdigit() for char in clean)
        
        if id_type == "PII:ID:PASSPORT":
            if clean.isalpha(): return False
            return 6 <= len(clean) <= 12
            
        if id_type == "PII:ID:NATIONAL": return 6 <= len(clean) <= 12
        return True

# ==========================================
# 2. CONTEXT VALIDATORS (AI FALLBACK)
# ==========================================
class IBANContextValidator:
    def __init__(self, threshold=0.18):
        self.threshold = threshold
        self.pos_anchors = ["IBAN", "Konto", "Bankverbindung", "Überweisung", "SEPA", "BIC", "Bank", "Kontodaten", "Zahlung an"]
        self.neg_anchors = ["Telefon", "Handy", "Fax", "ID", "Pass", "Ausweis", "Steuer", "SV-Nr"]
        self.vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 4))
        self.vectorizer.fit(self.pos_anchors + self.neg_anchors)
        self.pos_vectors = self.vectorizer.transform(self.pos_anchors)

    def is_valid_context(self, text: str, start: int, end: int) -> bool:
        window = text[max(0, start-40):min(len(text), end+40)].lower()
        input_vec = self.vectorizer.transform([window])
        pos_score = float(np.max(cosine_similarity(input_vec, self.pos_vectors)))
        return pos_score > self.threshold

class CardContextValidator:
    def __init__(self, threshold=0.25):
        self.threshold = threshold
        self.pos_anchors = ["Kreditkarte", "Mastercard", "Visa", "Amex", "American Express", "Karteninhaber", "Gültigkeit", "endend auf", "ending in", "Ablaufdatum", "Zahlung", "Karte", "Credit Card", "Girocard", "EC-Karte"]
        self.neg_anchors = ["Geburtstag", "Telefon", "Hausnummer", "PLZ", "Postleitzahl", "Jahr", "Uhrzeit", "Euro", "EUR", "PIN", "CVV", "CVC", "Prüfziffer", "Code", "TAN", "IBAN", "Bic", "Tel", "Fax"]
        self.vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 4))
        self.vectorizer.fit(self.pos_anchors + self.neg_anchors)
        self.pos_vectors = self.vectorizer.transform(self.pos_anchors)
        self.neg_vectors = self.vectorizer.transform(self.neg_anchors)

    def is_valid_context(self, text: str, start: int, end: int) -> bool:
        window = text[max(0, start-60):min(len(text), end+60)].lower()
        input_vec = self.vectorizer.transform([window])
        pos_score = float(np.max(cosine_similarity(input_vec, self.pos_vectors)))
        neg_score = float(np.max(cosine_similarity(input_vec, self.neg_vectors)))
        return pos_score > self.threshold and pos_score > neg_score

class PhoneContextValidator:
    def __init__(self, threshold=0.08):
        self.threshold = threshold
        self.pos_anchors = ["tel", "telefon", "phone", "mobil", "handy", "fon", "nummer", "nr", "rückruf", "contact", "kontakt", "anrufen", "angerufen", "durchwahl", "mobile", "erreichbar", "unter", "ansprechpartner"]
        self.neg_anchors = ["laufzeit", "zeitraum", "datum", "iban", "bic", "steuer", "id", "konto", "bank", "betrag", "euro", "eur", "plz", "gesamtsumme"]
        self.vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 4))
        self.vectorizer.fit(self.pos_anchors + self.neg_anchors)
        self.pos_vectors = self.vectorizer.transform(self.pos_anchors)
        self.neg_vectors = self.vectorizer.transform(self.neg_anchors)

    def is_valid_context(self, text: str, start: int, end: int) -> bool:
        window = text[max(0, start-60):min(len(text), end+60)].lower()
        if any(kw in window for kw in ["tel", "mobil", "handy", "fon", "nummer", "nr."]): return True
        clean_num = re.sub(r"[^0-9]", "", text[start:end])
        if clean_num.startswith("01") and 10 <= len(clean_num) <= 13: return True
            
        input_vec = self.vectorizer.transform([window])
        pos_score = float(np.max(cosine_similarity(input_vec, self.pos_vectors)))
        neg_score = float(np.max(cosine_similarity(input_vec, self.neg_vectors)))
        return pos_score > self.threshold and (pos_score >= neg_score or pos_score > 0.20)

class PassportContextValidator:
    def __init__(self, threshold=0.18):
        self.threshold = threshold
        self.pos_anchors = ["Pass", "Reisepass", "Passport", "Passnummer", "Pass-Nr", "Pass No", "Visa", "Nationalität", "Dokument"]
        self.neg_anchors = [
            "IBAN", "BIC", "Konto", "Bank", "Euro", "Zahlung", "Lastschrift",
            "Telefon", "Handy", "Tel", "Mobil", "Nummer", "Rückruf",
            "Jahre", "alt", "geboren", "geb.", "Geburtsdatum",
            "Führerschein", "License", "FS-Nr", "Klasse", "Fahrer",
            "Versicherung", "SV-Nr", "Krankenkasse", "Steuer-ID"
        ]
        self.vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 4))
        self.vectorizer.fit(self.pos_anchors + self.neg_anchors)
        self.pos_vectors = self.vectorizer.transform(self.pos_anchors)
        self.neg_vectors = self.vectorizer.transform(self.neg_anchors)

    def is_valid_context(self, text: str, start: int, end: int, candidate: str) -> bool:
        pre_window = text[max(0, start-25):start].lower()
        if "pass" in pre_window or "reise" in pre_window: return True
        window = text[max(0, start-60):min(len(text), end+60)].lower()
        input_vec = self.vectorizer.transform([window])
        pos_score = float(np.max(cosine_similarity(input_vec, self.pos_vectors)))
        neg_score = float(np.max(cosine_similarity(input_vec, self.neg_vectors)))
        if candidate.isdigit(): return pos_score > (self.threshold * 1.5) and pos_score > neg_score
        return pos_score > self.threshold and pos_score > neg_score

class SVNContextValidator:
    def __init__(self, threshold=0.15):
        self.threshold = threshold
        self.pos_anchors = ["SV-Nummer", "SV-Nr", "Sozialversicherung", "Rentenversicherung", "Versicherungsnummer", "RV-NR", "SVNR"]
        self.neg_anchors = ["IBAN", "Konto", "Pass", "Telefon", "Handy", "Steuer", "Tax"]
        self.vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 4))
        self.vectorizer.fit(self.pos_anchors + self.neg_anchors)
        self.pos_vectors = self.vectorizer.transform(self.pos_anchors)

    def is_valid_context(self, text: str, start: int, end: int) -> bool:
        window = text[max(0, start-50):min(len(text), end+50)].lower()
        if any(kw in window for kw in ["sv-nr", "svnr", "rv-nr", "sozialversicherung"]): return True
        input_vec = self.vectorizer.transform([window])
        pos_score = float(np.max(cosine_similarity(input_vec, self.pos_vectors)))
        return pos_score > self.threshold

class NationalContextValidator:
    def __init__(self, threshold=0.18):
        self.threshold = threshold
        self.pos_anchors = ["Ausweis", "Personalausweis", "National ID", "ID-Nr", "Identitätskarte", "Ausweisnummer", "Perso", "Dokumentnummer"]
        # Explicit Negative Anchors to separate from Driver's License
        self.neg_anchors = ["Führerschein", "License", "Driver", "Klasse", "Fahrerlaubnis", "Fahrzeug", "IBAN", "Konto", "Telefon", "SV-Nr"]
        
        self.vectorizer = TfidfVectorizer(analyzer='char', ngram_range=(2, 4))
        self.vectorizer.fit(self.pos_anchors + self.neg_anchors)
        self.pos_vectors = self.vectorizer.transform(self.pos_anchors)
        self.neg_vectors = self.vectorizer.transform(self.neg_anchors)

    def is_valid_context(self, text: str, start: int, end: int, candidate: str) -> bool:
        pre_window = text[max(0, start-25):start].lower()
        if any(kw in pre_window for kw in ["ausweis", "perso", "id-nr"]): return True
        
        window = text[max(0, start-60):min(len(text), end+60)].lower()
        input_vec = self.vectorizer.transform([window])
        pos_score = float(np.max(cosine_similarity(input_vec, self.pos_vectors)))
        neg_score = float(np.max(cosine_similarity(input_vec, self.neg_vectors)))
        
        # Numeric strings require a higher "National ID" signature to avoid false positives
        if candidate.isdigit():
            return pos_score > (self.threshold * 1.5) and pos_score > neg_score
        return pos_score > self.threshold and pos_score > neg_score

# ==========================================
# 3. USER PROVIDED DETECTOR
# ==========================================
class RegexPIIDetector:
    def __init__(self, iban_validator=None, card_validator=None, phone_validator=None, passport_validator=None, svn_validator=None, national_validator=None):
        self.iban_validator = iban_validator
        self.card_validator = card_validator
        self.phone_validator = phone_validator
        self.passport_validator = passport_validator
        self.svn_validator = svn_validator
        self.national_validator = national_validator
        
        self.patterns = {
            "FINANCIAL:IBAN": re.compile(r"\b[A-Z]{2}\d{2}(?:[\s\.\-]*[A-Z0-9]){11,35}\b", re.IGNORECASE),
            "FINANCIAL:CARD": re.compile(r"\b[1-9](?:[\s\-\–\/\.]*\d){12,18}\b"),
            "FINANCIAL:CARD_PARTIAL_INTERNAL": re.compile(r"(?i)(?:visa|mastercard|amex|girocard|kreditkarte|karte|endend|ending)\s*(?:auf|in|no|nr)?\s*(?::|#)?\s*\b(\d{4})\b"),
            "CONTACT:URL": re.compile(r"\b(?:(?:https?://|www\.)[\w\-\.\/%\+~=\?&]+|[\w\-]+(?:\.[a-zA-Z]{2,})+(?:/[\w\-\.\/%\+~=\?&]*)?)\b", re.IGNORECASE),
            "CONTACT:PHONE": re.compile(r"(?i)(?<![a-zA-Z0-9\.])(?:(?:(?:\+|00|[o0])[1-9]\d{0,4})[\s\.\-\/\\]*(?:\(\s*[o0]\s*\)\s*)?|(?:\(\s*[o0][1-9]\d{1,8}\s*\)|[o0][1-9]\d{1,8}))(?:[ \t\.\-\/\\]*(?:onal|abortel)?[ \t\.\-\/\\]*\d){3,15}[l1]?(?![a-zA-Z0-9])"),
            "CONTACT:EMAIL": re.compile(r"\b[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,6}(?=[^a-z]|$)"),
            "PII:ID:TAX": re.compile(r"\b[1-9]\d{10}\b"),
            "PII:ID:SVN": re.compile(r"\b\d{2}[\s]*\d{6}[\s]*[A-Z][\s]*\d{3}\b", re.IGNORECASE),
            "PII:ID:DRIVERLICENSE": re.compile(r"\b(?=.*\d)(?=.*[A-Z])[A-Z0-9]{11}\b", re.IGNORECASE),
            "PII:ID:PASSPORT": re.compile(r"\b[A-Z0-9]{7,11}\b", re.IGNORECASE),
            "PII:ID:NATIONAL": re.compile(r"\b[L-Z0-9][0-9A-Z]{8,9}\b"),
        }

    def _validate_match(self, pii_type: str, text: str) -> bool:
        if pii_type.startswith("PII:ID"): return Validators.validate_id(text, pii_type)
        if pii_type == "CONTACT:URL": return Validators.validate_url(text)
        elif pii_type == "CONTACT:PHONE": return Validators.validate_phone(text)
        elif pii_type == "CONTACT:EMAIL": return Validators.validate_email(text)
        elif pii_type == "FINANCIAL:IBAN": return Validators.validate_iban(text)
        elif pii_type == "FINANCIAL:CARD": return Validators.validate_card(text)
        return True

    def _resolve_conflicts(self, detections: List[Dict]) -> List[Dict]:
        if not detections: return []
        priority = {
            "FINANCIAL:IBAN": 100, "FINANCIAL:CARD": 95, "CONTACT:EMAIL": 92, 
            "PII:ID:SVN": 90, "PII:ID:DRIVERLICENSE": 85, "PII:ID:PASSPORT": 85, 
            "PII:ID:NATIONAL": 85, "PII:ID:TAX": 75, "CONTACT:URL": 50, "CONTACT:PHONE": 10
        }
        sorted_dets = sorted(detections, key=lambda x: (-priority.get(x['type'], 0), -(x['end'] - x['start'])))
        final, occupied = [], set()
        for det in sorted_dets:
            r = range(det['start'], det['end'])
            if not any(i in occupied for i in r):
                final.append(det); occupied.update(r)
        return sorted(final, key=lambda x: x['start'])

    def detect(self, text: str) -> List[Dict]:
        raw_detections = []
        for pii_type, pattern in self.patterns.items():
            for match in pattern.finditer(text):
                is_valid = False
                start_pos, end_pos = match.start(), match.end()
                detection_type = pii_type
                
                if pii_type == "FINANCIAL:CARD_PARTIAL_INTERNAL":
                    detection_type = "FINANCIAL:CARD"
                    original_digits = match.group(1)
                    clean_text = f"****{original_digits}" 
                    start_pos, end_pos = match.start(1), match.end(1)
                    if self.card_validator and self.card_validator.is_valid_context(text, start_pos, end_pos):
                        is_valid = True
                
                elif pii_type == "FINANCIAL:IBAN":
                    candidate = match.group()
                    parts = re.split(r'(\s+)', candidate)
                    stop_labels = {"BIC", "STOP", "ICH", "NAME", "GMBH", "IBAN", "AN", "AUF", "AM", "BIN", "WAR", "VON", "UND", "DIE", "IST", "DER", "DAS", "DAME", "HERR", "WIR", "IHR", "SEIN", "MIT", "BEI"}
                    valid_parts = []
                    word_idx = 0
                    for p in parts:
                        if not p.strip(): valid_parts.append(p); continue
                        t = p.strip(".,;:!?() ")
                        if not t: continue
                        if t.upper() in stop_labels or "ZZZ" in t.upper(): break
                        if t[0].isupper() and any(c.islower() for c in t): break
                        if word_idx > 0 and not any(c.isdigit() for c in t) and len(t) != 4: break
                        valid_parts.append(p); word_idx += 1
                    clean_text = "".join(valid_parts).strip(".,;:!? ")
                    end_pos = match.start() + len(clean_text)
                    if Validators.validate_iban(clean_text): is_valid = True
                    elif self.iban_validator and self.iban_validator.is_valid_context(text, match.start(), end_pos):
                        if 15 <= len(Validators.normalize(clean_text)) <= 34: is_valid = True
                
                elif pii_type == "FINANCIAL:CARD":
                    if match.start() > 0 and text[match.start()-1] == '+': continue
                    clean_text = match.group().strip(".,;:!? ")
                    end_pos = match.start() + len(clean_text)
                    if Validators.validate_card(clean_text): is_valid = True
                    elif self.card_validator and self.card_validator.is_valid_context(text, match.start(), end_pos): is_valid = True
                
                elif pii_type == "CONTACT:PHONE":
                    clean_text = match.group().strip(".,;:!? \n\r\t")
                    end_pos = match.start() + len(clean_text)
                    if Validators.validate_phone(clean_text):
                        if self.phone_validator:
                            if self.phone_validator.is_valid_context(text, match.start(), end_pos):
                                is_valid = True
                        else: is_valid = True

                elif pii_type == "PII:ID:PASSPORT":
                    clean_text = match.group().strip(".,;:!? \n\r\t")
                    end_pos = match.start() + len(clean_text)
                    if Validators.validate_id(clean_text, pii_type):
                        if self.passport_validator and self.passport_validator.is_valid_context(text, start_pos, end_pos, clean_text):
                            is_valid = True

                elif pii_type == "PII:ID:SVN":
                    clean_text = match.group().strip(".,;:!? \n\r\t")
                    end_pos = match.start() + len(clean_text)
                    if Validators.validate_id(clean_text, pii_type):
                        if self.svn_validator:
                            if self.svn_validator.is_valid_context(text, match.start(), end_pos):
                                is_valid = True
                        else: is_valid = True

                elif pii_type == "PII:ID:NATIONAL":
                    clean_text = match.group().strip(".,;:!? \n\r\t")
                    end_pos = match.start() + len(clean_text)
                    if Validators.validate_id(clean_text, pii_type):
                        if self.national_validator:
                            if self.national_validator.is_valid_context(text, match.start(), end_pos, clean_text):
                                is_valid = True
                        else: is_valid = True

                else:
                    clean_text = match.group().strip(".,;:!? ")
                    end_pos = match.start() + len(clean_text)
                    is_valid = self._validate_match(pii_type, clean_text)

                if is_valid:
                    raw_detections.append({"type": detection_type, "text": clean_text, "start": start_pos, "end": end_pos, "confidence": 1.0})
        return self._resolve_conflicts(raw_detections)

# ==========================================
# 4. AGE EXTRACTION (CONTEXTUAL)
# ==========================================
class FastAgeExtractor:
    def __init__(self, threshold=0.30):
        self.threshold = threshold
        self.current_year = datetime.now().year
        self.pos_anchors = ["Ich bin <NUM> Jahre alt", "Er ist <NUM> geworden", "Sie ist <NUM>", "Mein Alter ist <NUM>", "Das Kind ist <NUM>", "Ein <NUM>-Jähriger", "Mit <NUM> Jahren", "Geboren am <NUM>", "Mein Geburtsdatum ist <NUM>", "Baujahr <NUM>", "Jahrgang <NUM>", "Geburtstag am <NUM>", "Nächstes Jahr werde ich <NUM>", "Der ist schon <NUM>"]
        self.neg_anchors = ["Das kostet <NUM> Euro", "Preis <NUM> EUR", "Ich habe <NUM> Äpfel", "In <NUM> Minuten", "Hausnummer <NUM>", "Seite <NUM>", "Um <NUM> Uhr", "<NUM> Prozent", "Gewicht <NUM> kg", "PLZ <NUM>", "Verspätung <NUM>", "Nummer <NUM>", "Platz <NUM>", "Größe <NUM>"]
        self.vectorizer = TfidfVectorizer(analyzer='word', ngram_range=(1, 2))
        self.vectorizer.fit(self.pos_anchors + self.neg_anchors)
        self.pos_vectors = self.vectorizer.transform(self.pos_anchors)
        self.neg_vectors = self.vectorizer.transform(self.neg_anchors)

    def calculate_age(self, value_str):
        if re.search(r'[./-]', value_str):
            for fmt in ("%d.%m.%Y", "%d/%m/%Y", "%d-%m-%Y"):
                try:
                    dob = datetime.strptime(value_str, fmt)
                    age = self.current_year - dob.year - ((datetime.now().month, datetime.now().day) < (dob.month, dob.day))
                    return age
                except ValueError: continue
            return None
        try:
            val = int(value_str)
            if 1900 < val <= self.current_year: return self.current_year - val
            return val
        except ValueError: return None

    def get_pii_type(self, age):
        if age is None: return "AGE:UNKNOWN"
        if age > 120: return None
        if age < 12: return "AGE:CHILD"
        elif 12 <= age <= 17: return "AGE:TEEN"
        elif 18 <= age <= 64: return "AGE:ADULT"
        else: return "AGE:SENIOR"

    def analyze_text(self, text):
        findings = []
        for match in re.finditer(r'\b(\d{1,2}[./-]\d{1,2}[./-]\d{4}|\d{1,4})\b', text):
            cand = match.group(0)
            snip = text[max(0, match.start()-50):min(len(text), match.end()+50)].replace(cand, "<NUM>", 1)
            vec = self.vectorizer.transform([snip])
            p_scr = float(np.max(cosine_similarity(vec, self.pos_vectors)))
            n_scr = float(np.max(cosine_similarity(vec, self.neg_vectors)))
            if p_scr > self.threshold and p_scr > n_scr:
                age = self.calculate_age(cand)
                label = self.get_pii_type(age)
                if label: findings.append({"type": label, "text": cand, "start": match.start(), "end": match.end(), "confidence": round(p_scr, 2)})
        return findings

# ==========================================
# 5. UNIFIED PIPELINE (MERGED LOGIC)
# ==========================================
class UnifiedPIIPipeline:
    def __init__(self):
        print("Loading NLP Models...")
        try: self.nlp = spacy.load("de_core_news_lg")
        except:
            from spacy.cli import download
            download("de_core_news_lg")
            self.nlp = spacy.load("de_core_news_lg")

        self.medications = ["ibuprofen", "aspirin", "paracetamol", "antibiotika", "insulin"]
        self.conditions = ["kopfschmerzen", "migräne", "fieber", "husten", "diabetes"]
        self.stoplist = {"Morgen", "Heute", "Gestern", "Hallo", "Hi", "Hey", "Danke", "Bitte", "Grüße"}
        self.noun_blocklist = {"Perso", "Ausweis", "Pass", "Konto", "Bank", "Iban", "Nummer", "Tel", "Handy", "Telefon", "Email", "Mail", "Adresse", "Name", "Büro", "Handynummer", "Steuer-ID", "Glückszahl", "SV-Nr", "SVN"}

        ruler = self.nlp.add_pipe("entity_ruler", before="ner")
        patterns = []
        for item in self.medications: patterns.append({"label": "MEDICATION", "pattern": [{"LOWER": item}]})
        for item in self.conditions: patterns.append({"label": "CONDITION", "pattern": [{"LOWER": item}]})
        sfx = "straße|strasse|str.|weg|platz|allee|damm|ring|gasse|ufer|chaussee|hof|garten|markt|zeile|wall"
        patterns.append({"label": "ADDRESS_DETECTED", "pattern": [{"TEXT": {"REGEX": f"(?i).+({sfx})$"}}, {"TEXT": {"REGEX": r"^\d"}}]})
        ruler.add_patterns(patterns)

        self.matcher = DependencyMatcher(self.nlp.vocab)
        self.age_extractor = FastAgeExtractor(threshold=0.30)
        self.iban_context_validator = IBANContextValidator()
        self.card_context_validator = CardContextValidator()
        self.phone_context_validator = PhoneContextValidator()
        self.passport_context_validator = PassportContextValidator()
        self.svn_context_validator = SVNContextValidator()
        self.national_validator = NationalContextValidator()
        
        self.regex_detector = RegexPIIDetector(
            iban_validator=self.iban_context_validator, 
            card_validator=self.card_context_validator,
            phone_validator=self.phone_context_validator,
            passport_validator=self.passport_context_validator,
            svn_validator=self.svn_context_validator,
            national_validator=self.national_validator
        )

    def _generate_token(self, pii_type, text_segment):
        h = hashlib.md5(text_segment.lower().encode()).hexdigest()[:8]
        return f"[PII:{pii_type.replace(':', '_')}_ID_{h}]"

    def _analyze_person_entity(self, ent):
        role_keywords = {"Dr.", "Prof.", "Arzt", "Ärztin", "Herr", "Frau", "Anwalt"}
        detected_role = "N/A"
        clean_name_parts = []
        for token in ent:
            if token.text in role_keywords: detected_role = token.text
            else: clean_name_parts.append(token.text)
        if detected_role == "N/A" and ent.start > 0:
            prev_token = ent.doc[ent.start - 1]
            if prev_token.text in role_keywords: detected_role = prev_token.text
        clean_name = re.sub(r"[^\w\s-]", "", " ".join(clean_name_parts).strip())
        return clean_name or ent.text, detected_role

    def process_batch(self, text_list: List[str]) -> List[Dict[str, Any]]:
        results = []
        for original_text in text_list:
            start_time = time.time()
            all_findings = []
            occupied = set()

            regex_findings = self.regex_detector.detect(original_text)
            all_findings.extend(regex_findings)
            for f in regex_findings: occupied.update(range(f['start'], f['end']))

            for f in self.age_extractor.analyze_text(original_text):
                if not any(i in occupied for i in range(f['start'], f['end'])):
                    all_findings.append(f); occupied.update(range(f['start'], f['end']))

            doc = self.nlp(original_text)
            for ent in doc.ents:
                if any(i in occupied for i in range(ent.start_char, ent.end_char)): continue
                label, txt = ent.label_, ent.text
                if label in ["MEDICATION", "CONDITION"]:
                    all_findings.append({"type": f"MED:{label}", "text": txt, "start": ent.start_char, "end": ent.end_char, "confidence": 0.9})
                    occupied.update(range(ent.start_char, ent.end_char))
                elif label in ["LOC", "GPE", "ADDRESS_DETECTED"]:
                    if label == "ADDRESS_DETECTED" or re.search(r"\d", txt):
                        all_findings.append({"type": "LOCATION:ADDRESS", "text": txt, "start": ent.start_char, "end": ent.end_char, "confidence": 0.95})
                        occupied.update(range(ent.start_char, ent.end_char))
                elif label in ["PER", "PER_STRONG"]:
                    clean_name, role = self._analyze_person_entity(ent)
                    if clean_name.title() not in self.stoplist and not any(p.strip().title() in self.noun_blocklist for p in clean_name.split()):
                        all_findings.append({"type": "PERSON", "text": txt, "start": ent.start_char, "end": ent.end_char, "confidence": 0.9})
                        occupied.update(range(ent.start_char, ent.end_char))

            all_findings.sort(key=lambda x: x['start'], reverse=True)
            masked_text = original_text
            for f in all_findings:
                token = self._generate_token(f['type'], f['text'])
                masked_text = masked_text[:f['start']] + token + masked_text[f['end']:]
                f['token'] = token

            results.append({"has_pii": len(all_findings) > 0, "detections": all_findings[::-1], "anonymized_text": masked_text, "processing_time_ms": int((time.time() - start_time) * 1000)})
        return results

# ==========================================
# 6. EXECUTION
# ==========================================
if __name__ == "__main__":
    pipeline = UnifiedPIIPipeline()
    samples = [
        "Personalausweis Nummer: L12345678",
        "Reisepass Ukraine: FE893746",
        "Meine SV-Nummer lautet 11240601S003",
        "Ansprechpartner: Herr Brinkmann 0228 9182736"
    ]
    output = pipeline.process_batch(samples)
    print(json.dumps(output, indent=4, ensure_ascii=False))

Loading NLP Models...
[
    {
        "has_pii": true,
        "detections": [
            {
                "type": "PERSON",
                "text": "Personalausweis",
                "start": 0,
                "end": 15,
                "confidence": 0.9,
                "token": "[PII:PERSON_ID_96fb3813]"
            },
            {
                "type": "PII:ID:NATIONAL",
                "text": "L12345678",
                "start": 24,
                "end": 33,
                "confidence": 1.0,
                "token": "[PII:PII_ID_NATIONAL_ID_1799e855]"
            }
        ],
        "anonymized_text": "[PII:PERSON_ID_96fb3813] Nummer: [PII:PII_ID_NATIONAL_ID_1799e855]",
        "processing_time_ms": 18
    },
    {
        "has_pii": true,
        "detections": [
            {
                "type": "PII:ID:PASSPORT",
                "text": "FE893746",
                "start": 19,
                "end": 27,
                "confidence": 1.0,
                "token": "

In [11]:
newdf = pd.read_excel('PII_training_v2.xlsx')

In [12]:
newdf.head()

Unnamed: 0,sentences,PII:AGE:CHILD,PII:AGE:TEEN,PII:AGE:ADULT,PII:AGE:SENIOR,PII:PERSON,PII:CONTACT:EMAIL,PII:CONTACT:PHONE,PII:FINANCIAL:IBAN,PII:FINANCIAL:BIC,...,PII:ID:PASSPORT,PII:ID:TAX,PII:ID:UST,PII:ID:SVN,PII:ID:INSURANCE,PII:ID:DRIVERLICENSE,PII:DRIVERPLATE,PII:LOCATION:POSTALCODE,PII:BIRTHDAY,PII:LOCATION:ADDRESS
0,Kannst du mir helfen diese Kundenanfrage zu be...,,,,,Thorsten Beyer,Noch,0211 7839456,,,...,,,,,,,,40215.0,,Marktstraße 23
1,"hey, schreib mir mal ne kurze zahlungserinneru...",,,,,Sabine Kröger,,,,,...,,,,,,,,45147.0,,Hufelandring 8
2,"Folgene Email ist reingekommen, kannst du mir ...",,,,,Monika Schuster-Brandt,m.schuster-brandt@gmx.de,0221-4478832,,,...,,,,,,,,50931.0,,Lessingallee 45
3,muss dem neuen mitarbeiter ne willkommensmail ...,,,,,Aleksandar Petrović,a.petrovic@web.de,,,,...,,,,,,,,,14.08.1992,
4,"Hab hier nen Zettel vom Außendienst, kann kaum...",,,,,Winkelmann; Winkelman,,0.000124,,,...,,,,,,,,28215.0,,Theodor-Heuss-Allee 78


In [13]:
newdf.columns

Index(['sentences', 'PII:AGE:CHILD', 'PII:AGE:TEEN', 'PII:AGE:ADULT',
       'PII:AGE:SENIOR', 'PII:PERSON', 'PII:CONTACT:EMAIL',
       'PII:CONTACT:PHONE', 'PII:FINANCIAL:IBAN', 'PII:FINANCIAL:BIC',
       'PII:FINANCIAL:CARD', 'PII:PIN', 'PII:ID:NATIONAL', 'PII:ID:PASSPORT',
       'PII:ID:TAX', 'PII:ID:UST', 'PII:ID:SVN', 'PII:ID:INSURANCE',
       'PII:ID:DRIVERLICENSE', 'PII:DRIVERPLATE', 'PII:LOCATION:POSTALCODE',
       'PII:BIRTHDAY', 'PII:LOCATION:ADDRESS'],
      dtype='object')

In [14]:
processed_df = newdf[['sentences', 'PII:ID:NATIONAL']].copy()

In [15]:
pipeline = UnifiedPIIPipeline()
samples = processed_df['sentences'].astype(str).tolist()
output = pipeline.process_batch(samples)

extracted_ibans = []

for result in output:
    ibans_in_sentence = [d['text'] for d in result['detections'] if d['type'] == 'PII:ID:NATIONAL']
    
    if ibans_in_sentence:
        extracted_ibans.append(", ".join(ibans_in_sentence))
    else:
        extracted_ibans.append(None)


processed_df['detected_card'] = extracted_ibans
detected_iban_column = processed_df['detected_card']


print(processed_df.head())

Loading NLP Models...
                                           sentences PII:ID:NATIONAL  \
0  Kannst du mir helfen diese Kundenanfrage zu be...             NaN   
1  hey, schreib mir mal ne kurze zahlungserinneru...             NaN   
2  Folgene Email ist reingekommen, kannst du mir ...             NaN   
3  muss dem neuen mitarbeiter ne willkommensmail ...             NaN   
4  Hab hier nen Zettel vom Außendienst, kann kaum...             NaN   

  detected_card  
0          None  
1          None  
2          None  
3          None  
4          None  


In [16]:
processed_df.to_csv('tempnational.csv')

In [17]:
processed_df.columns

Index(['sentences', 'PII:ID:NATIONAL', 'detected_card'], dtype='object')