In [None]:
import pandas as pd
import random
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, RecognizerResult
from presidio_analyzer.nlp_engine import NlpEngineProvider
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities.engine import OperatorConfig
from faker import Faker
import spacy
from copy import deepcopy
from collections import Counter
import re
import unicodedata

In [None]:
file_path = "../../data/adhd-beliefs-pt/adhd-beliefs-pt-prepared.pkl"
df = pd.read_pickle(file_path)
df.head()

In [None]:
configuration = {
    "nlp_engine_name": "spacy",
    "models": [{"lang_code": "pt", "model_name": "pt_core_news_lg"}],
}
# Create NLP engine based on configuration
provider = NlpEngineProvider(nlp_configuration=configuration)
nlp_engine = provider.create_engine()

In [None]:
registry = RecognizerRegistry()
registry.load_predefined_recognizers(nlp_engine=nlp_engine, languages=["pt"])
registry.recognizers = [
    r for r in registry.recognizers
    if not (("PERSON" in r.supported_entities) and "dictionary" in r.__class__.__name__.lower())
]

In [None]:
# the languages are needed to load country-specific recognizers 
# for finding phones, passport numbers, etc.
analyzer = AnalyzerEngine(nlp_engine=nlp_engine,
                          registry=registry)
anonymizer = AnonymizerEngine()

In [None]:
fake = Faker(locale=['pt_PT'])

In [None]:
nlp_pt = nlp_engine.nlp["pt"]  # spaCy Language instance
PT_STOPWORDS = set(nlp_pt.Defaults.stop_words)

In [None]:
def _norm(s: str) -> str:
    s = s.strip().lower()
    s = "".join(c for c in unicodedata.normalize("NFKD", s) if not unicodedata.combining(c))
    s = re.sub(r"\s+", " ", s)
    return s

In [None]:
def _match_casing(src: str, repl: str) -> str:
    if src in ["EUA", "UK", "EAU"]:
        return repl.title()
    if src.isupper():
        return repl.upper()
    if src.istitle():
        repl = repl.title()
        # Lowercase specific Portuguese particles
        repl = re.sub(r"\b(De|Da|Do|Das|Dos|E)\b", lambda m: m.group(1).lower(), repl)
        return repl
    if src.islower():
        return repl.lower()
    # Mixed / sentence case: title-case only if looks like a proper noun
    return repl

In [None]:
# Build a normalized set of country names
COUNTRY_SET = set()
COUNTRY_SYNONYMS = {_norm(n) for n in [
    "EUA", "Estados Unidos", "Reino Unido", "Inglaterra", "Escócia",
    "País de Gales", "Irlanda do Norte", "República Checa", "Coreia do Sul",
    "México", "Alemanha", "França", "Itália", "Espanha",
    "Noruega", "Suécia", "Dinamarca", "Finlândia", "Islândia",
    "Dubai", "Emirados Árabes Unidos", "Canadá", "Austrália",
]}
try:
    import pycountry
    for c in pycountry.countries:
        COUNTRY_SET.add(_norm(getattr(c, "name", "")))
        if hasattr(c, "official_name"):
            COUNTRY_SET.add(_norm(c.official_name))
        # add common aliases
        for attr in ("common_name",):
            if hasattr(c, attr):
                COUNTRY_SET.add(_norm(getattr(c, attr)))
except Exception:
    # no pycountry available -> rely on synonyms + heuristics
    pass
COUNTRY_SET |= COUNTRY_SYNONYMS

In [None]:
def classify_location(span_text: str) -> str:
    s = _norm(span_text.rstrip(".,;:!?)»”]"))  # trim common trailing punct
    if s in COUNTRY_SET:
        return "country"
    return "city"

In [None]:
def postfilter_person(results, text, nlp_lang, min_score_person=0.72):
    """
    Drop weak/false PERSON hits using:
      - higher score requirement
      - stopword/lowercase/too-short checks
      - POS-based check: require proper nouns (PROPN). This blocks 'Política'.
    """
    doc = nlp_lang(text)  # spaCy doc for POS/lemma checks
    stopwords = nlp_lang.Defaults.stop_words

    def span_tokens(r):
        return [t for t in doc if t.idx >= r.start and t.idx + len(t) <= r.end]

    filtered = []
    for r in results:
        if r.entity_type != "PERSON":
            filtered.append(r)
            continue

        if r.score is not None and r.score < min_score_person:
            continue

        toks = span_tokens(r)
        span_text = text[r.start:r.end]

        # 1) Basic heuristics
        if len(toks) == 1:
            t = toks[0]
            # reject all-lowercase or very short single tokens
            if len(t.text) < 3:
                continue

        # 2) All tokens are stopwords? drop
        if toks and all(t.text.lower() in stopwords for t in toks):
            continue

        # 3) POS-based rule: require PROPN for single-token names,
        #    and majority PROPN for multi-token names
        if len(toks) == 1:
            if toks[0].pos_ != "PROPN":   # 'Política' is NOUN → drop
                continue
        else:
            propn_ratio = sum(1 for t in toks if t.pos_ == "PROPN") / len(toks)
            if propn_ratio < 0.5:
                continue

        # 4) Extra guard: if the span is sentence-initial and only ONE token,
        #    and that token is a common noun (NOUN), drop.
        sent = toks[0].sent if toks else None
        if sent and len(toks) == 1:
            if toks[0].i == sent.start and toks[0].pos_ != "PROPN":
                continue

        filtered.append(r)

    return filtered

