In [11]:
import sys, random, string, re, time
import pandas as pd
import numpy as np


UNIQUE_LABELS = {
    "URL_PERSONAL",
    "EMAIL",
    "USERNAME",
    "STREET_ADDRESS",
    "NAME_STUDENT",
    "PHONE_NUM",
    "ID_NUM"
}

LABEL2ENT_SPECIAL_TOKEN = {l : l + "_TOKEN" for l in UNIQUE_LABELS}
ENT_SPECIAL_TOKEN2LABEL = {l + "_TOKEN": l for l in UNIQUE_LABELS}

ENTITY_SPECIAL_TOKENS = set(LABEL2ENT_SPECIAL_TOKEN.values())

## Load generated essays an associated entities

In [12]:
generated_essays_path = "/home/savkin/2024/PII_Data_Detection/pii/generated_datasets/generated_df_with_essays_final.csv"

df = pd.read_csv(generated_essays_path)

## Replace generated ents with class labels

Accumulate ents into dict

In [13]:
def entities_to_label_dict(row):
    label_dict={"STREET_ADDRESS": [row["STREET_ADDRESS"]],
                "NAME_STUDENT": [row["NAME_STUDENT"]],
                "EMAIL": [row["EMAIL"]],
                "PHONE_NUM": [row["PHONE_NUM"]],
                "ID_NUM": [row["ID_NUM"]],
                "USERNAME": [row["USERNAME"]],
                "URL_PERSONAL": [row["URL_PERSONAL"]]}
    return label_dict

df["label_dict"] = df.agg(entities_to_label_dict, axis=1)

In [14]:
replace_order = [
    "URL_PERSONAL",
    "EMAIL",
    "USERNAME",
    "STREET_ADDRESS",
    "NAME_STUDENT",
    "PHONE_NUM",
    "ID_NUM",
]

def replace_ents_with_labels(row):
    text = row["generated_text"]
    missing_ents_dict = {}
    for ent_label in replace_order:
        ent = row["label_dict"][ent_label]
        assert len(ent) == 1
        ent_text = ent[0]
        missing_ents_dict[ent_label] = ent_text not in text 
        if not missing_ents_dict[ent_label]:
            
            text = text.replace(ent_text, LABEL2ENT_SPECIAL_TOKEN[ent_label])

    row["generated_text_with_ent_labels"] = text
    row["missing_ents_dict"] = missing_ents_dict
    return row

df = df.agg(replace_ents_with_labels, axis=1)

## Tokenize generated texts

In [15]:
from spacy.lang.en import English

en_tokenizer = English().tokenizer
    
def tokenize_with_spacy(text, tokenizer=en_tokenizer):
    tokenized_text = tokenizer(text)
    tokens = [token.text for token in tokenized_text]
    trailing_whitespace = [bool(token.whitespace_) for token in tokenized_text]
    return tokens, trailing_whitespace

def tokenize_df_with_spacy(row):
    tokens, trailing_whitespace = tokenize_with_spacy(row["generated_text_with_ent_labels"])
    row["tokens"] = tokens
    row["trailing_whitespace"] = trailing_whitespace
    row["labels"] = ["O"] * len(tokens)
    return row

df = df.agg(tokenize_df_with_spacy, axis=1)

## Find ents positions

In [16]:
def mark_ent_label_tokens(row):
    ent2pos = {x: [] for x in UNIQUE_LABELS}
    tokens = row["tokens"]
    for i, tok in enumerate(tokens):
        if tok in ENTITY_SPECIAL_TOKENS:
            enity_label = ENT_SPECIAL_TOKEN2LABEL[tok]
            ent2pos[enity_label].append(i)
    row["ent2pos"] = ent2pos
    return row

df = df.agg(mark_ent_label_tokens, axis=1)

## Replace labels with ents

In [17]:
def replace_labels_with_ents(row):
    ent2pos = row["ent2pos"]
    ents_dict = row["label_dict"]

    entity_mentions = [(ent_label, ent_text, pos) for ent_label, ent_text in ents_dict.items() for pos in ent2pos[ent_label]]
    sorted_entity_mentions = sorted(entity_mentions, key=lambda x: x[-1], reverse=True)

    for ent_label, ent_text, pos in sorted_entity_mentions:
        assert len(ent_text) == 1
        ent_text = ent_text[0]
        ent_tokens, ent_trailing_whitespace = tokenize_with_spacy(ent_text)
        ent_bio_tags = ["B-" + ent_label] + ["I-" + ent_label] * (len(ent_tokens) - 1)

        assert len(ent_tokens) == len(ent_trailing_whitespace) == len(ent_bio_tags)
            
        for k, v in [("tokens", ent_tokens), ("trailing_whitespace", ent_trailing_whitespace), ("labels", ent_bio_tags)]:
            row[k].pop(pos)
            row[k][pos:pos] = v

    return row

df = df.agg(replace_labels_with_ents, axis=1)

## Sanity checks

In [18]:
len_df = df[["tokens", "trailing_whitespace", "labels"]].applymap(len)
mask = (len_df["tokens"] == len_df["trailing_whitespace"]) & (len_df["tokens"] == len_df["labels"])
assert mask.astype(int).agg("prod") == 1

## Saving data

In [19]:
import json

save_path = "/home/savkin/2024/PII_Data_Detection/pii/generated_datasets/deleteme.json"
records = df.to_dict(orient="records")

with open(save_path, "w") as file:
    json.dump(records, file, indent=4)

## Generated data analysys