In [16]:
from pyspark.sql import SparkSession, Row, DataFrame
from pyspark.sql.types import DateType
from pyspark.sql.window import Window

from pyspark.sql.functions import (
    col, sha2, concat_ws, count, countDistinct, input_file_name, regexp_extract, broadcast,
    regexp_replace, lag, lead, lit, datediff, current_date, first, floor,
    when, expr, to_date, coalesce, length, trim, min, max, substring,
    row_number, desc, year, month, dayofweek, date_format, collect_list,
    sum as spark_sum, min as spark_min, max as spark_max
)

import glob
import os,shutil
from datetime import datetime
from functools import reduce

os.environ["HADOOP_HOME"] = "C:\\hadoop"
os.environ["PATH"] += ";C:\\hadoop\\bin"

from delta import configure_spark_with_delta_pip

from typing import Union, List
from delta.tables import DeltaTable

In [17]:
# This defines the required fields for the file
required_fields = [
            "GUEST_NAME", "EMAIL_ADDRESS", "BOOKING_NUMBER", "PHONE_NUM",
            "STREET_ADDRESS", "STATE", "BOOKING_DATE", "DEPARTURE_DATE"
        ]
incremental_or_full = "full"
input_dir = "C:/Users/Ronald/data"
archive_path = "C:/Users/Ronald/data/process/archive"

process_dir = os.path.join(input_dir, "process")
failed_dir = os.path.join(input_dir, "failed")

bronze_path = "C:/Users/Ronald/delta/bronze_output/"
stage_path = "C:/Users/Ronald/delta/stg_incremental/"
output_path = "C:/Users/Ronald/delta/customer_aggregate/"
business_insights_path = "C:/Users/Ronald/delta/business_insight/"


In [18]:
builder = (
    SparkSession.builder.appName("VikingCruisesAssessmentApp")
    .master("local[*]")
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
)

spark = configure_spark_with_delta_pip(builder).getOrCreate()

In [19]:
## FILE VALIDATION FUNCTIONS

def read_file(file_path):
    return spark.read.csv(file_path, header=True, inferSchema=True)

def has_required_columns(df, required_fields):
    return set(required_fields).issubset(set(df.columns))

def trim_required_fields(df, required_fields):
    return df.select([
        trim(col(c)).alias(c) if c in required_fields else col(c) for c in df.columns
    ])

def has_null_or_blank_fields(df_trimmed, required_fields):
    null_or_blank_exprs = [col(c).isNull() | (col(c) == "") for c in required_fields]
    return df_trimmed.filter(reduce(lambda x, y: x | y, null_or_blank_exprs)).count() > 0

def has_invalid_booking_numbers(df_trimmed):
    return df_trimmed.filter(~col("BOOKING_NUMBER").startswith("BOOK")).count() > 0

def has_invalid_phone_numbers(df_trimmed):
    df_phone_check = df_trimmed.withColumn("PHONE_NUM_CLEAN", regexp_replace(col("PHONE_NUM"), "[^0-9]", ""))
    return df_phone_check.filter(col("PHONE_NUM") != col("PHONE_NUM_CLEAN")).count() > 0

def validate_file(file_path, required_fields):
    file_name = os.path.basename(file_path)
    try:
        df = read_file(file_path)

        if not has_required_columns(df, required_fields):
            raise ValueError("Missing required columns")

        df_trimmed = trim_required_fields(df, required_fields)

        if has_null_or_blank_fields(df_trimmed, required_fields):
            raise ValueError("Null or blank values in required columns")

        if has_invalid_booking_numbers(df_trimmed):
            raise ValueError("Invalid BOOKING_NUMBER format")

        if has_invalid_phone_numbers(df_trimmed):
            raise ValueError("PHONE_NUM contains non-digit characters")

        return file_name, os.path.abspath(file_path), int(os.path.getmtime(file_path))  # Valid

    except Exception as e:
        print(f"[ERROR] Skipping file '{file_name}': {e}")
        return None

def validate_and_move_files(file_list, required_fields, process_dir, failed_dir):
    """
    Validates files and moves them to process or failed directories.

    Returns:
        List of tuples: (file_name, new_path, timestamp) for valid files.
    """
    file_info = []

    for file in file_list:
        file_name = os.path.basename(file)
        ts = os.path.getmtime(file)

        result = validate_file(file, required_fields)
        if result:
            print(f"File '{result[0]}' passes validation")
            new_path = os.path.normpath(os.path.join(process_dir, file_name))
            shutil.move(file, new_path)
            file_info.append((file_name, new_path, ts))
        else:
            print(f"File '{file}' failed validation")
            shutil.move(file, os.path.join(failed_dir, file_name))
    
    return file_info


In [20]:
##FUNCTION TO LOAD FILE AND ADD ADMIN COLUMNS AND EMAIL_HASH COLUMN

def read_and_enrich_csv(file_name: str, file_path: str, file_ts: int) -> DataFrame:
    """Reads a CSV file and adds file metadata as columns."""
    df = spark.read.csv(file_path, header=True, inferSchema=True)
    df = df.withColumn("source_file_name", lit(file_name)) \
           .withColumn("source_file_path", lit(file_path)) \
           .withColumn("file_modified_unix", lit(file_ts)) \
           .withColumn("EMAIL_HASH", sha2(col("EMAIL_ADDRESS"), 256))

    # Get current columns
    cols = df.columns

    # Move EMAIL_HASH to the front
    new_col_order = ["EMAIL_HASH"] + [col for col in cols if col != "EMAIL_HASH"]

    # Reorder DataFrame
    df = df.select(*new_col_order)
    
    return df

