In [0]:
from pyspark.sql.functions import col, lit, current_timestamp, sum as _sum
from delta.tables import DeltaTable
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
import os

print(os.environ['SPARK_VERSION'])

In [0]:
#Defining Path
date_str = dbutils.widgets.get("arrival_date")

booking_data = f"/Volumes/test_gds/default/booking_data/bookings_{date_str}.csv"
customer_data = f"/Volumes/test_gds/default/customer_data/customers_{date_str}.csv"
print(booking_data)
print(customer_data)

In [0]:
#Reading Data from the defined path

booking_df = spark.read.\
        format("csv").\
        option("header", "true").\
        option("inferSchema", "true").\
        option("quote","\"").\
        option("multiline", "true").\
        load(booking_data)

customer_df = spark.read.\
        format("csv").\
        option("header", "true").\
        option("inferSchema", "true").\
        option("quote","\"").\
        option("multiline", "true").\
        load(customer_data)

print(booking_df)
print(customer_df)

In [0]:
#Visulazing the DataFrames and Schemas

booking_df.printSchema()
customer_df.printSchema()
booking_df.show(5)
customer_df.show(5)

In [0]:
# Data Quality checks on Booking Data and Checking Data Types

booking_check = Check(spark, CheckLevel.Error, "Booking Data Check")\
        .hasSize(lambda x: x > 0)\
        .isUnique("booking_id")\
        .isComplete("customer_id")\
        .isComplete("amount")\
        .isNonNegative("amount")\
        .isNonNegative("quantity")\
        .isNonNegative("discount")

customer_check = Check(spark, CheckLevel.Error, "Customer Data Check")\
        .hasSize(lambda x: x > 0)\
        .isUnique("customer_id")\
        .isComplete("customer_name")\
        .isComplete("customer_address")\
        .isComplete("email")


# Run the verification suite
booking_dq_check = VerificationSuite(spark).onData(booking_df).addCheck(booking_check).run()
customer_dq_check = VerificationSuite(spark).onData(customer_df).addCheck(customer_check).run()

booking_dq_check_df = VerificationResult.checkResultsAsDataFrame(spark, booking_dq_check)
customer_dq_check_df = VerificationResult.checkResultsAsDataFrame(spark,customer_dq_check)

display(booking_dq_check_df)
display(customer_dq_check_df)

if booking_dq_check.status != "Success":
    raise ValueError("Booking Data Quality Check Failed")

if customer_dq_check.status != "Success":
    raise ValueError("Customer Data Quality Check Failed")



In [0]:
# Add column in booking_data
booking_new_df = booking_df.withColumn("timestamp", current_timestamp())
booking_new_df.show(5)

# Join boking data with customer data
df_joined = booking_new_df.join(customer_df, "customer_id", "inner")
df_joined.show(5)

# Add Filters
df_joined_filter = df_joined.withColumn("TotalCost", col("amount") - col("discount")).filter(col("quantity") > 0)
df_joined_filter.show(5)

#Group by and aggregate
df_joined_agg = df_joined_filter.groupBy("customer_id", "booking_type").agg(_sum("TotalCost").alias("TotalCost"), _sum("quantity").alias("TotalQuantity"))

df_joined_agg.show(5)

In [0]:
fact_table = "test_gds.default.booking_fact"
fact_table_exists = spark._jsparkSession.catalog().tableExists(fact_table)

if fact_table_exists :
    df_existing_table = spark.read.format("delta").table(fact_table)
    df_combined = df_existing_table.unionByName(df_joined_agg, allowMissingColumns=True)
    df_final_agg = df_combined.groupBy("customer_id", "booking_type")\
    .agg(_sum("TotalCost").alias("TotalCost"), _sum("TotalQuantity").alias("TotalQuantity"))
else:
    df_final_agg = df_joined_agg

df_final_agg.show(5)

#Write this data to delta table
df_final_agg.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(fact_table)

In [0]:
scd_dim_table = "test_gds.default.customer_dim"
scd_dim_table_exists = spark._jsparkSession.catalog().tableExists(scd_dim_table)

if scd_dim_table_exists :
    scd_table = DeltaTable.forName(spark, scd_dim_table)
    display(scd_table.toDF())

    # Perform SCD2 merge logic
    scd_table.alias("scd") \
        .merge(
            customer_df.alias("updates"),
            "scd.customer_id = updates.customer_id and scd.valid_to = '9999-12-31'"
        ) \
        .whenMatchedUpdate(set={
            "valid_to": "updates.valid_from",
        }) \
        .execute()

    customer_df.write.format("delta").mode("append").saveAsTable(scd_dim_table)
else:
    customer_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(scd_dim_table)

