In [2]:
%load_ext autoreload
%autoreload 2

import pandas as pd
import random
import string
from datetime import datetime

def modify_ssn(ssn, dl):
    """
    Apply random changes to SSN value.
    Below code modification logic is generated by GPT4
    """
    def format_ssn(ssn):
        if random.choice([True, False]):
            ssn = f"{ssn[:3]}-{ssn[3:5]}-{ssn[5:]}"
        return ssn
    
    if dl < 0:
        action = random.choice(['add', 'remove'])
        if action == 'add':
            num_additions = random.randint(1, 3)
            for _ in range(num_additions):
                if len(ssn) < 13:  # An arbitrary limit for the SSN length
                    index_to_add = random.randint(0, len(ssn))
                    new_digit = random.choice(string.ascii_letters)
                    ssn = ssn[:index_to_add] + str(new_digit) + ssn[index_to_add:]
        elif action == 'remove':
            num_removals = random.randint(1, 3)
            for _ in range(num_removals):
                if len(ssn) > 5:  # Ensuring the SSN doesn't get too short
                    index_to_remove = random.randint(0, len(ssn) - 1)
                    ssn = ssn[:index_to_remove] + ssn[index_to_remove+1:]
    
    if dl < 1:
        action = random.choice(['hyphenate','dot'])
        if action == 'hyphenate' and ssn.count('-') < 2:  # Limit to 2 hyphens
            index_to_hyphenate = random.randint(1, len(ssn)-1)  # Random position for hyphen insertion
            while ssn[index_to_hyphenate] == '-':  # Ensure we're not adding a hyphen next to another
                index_to_hyphenate = random.randint(1, len(ssn)-1)
            ssn = ssn[:index_to_hyphenate] + '-' + ssn[index_to_hyphenate:]
        elif action == 'dot' and ssn.count('.') < 2:  # Limit to 2 hyphens
            index_to_hyphenate = random.randint(1, len(ssn)-1)  # Random position for hyphen insertion
            while ssn[index_to_hyphenate] == '.':  # Ensure we're not adding a hyphen next to another
                index_to_hyphenate = random.randint(1, len(ssn)-1)
            ssn = ssn[:index_to_hyphenate] + '.' + ssn[index_to_hyphenate:]

    if dl == 1:
        ssn = format_ssn(ssn)

    return ssn


def modify_zip(zip, dl):
    """
    Apply random changes to ZIP value.
    Below code modification logic is generated by GPT4
    """
    def add_plus_four(zip_code):
        if random.choice([True, False]):
            plus_four = ''.join([str(random.randint(0, 9)) for _ in range(4)])
            zip_code = f"{zip_code}-{plus_four}"
        return zip_code

    if dl < 0:
        action = random.choice(['add', 'remove'])
        if action == 'add':
            num_additions = random.randint(1, 3)
            for _ in range(num_additions):
                if len(zip) < 10:  # An arbitrary limit for the zip length
                    index_to_add = random.randint(0, len(zip))
                    new_digit = random.randint(0, 9)
                    zip = zip[:index_to_add] + str(new_digit) + zip[index_to_add:]
        elif action == 'remove':
            num_removals = random.randint(1, 3)
            for _ in range(num_removals):
                if len(zip) > 3:  # Ensuring the zip doesn't get too short
                    index_to_remove = random.randint(0, len(zip) - 1)
                    zip = zip[:index_to_remove] + zip[index_to_remove+1:]
    if dl < 1:
        action = random.choice(['hyphenate','dot'])
        if action == 'hyphenate' and zip.count('-') < 2:  # Limit to 2 hyphens
            index_to_hyphenate = random.randint(1, len(zip)-1)  # Random position for hyphen insertion
            while zip[index_to_hyphenate] == '-':  # Ensure we're not adding a hyphen next to another
                index_to_hyphenate = random.randint(1, len(zip)-1)
            zip = zip[:index_to_hyphenate] + '-' + zip[index_to_hyphenate:]
        elif action == 'dot' and zip.count('.') < 2:  # Limit to 2 hyphens
            index_to_hyphenate = random.randint(1, len(zip)-1)  # Random position for hyphen insertion
            while zip[index_to_hyphenate] == '.':  # Ensure we're not adding a hyphen next to another
                index_to_hyphenate = random.randint(1, len(zip)-1)
            zip = zip[:index_to_hyphenate] + '.' + zip[index_to_hyphenate:]
    
    if dl == 1:
        zip = add_plus_four(zip)

    return zip