In [21]:
##function to merge dataframe into delta table
def merge_into_delta_table(
    df_new: DataFrame,                        ### New DataFrame to merge  
    delta_path: str,                          ### Path to the Delta Lake table.
    spark: SparkSession,                      ### Active SparkSession
    merge_keys: Union[str, List[str]],        ### Column name(s) to identify unique records (string or list of strings)
    mode: str = "overwrite",                  ### Write mode for Delta (default: "overwrite").
    allow_schema_evolution: bool = True,      ### This accounts for schema evolution
    verbose: bool = True                      ### Whether to print status messages.
) -> None:
    try:
        if isinstance(merge_keys, str):
            merge_keys = [merge_keys]

        if not os.path.exists(delta_path):
            df_new.write \
                .mode(mode) \
                .format("delta") \
                .option("mergeSchema", str(allow_schema_evolution).lower()) \
                .save(delta_path)
            if verbose:
                print(f"✅ Initialized Delta table at {delta_path} from df_new")
        else:
            delta_table = DeltaTable.forPath(spark, delta_path)

            merge_condition = " AND ".join([f"target.{key} = source.{key}" for key in merge_keys])

           # Delete existing records in target that match the new data
            delta_table.alias("target") \
                .merge(
                    df_new.alias("source"),
                    merge_condition
                ) \
                .whenMatchedDelete() \
                .execute()

            # Append new data
            df_new.write \
                .format("delta") \
                .mode("append") \
                .option("mergeSchema", str(allow_schema_evolution).lower()) \
                .save(delta_path)

            if verbose:
                print(f"✅ Deleted + inserted records into Delta table at {delta_path}")

    except Exception as e:
        print(f"❌ Error in merge_into_delta_table: {e}")
        raise

In [22]:
file_list = glob.glob(os.path.join(input_dir, "data_engineer_viking_assessment*.csv"))
print(file_list)

['C:/Users/Ronald/data\\data_engineer_viking_assessment2.csv']


In [23]:
## LOOP THROUGH FILES AND VALIDATE THEM.  MOVE BAD FILES -> Failed,  MOVE GOOD FILES -> Process
file_info = validate_and_move_files(file_list, required_fields, process_dir, failed_dir)

# LOOP THROUGH FILES FOR PROCESSING AND CALL FUNCTION TO READ FILES INTO df_staging_final
df_staging_final: DataFrame = None

for file_name, file_path, file_ts in file_info:
    try:
        df = read_and_enrich_csv(file_name, file_path, file_ts)
        if df_staging_final is None:
            df_staging_final = df
        else:
            df_staging_final = df_staging_final.unionByName(df, allowMissingColumns=True)
    except Exception as e:
        print(f"[ERROR] Failed to read {file_name}: {e}")

File 'data_engineer_viking_assessment2.csv' passes validation


In [24]:
if df_staging_final is not None and df_staging_final.head(1):
    df_staging_final.show()
    df_staging_final.printSchema()
    no_data = False
else:
    no_data = True

+--------------------+---------------+--------------------+--------------+----------+------------------+-----------+-------------------+-------------------+--------------------+--------------------+--------------------+
|          EMAIL_HASH|     GUEST_NAME|       EMAIL_ADDRESS|BOOKING_NUMBER| PHONE_NUM|    STREET_ADDRESS|      STATE|       BOOKING_DATE|     DEPARTURE_DATE|    source_file_name|    source_file_path|  file_modified_unix|
+--------------------+---------------+--------------------+--------------+----------+------------------+-----------+-------------------+-------------------+--------------------+--------------------+--------------------+
|a025ebf243ab0db33...|    Max Collier| xtorres@example.net|BOOK0000119999|8996702946|08898 Smith Tunnel|Connecticut|2025-02-04 00:00:00|2025-04-24 00:00:00|data_engineer_vik...|C:\Users\Ronald\d...|1.7536517581059752E9|
|27e420a016e4ada41...|Douglas Herrera|alexander45@examp...|BOOK0000120000| 209419806|  2827 Ryan Spring|    Vermont|2025

In [25]:
if no_data == False:

# Define a window partitioned by BOOKING_NUMBER and ordered by file_modified_unix descending
    window_spec = Window.partitionBy("BOOKING_NUMBER").orderBy(desc("file_modified_unix"))

# Add a row number so we can pick the latest record per booking
    df_ranked = df_staging_final.withColumn("row_num", row_number().over(window_spec))

# Filter to only the first (most recent) record for each BOOKING_NUMBER
    df_delta = df_ranked.filter("row_num = 1").drop("row_num")

##YOU CAN WRITE DATA VALIDATION OR CLEASINGS RULES HERE##

#merge inremental records into bronze table
    merge_into_delta_table(
        df_new=df_delta,
        delta_path="C:/Users/Ronald/delta/bronze_output/",
        spark=spark,
        merge_keys="BOOKING_NUMBER"
    )

