In [1]:
import pandas as pd
import torch
from torch.utils.data import Dataset, DataLoader
from transformers import AutoTokenizer, AutoModelForTokenClassification,AutoConfig
import pytorch_lightning as pl
import ast
from torchmetrics import Precision, Recall, F1Score, Accuracy
from pytorch_lightning.callbacks import EarlyStopping
from pytorch_lightning.loggers import TensorBoardLogger
from typing import Optional, Dict, Any, List
import numpy as np
import os
import json
from sklearn import preprocessing
from transformers import DataCollatorForTokenClassification
import torch.nn as nn

In [2]:
import torch
import pandas as pd
from transformers import AutoTokenizer, AutoModel, DebertaV2TokenizerFast, BertTokenizerFast, AutoModelForTokenClassification

# Load model and tokenizer
model_name = "iiiorg/piiranha-v1-detect-personal-information"
piiranha_tokenizer = DebertaV2TokenizerFast.from_pretrained("iiiorg/piiranha-v1-detect-personal-information")
piiranha_model = AutoModelForTokenClassification.from_pretrained(model_name)


device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
piiranha_model.to(device)

# Function to apply PII masking
def mask_pii(text, predictions=None, model=piiranha_model, tokenizer=piiranha_tokenizer, aggregate_redaction=True):
    model.to(device)
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    inputs.to(device)

    if predictions == None:
        with torch.no_grad():
            outputs = model(**inputs)
        predictions = torch.argmax(outputs.logits, dim=-1)
    encoded_inputs = tokenizer.encode_plus(text, return_offsets_mapping=True, add_special_tokens=True)
    offset_mapping = encoded_inputs['offset_mapping']

    masked_text = list(text)
    is_redacting = False
    redaction_start = 0
    current_pii_type = ''

    for i, (start, end) in enumerate(offset_mapping):
        if start == end:
            continue

        label = predictions[0][i].item()
        if label != model.config.label2id['O']:
            pii_type = model.config.id2label[label]
            if not is_redacting:
                is_redacting = True
                redaction_start = start
                current_pii_type = pii_type
            elif not aggregate_redaction and pii_type != current_pii_type:
                apply_redaction(masked_text, redaction_start, start, current_pii_type, aggregate_redaction)
                redaction_start = start
                current_pii_type = pii_type
        else:
            if is_redacting:
                apply_redaction(masked_text, redaction_start, end, current_pii_type, aggregate_redaction)
                is_redacting = False

    if is_redacting:
        apply_redaction(masked_text, redaction_start, len(masked_text), current_pii_type, aggregate_redaction)

    return ''.join(masked_text)

def apply_redaction(masked_text, start, end, pii_type, aggregate_redaction):
    for j in range(start, end):
        masked_text[j] = ''
    if aggregate_redaction:
        masked_text[start] = '[redacted]'
    else:
        masked_text[start] = f'[{pii_type}]'


In [3]:
example_text="<p>My child faozzsd379223 (DOB: May/58) will undergo treatment with Dr. faozzsd379223, office at Hill Road. Our ZIP code is 28170-6392. Consult policy M.UE.227995. Contact number: 0070.606.322.6244. Handle transactions with 6225427220412963. Queries? Email: faozzsd379223@outlook.com.</p>"
print("OG TEXT")
print(example_text)
print("Aggregated redaction:")
masked_example_aggregated = mask_pii(example_text, aggregate_redaction=False)
print(masked_example_aggregated)

Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


OG TEXT
<p>My child faozzsd379223 (DOB: May/58) will undergo treatment with Dr. faozzsd379223, office at Hill Road. Our ZIP code is 28170-6392. Consult policy M.UE.227995. Contact number: 0070.606.322.6244. Handle transactions with 6225427220412963. Queries? Email: faozzsd379223@outlook.com.</p>
Aggregated redaction:
<p>My child[I-USERNAME]DOB:[I-DATEOFBIRTH] undergo treatment with Dr.[I-USERNAME] office at[I-STREET] Our ZIP code is[I-ZIPCODE] Consult policy M.UE.227995. Contact number:[I-TELEPHONENUM] Handle transactions with 6225427220412963. Queries? Email:[I-EMAIL]p>


In [4]:
train_df = pd.read_csv("train.csv")
valid_df = pd.read_csv("validation.csv")

In [5]:
MODEL_NAME = "iiiorg/piiranha-v1-detect-personal-information"

input_col_name, output_col_name = 'source_text', 'mbert_token_classes'
x_train, y_train = train_df[input_col_name].to_list(), train_df[output_col_name].apply(ast.literal_eval).to_list()
x_valid, y_valid = valid_df[input_col_name].to_list(), valid_df[output_col_name].apply(ast.literal_eval).to_list()
train_df['privacy_mask'] = train_df['privacy_mask'].apply(ast.literal_eval)
valid_df['privacy_mask'] = valid_df['privacy_mask'].apply(ast.literal_eval)

In [6]:
def normalize_label(label: str):
    if label.startswith('B-') or label.startswith('I-'):
        return label[2:]
    else: return label

