
# PII Redactor (Transformers NER) — Ordered Compact Notebook

**Order requested:**  
1) Imports, setup, configs → 2) Pipeline loader & helpers → 3) Fine-tuning → 4) Run redaction  
The final run prefers the **fine‑tuned** checkpoint if it exists; otherwise, it uses the **base** model.


In [17]:
from pathlib import Path
import re
import pandas as pd

ALLOW_REGEX_STRUCTURED = False

MODEL_NAME = "Davlan/xlm-roberta-base-ner-hrl"

INPUT_PATH  = Path("input.txt")
OUTPUT_PATH = Path("output.txt")
CSV_PATH    = Path("redaction_comparison.csv")

FINETUNE_DIR = Path("finetuned-ner")
FINETUNE_DIR.mkdir(parents=True, exist_ok=True)


EMAIL_RE = PHONE_RE = URL_RE = IPV4_RE = None
if ALLOW_REGEX_STRUCTURED:
    EMAIL_RE = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
    PHONE_RE = re.compile(r"(?<!\d)(?:\+?\d[\s()./\-]*){7,15}\d")
    URL_RE   = re.compile(r"(?:https?://\S+|www\.\S+)")
    IPV4_RE  = re.compile(r"\b(?:(?:25[0-5]|2[0-4]\d|1?\d?\d)\.){3}(?:25[0-5]|2[0-4]\d|1?\d?\d)\b")


TAG_MAP = {
    "PER": "PERSON", "PERSON": "PERSON",
    "ORG": "ORG", "ORGANIZATION": "ORG",
    "LOC": "ADDRESS", "GPE": "ADDRESS", "LOCATION": "ADDRESS", "MISC": "ADDRESS",
    "DATE": "DATE",
}
REPLACERS = {
    "PERSON": "[REDACTED:PERSON]",
    "ORG": "[REDACTED:ORG]",
    "ADDRESS": "[REDACTED:ADDRESS]",
    "DATE": "[REDACTED:DATE]",
    "EMAIL": "[REDACTED:EMAIL]",
    "PHONE": "[REDACTED:PHONE]",
    "URL": "[REDACTED:URL]",
    "IP": "[REDACTED:IP]",
}

In [18]:


from transformers import pipeline

def load_ner_pipeline():
   
    use_path = FINETUNE_DIR if any(FINETUNE_DIR.glob("*")) else MODEL_NAME
    print("Loading NER from:", use_path)
    ner = pipeline("token-classification", model=str(use_path), aggregation_strategy="simple")
    return ner, use_path

def collect_spans(text, ner):
  
    ml_spans = []
    for ent in ner(text):
        label = TAG_MAP.get(ent.get("entity_group", ""), None)
        if not label:
            continue
        ml_spans.append((label, int(ent["start"]), int(ent["end"])))


    rx_spans = []
    if ALLOW_REGEX_STRUCTURED:
        if EMAIL_RE:
            for m in EMAIL_RE.finditer(text):
                rx_spans.append(("EMAIL", m.start(), m.end()))
        if PHONE_RE:
            for m in PHONE_RE.finditer(text):
                digits = re.sub(r"\D", "", m.group(0))
                if 7 <= len(digits) <= 15:
                    rx_spans.append(("PHONE", m.start(), m.end()))
        if URL_RE:
            for m in URL_RE.finditer(text):
                rx_spans.append(("URL", m.start(), m.end()))
        if IPV4_RE:
            for m in IPV4_RE.finditer(text):
                rx_spans.append(("IP", m.start(), m.end()))
    return ml_spans, rx_spans

def redact_and_save(text, ml_spans, rx_spans):
   
    spans = sorted(ml_spans + rx_spans, key=lambda x: (x[1], x[2]))
    merged = []
    for lab, s, e in spans:
        if merged and s <= merged[-1][2]:
            prev_lab, ps, pe = merged[-1]
            merged[-1] = (prev_lab, ps, max(pe, e))
        else:
            merged.append((lab, s, e))

    chars = list(text)
    for lab, s, e in reversed(merged):
        tag = REPLACERS.get(lab, "[REDACTED]")
        chars[s:e] = list(tag)
    redacted_text = "".join(chars)

    
    orig_lines = text.splitlines()
    red_lines  = redacted_text.splitlines()
    while len(red_lines) < len(orig_lines): red_lines.append("")
    while len(orig_lines) < len(red_lines): orig_lines.append("")
    df = pd.DataFrame({"line": range(1, len(orig_lines)+1), "input": orig_lines, "redacted": red_lines})
    OUTPUT_PATH.write_text(redacted_text, encoding="utf-8")
    df.to_csv(CSV_PATH, index=False)
    return redacted_text, df


In [19]:

from datasets import Dataset
from transformers import AutoTokenizer, AutoModelForTokenClassification, DataCollatorForTokenClassification, TrainingArguments, Trainer
import numpy as np

toy_sentences = [
    "Jane Doe met ACME in London on 12 June 2024 .",
    "Carlos visited New York City on 2023-05-17 for ACME Corp .",
]

label_list = ["O", "B-PER", "I-PER", "B-ORG", "I-ORG", "B-LOC", "I-LOC", "B-DATE", "I-DATE"]
label_to_id = {l:i for i,l in enumerate(label_list)}
id_to_label = {i:l for i,l in enumerate(label_list)}

def annotate_sentence(sent):
    words = sent.split()
    tags = ["O"] * len(words)
    for i,w in enumerate(words):
        if w in {"Jane","Carlos"}: tags[i] = "B-PER"
        if w in {"Doe"}:           tags[i] = "I-PER"
        if w in {"ACME"}:          tags[i] = "B-ORG"
        if w in {"Corp"}:          tags[i] = "I-ORG"
        if w in {"New"}:           tags[i] = "B-LOC"
        if w in {"York","City"}:   tags[i] = "I-LOC"
        if w in {"London"}:        tags[i] = "B-LOC"
        if w in {"12","June","2024","2023-05-17"}:
            tags[i] = "B-DATE" if w in {"12","2023-05-17"} else "I-DATE"
    return words, tags

tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME)

tokenized_examples = []
for s in toy_sentences:
    words, tags = annotate_sentence(s)
    enc = tokenizer(words, is_split_into_words=True, truncation=True, return_offsets_mapping=False)
    word_ids = enc.word_ids()
    label_ids = []
    for wi in word_ids:
        label_ids.append(-100 if wi is None else label_to_id[tags[wi]])
    enc["labels"] = label_ids
    tokenized_examples.append(enc)

dataset = Dataset.from_dict({k:[d[k] for d in tokenized_examples] for k in tokenized_examples[0].keys()})
model = AutoModelForTokenClassification.from_pretrained(MODEL_NAME, num_labels=len(label_list), id2label=id_to_label, label2id=label_to_id)
data_collator = DataCollatorForTokenClassification(tokenizer)

training_args = TrainingArguments(
    output_dir=str(FINETUNE_DIR),
    per_device_train_batch_size=8,
    num_train_epochs=1,
    learning_rate=5e-5,
    weight_decay=0.01,
    logging_steps=1,
    save_strategy="epoch",
    remove_unused_columns=False,
)

def compute_metrics(p):
    predictions, labels = p
    preds = np.argmax(predictions, axis=-1)
    mask = labels != -100
    acc = (preds[mask] == labels[mask]).mean() if mask.any() else 0.0
    return {"token_acc": float(acc)}

trainer = Trainer(
    model=model,
    args=training_args,
    train_dataset=dataset,
    eval_dataset=dataset,
    tokenizer=tokenizer,
    data_collator=data_collator,
    compute_metrics=compute_metrics,
)

trainer.train()
trainer.save_model(str(FINETUNE_DIR))
tokenizer.save_pretrained(str(FINETUNE_DIR))
print("Saved fine-tuned checkpoint to:", FINETUNE_DIR.resolve())


  trainer = Trainer(


Step,Training Loss
1,7.167


Saved fine-tuned checkpoint to: /Users/joshuaradzadlaon/vscode/schoolworks/LegalRewriter/finetuned-ner


In [20]:

ner, used_model = load_ner_pipeline()
text = INPUT_PATH.read_text(encoding="utf-8")
ml_spans, rx_spans = collect_spans(text, ner)
redacted_text, df = redact_and_save(text, ml_spans, rx_spans)
print("Model used:", used_model)
print("Saved redacted text to:", OUTPUT_PATH.resolve())
print("Saved comparison CSV to:", CSV_PATH.resolve())
df.head(50)


Loading NER from: finetuned-ner


Device set to use mps:0


Model used: finetuned-ner
Saved redacted text to: /Users/joshuaradzadlaon/vscode/schoolworks/LegalRewriter/output.txt
Saved comparison CSV to: /Users/joshuaradzadlaon/vscode/schoolworks/LegalRewriter/redaction_comparison.csv


Unnamed: 0,line,input,redacted
0,1,John A. Smith from Globex LLC will join on 202...,[REDACTED:ORG] from[REDACTED:ADDRESS] will joi...
1,2,"Meeting venue: 742 Evergreen Terrace, Shelbyvi...","Meeting venue: 742[REDACTED:DATE],[REDACTED:DA..."
2,3,Alternate phone: +44 20 7946 0958; support ema...,Alternate phone: +44 20 7946 0958; support ema...