#Archive processed files 
    for file_name, file_path, file_ts in file_info:
        try:
            dest_path = os.path.join(archive_path, file_name)
            shutil.move(file_path, dest_path)
            print(f"Moved: {file_name} → {dest_path}")
        except Exception as e:
            print(f"Error moving {file_name}: {e}")

✅ Deleted + inserted records into Delta table at C:/Users/Ronald/delta/bronze_output/
Moved: data_engineer_viking_assessment2.csv → C:/Users/Ronald/data/process/archive\data_engineer_viking_assessment2.csv


In [64]:
##CUSTOMER AGGREGATE PROCESSING##    

In [27]:
def load_incremental_data(df_delta, bronze_path, stage_path):
    df_delta.write.mode("overwrite").option("overwriteSchema", "true").format("delta").save(stage_path)
    df_email_list = df_delta.select("EMAIL_HASH").distinct()
    df_bronze = spark.read.format("delta").load(bronze_path)
    return df_bronze.join(broadcast(df_email_list), on="EMAIL_HASH", how="inner")

In [28]:
def load_full_data(bronze_path):
    return spark.read.format("delta").load(bronze_path)

In [29]:
def add_phone_metadata(df):
    df = df.withColumn(
        "invalid_phone_ind",
        when(length(col("PHONE_NUM").cast("string")) != 10, True).otherwise(False)
    )
    df = df.withColumn(
        "area_code",
        when(length(col("PHONE_NUM").cast("string")) == 10,
             substring(col("PHONE_NUM").cast("string"), 1, 3)
        ).otherwise(None)
    )
    return df

In [30]:
def aggregate_customer_data(df):
    df_agg = df.groupBy("EMAIL_ADDRESS").agg(
        spark_min("GUEST_NAME").alias("GUEST_NAME"),
        spark_min("PHONE_NUM").alias("PHONE_NUM"),
        spark_min("STREET_ADDRESS").alias("STREET_ADDRESS"),
        spark_min("STATE").alias("STATE"),
        spark_min("BOOKING_DATE").alias("FIRST_BOOKING_DATETIME"),
        spark_max("BOOKING_DATE").alias("LAST_BOOKING_DATETIME"),
        spark_min("DEPARTURE_DATE").alias("FIRST_DEPARTURE_DATETIME"),
        spark_max("DEPARTURE_DATE").alias("LAST_DEPARTURE_DATETIME"),
        countDistinct("BOOKING_NUMBER").alias("BOOKING_COUNT"),
        first("area_code", ignorenulls=True).alias("AREA_CODE"),
        first("invalid_phone_ind", ignorenulls=True).alias("INVALID_PHONE_IND"),
        concat_ws(",", collect_list("BOOKING_NUMBER")).alias("BOOKING_NUMBER_LIST")
    )

    df_agg = df_agg \
        .withColumn("FIRST_BOOKING_DATE", to_date("FIRST_BOOKING_DATETIME")) \
        .withColumn("LAST_BOOKING_DATE", to_date("LAST_BOOKING_DATETIME")) \
        .withColumn("FIRST_DEPARTURE_DATE", to_date("FIRST_DEPARTURE_DATETIME")) \
        .withColumn("LAST_DEPARTURE_DATE", to_date("LAST_DEPARTURE_DATETIME")) \
        .withColumn("FIRST_BOOKING_YEAR", year("FIRST_BOOKING_DATE")) \
        .withColumn("FIRST_BOOKING_MONTH", month("FIRST_BOOKING_DATE")) \
        .withColumn("FIRST_BOOKING_DAY_OF_WEEK", date_format("FIRST_BOOKING_DATE", "EEEE")) \
        .withColumn("LAST_BOOKING_YEAR", year("LAST_BOOKING_DATE")) \
        .withColumn("LAST_BOOKING_MONTH", month("LAST_BOOKING_DATE")) \
        .withColumn("LAST_BOOKING_DAY_OF_WEEK", date_format("LAST_BOOKING_DATE", "EEEE")) \
        .withColumn("EMAIL_HASH", sha2(col("EMAIL_ADDRESS"), 256))

    # Reorder columns to put EMAIL_HASH first
    cols = df_agg.columns
    new_col_order = ["EMAIL_HASH"] + [col for col in cols if col != "EMAIL_HASH"]
    return df_agg.select(*new_col_order)

In [31]:
##AGGREGATE CUSTOMER DATA FOR INCREMENTAL OR FULL LOAD##

if no_data == False or incremental_or_full == "full":

    if incremental_or_full == "incremental":
        df_cust_raw = load_incremental_data(df_delta, bronze_path, stage_path)
    elif incremental_or_full == "full":
        df_cust_raw = load_full_data(bronze_path)

    df_cust_raw = add_phone_metadata(df_cust_raw)
    df_customer_agg = aggregate_customer_data(df_cust_raw)

##YOU CAN WRITE DATA VALIDATION OR CLEASINGS RULES HERE##
    
    merge_into_delta_table(
        df_new=df_customer_agg,
        delta_path=output_path,
        spark=spark,
        merge_keys="EMAIL_HASH"
    )

✅ Deleted + inserted records into Delta table at C:/Users/Ronald/delta/customer_aggregate/