def modify_date(dob_str, dl):
    """
    Apply random changes to date value.
    Below code modification logic is generated by GPT4
    """
    def randomly_modify_formatted_dob(dob_str):
        if dl == -1:
            action = random.choice(["add", "remove"])
        elif dl == 0:
            action = random.choice(["change"])
        index = random.randint(0, len(dob_str) - 1)
        change_length = random.randint(1, 3)
        if action == "add":
            for _ in range(change_length):
                dob_str = dob_str[:index] + random.choice(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9']) + dob_str[index:]
        elif action == "remove":
            if index + change_length > len(dob_str):
                change_length = len(dob_str) - index
            dob_str = dob_str[:index] + dob_str[index + change_length:]
        elif action == "change":
            for _ in range(change_length):
                if index < len(dob_str) and dob_str[index].isdigit():
                    dob_str = dob_str[:index] + str(random.randint(0,9)) + dob_str[index + 1:]
                    index += 1
        
        return dob_str

    formats = [
        "%Y-%m-%d", 
        "%d/%m/%Y", 
        "%m-%d-%Y", 
        "%d %B %Y", 
        "%B %d, %Y", 
        "%d %b %Y", 
        "%d.%m.%Y"
    ]
    try:
        dob_date = datetime.strptime(dob_str, "%Y%m%d")
        new_format = random.choice(formats)
        new_dob_str = dob_date.strftime(new_format)
    except ValueError:
        new_dob_str = dob_str

    if dl < 1: 
        new_dob_str = randomly_modify_formatted_dob(new_dob_str)
    
    return new_dob_str


def modify_name(name, dl):
    """
    Apply random changes to person name value.
    Below code modification logic is generated by GPT4
    """
    def random_case_with_numbers(s):
        new_string = []
        for char in s:
            if dl == 1: 
                action = random.choice(["change_case", "keep_as_is"])
            elif dl == 0:
                action = random.choice(["insert_number"])
            elif dl == -1:
                action = random.choice(["insert_punctuation"])
            
            if action == "change_case":
                new_char = char.upper() if random.choice([True, False]) else char.lower()
            elif action == "insert_number":
                new_char = random.choice('0123456789')
            elif action == "insert_punctuation":
                new_char = random.choice(string.punctuation)
            else:
                new_char = char

            new_string.append(new_char)

        return ''.join(new_string)
    
    name = random_case_with_numbers(name)

    return name


def modify_race(r, dl):
    """
    Apply random changes to race value.
    Below code modification logic is generated by GPT4
    """
    race_categories_1 = {
        'AA': ['African American', 'Afro-Caribbean', 'Black'],
        'WHITE': ['White', 'Caucasian', 'European American'],
        'ASIAN': ['Asian', 'East Asian', 'South Asian', 'Southeast Asian'],
        'AIAN': ['American Indian', 'Alaska Native'],
        'NHOPI': ['Native Hawaiian', 'Pacific Islander'],
        'OTHER': ['Other', 'Mixed Race', 'Unspecified']
    }

    race_categories_0 = {
        'AA': ['B', 'Afro-Latinx', 'Afro-American'],
        'WHITE': ['European', 'Anglo', 'Euro-Canadian', 'W'],
        'ASIAN': ['Central Asian', 'West Asian', 'Chinese', 'Japanese'],
        'AIAN': ['First Nations', 'Indigenous American'],
        'NHOPI': ['Polynesian', 'Micronesian', 'Melanesian'],
        'OTHER': ['Multiethnic', 'Biracial', 'Multirace']
    }

    def generate_race_name(category):
        if dl == 1:
            if category in race_categories_1:
                category = random.choice(race_categories_1[category])
        else:
            if category in race_categories_0:
                category = random.choice(race_categories_0[category])
        return category

    r = generate_race_name(r)

    return r


def modify_gender(g, dl):
    """
    Apply random changes to gender value.
    Below code modification logic is generated by GPT4
    """
    gender_categories_0 = {
        'MALE': ['M', 'Guy', 'Dude', 'Gentleman', 'Mister'],
        'FEMALE': ['F', 'Lady', 'Gal', 'Miss', 'Madam'],
        'OTHER': ['Genderfluid', 'Agender', 'Two-Spirit', 'Gender Non-Conforming']
    }

    gender_categories_1 = {
        'MALE': ['Male', 'Man'],
        'FEMALE': ['Female', 'Woman'],
        'OTHER': ['Non-Binary', 'Genderqueer', 'Unspecified']
    }

    def generate_gender_name(category):
        if dl == 1:
            if category in gender_categories_1:
                return random.choice(gender_categories_1[category])
        else:
            if category in gender_categories_0:
                return random.choice(gender_categories_0[category])
        return category
    
    g = generate_gender_name(g)

    return g


def damage(value, pii_name, damage_level):
    if value != "":
        if pii_name in ["dob", "ed"]:
            value = modify_date(value, damage_level)
        elif pii_name in ["fn", "ln"]:
            value = modify_name(value, damage_level)
        elif pii_name in ["zip"]:
            value = modify_zip(value, damage_level)
        elif pii_name in ["ssn"]:
            value = modify_ssn(value, damage_level)
        elif pii_name in ["r"]:
            value = modify_race(value, damage_level)
        elif pii_name in ["g"]:
            value = modify_gender(value, damage_level)
        else:
            value = value
    return value


def damage_level():
    r = random.random()
    if r > 0.6:
        # light damage. The value is close to the ground truth.
        return 1
    elif r > 0.3: 
        # medium damage. 
        return 0
    else:
        # heavey damage. The value is far from the ground truth
        return -1

In [5]:
ground_truth_data = pd.read_parquet("data/pii_cln", engine='fastparquet')

piis = ["ssn", "zip", "dob", "r", "g", "ln", "fn", "ed"]

for pii in piis:
    ground_truth_data[f"{pii}_dl"] = ground_truth_data[pii].map(lambda x: damage_level())
    ground_truth_data[f"{pii}_org"] = ground_truth_data.apply(lambda x: damage(x[pii], pii, x[f"{pii}_dl"]), axis=1)

org_columns = [f"{pii}_org" for pii in piis]
dl_columns = [f"{pii}_dl" for pii in piis]
ground_truth_data[piis+org_columns+dl_columns+["src","id"]].to_csv("data/pii_org.csv", index=False)
ground_truth_data[org_columns+["src","id"]].to_csv("data/pii_test.csv", index=False)

