
#### Run the cell below to install the required packages for Copilot


In [None]:
from faker import Faker
fake = Faker()

In [None]:
# Function to load data based on file_format
def load_data(config_name, location, file_format):
    if file_format == "delta":
        print(f"Loading Delta data from {location}")
        return spark.read.format("delta").load(location)
    elif file_format == "csv":
        print(f"Loading CSV data from {location}")
        return spark.read.option("header", "true").csv(location)
    elif file_format == "parquet":
        print(f"Loading Parquet data from {location}")
        return spark.read.parquet(location)
    elif file_format == "table":
        print(f"Loading table data from {location}")
        return spark.read.load(location)
    else:
        print(f"Unsupported file format {file_format} for {config_name}")
        return None

In [None]:

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType



# Define functions
def get_fake_first_name(seedValue):
    fake.seed_instance(seedValue)
    return fake.first_name()

def get_fake_last_name(seedValue):
    fake.seed_instance(seedValue)
    return fake.last_name()

def get_fake_email(seedValue):
    fake.seed_instance(seedValue)
    return fake.email()

def get_fake_phone_number(seedValue):
    fake.seed_instance(seedValue)
    return fake.phone_number()

def get_fake_address(seedValue):
    fake.seed_instance(seedValue)
    return fake.address()

def get_fake_ssn(seedValue):
    fake.seed_instance(seedValue)
    return fake.ssn()

def get_fake_passport_number(seedValue):
    fake.seed_instance(seedValue)
    return fake.passport_number()

def get_fake_street_address(seedValue):
    fake.seed_instance(seedValue)
    return fake.street_address()

def get_fake_name(seedValue):
    fake.seed_instance(seedValue)
    return fake.name()

def get_fake_user_name(seedValue):
    fake.seed_instance(seedValue)
    return fake.user_name()

def get_fake_postalcode(seedValue):
    fake.seed_instance(seedValue)
    return fake.postcode()

def get_fake_zipcode(seedValue):
    fake.seed_instance(seedValue)
    return fake.zipcode()

def get_fake_country_code(seedValue):
    fake.seed_instance(seedValue)
    return fake.country_code()

def get_fake_date_of_birth(seedValue):
    fake.seed_instance(seedValue)
    birth_date = fake.date_of_birth(minimum_age=18, maximum_age=90)
    return birth_date.strftime('%Y-%m-%d')

# Step 3: Create PySpark UDFs
get_fake_first_name_udf = udf(get_fake_first_name, StringType())
get_fake_last_name_udf = udf(get_fake_last_name, StringType())
get_fake_email_udf = udf(get_fake_email, StringType())
get_fake_phone_number_udf = udf(get_fake_phone_number, StringType())
get_fake_address_udf = udf(get_fake_address, StringType())
get_fake_ssn_udf = udf(get_fake_ssn, StringType())
get_fake_passport_number_udf = udf(get_fake_passport_number, StringType())
get_fake_street_address_udf = udf(get_fake_street_address, StringType())
get_fake_name_udf = udf(get_fake_name, StringType())
get_fake_user_name_udf = udf(get_fake_user_name, StringType())
get_fake_postalcode_udf = udf(get_fake_postalcode, StringType())
get_fake_zipcode_udf = udf(get_fake_zipcode, StringType())
get_fake_country_code_udf = udf(get_fake_country_code, StringType())
get_fake_date_of_birth_udf = udf(get_fake_date_of_birth, StringType())


In [None]:
def get_fake_df(original_df, pii_column_mapping_config):

    if not pii_column_mapping_config:
        return original_df
    
    column_mapping = pii_column_mapping_config #dict(item.split(": ") for item in column_mapping_config)
    print(column_mapping)

    for column, fake_type in column_mapping.items():
        if fake_type == 'first_name':
            original_df = original_df.withColumn(column, get_fake_first_name_udf(original_df[column]))
        elif fake_type == 'last_name':
            original_df = original_df.withColumn(column, get_fake_last_name_udf(original_df[column]))
        elif fake_type == 'email':
            original_df = original_df.withColumn(column, get_fake_email_udf(original_df[column]))
        elif fake_type == 'phone_number':
            original_df = original_df.withColumn(column, get_fake_phone_number_udf(original_df[column]))
        elif fake_type == 'address':
            original_df = original_df.withColumn(column, get_fake_address_udf(original_df[column]))
        elif fake_type == 'ssn':
            original_df = original_df.withColumn(column, get_fake_ssn_udf(original_df[column]))
        elif fake_type == 'passport_number':
            original_df = original_df.withColumn(column, get_fake_passport_number_udf(original_df[column]))
        elif fake_type == 'street_address':
            original_df = original_df.withColumn(column, get_fake_street_address_udf(original_df[column]))
        elif fake_type == 'name':
            original_df = original_df.withColumn(column, get_fake_name_udf(original_df[column]))
        elif fake_type == 'user_name':
            original_df = original_df.withColumn(column, get_fake_user_name_udf(original_df[column]))
        elif fake_type == 'postalcode':
            original_df = original_df.withColumn(column, get_fake_postalcode_udf(original_df[column]))
        elif fake_type == 'zipcode':
            original_df = original_df.withColumn(column, get_fake_zipcode_udf(original_df[column]))
        elif fake_type == 'country_code':
            original_df = original_df.withColumn(column, get_fake_country_code_udf(original_df[column]))
        elif fake_type == 'date_of_birth':
            original_df = original_df.withColumn(column, get_fake_date_of_birth_udf(original_df[column]))

    # Show the updated DataFrame with fake values
    return original_df


In [None]:
def save_as_delta(df, config_name):
    """
    Saves the given DataFrame as a Delta table at the specified location.
    
    :param df: The DataFrame to save.
    :param df_name: The name of the DataFrame (used to create the path).
    """
    # Define the save path using the DataFrame name
    save_path = f"Files/fake_data/{config_name}"

    # Save the DataFrame in Delta format
    df.write.saveAsTable("fake_"+config_name)