In [70]:
df_customer_agg.count()

96000

In [71]:
df_bronze = spark.read.format("delta").load("C:/Users/Ronald/delta/bronze_output/")
df_bronze.count()

120001

In [73]:
df_cust_agg = spark.read.format("delta").load("C:/Users/Ronald/delta/customer_aggregate/")
df_cust_agg.count()

96000

In [None]:
##BUSINESS INSIGHTS PROCESSING##

In [35]:
def load_delta_table(spark: SparkSession, path: str) -> DataFrame:
    return spark.read.format("delta").load(path)

def enrich_booking_dates(df: DataFrame) -> DataFrame:
    return df.select(
        col("STATE"),
        to_date("BOOKING_DATE").alias("BOOKING_DATE"),
        year("BOOKING_DATE").alias("BOOKING_YEAR"),
        month("BOOKING_DATE").alias("BOOKING_MONTH"),
        dayofweek("BOOKING_DATE").alias("BOOKING_DAY_OF_WEEK"),
        to_date("DEPARTURE_DATE").alias("DEPARTURE_DATE"),
        year("DEPARTURE_DATE").alias("DEPARTURE_YEAR"),
        month("DEPARTURE_DATE").alias("DEPARTURE_MONTH"),
        dayofweek("DEPARTURE_DATE").alias("DEPARTURE_DAY_OF_WEEK"),
        col("EMAIL_HASH")
    )

def calculate_booking_metrics(df: DataFrame) -> DataFrame:
    window_spec = Window.partitionBy("EMAIL_HASH").orderBy("BOOKING_DATE")
    return df.withColumn("CUSTOMER_PREVIOUS_BOOKING_DATE", lag("BOOKING_DATE").over(window_spec)) \
             .withColumn("DAYS_FROM_PREVIOUS_BOOKING", datediff(col("BOOKING_DATE"), col("CUSTOMER_PREVIOUS_BOOKING_DATE"))) \
             .withColumn("CUSTOMERS_BOOKING_ORDER", row_number().over(window_spec))

def join_customer_data(df_bookings: DataFrame, df_customers: DataFrame) -> DataFrame:
    return df_bookings.join(
        df_customers.select(
            col("EMAIL_HASH"),
            col("STATE").alias("CUSTOMERS_CURRENT_STATE"),
            col("AREA_CODE").alias("CUSTOMERS_CURRENT_AREA_CODE"),
            col("LAST_BOOKING_DATE")
        ),
        on="EMAIL_HASH",
        how="left"
    ).withColumn("CUSTOMERS_DAYS_FROM_LAST_BOOKING", datediff(current_date(), col("LAST_BOOKING_DATE")))

def generate_business_insight(df: DataFrame) -> DataFrame:
    return df.groupBy(
        "STATE",
        "BOOKING_DATE",
        "BOOKING_YEAR",
        "BOOKING_MONTH",
        "BOOKING_DAY_OF_WEEK",
        "DEPARTURE_DATE",
        "DEPARTURE_YEAR",
        "DEPARTURE_MONTH",
        "DEPARTURE_DAY_OF_WEEK",
        "DAYS_FROM_PREVIOUS_BOOKING",
        "CUSTOMERS_DAYS_FROM_LAST_BOOKING",
        "CUSTOMERS_BOOKING_ORDER",
        "CUSTOMERS_CURRENT_STATE",
        "CUSTOMERS_CURRENT_AREA_CODE"
    ).agg(
        count("*").cast("long").alias("BOOKING_COUNT")
    )

def save_to_delta_table(df: DataFrame, path: str):
    try:
        df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save(path)
    
        print(f"✅ Table successfully created at {path}")

    except Exception as e:
        print(f"❌ Error in save_to_delta_table: {e}")
        raise

In [36]:
## SYNC BUSINESS INSIGHTS (Full Run) ###

df_bronze = load_delta_table(spark, "C:/Users/Ronald/delta/bronze_output/")
df_customer = load_delta_table(spark, "C:/Users/Ronald/delta/customer_aggregate/")

df_enriched = enrich_booking_dates(df_bronze)
df_metrics = calculate_booking_metrics(df_enriched)
df_with_customer = join_customer_data(df_metrics, df_customer)
df_business_insight = generate_business_insight(df_with_customer)

##YOU CAN WRITE DATA VALIDATION OR CLEASINGS RULES HERE##

save_to_delta_table(df_business_insight, business_insights_path)

✅ Table successfully created at C:/Users/Ronald/delta/business_insight/


In [34]:
##BUSINESS INSIGHTS USE CASES##

In [37]:
##CHECK COUNTS AND SUM##
df_business_insight.agg(spark_sum("BOOKING_COUNT").alias("TOTAL_BOOKINGS")).show()


+--------------+
|TOTAL_BOOKINGS|
+--------------+
|        120001|
+--------------+



In [137]:
## DAYS BETWEEN BOOKINGS FOR REPEAT BOOKERS ##

df_business_insight \
    .groupBy("DAYS_FROM_PREVIOUS_BOOKING") \
    .agg(spark_sum("BOOKING_COUNT").alias("TOTAL_BOOKINGS")) \
    .orderBy(col("DAYS_FROM_PREVIOUS_BOOKING").asc()) \
    .show()