def label2id(label, model):
    if label.startswith('B-'):
        label = 'I-' + label[2:]
    return model.config.label2id[label]

def id2label(id, model):
    label = model.config.id2label[id]
    return normalize_label(label)

In [7]:
text = x_valid[27]
labels = y_valid[27]
s = valid_df.iloc[27]

In [14]:
# miniagent_model = AutoModelForTokenClassification.from_pretrained('model_epoch_03', )
from safetensors.torch import load_file
config = AutoConfig.from_pretrained('pii_model/checkpoints/best_model')
miniagent_model = AutoModelForTokenClassification.from_config(config)
state_dict = load_file('pii_model/checkpoints/best_model/model.safetensors')
miniagent_model.load_state_dict(state_dict)

miniagent_tokenizer = BertTokenizerFast.from_pretrained("google-bert/bert-base-multilingual-cased")
miniagent_model.config.id2label = {0: 'O', 1: 'I-ACCOUNTNUM', 2: 'I-IDCARDNUM'}
miniagent_model.config.label2id = {'O': 0, 'I-ACCOUNTNUM': 1, 'I-IDCARDNUM': 2}

In [16]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [17]:
def make_privacy_mask(model, tokenizer, text, predictions = None):
    model.to(device)
    inputs = tokenizer(text, return_tensors="pt", truncation=True, padding=True)
    inputs.to(device)
    # inputs = {k: v.to(device) for k, v in inputs.items()}

    if predictions == None:
        with torch.no_grad():
            outputs = model(**inputs)
        predictions = torch.argmax(outputs.logits, dim=-1)

    encoded_inputs = tokenizer.encode_plus(text, return_offsets_mapping=True, add_special_tokens=True)
    offset_mapping = encoded_inputs['offset_mapping']
    oid = label2id('O', model)
    privacy_mask = []
    current_pii = None
    for id, (start, end) in zip(list(predictions[0].tolist()), offset_mapping):
        if current_pii is not None and (id == oid or current_pii['label_id'] != id): 
            privacy_mask.append(current_pii)
            current_pii = None
        if id != oid and current_pii is None:
            current_pii = {
                'label_id': id,
                'label': id2label(id, model),
                'start': start+1,
                'end': end,
                'value': text[start:end]
            }
        elif id != oid and current_pii is not None:
            current_pii['end'] = end
            current_pii['value'] = text[current_pii['start']:end]
    return privacy_mask

# text = x_valid[27]
# labels = y_valid[27]
# s = valid_df.iloc[27]
text = '<p>Validatie ID: 663148812, Sociale beveiliging: 862460623.</p><p>Vertrek via: Duplex 69, Schiedam, 156.</p>'
make_privacy_mask(piiranha_model, piiranha_tokenizer, text)
mg_pm = make_privacy_mask(miniagent_model, miniagent_tokenizer, text)
print(mg_pm)
# print(s.privacy_mask)

[{'label_id': 2, 'label': 'IDCARDNUM', 'start': 16, 'end': 24, 'value': ' 6631488'}]


In [11]:
# valid_set = [(yv, xv) for yv, xv in zip(y_valid, x_valid) if 'I-ACCOUNTNUM' in yv or 'I-IDCARDNUM' in yv]
# yv, xv = zip(*valid_set)

yv, xv = y_valid, x_valid

In [18]:
wrong_labels = {'FP':{}, 'FN':{}, 'TP':{}}
true_positives = {}
def score(predicted_mask, mask):
    equals = lambda pii, pred: \
        pred['label'] == pii['label'] and \
        pred['start'] == pii['start'] and \
        pred['end'] == pii['end']
    for pii in mask:
        if any(equals(pii, pred) for pred in predicted_mask):
            label = pii['label']
            if label not in wrong_labels['TP']:
                wrong_labels['TP'][label] = 1
            else:
                wrong_labels['TP'][label] += 1
            if label not in true_positives:
                true_positives[label] = [pii['value']]
            elif len(true_positives[label]) < 50:
                true_positives[label].append(pii['value'])
        if all(not equals(pii, pred) for pred in predicted_mask):
            label = pii['label']
            if label not in wrong_labels['FN']:
                wrong_labels['FN'][label] = 1
            else:
                wrong_labels['FN'][label] += 1
    for pred in predicted_mask:
        if all(not equals(pii, pred) for pii in mask):
            label = pred['label']
            if label not in wrong_labels['FP']:
                wrong_labels['FP'][label] = 1
            else:
                wrong_labels['FP'][label] += 1
    

