In [0]:
from pyspark.sql.functions import sum, col, lit, current_timestamp
from delta.tables import DeltaTable
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
import os

print(os.environ["SPARK_VERSION"])
#date_str="2024-07-25"
date_str=dbutils.widgets.get("arrival_date")

booking_data=f"/Volumes/incremental_load/default/orders_data/booking_data/bookings_{date_str}.csv"
customer_data=f"/Volumes/incremental_load/default/orders_data/customer_data/customers_{date_str}.csv"

print(booking_data)
print(customer_data)

bookings_df=spark.read\
                .format("csv")\
                .option("header",True)\
                .option("inferSchema",True)\
                .option("quote","\"")\
                .option("multiLine","true")\
                .load(booking_data)
bookings_df.printSchema()
display(bookings_df)
customers_df=spark.read\
                .format("csv")\
                .option("header",True)\
                .option("inferSchema",True)\
                .option("quote","\"")\
                .option("multiLine","true")\
                .load(customer_data)
customers_df.printSchema()
display(customers_df)


In [0]:
#Applying checks
check_bookings=Check(spark, CheckLevel.Error, "Check for bookings")\
                .hasSize(lambda x: x > 0)\
                .isUnique("booking_id", hint="Booking ID is not unique throughout")\
                .isComplete("customer_id")\
                .isComplete("booking_id")\
                .isNonNegative("amount")\
                .isNonNegative("quantity")\
                .isNonNegative("discount")\

check_customers=Check(spark, CheckLevel.Error, "Check for customers")\
                .hasSize(lambda x: x > 0)\
                .isUnique("customer_id", hint="Customer ID is not unique throughout")\
                .isComplete("customer_name")\
                .isComplete("customer_address")\
                .isComplete("email")\

booking_dq_check=VerificationSuite(spark)\
                .onData(bookings_df)\
                .addCheck(check_bookings)\
                .run()
customer_dq_check=VerificationSuite(spark)\
                    .onData(customers_df)\
                    .addCheck(check_customers)\
                    .run()

booking_dq_check_df = VerificationResult.checkResultsAsDataFrame(spark, booking_dq_check)
display(booking_dq_check_df)

customer_dq_check_df = VerificationResult.checkResultsAsDataFrame(spark, customer_dq_check)
display(customer_dq_check_df)

if booking_dq_check.status!="Success":
    raise ValueError("Data quality check failed for bookings")
if customer_dq_check.status!="Success":
    raise ValueError("Data quality check failed for customers")

In [0]:
#Transformation
bookings_df_incremental=bookings_df.withColumn("ingestion_time",current_timestamp())
bookings_joined=bookings_df_incremental.join(customers_df,"customer_id")

df_transformed=bookings_joined.withColumn("total_cost",col("amount")-col("discount"))\
                            .filter(col("quantity") > 0)

df_agg=df_transformed.groupBy("booking_type","customer_id")\
                        .agg(
                            sum("total_cost").alias("total_amount_sum"),
                            sum("quantity").alias("total_quantity_sum")
                        )


In [0]:
#writing into fact table
fact_table_path="incremental_load.default.bookings_fact"
fact_table_exists=spark._jsparkSession.catalog().tableExists(fact_table_path)

if fact_table_exists:
    df_existing_fact=spark.read.format("delta").table(fact_table_path)

    df_combined=df_existing_fact.unionByName(df_agg,allowMissingColumns=True)
    df_final_agg=df_combined.groupBy("booking_type","customer_id")\
                            .agg(
                            sum("total_amount_sum").alias("total_amount_sum"),
                            sum("total_quantity_sum").alias("total_quantity_sum")
                        )
else:
    df_final_agg=df_agg
display(df_final_agg)

df_final_agg.write.format("delta").mode("overwrite")\
                    .option("overwriteSchema",True)\
                    .saveAsTable(fact_table_path)

In [0]:
scd_table_path="incremental_load.default.customer_dim"
scd_table_exists=spark._jsparkSession.catalog().tableExists(scd_table_path)

if scd_table_exists:
    scd_table=DeltaTable.forName(spark,scd_table_path)
    scd_table.alias("scd")\
        .merge(customers_df.alias("updates"),
        "scd.customer_id = updates.customer_id and scd.valid_to='9999-12-31'")\
        .whenMatchedUpdate(
            set={
                "valid_to":"updates.valid_from"
            }
        )\
        .execute()
    customers_df.write.format("delta").mode("append").saveAsTable(scd_table_path)
else:
    customers_df.write.format("delta").mode("overwrite").saveAsTable(scd_table_path)