+--------------------------+--------------+
|DAYS_FROM_PREVIOUS_BOOKING|TOTAL_BOOKINGS|
+--------------------------+--------------+
|                      null|         96000|
|                         0|            50|
|                         1|            96|
|                         2|            94|
|                         3|            96|
|                         4|            90|
|                         5|           102|
|                         6|            88|
|                         7|            91|
|                         8|            93|
|                         9|            85|
|                        10|            75|
|                        11|            78|
|                        12|            95|
|                        13|           116|
|                        14|            79|
|                        15|            85|
|                        16|            90|
|                        17|            89|
|                        18|    

In [138]:
## DAYS BETWEEN BOOKINGS GROUPING FOR REPEAT BOOKERS ##

df_business_insight \
    .withColumn(
        "DAYS_BUCKET",
        when(col("DAYS_FROM_PREVIOUS_BOOKING").between(0, 100), "0-100")
        .when(col("DAYS_FROM_PREVIOUS_BOOKING").between(101, 200), "101-200")
        .when(col("DAYS_FROM_PREVIOUS_BOOKING").between(201, 300), "201-300")
        .when(col("DAYS_FROM_PREVIOUS_BOOKING").between(301, 400), "301-400")
        .when(col("DAYS_FROM_PREVIOUS_BOOKING").between(401, 500), "401-500")
        .when(col("DAYS_FROM_PREVIOUS_BOOKING").between(501, 600), "501-600")
        .when(col("DAYS_FROM_PREVIOUS_BOOKING").between(601, 700), "601-700")
        .otherwise("700+")
    ) \
    .groupBy("DAYS_BUCKET") \
    .agg(spark_sum("BOOKING_COUNT").alias("TOTAL_BOOKINGS")) \
    .orderBy("DAYS_BUCKET") \
    .show()

+-----------+--------------+
|DAYS_BUCKET|TOTAL_BOOKINGS|
+-----------+--------------+
|      0-100|          8189|
|    101-200|          5860|
|    201-300|          4148|
|    301-400|          2639|
|    401-500|          1665|
|    501-600|          1034|
|    601-700|           444|
|       700+|         96022|
+-----------+--------------+



In [98]:
##Bookings by States
df_business_insight \
    .groupBy("STATE") \
    .agg(spark_sum("BOOKING_COUNT").alias("TOTAL_BOOKINGS")) \
    .orderBy(col("TOTAL_BOOKINGS").desc()) \
    .show()

+-------------+--------------+
|        STATE|TOTAL_BOOKINGS|
+-------------+--------------+
|      Georgia|          2512|
|     Oklahoma|          2505|
|    Louisiana|          2500|
|        Idaho|          2490|
|New Hampshire|          2481|
|    Tennessee|          2480|
|       Kansas|          2478|
|         Utah|          2471|
|       Nevada|          2471|
|     Kentucky|          2460|
|         Iowa|          2459|
|      Wyoming|          2459|
|Massachusetts|          2454|
|     Virginia|          2442|
|       Oregon|          2442|
|         Ohio|          2440|
|       Hawaii|          2438|
| Pennsylvania|          2437|
|      Vermont|          2434|
| South Dakota|          2430|
+-------------+--------------+
only showing top 20 rows



In [99]:
## NUMBER OF REPEAT CUSTOMERS
df_business_insight \
    .groupBy("CUSTOMERS_BOOKING_ORDER") \
    .agg(spark_sum("BOOKING_COUNT").alias("TOTAL_BOOKINGS")) \
    .orderBy(col("CUSTOMERS_BOOKING_ORDER").asc()) \
    .show()

+-----------------------+--------------+
|CUSTOMERS_BOOKING_ORDER|TOTAL_BOOKINGS|
+-----------------------+--------------+
|                      1|         96000|
|                      2|         21200|
|                      3|          2566|
|                      4|           221|
|                      5|            14|
+-----------------------+--------------+



In [100]:
## BOOKINGS BY MONTH
df_business_insight \
    .groupBy("BOOKING_MONTH") \
    .agg(spark_sum("BOOKING_COUNT").alias("TOTAL_BOOKINGS")) \
    .orderBy(col("BOOKING_MONTH").asc()) \
    .show()

+-------------+--------------+
|BOOKING_MONTH|TOTAL_BOOKINGS|
+-------------+--------------+
|            1|         10170|
|            2|          9298|
|            3|         10153|
|            4|          9678|
|            5|         10356|
|            6|          9908|
|            7|         10076|
|            8|         10188|
|            9|          9737|
|           10|         10133|
|           11|         10184|
|           12|         10120|
+-------------+--------------+



In [None]:
#####DATA VALIDATION CHECKS ######

In [15]:
##CHECK FOR DUPLIDATE BOOKING NUMBERS

df_delta.groupBy("BOOKING_NUMBER").count().filter("count > 1").show()

+--------------+-----+
|BOOKING_NUMBER|count|
+--------------+-----+
+--------------+-----+



In [16]:
##CHECK FOR NULL VALUES
df_nulls = df_bronze.filter(
    col("EMAIL_ADDRESS").isNull() |
    col("PHONE_NUM").isNull() |
    col("STREET_ADDRESS").isNull() |
    col("STATE").isNull() |
    col("BOOKING_NUMBER").isNull() |
    col("BOOKING_DATE").isNull() |
    col("DEPARTURE_DATE").isNull()
)
df_nulls.show(truncate=False)