In [19]:
def batch_privacy_mask(model, tokenizer, texts):
    model.to(device)
    try:
        inputs = tokenizer(texts, return_tensors="pt", truncation=True, padding=True)
        inputs.to(device)
        with torch.no_grad():
            outputs = model(**inputs)
        predictionss = torch.argmax(outputs.logits, dim=-1)
        
        privacy_masks = []
        for (predictions, text) in zip(predictionss, texts):
            encoded_inputs = tokenizer.encode_plus(text, return_offsets_mapping=True, add_special_tokens=True)
            offset_mapping = encoded_inputs['offset_mapping']
            oid = label2id('O', model)
            privacy_mask = []
            current_pii = None
            for id, (start, end) in zip(list(predictions.tolist()), offset_mapping):
                if current_pii is not None and (id == oid or current_pii['label_id'] != id): 
                    privacy_mask.append(current_pii)
                    current_pii = None
                if id != oid and current_pii is None:
                    current_pii = {
                        'label_id': id,
                        'label': id2label(id, model),
                        'start': start+1,
                        'end': end,
                        'value': text[start:end]
                    }
                elif id != oid and current_pii is not None:
                    current_pii['end'] = end
                    current_pii['value'] = text[current_pii['start']:end]
            privacy_masks.append(privacy_mask)
        return privacy_masks
    except: return []

In [20]:
from tqdm import tqdm
wrong_labels = {'FP':{}, 'FN':{}, 'TP':{}}
true_positives = {}

BATCH = 100
for i in tqdm(range(0, len(xv), BATCH)):
    s = valid_df.iloc[i]
    text = xv[i:min(i+BATCH,len(xv)-1)]
    pm = batch_privacy_mask(piiranha_model, piiranha_tokenizer, text)
    for pm, text in zip(pm, text):
        score(pm, s.privacy_mask)
print(wrong_labels)

100%|██████████| 814/814 [07:15<00:00,  1.87it/s]

{'FP': {'GIVENNAME': 20962, 'SURNAME': 9277, 'EMAIL': 7637, 'DATEOFBIRTH': 3326, 'STREET': 3603, 'BUILDINGNUM': 3462, 'TELEPHONENUM': 4609, 'USERNAME': 11474, 'SOCIALNUM': 3131, 'CITY': 8136, 'ZIPCODE': 3327, 'IDCARDNUM': 4206, 'DRIVERLICENSENUM': 2256, 'CREDITCARDNUMBER': 2299, 'PASSWORD': 2567, 'ACCOUNTNUM': 3749, 'TAXNUM': 2211}, 'FN': {'GIVENNAME': 13843, 'USERNAME': 8317, 'TELEPHONENUM': 6151, 'EMAIL': 6834, 'STREET': 4163, 'BUILDINGNUM': 4467, 'IDCARDNUM': 4666, 'SURNAME': 8124, 'ACCOUNTNUM': 3874, 'CITY': 8124, 'DRIVERLICENSENUM': 2478, 'TAXNUM': 3675, 'SOCIALNUM': 2678, 'DATEOFBIRTH': 2184, 'CREDITCARDNUMBER': 3177, 'ZIPCODE': 3570, 'PASSWORD': 2480}, 'TP': {'GIVENNAME': 157, 'USERNAME': 83, 'TELEPHONENUM': 49, 'EMAIL': 66, 'STREET': 37, 'BUILDINGNUM': 33, 'IDCARDNUM': 34, 'SURNAME': 76, 'ACCOUNTNUM': 26, 'CITY': 76, 'DRIVERLICENSENUM': 22, 'TAXNUM': 25, 'SOCIALNUM': 22, 'CREDITCARDNUMBER': 23, 'ZIPCODE': 30, 'PASSWORD': 20, 'DATEOFBIRTH': 16}}





## Testing model

In [31]:
print("Original text:       " + text)
print("Ground truth:        " + s.masked_text)
piiranha_model = AutoModelForTokenClassification.from_pretrained(model_name)

masked = mask_pii(text, model=piiranha_model, tokenizer=piiranha_tokenizer, aggregate_redaction=False)
print("Masked by piiranha:  " + masked)
masked = mask_pii(text, model=miniagent_model, tokenizer=miniagent_tokenizer, aggregate_redaction=False)
print("Masked by miniagent: " + masked)


Original text:       <p>Validatie ID: 663148812, Sociale beveiliging: 862460623.</p><p>Vertrek via: Duplex 69, Schiedam, 156.</p>
Ground truth:        <p>Klant [USERNAME_1] uit [CITY_1] bindt zich aan deze gezondheidsverzekering. Gebruik [PASSWORD_1] voor toegang tot uw polis. Premies worden van rekening [ACCOUNTNUM_1] bij bank National Australia Bank afgeschreven. Krachtsmidden zijn [CREDITCARDNUMBER_1] en [CREDITCARDEXPIRYCREDITCARDCVV_1]. PIN voor verificatie: [PIN_1].</p>


Asking to truncate to max_length but no maximum length is provided and the model has no predefined maximum length. Default to no truncation.


Masked by piiranha:  <p>Validatie ID:[I-IDCARDNUM] Sociale beveiliging:[I-SOCIALNUM]p><p>Vertrek via: Duplex 69,[I-CITY][I-BUILDINGNUM]p>
Masked by miniagent: <p>Validatie ID[I-IDCARDNUM], Sociale beveiliging: 862460623.</p><p>Vertrek via: Duplex 69, Schiedam, 156.</p>