In [None]:
public_figures = [
    {"label": "PUBLIC_FIGURE", "pattern": "Cristiano Ronaldo"},
    {"label": "PUBLIC_FIGURE", "pattern": "António Guterres"},
    {"label": "PUBLIC_FIGURE", "pattern": "Anitta"},
    {"label": "PUBLIC_FIGURE", "pattern": "Taylor Swift"},
    {"label": "PUBLIC_FIGURE", "pattern": "J. D. Salinger"},
    {"label": "PUBLIC_FIGURE", "pattern": "Melville"},
    {"label": "PUBLIC_FIGURE", "pattern": "Osamu Dazai"},
    {"label": "PUBLIC_FIGURE", "pattern": "Jung"},
    {"label": "PUBLIC_FIGURE", "pattern": "Percy Jackson"}
]
if "entity_ruler" not in nlp_pt.pipe_names:
    ruler = nlp_pt.add_pipe("entity_ruler", before="ner")
else:
    ruler = nlp_pt.get_pipe("entity_ruler")
ruler.add_patterns(public_figures)

In [None]:
pet_names = [
    {"label": "PET_NAME", "pattern": "Vince"},
    {"label": "PET_NAME", "pattern": "Kiko"},
    {"label": "PET_NAME", "pattern": "Thor"},
    {"label": "PET_NAME", "pattern": "Kylie"},
]
if "entity_ruler" not in nlp_pt.pipe_names:
    ruler = nlp_pt.add_pipe("entity_ruler", before="ner")
else:
    ruler = nlp_pt.get_pipe("entity_ruler")
ruler.add_patterns(pet_names)

In [None]:
def _replace_persons_with_gendered_fakes(text: str, results, fake):
    def _gender_from_article(txt: str, start_idx: int):
        win = txt[max(0, start_idx-2):start_idx].lower()
        if win.endswith("a "): return "f"
        if win.endswith("o "): return "m"
        return None

    mapping = {}
    non_person = [deepcopy(r) for r in results if r.entity_type != "PERSON"]
    persons = sorted([r for r in results if r.entity_type == "PERSON"],
                     key=lambda r: r.start, reverse=True)

    new_text = text
    replacements = []

    for r in persons:
        original_name = new_text[r.start:r.end]

        if original_name in mapping:
            replacement = mapping[original_name]
        else:
            gender = _gender_from_article(new_text, r.start)
            if gender == "f":
                replacement = f"{fake.first_name_female()}"
            elif gender == "m":
                replacement = f"{fake.first_name_male()}"
            else:
                replacement = fake.first_name()
            mapping[original_name] = replacement
        replacement = _match_casing(original_name, replacement)
        
        replacements.append({
            "entity_type": "PERSON",
            "before": original_name,
            "after": replacement
        })

        # splice & adjust indices
        original_len = r.end - r.start
        new_text = new_text[:r.start] + replacement + new_text[r.end:]
        delta = len(replacement) - original_len

        if delta != 0:
            for nr in non_person:
                if nr.start >= r.end:
                    nr.start += delta
                    nr.end += delta

        non_person = [nr for nr in non_person if not (nr.start < r.end and nr.end > r.start)]

    return new_text, non_person, replacements


In [None]:
def drop_ignored_names(results, text):
    IGNORE_NAMES = {"phda", "well", "PDAH", "anyways", "acalma-me", "catisfaction", "vivi", "Compal de pêra", "P.s", "humana", "Estimula-me", "Yoggi", "Deus", "bue giro", "bue", "Perturbação Obsessiva e Compulsiva", "AMVs", "Mochi donuts", "Mowgli", "Mãe", "Heck", "Rock in Rio", "Aspartame", "oubir", "Weeee", "Praxe", "Maybe", "Usagi", "Lucy", "Espiritualidade", "killer"}
    IGNORE_NAMES = {n.lower() for n in IGNORE_NAMES}
    filtered = []
    for r in results:
        span = text[r.start:r.end].strip().lower()
        if span in IGNORE_NAMES and r.entity_type in ("PERSON", "LOCATION"):
            # skip this entity
            continue
        filtered.append(r)
    return filtered