+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------------+----------------+------------------+
|GUEST_NAME|EMAIL_ADDRESS|BOOKING_NUMBER|PHONE_NUM|STREET_ADDRESS|STATE|BOOKING_DATE|DEPARTURE_DATE|source_file_name|source_file_path|file_modified_unix|
+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------------+----------------+------------------+
+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------------+----------------+------------------+



In [17]:
##CHECK FOR BLANK VALUES
df_blanks = df_bronze.filter(
    (col("EMAIL_ADDRESS") == "") |
    (col("STREET_ADDRESS") == "") |
    (col("STATE") == "") |
    (col("BOOKING_NUMBER") == "")
)
df_blanks.show(truncate=False)

+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------------+----------------+------------------+
|GUEST_NAME|EMAIL_ADDRESS|BOOKING_NUMBER|PHONE_NUM|STREET_ADDRESS|STATE|BOOKING_DATE|DEPARTURE_DATE|source_file_name|source_file_path|file_modified_unix|
+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------------+----------------+------------------+
+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------------+----------------+------------------+



In [18]:
##CHECK FOR LEADING OR TRAILING SPACES

df_spaces = df_bronze.filter(
    (col("EMAIL_ADDRESS") != trim(col("EMAIL_ADDRESS"))) |
    (col("STREET_ADDRESS") != trim(col("STREET_ADDRESS"))) |
    (col("STATE") != trim(col("STATE"))) |
    (col("BOOKING_NUMBER") != trim(col("BOOKING_NUMBER")))
)

df_spaces.show(truncate=False)

+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------------+----------------+------------------+
|GUEST_NAME|EMAIL_ADDRESS|BOOKING_NUMBER|PHONE_NUM|STREET_ADDRESS|STATE|BOOKING_DATE|DEPARTURE_DATE|source_file_name|source_file_path|file_modified_unix|
+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------------+----------------+------------------+
+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------------+----------------+------------------+



In [118]:
df_only_internal_spaces =df_bronze.filter(
    (col("EMAIL_ADDRESS").contains(" ")) &  # any space at all
    (col("EMAIL_ADDRESS") == trim(col("EMAIL_ADDRESS")))  # no leading/trailing
)

df_only_internal_spaces.show(truncate=False)

+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------+-----------------+---------+
|GUEST_NAME|EMAIL_ADDRESS|BOOKING_NUMBER|PHONE_NUM|STREET_ADDRESS|STATE|BOOKING_DATE|DEPARTURE_DATE|email_hash|invalid_phone_ind|area_code|
+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------+-----------------+---------+
+----------+-------------+--------------+---------+--------------+-----+------------+--------------+----------+-----------------+---------+



In [14]:
#####DATA PROFILING#######

In [None]:
##CHECK BOOKING DATE and DEPARTURE DATE VALUES

df_bronze.select(
    min("BOOKING_DATE").alias("min_booking_date"),
    max("BOOKING_DATE").alias("max_booking_date"),
    min("DEPARTURE_DATE").alias("min_departure_date"),
    max("DEPARTURE_DATE").alias("max_departure_date")
).show()

In [77]:
df.select("STATE").distinct().orderBy("STATE").show(60,truncate=False)

+--------------+
|STATE         |
+--------------+
|Alabama       |
|Alaska        |
|Arizona       |
|Arkansas      |
|California    |
|Colorado      |
|Connecticut   |
|Delaware      |
|Florida       |
|Georgia       |
|Hawaii        |
|Idaho         |
|Illinois      |
|Indiana       |
|Iowa          |
|Kansas        |
|Kentucky      |
|Louisiana     |
|Maine         |
|Maryland      |
|Massachusetts |
|Michigan      |
|Minnesota     |
|Mississippi   |
|Missouri      |
|Montana       |
|Nebraska      |
|Nevada        |
|New Hampshire |
|New Jersey    |
|New Mexico    |
|New York      |
|North Carolina|
|North Dakota  |
|Ohio          |
|Oklahoma      |
|Oregon        |
|Pennsylvania  |
|Rhode Island  |
|South Carolina|
|South Dakota  |
|Tennessee     |
|Texas         |
|Utah          |
|Vermont       |
|Virginia      |
|Washington    |
|West Virginia |
|Wisconsin     |
|Wyoming       |
+--------------+



In [6]:
df_email = df.groupBy("EMAIL_ADDRESS") \
  .count() \
  .filter("count > 1") \
  .orderBy("count", ascending=False)

df_email.show(truncate=False)

+-------------------------+-----+
|EMAIL_ADDRESS            |count|
+-------------------------+-----+
|smithjennifer@example.net|5    |
|thomasyoder@example.net  |5    |
|jennifer07@example.net   |5    |
|toni63@example.com       |5    |
|chungstacey@example.org  |5    |
|fieldsnancy@example.org  |5    |
|emilymatthews@example.net|5    |
|phillipsemily@example.org|5    |
|peterwagner@example.org  |5    |
|vvincent@example.com     |5    |
|marco25@example.net      |5    |
|welchkelly@example.org   |5    |
|lorraine59@example.com   |5    |
|daviskimberly@example.net|5    |
|mooreangelica@example.com|4    |
|garzajennifer@example.com|4    |
|charlotte86@example.org  |4    |
|elizabeth62@example.com  |4    |
|jonesmary@example.org    |4    |
|gcooper@example.org      |4    |
+-------------------------+-----+
only showing top 20 rows


