In [0]:
from pyspark.sql.functions import col, lit, current_timestamp, sum as _sum
from delta.tables import DeltaTable
from pydeequ.checks import Check, CheckLevel, CheckStatus
from pydeequ.verification import VerificationSuite, VerificationResult
import os

print(os.environ['SPARK_VERSION'])
# Get job parameters from Databricks
date_str = dbutils.widgets.get("arrival_date")


# define file paths
booking_data = f"/Volumes/incremental_data_load/default/orders_data/bookings/bookings_{date_str}.csv"
customer_data = f"/Volumes/incremental_data_load/default/orders_data/customer/customers_{date_str}.csv"

# read booking data
bookings_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", "\"") \
    .option("multiline", "true") \
    .load(booking_data)

bookings_df.printSchema()
display(bookings_df)

# read customer data 

customer_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", "\"") \
    .option("multiline", "true") \
    .load(customer_data)

customer_df.printSchema()
display(customer_df)

# define check constraint on booking data

check_incremental = Check(spark, CheckLevel.Error, "Booking data") \
    .hasSize(lambda x: x>0) \
    .isUnique("booking_id") \
    .isComplete("customer_id") \
    .isComplete("amount") \
    .isNonNegative("amount") \
    .isNonNegative("quantity") \
    .isNonNegative("discount")


check_scd = Check(spark, CheckLevel.Error, "Customer data") \
    .hasSize(lambda x: x>0) \
    .isUnique("customer_id") \
    .isComplete("customer_name") \
    .isComplete("customer_address") \
    .isComplete("email")

# run verification suite

booking_dq_check = VerificationSuite(spark) \
    .onData(bookings_df) \
    .addCheck(check_incremental) \
    .run()


customer_dq_check = VerificationSuite(spark) \
    .onData(customer_df) \
    .addCheck(check_scd) \
    .run()


booking_dq_check_df = VerificationResult.checkResultsAsDataFrame(spark, booking_dq_check)
display(booking_dq_check_df)

customer_dq_check_df = VerificationResult.checkResultsAsDataFrame(spark, customer_dq_check)
display(customer_dq_check_df)


if booking_dq_check.status != "Success":
    raise ValueError("Data quality check failed for booking data")
if customer_dq_check.status != "Success":
    raise ValueError("Data quality check failed for customer data")

# Add ingestion timestamp to booking data
bookings_df_incremental = bookings_df.withColumn("ingestion_timestamp", current_timestamp())

# join booking data with customer data
df_joined = bookings_df_incremental.join(customer_df, "customer_id")

# business logic: calcuate total cost after discount and filter

df_transformed = df_joined \
    .withColumn("total_cost", col("amount") - col("discount")) \
    .filter(col("quantity") > 0)

# Group by and aggregate
df_transformed_agg = df_transformed \
    .groupBy("booking_type", "customer_id") \
    .agg(
        _sum("total_cost").alias("total_amount_sum"),
        _sum("quantity").alias("total_quantity_sum")
    )

# Check if the Delta table exists
fact_table_path = "incremental_data_load.default.booking_fact"
fact_table_exists = spark._jsparkSession.catalog().tableExists(fact_table_path)

if fact_table_exists:
    # read existing table
    df_exisiting_fact = spark.read.format("delta").table(fact_table_path)

    # combine the aggregated data
    df_combined = df_exisiting_fact.unionByName(df_transformed_agg, allowMissingColumns=True)

     # Perform another group by and aggregation on the combined data
    df_final_agg = df_combined \
        .groupBy("booking_type", "customer_id") \
        .agg(
            _sum("total_amount_sum").alias("total_amount_sum"),
            _sum("total_quantity_sum").alias("total_quantity_sum")
        )
else:
    # If the fact table doesn't exist, use the aggregated transformed data directly
    df_final_agg = df_transformed_agg

display(df_final_agg)

# Write the final aggregated data back to the Delta table
df_final_agg.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(fact_table_path)

scd_table_path = "incremental_data_load.default.customer_dim"
scd_table_exists = spark._jsparkSession.catalog().tableExists(scd_table_path)

# Check if the customers table exists
if scd_table_exists:
    # Load the existing SCD table
    scd_table = DeltaTable.forName(spark, scd_table_path)
    display(scd_table.toDF())
    
    # Perform SCD2 merge logic
    scd_table.alias("scd") \
        .merge(
            customer_df.alias("updates"),
            "scd.customer_id = updates.customer_id and scd.valid_to = '9999-12-31'"
        ) \
        .whenMatchedUpdate(set={
            "valid_to": "updates.valid_from",
        }) \
        .execute()

    customer_df.write.format("delta").mode("append").saveAsTable(scd_table_path)
else:
    # If the SCD table doesn't exist, write the customer data as a new Delta table
    customer_df.write.format("delta").mode("overwrite").saveAsTable(scd_table_path)







