In [1]:
import re
from typing import Optional, List, Dict, Tuple

import pycountry
from presidio_analyzer import AnalyzerEngine, RecognizerResult 
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig

import pandas as pd

In [2]:
def load_demonym_map(path="demonyms.csv"):
    df = pd.read_csv(path)
    df["demonym"] = df["demonym"].astype(str).str.strip().str.lower()
    df["country"] = df["country"].astype(str).str.strip()

    return dict(zip(df["demonym"], df["country"]))

In [3]:
COUNTRY_MAP = {}

def build_country_map():
    for c in pycountry.countries:
        # canonical
        COUNTRY_MAP[c.name.lower()] = c.name
        # alpha-2, alpha-3
        COUNTRY_MAP[getattr(c, "alpha_2", "").lower()] = c.name
        COUNTRY_MAP[getattr(c, "alpha_3", "").lower()] = c.name

    # Add common demonyms manually or load from CSV
    demonyms = load_demonym_map()
    for dem, country in demonyms.items():
        key = dem.lower().strip()
        COUNTRY_MAP[key] = country
    
    COUNTRY_MAP["uk"] = "United Kingdom"

build_country_map()

In [4]:
# Normalize COUNTRY_MAP keys so variants like "u.k.", "u-k", "u k" map to "uk"
normalized = {}
for k, v in COUNTRY_MAP.items():
    # remove punctuation: U.K. -> uk
    cleaned = re.sub(r"[^\w]", "", k.lower())
    if cleaned:
        normalized[cleaned] = v

COUNTRY_MAP.update(normalized)

In [5]:
def resolve_country_token(token: str) -> Optional[str]:
    """
    Return canonical country name if token maps to a country (case-insensitive).
    Attempts:
      - direct lookup on full text (trim punctuation)
      - token-level lookup for multi-word country names (e.g. "united", "kingdom")
    """
    if not token:
        return None

    t = token.strip().lower()
    # strip surrounding punctuation
    t = re.sub(r"^[^a-z0-9]+|[^a-z0-9]+$", "", t)

    # direct lookup
    if t in COUNTRY_MAP:
        return COUNTRY_MAP[t]

    # try collapsing spaces and punctuation (e.g., "united states" -> direct)
    t_norm = re.sub(r"[^\w\s]", " ", token).strip().lower()
    if t_norm in COUNTRY_MAP:
        return COUNTRY_MAP[t_norm]

    # fallback: token-by-token check (useful when the entity is multiple tokens)
    for tok in t_norm.split():
        cleaned_tok = tok.strip().lower()
        if cleaned_tok in COUNTRY_MAP:
            return COUNTRY_MAP[cleaned_tok]

    return None

In [6]:
analyzer = AnalyzerEngine()   # uses default NLP engine (spaCy if present)
anonymizer = AnonymizerEngine()

In [7]:
def find_country_spans(text: str) -> List[Tuple[int, int, str]]:
    spans = []
    lowered = text.lower()

    # Sort by length desc so "united states" matches before "states"
    keys = sorted(COUNTRY_MAP.keys(), key=lambda k: -len(k))

    for key in keys:
        if not key:
            continue
        pattern = r"(?<![A-Za-z0-9])" + re.escape(key) + r"(?![A-Za-z0-9])"

        for m in re.finditer(pattern, lowered, flags=re.IGNORECASE):
            s, e = m.start(), m.end()

            spans.append((s, e, text[s:e]))

    final = []
    last_end = -1
    for s, e, sub in sorted(spans, key=lambda x: x[0]):
        if s >= last_end:
            final.append((s, e, sub))
            last_end = e

    return final

