In [0]:
# Install necessary Libraries

In [0]:
%pip install Faker==37.6.0 dbldatagen==0.4.0.post1

In [0]:
%restart_python

In [0]:
import re
import string
import random
import chispa
from pyspark.sql import functions as F
from pyspark.sql import Window
from pyspark.sql.types import StructType, StructField, StringType, LongType, TimestampType, DateType, IntegerType
from pyspark.sql.functions import udf
import dbldatagen as dg
from dbldatagen import DataGenerator, fakerText, FakerTextFactory
from faker import Faker
from faker.providers import internet


In [0]:

# The schema for users
phoenix101_schema = StructType([
    StructField("user_id", IntegerType(), False),
    StructField("firstname", StringType(), False),
    StructField("lastname", StringType(), False),
    StructField("email_address", StringType(), False),
    StructField("phone_number", StringType(), False),
    StructField("created_at", TimestampType(), False),
    StructField("ingestion_timestamp", DateType(), False),
    StructField("source_system", StringType(), False)
    ]
)


In [0]:
# Set seed for reproducibility
SEED_NUMBER = 42
random.seed(SEED_NUMBER)

def random_space() -> str:
    """
    Generates a random space character

    Returns:
        str: A random space character
    """
    return random.choice(["", " ", "   "])

def filter_remove_unwanted_characters(local_part: str) -> str:
    """
    Filters out unwanted characters from the local part of an email address.

    Args:
        local_part (str): The local part of an email address.

    Returns:
        str: The filtered local part of an email address.
    """
    local = re.sub(r"[^A-Za-z0-9._+\-']", "", local_part)
    return re.sub(r"\.+", ".", local).strip(".")

def random_local_part(firstname: str, lastname: str, min_length: int = 6) -> str:
    """
    Generates a random local part for an email address with better randomness
    and avoids very short predictable outputs.

    Args:
        firstname (str): The firstname of the user.
        lastname (str): The lastname of the user.
        min_length (int): Minimum length of the local part.

    Returns:
        str: The generated local part for an email address.
    """

    firstname = filter_remove_unwanted_characters(firstname or "").lower()
    lastname = filter_remove_unwanted_characters(lastname or "").lower()

    # Pick random slice lengths (at least 2 chars if possible)
    fn_slice = firstname[: random.randint(2, len(firstname))] if firstname else ""
    ln_slice = lastname[: random.randint(2, len(lastname))] if lastname else ""

    # Random numeric or alphanumeric tail
    tail = ''.join(random.choices(string.ascii_lowercase + string.digits, k=6))

    # Combine and shuffle pieces
    parts = [fn_slice, ln_slice]
    random.shuffle(parts)
    parts.append(tail)
    local_part = "".join(parts)

    # Enforce minimum length by padding with random chars
    while len(local_part) < min_length:
        local_part += random.choice(string.ascii_lowercase + string.digits)

    return local_part


@udf("string")
def create_email_address(firstname, lastname, domain):
    error_space = random_space()
    return random_local_part(firstname, lastname) + error_space + '@' + domain


In [0]:
fake = Faker(locale="UK")
Faker.seed(SEED_NUMBER)
FakerTextUK = FakerTextFactory(locale=["en_UK"], providers=["internet"])

source_systems = ["PHOENIX101", "PHOENIX-1"]

@udf("string")
def generate_antonburg_phone(n: int) -> str:
    rand_val = random.random()
    if rand_val <= 0.5:
        return f"0543-000-{n:07d}"
    else:
        return f"0543000{n:07d}" + random_space()


In [0]:
partitions_requested = 8
data_rows = 5_000_000

spark.conf.set("spark.sql.shuffle.partitions", partitions_requested)


phoenix101_faker_dataspec = (
    dg.DataGenerator(spark, rows=data_rows, startingId=1, partitions=partitions_requested)
    .withIdOutput()
    .withSchema(phoenix101_schema)
)

phoenix101_faker_dataspec = (
    phoenix101_faker_dataspec.withColumnSpec(
        "firstname", text=FakerTextUK("first_name")
    )
    .withColumnSpec("lastname", percentNulls=0.1, text=FakerTextUK("last_name"))
    .withColumnSpec("phone_number", percentNulls=0.23, text=FakerTextUK("phone_number"))
    .withColumnSpec(
        "created_at",
        percentNulls=0.15,
        data_range=dg.DateRange(
            "2017-06-01 01:33:00", "2018-01-01 01:33:00", "1 minute"
        ),
        random=True,
    )
    .withColumnSpec("ingestion_timestamp", values=["2018-06-12 02:15:00"])
    .withColumnSpec("source_system", values=["PHOENIX101"])
)

df_phoenix101_faker = phoenix101_faker_dataspec.build()


df_phoenix101_faker = df_phoenix101_faker.withColumn("domain_name", F.lit("example.com")).withColumn(
    "email_address", create_email_address("firstname", "lastname", "domain_name")
)
df_phoenix101_faker = df_phoenix101_faker.withColumn(
    "phone_number", generate_antonburg_phone("id")
)

schema_cols = [field.name for field in phoenix101_schema.fields]

# Select those columns from the DataFrame
df_phoenix101_faker_selected = df_phoenix101_faker.select(*schema_cols)
display(df_phoenix101_faker_selected)

### Testing to make sure we have unique values

In [0]:
# Tests to make sure no duplicates

def test_unique_phone_numbers(df):
    dups_phone_number_count = (
        df.groupBy("phone_number")
          .count()
          .filter(F.col("count") > 1)
    ).count()
    assert dups_phone_number_count == 0

def test_unique_user_id(df):
    dups_user_id_count = (
        df.groupBy("user_id")
          .count()
          .filter(F.col("count") > 1)
    ).count()
    assert dups_user_id_count == 0

def test_unique_email_address(df):
    dups_email_address_count = (
        df.groupBy("email_address")
          .count()
          .filter(F.col("count") > 1)
    ).count()
    assert dups_email_address_count == 0

In [0]:
test_unique_phone_numbers(df_phoenix101_faker_selected)

In [0]:
test_unique_user_id(df_phoenix101_faker_selected)

In [0]:
test_unique_email_address(df_phoenix101_faker_selected)

Testing looks good, over to splitting the Data

### Split the Data

In [0]:
# Split dataframe into 10% and 90%
df_10pct, df_rest = df_phoenix101_faker_selected.randomSplit([0.1, 0.9], seed=SEED_NUMBER)

In [0]:
# Write split data into tables
df_10pct.write.mode("overwrite").saveAsTable("securehome.raw.phoenix_sample_10pct")
df_rest.write.mode("overwrite").saveAsTable("securehome.raw.phoenix_main_90pct")

In [0]:
# Write split data into volumes
df_10pct.write.mode("overwrite").format("parquet").save("/Volumes/securehome/raw/phoenix/phoenix_10pct_data")
df_rest.write.mode("overwrite").format("parquet").save("/Volumes/securehome/raw/phoenix/phoenix_90pct_data")

In [0]:
%sql
-- overview of the sample dataset
SELECT *
FROM securehome.raw.phoenix_sample_10pct
LIMIT 10;

In [0]:
%sql
-- overview of the main dataset
SELECT *
FROM securehome.raw.phoenix_main_90pct
LIMIT 10;