In [7]:
df_email.groupBy("count") \
    .count() \
    .orderBy("count") \
    .show()

+-----+-----+
|count|count|
+-----+-----+
|    2|18634|
|    3| 2346|
|    4|  206|
|    5|   14|
+-----+-----+



In [8]:
df.filter(df.EMAIL_ADDRESS == "smithjennifer@example.net").show()

+------------+--------------------+--------------+----------+--------------------+--------+------------+--------------+
|  GUEST_NAME|       EMAIL_ADDRESS|BOOKING_NUMBER| PHONE_NUM|      STREET_ADDRESS|   STATE|BOOKING_DATE|DEPARTURE_DATE|
+------------+--------------------+--------------+----------+--------------------+--------+------------+--------------+
|Dawn Walters|smithjennifer@exa...|  BOOK00001758|3139241653|64277 Ponce Flats...|Maryland|  2024-01-22|    2024-05-04|
|Dawn Walters|smithjennifer@exa...| BOOK000098795|3139241653|64277 Ponce Flats...|Maryland|  2025-02-20|    2025-05-02|
|Dawn Walters|smithjennifer@exa...| BOOK000099623|3139241653|64277 Ponce Flats...|Maryland|  2024-11-02|    2026-01-08|
|Dawn Walters|smithjennifer@exa...|BOOK0000109233|3139241653|64277 Ponce Flats...|Maryland|  2024-01-25|    2024-05-26|
|Dawn Walters|smithjennifer@exa...|BOOK0000113432|3139241653|64277 Ponce Flats...|Maryland|  2024-11-08|    2025-02-17|
+------------+--------------------+-----

In [9]:
df.groupBy("EMAIL_ADDRESS") \
  .agg(countDistinct("STREET_ADDRESS").alias("street_address")) \
  .filter(col("street_address") > 1) \
  .orderBy("street_address", ascending=False) \
  .show(truncate=False)

+-------------+--------------+
|EMAIL_ADDRESS|street_address|
+-------------+--------------+
+-------------+--------------+



In [10]:
df.groupBy("EMAIL_ADDRESS") \
  .agg(countDistinct("PHONE_NUM").alias("unique_phones")) \
  .filter(col("unique_phones") > 1) \
  .orderBy("unique_phones", ascending=False) \
  .show(truncate=False)

+-------------+-------------+
|EMAIL_ADDRESS|unique_phones|
+-------------+-------------+
+-------------+-------------+



In [11]:
df.groupBy("EMAIL_ADDRESS") \
  .agg(countDistinct("STATE").alias("unique_state")) \
  .filter(col("unique_state") > 1) \
  .orderBy("unique_state", ascending=False) \
  .show(truncate=False)

+-------------+------------+
|EMAIL_ADDRESS|unique_state|
+-------------+------------+
+-------------+------------+



In [78]:
df.groupBy("PHONE_NUM") \
  .agg(countDistinct("EMAIL_ADDRESS").alias("email_address")) \
  .filter(col("email_address") > 1) \
  .orderBy("email_address", ascending=False) \
  .show(truncate=False)

+---------+-------------+
|PHONE_NUM|email_address|
+---------+-------------+
+---------+-------------+



In [21]:
df.groupBy("STREET_ADDRESS") \
  .agg(countDistinct("EMAIL_ADDRESS").alias("email_address")) \
  .filter(col("email_address") > 1) \
  .orderBy("email_address", ascending=False) \
  .show(truncate=False)

+-----------------+-------------+
|STREET_ADDRESS   |email_address|
+-----------------+-------------+
|394 Daniel Course|2            |
+-----------------+-------------+



In [23]:
df.filter(df.STREET_ADDRESS == "394 Daniel Course").show()

+------------------+--------------------+--------------+----------+-----------------+--------+------------+--------------+--------------------+
|        GUEST_NAME|       EMAIL_ADDRESS|BOOKING_NUMBER| PHONE_NUM|   STREET_ADDRESS|   STATE|BOOKING_DATE|DEPARTURE_DATE|          email_hash|
+------------------+--------------------+--------------+----------+-----------------+--------+------------+--------------+--------------------+
|   Joseph Bradford|jenniferallen@exa...|  BOOK00009992|1382729454|394 Daniel Course| Vermont|  2024-08-23|    2025-09-08|7c1da51a43b49e7c8...|
|Jacqueline Mcclure| barry93@example.com| BOOK000061242|2785662120|394 Daniel Course|Illinois|  2024-07-04|    2024-09-06|6d01a34353bb93ac6...|
+------------------+--------------------+--------------+----------+-----------------+--------+------------+--------------+--------------------+



In [113]:
#####TYPE 2 CUSTOMER DIMENSION#####

In [None]:
df_customer_raw = df.select(
    "EMAIL_ADDRESS", "BOOKING_DATE", "PHONE_NUM", "STREET_ADDRESS", "STATE"
).withColumn("EMAIL_HASH", sha2(col("EMAIL_ADDRESS"), 256)) \
 .withColumn("BOOKING_DATE", to_date("BOOKING_DATE"))