In [8]:
def mask_pii_preserve_country_org(text: str) -> str:
    """
    Detect PII using Presidio, but preserve:
      - any country/demonym/ISO occurrences (found via find_country_spans)
      - any ORGANIZATION entities (Presidio type)
    Mask everything else with the standard token "<MASKED>".
    """

    if not text:
        return text

    # STEP 1: find country spans first (protect these)
    country_spans = find_country_spans(text)
    country_intervals = [(s, e) for s, e, _ in country_spans]

    # STEP 2: run Presidio analyzer with explicit entity types
    results = analyzer.analyze(
        text=text, 
        language="en",
        entities=["PERSON", "EMAIL_ADDRESS", "PHONE_NUMBER", "LOCATION", 
                  "DATE_TIME", "US_SSN", "US_BANK_NUMBER", "IBAN_CODE", 
                  "CREDIT_CARD", "IP_ADDRESS", "ORGANIZATION", "US_DRIVER_LICENSE"]
    )

    # STEP 3: collect candidate spans to mask
    mask_spans: List[Tuple[int, int]] = []
    for r in results:
        s, e = r.start, r.end
        etype = r.entity_type

        # uncomment below two lines to preserve ORG
        # if etype == "ORGANIZATION":
        #     continue

        overlap_country = False
        for cs, ce in country_intervals:
            if not (e <= cs or s >= ce):
                overlap_country = True
                break
        if overlap_country:
            continue

        mask_spans.append((s, e))

    # STEP 4: merge overlapping/adjacent mask spans
    if not mask_spans:
        return text

    mask_spans = sorted(mask_spans, key=lambda x: x[0])
    merged: List[Tuple[int, int]] = []
    cur_s, cur_e = mask_spans[0]
    for s, e in mask_spans[1:]:
        if s <= cur_e + 1:
            cur_e = max(cur_e, e)
        else:
            merged.append((cur_s, cur_e))
            cur_s, cur_e = s, e
    merged.append((cur_s, cur_e))

    # STEP 5: build final string
    out_parts: List[str] = []
    cursor = 0
    for (s, e) in merged:
        if cursor < s:
            out_parts.append(text[cursor:s])
        out_parts.append("<MASKED>")
        cursor = e
    if cursor < len(text):
        out_parts.append(text[cursor:])

    return "".join(out_parts)

In [10]:
def mask_emails_regex(text: str, mask_token: str = "<MASKED>") -> str:
    
    if not text:
        return text
    
    email_pattern = r'\b[A-Za-z0-9][A-Za-z0-9._%+-]*@[A-Za-z0-9][A-Za-z0-9.-]*\.[A-Za-z]{2,63}\b'
    
    # Replace all email matches with mask token
    masked_text = re.sub(email_pattern, mask_token, text)
    
    return masked_text

In [11]:
samples = [
    "John Doe (DOB 1987-10-05) from 221B Baker Street, London, UK has email john.doe@example.com and phone +1-202-555-0123. Works at OpenAI.",
    "Contact payroll: account number 123456789, routing 987654321. Country: India.",
    "Send parcel to PO Box 1234, Dubai, UAE. CFO: Jane Smith, email jane@corp.com",
    "American citizen, born in USA, ssn 123-45-6789"
]

for s in samples:
    print("INPUT: ", s)
    print("OUTPUT:", mask_pii_preserve_country_org(mask_emails_regex(s)))
    print("-" * 80)

INPUT:  John Doe (DOB 1987-10-05) from 221B Baker Street, London, UK has email john.doe@example.com and phone +1-202-555-0123. Works at OpenAI.
OUTPUT: <MASKED> (DOB <MASKED>) from <MASKED>, <MASKED>, UK has email <MASKED> and phone <MASKED>. Works at OpenAI.
--------------------------------------------------------------------------------
INPUT:  Contact payroll: account number 123456789, routing 987654321. Country: India.
OUTPUT: Contact payroll: account number <MASKED>, routing <MASKED>. Country: India.
--------------------------------------------------------------------------------
INPUT:  Send parcel to PO Box 1234, Dubai, UAE. CFO: Jane Smith, email jane@corp.com
OUTPUT: Send parcel to <MASKED>, <MASKED>, UAE. CFO: <MASKED>, email <MASKED>
--------------------------------------------------------------------------------
INPUT:  American citizen, born in USA, ssn 123-45-6789
OUTPUT: American citizen, born in USA, ssn 123-45-6789
------------------------------------------------------