In [None]:
def drop_false_entities(results, text):
    bad_locations = {"campos de ferias", "viajar", "prontos", "phda", "sinto", "estive", "amanhã", "hiperfoca", "concentrar-me", "ir de erasmus", "🥹", "altos", "associam", "montes", "square", "entristece", "relembro-me", "fechei-me", "iria", "levo-os", "esteja<3", "volei", "gym", "tenho", "bombeiro", "deparome", "odevia", "go", "obrigada", "miuda", "backstory", "apeteceu-nos", "ocd", "terra", "usei-as", "regresso", "aquashow", "castração", "invençºao", "aikido", "aspartame", "coca cola", "procastinar", "fui", "uber", "desligar-me", "harvard", "calor", "overthinker", "canal panda", "lichia", "rua", "nonetheless", "mundo", "B", "beijinhos", "compaixão", "psicóloga", "fez-me", "preparámo-nos", "Constant brain chatter\n•", "rua ignoram-me", "portugal", "europa"}       # lowercase for comparison
    bad_locations = {n.lower() for n in bad_locations}
    
    filtered = []
    for r in results:
        span = text[r.start:r.end].lower().strip()
        if span == "gonçalo":
            r.entity_type = "PERSON"
        if r.entity_type == "LOCATION" and span in bad_locations:
            continue  # drop
        filtered.append(r)
    return filtered

In [None]:
def detect_with_exemptions(text, score_threshold=0.65):
    # 1) PUBLIC_FIGURE via spaCy EntityRuler (as you already have)
    pf_results = []
    doc_pf = nlp_pt(text)
    for ent in doc_pf.ents:
        if ent.label_ == "PUBLIC_FIGURE":
            pf_results.append(RecognizerResult("PUBLIC_FIGURE", ent.start_char, ent.end_char, 1.0))
        if ent.label_ == "PET_NAME":
            pf_results.append(RecognizerResult("PET_NAME", ent.start_char, ent.end_char, 1.0))    

    # 2) Presidio
    pres_results = analyzer.analyze(text=text, language="pt", score_threshold=score_threshold)

    # 3) Post-filter PERSON with POS logic (<<< this is the new part)
    pres_results = postfilter_person(pres_results, text, nlp_lang=nlp_pt, min_score_person=0.72)

    # 4) Remove PERSON overlapping PUBLIC_FIGURE
    pf_spans = [(r.start, r.end) for r in pf_results]
    final = []
    for r in pres_results:
        if r.entity_type == "PERSON":
            if any(not (r.end <= s or r.start >= e) for s, e in pf_spans):
                continue
        final.append(r)

    final.extend(pf_results)
    return final

In [None]:
def _apply_non_person_with_logging(text, results, operators, *, fake, consistent_maps=None):
    """
    Replace non-PERSON entities right-to-left with precise logging.
    LOCATIONs are classified as city/country and mapped consistently per entry.
    """
    from copy import deepcopy

    def _run_operator(span_text, cfg):
        name = getattr(cfg, "operator_name", None) or cfg.get("operator_name")
        params = getattr(cfg, "params", None) or cfg.get("params", {}) or {}
        if name == "custom":
            fn = params.get("lambda")
            return fn(span_text) if fn else span_text
        if name == "replace":
            return str(params.get("new_value", ""))
        if name == "mask":
            mchar = str(params.get("masking_char", "*"))
            n = int(params.get("chars_to_mask", len(span_text)))
            from_end = bool(params.get("from_end", False))
            n = max(0, min(n, len(span_text)))
            if n == 0: return span_text
            if n >= len(span_text): return mchar * len(span_text)
            return (span_text[:-n] + mchar * n) if from_end else (mchar * n + span_text[n:])
        return "*" * max(1, len(span_text))

    # copy & sort right-to-left
    res = [deepcopy(r) for r in results if r.entity_type != "PERSON"]
    res.sort(key=lambda r: r.start, reverse=True)

    out = text
    logs = []

    # per-entry maps
    consistent_maps = consistent_maps or {}
    city_map    = consistent_maps.setdefault("LOCATION_CITY", {})     # norm(original) -> fake
    country_map = consistent_maps.setdefault("LOCATION_COUNTRY", {})  # norm(original) -> fake

    for r in res:
        etype = r.entity_type
        before = out[r.start:r.end]
        cfg = operators.get(etype, operators.get("DEFAULT"))
        if cfg is None:
            continue

        if etype == "LOCATION":
            kind = classify_location(before)  # 'city' or 'country'
            key  = _norm(before)
            if kind == "country":
                if key in country_map:
                    after_raw = country_map[key]
                else:
                    # generate a country name
                    after_raw = fake.country()
                    country_map[key] = after_raw
            else:
                if key in city_map:
                    after_raw = city_map[key]
                else:
                    # generate a city name
                    after_raw = fake.city()
                    city_map[key] = after_raw

            after = _match_casing(before, after_raw)
        else:
            # non-LOCATION: use operator as-is
            after = _run_operator(before, cfg)

        out = out[:r.start] + after + out[r.end:]

        logs.append({"entity_type": etype, "before": before, "after": after})

    return out, logs, consistent_maps