In [106]:
w = Window.partitionBy("EMAIL_ADDRESS").orderBy("BOOKING_DATE")

df_cust_changes = df_customer_raw.withColumn("prev_phone", lag("PHONE_NUM").over(w)) \
    .withColumn("prev_address", lag("STREET_ADDRESS").over(w)) \
    .withColumn("prev_state", lag("STATE").over(w)) \
    .withColumn("change_flag",
        when(
            (col("PHONE_NUM") != col("prev_phone")) |
            (col("STREET_ADDRESS") != col("prev_address")) |
            (col("STATE") != col("prev_state")) |
            col("prev_phone").isNull(), 1
        ).otherwise(0)
    )

In [107]:
df_cust_changes.show()

+--------------------+------------+----------+--------------------+--------------+--------------------+----------+--------------------+----------+-----------+
|       EMAIL_ADDRESS|BOOKING_DATE| PHONE_NUM|      STREET_ADDRESS|         STATE|          EMAIL_HASH|prev_phone|        prev_address|prev_state|change_flag|
+--------------------+------------+----------+--------------------+--------------+--------------------+----------+--------------------+----------+-----------+
|aanderson@example...|  2024-09-09|6943205203|106 Gardner Glen ...|North Carolina|40331fbae5b342bad...|      NULL|                NULL|      NULL|          1|
| aaron06@example.com|  2024-03-28|8297194934|  884 Miranda Course|      Illinois|5c7f10bf6e36ce15c...|      NULL|                NULL|      NULL|          1|
| aaron35@example.org|  2024-01-27|8075129060|      738 Heath Ways|       Arizona|fac1754a2d086513f...|      NULL|                NULL|      NULL|          1|
| aaron49@example.net|  2025-02-11|2679651914|

In [108]:
##Assign a unique version ID to each group of unchanged records##
df_version_group = df_cust_changes.withColumn("version_group", spark_sum("change_flag").over(w))

In [124]:
df_version_group.show()

+--------------------+------------+----------+--------------------+--------------+--------------------+----------+--------------------+----------+-----------+-------------+
|       EMAIL_ADDRESS|BOOKING_DATE| PHONE_NUM|      STREET_ADDRESS|         STATE|          EMAIL_HASH|prev_phone|        prev_address|prev_state|change_flag|version_group|
+--------------------+------------+----------+--------------------+--------------+--------------------+----------+--------------------+----------+-----------+-------------+
|aanderson@example...|  2024-09-09|6943205203|106 Gardner Glen ...|North Carolina|40331fbae5b342bad...|      NULL|                NULL|      NULL|          1|            1|
| aaron06@example.com|  2024-03-28|8297194934|  884 Miranda Course|      Illinois|5c7f10bf6e36ce15c...|      NULL|                NULL|      NULL|          1|            1|
| aaron35@example.org|  2024-01-27|8075129060|      738 Heath Ways|       Arizona|fac1754a2d086513f...|      NULL|                NULL|

In [109]:
##aggregates each version group
df_dim_customer_hist = df_version_group.groupBy("EMAIL_HASH", "EMAIL_ADDRESS", "version_group") \
    .agg(
        expr("first(PHONE_NUM)").alias("PHONE_NUM"),
        expr("first(STREET_ADDRESS)").alias("STREET_ADDRESS"),
        expr("first(STATE)").alias("STATE"),
        expr("min(BOOKING_DATE)").alias("start_date"),
        expr("lead(min(BOOKING_DATE), 1) over (partition by EMAIL_ADDRESS order by min(BOOKING_DATE))").alias("next_start_date")
    )

In [110]:
df_customer_type_2 = df_dim_customer_hist.withColumn(
    "end_date",
    when(
        col("next_start_date").isNotNull(),
        expr("date_sub(next_start_date, 1)")
    ).otherwise(lit("9999-12-31").cast(DateType()))
).drop("version_group", "next_start_date")

In [111]:
df_customer_type_2 = df_customer_type_2.orderBy("EMAIL_ADDRESS", "start_date")

In [112]:
df_customer_type_2.show(truncate=False)

+----------------------------------------------------------------+----------------------+----------+------------------------------+--------------+----------+----------+
|EMAIL_HASH                                                      |EMAIL_ADDRESS         |PHONE_NUM |STREET_ADDRESS                |STATE         |start_date|end_date  |
+----------------------------------------------------------------+----------------------+----------+------------------------------+--------------+----------+----------+
|a77ea1ac5486682e4a69c1b506d31b51e791529482cc6763fdc6e9f1bddef8ca|aadams@example.net    |6881747577|03668 Pham Place              |Florida       |2025-04-27|9999-12-31|
|4d6441ff58547adf993b3a18d1f029382feb528dd07671cfefe77be5f2f97166|aadams@example.org    |288850687 |04013 Derrick Lodge           |Vermont       |2025-04-14|9999-12-31|
|8e510dfbe106af60b0a3a2cf05588035c9889f9acce0d0b494ec6983ecc30360|aaguilar@example.net  |6015683839|8922 Mitchell Key             |Massachusetts |2024-02-2