In [None]:
df_anon = df[["special_interest", "diary_entry", "selfdefining_memory", "empty_sheet"]]
print("Columns to anonymize:", df_anon.columns.tolist())

In [None]:
fake_operators = {
    "PHONE_NUMBER": OperatorConfig("custom", {"lambda": lambda x: fake.phone_number()}),
    "EMAIL_ADDRESS": OperatorConfig("custom", {"lambda": lambda x: fake.email()}),
    "LOCATION": OperatorConfig("custom", {"lambda": lambda x: x}),
    "PUBLIC_FIGURE": OperatorConfig("custom", {"lambda": lambda x: x}),
    "PET_NAME": OperatorConfig("custom", {"lambda": lambda x: x}),
    "CREDIT_CARD": OperatorConfig("custom", {"lambda": lambda x: fake.credit_card_number()}),
    "ORGANIZATION": OperatorConfig("custom", {"lambda": lambda x: x}),
    "DEFAULT": OperatorConfig(operator_name="mask", params={'chars_to_mask': 10, 'masking_char': '*', 'from_end': False}),
}

In [None]:
modified_count = 0
entity_counter = Counter()
change_log = []

for idx, row in df_anon.iterrows():
    for column in df_anon.columns:
        cell_value = row[column]
        if pd.notna(cell_value) and str(cell_value).strip():
            text = str(cell_value)
            
            results = detect_with_exemptions(text)
            results = drop_false_entities(results, text)
            results = drop_ignored_names(results, text)
            
            text_after_person, results_without_person, person_repl = _replace_persons_with_gendered_fakes(
                text, results, fake
            )
            
            per_entry_maps = {}  # resets every entry; move outside loop for global consistency
            text_final, non_person_repl, per_entry_maps = _apply_non_person_with_logging(
                text_after_person,
                results_without_person,
                fake_operators,
                fake=fake,
                consistent_maps=per_entry_maps
            )
            
            # result_obj = anonymizer.anonymize(
            #     text=text_after_person,
            #     analyzer_results=results_without_person,
            #     operators=fake_operators
            # )
            # anonymized_text = result_obj.text
            
            anonymized_text = text_final
            all_replacements = person_repl + non_person_repl
            
            # tmp_text = text_final
            # items_desc = sorted(result_obj.items, key=lambda it: it.start, reverse=True)
            
            # presidio_repl = []
            # for it in items_desc:
            #     before_span = tmp_text[it.start:it.end]   # correct "before" in current state
            #     presidio_repl.append({
            #         "entity_type": it.entity_type,
            #         "before": before_span,
            #         "after": it.text
            #     })
            #     # apply the replacement so subsequent indices stay valid
            #     tmp_text = tmp_text[:it.start] + it.text + tmp_text[it.end:]
            # all_replacements = person_repl + presidio_repl
            
            if anonymized_text != cell_value:
                modified_count += 1
                for rep in all_replacements:
                    entity_counter[rep["entity_type"]] += 1
                change_log.append({
                    "row": idx,
                    "column": column,
                    "original": cell_value,
                    "anonymized": anonymized_text,
                    "replacements": all_replacements
                })

            df_anon.at[idx, column] = anonymized_text

# ---- summary ----
lines = []
lines.append("="*60)
lines.append(f"Total modified entries: {modified_count}")
lines.append("Entities replaced (by type):")
for ent, count in entity_counter.most_common():
    lines.append(f"  {ent}: {count}")

lines.append("="*60)
lines.append("Example changes:")
for log in change_log:
    lines.append(f"Row {log['row']} | Col {log['column']}")
    for rep in log["replacements"]:
        lines.append(f"  {rep['entity_type']}: {rep['before']} -> {rep['after']}")
    lines.append("\n")
    lines.append(f"Original  : {log['original']}\n")
    lines.append(f"Anonymized: {log['anonymized']}\n")
    lines.append("-"*40)

# Join as one string
summary_text = "\n".join(lines)
# Save to Markdown file
with open("data/anonymization_summary.md", "w", encoding="utf-8") as f:
    f.write(summary_text)

print("Summary saved to data/anonymization_summary.md")

In [None]:
df = pd.concat([df.drop(columns=df_anon.columns), df_anon], axis=1)
df.head()

In [None]:
df.to_pickle("../../data/adhd-beliefs-pt/adhd-beliefs-pt-anonymized.pkl")