In [0]:
from pyspark.sql.functions import col, lit, current_timestamp, sum as _sum
from delta.tables import DeltaTable
from pydeequ.checks import Check, CheckLevel
from pydeequ.verification import VerificationSuite, VerificationResult
import os

print(os.environ['SPARK_VERSION'])
# Get job parameters from Databricks
date_str = dbutils.widgets.get("arrival_date")
# date_str = "2024-07-25"

# Define file paths based on date parameter
booking_data = f"/Volumes/incremental_load/default/orders_data/booking_data/bookings_{date_str}.csv"
customer_data = f"/Volumes/incremental_load/default/orders_data/customer_data/customers_{date_str}.csv"
print(booking_data)
print(customer_data)

# Read booking data
booking_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", "\"") \
    .option("multiLine", "true") \
    .load(booking_data)

booking_df.printSchema()
display(booking_data)

# Read customer data for scd2 merge
customer_df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .option("quote", "\"") \
    .option("multiLine", "true") \
    .load(customer_data)

customer_df.printSchema()
display(customer_df)

# Data Quality Checks on booking data
check_incremental = Check(spark, CheckLevel.Error, "Booking Data Check") \
    .hasSize(lambda x: x > 0) \
    .isUnique("booking_id", hint="Booking ID is not unique throught") \
    .isComplete("customer_id") \
    .isComplete("amount") \
    .isNonNegative("amount") \
    .isNonNegative("quantity") \
    .isNonNegative("discount")

# Data Quality Checks on customer data
# check_scd = Check(spark, CheckLevel.Error, "Customer Data Check") \
#     .hasSize(lambda x: x > 0) \
#     .isUnique("customer_id") \
#     .isComplete("customer_name") \
#     .isComplete("customer_address") \
#     .isComplete("phone_number") \
#     .isComplete("email")

check_scd = Check(spark, CheckLevel.Error, "Customer Data Check") \
    .hasSize(lambda x: x > 0) \
    .isUnique("customer_id") \
    .isComplete("customer_name") \
    .isComplete("customer_address") \
    .isComplete("email")

# Run the verification suite
booking_dq_check = VerificationSuite(spark) \
    .onData(booking_df) \
    .addCheck(check_incremental) \
    .run()

customer_dq_check = VerificationSuite(spark) \
    .onData(customer_df) \
    .addCheck(check_scd) \
    .run()

booking_dq_check_df = VerificationResult.checkResultsAsDataFrame(spark, booking_dq_check)
display(booking_dq_check_df)

customer_dq_check_df = VerificationResult.checkResultsAsDataFrame(spark, customer_dq_check)
display(customer_dq_check_df)

# Check if verification passed
if booking_dq_check.status != "Success":
    raise ValueError("Data Quality Checks Failed for Booking Data")

if customer_dq_check.status != "Success":
    raise ValueError("Data Quality Checks Failed for Customer Data")

# Add ingestion timestamp to booking data
booking_df_incremental = booking_df.withColumn("ingestion_time", current_timestamp())

# Join booking data with customer data
df_joined = booking_df_incremental.join(customer_df, "customer_id")

# Business transformation: calculate total cost after discount and filter
df_transformed = df_joined \
    .withColumn("total_cost", col("amount") - col("discount")) \
    .filter(col("quantity") > 0)

# Group by and aggregate df_transformed
df_transformed_agg = df_transformed \
    .groupBy("booking_type", "customer_id") \
    .agg(
        _sum("total_cost").alias("total_amount_sum"),
        _sum("quantity").alias("total_quantity_sum")
    )

# Check if the Delta table exists
fact_table_path = "incremental_load.default.booking_fact"
fact_table_exists = spark._jsparkSession.catalog().tableExists(fact_table_path)


if fact_table_exists:
    # Read the existing fact table
    df_existing_fact = spark.read.format("delta").table(fact_table_path)
    
    # Combine the aggregated data
    df_combined = df_existing_fact.unionByName(df_transformed_agg, allowMissingColumns=True)
    
    # Perform another group by and aggregation on the combined data
    df_final_agg = df_combined \
        .groupBy("booking_type", "customer_id") \
        .agg(
            _sum("total_amount_sum").alias("total_amount_sum"),
            _sum("total_quantity_sum").alias("total_quantity_sum")
        )
else:
    # If the fact table doesn't exist, use the aggregated transformed data directly
    df_final_agg = df_transformed_agg

display(df_final_agg)

# Write the final aggregated data back to the Delta table
df_final_agg.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(fact_table_path)

scd_table_path = "incremental_load.default.customer_dim"
scd_table_exists = spark._jsparkSession.catalog().tableExists(scd_table_path)

# Check if the customers table exists
if scd_table_exists:
    # Load the existing SCD table
    scd_table = DeltaTable.forName(spark, scd_table_path)
    display(scd_table.toDF())
    
    # Perform SCD2 merge logic
    scd_table.alias("scd") \
        .merge(
            customer_df.alias("updates"),
            "scd.customer_id = updates.customer_id and scd.valid_to = '9999-12-31'"
        ) \
        .whenMatchedUpdate(set={
            "valid_to": "updates.valid_from",
        }) \
        .execute()

    customer_df.write.format("delta").mode("append").saveAsTable(scd_table_path)
else:
    # If the SCD table doesn't exist, write the customer data as a new Delta table
    customer_df.write.format("delta").mode("overwrite").saveAsTable(scd_table_path)

3.5.0
/Volumes/incremental_load/default/orders_data/booking_data/bookings_2024-07-25.csv
/Volumes/incremental_load/default/orders_data/customer_data/customers_2024-07-25.csv
root
 |-- booking_id: integer (nullable = true)
 |-- customer_id: integer (nullable = true)
 |-- booking_date: date (nullable = true)
 |-- amount: integer (nullable = true)
 |-- booking_type: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: integer (nullable = true)
 |-- booking_status: string (nullable = true)
 |-- hotel_name: string (nullable = true)
 |-- flight_number: string (nullable = true)



'/Volumes/incremental_load/default/orders_data/booking_data/bookings_2024-07-25.csv'

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- customer_address: string (nullable = true)
 |-- phone_number: string (nullable = true)
 |-- email: string (nullable = true)
 |-- valid_from: date (nullable = true)
 |-- valid_to: date (nullable = true)



customer_id,customer_name,customer_address,phone_number,email,valid_from,valid_to
1007,Robert Johnson,"574 Patel Drive Apt. 043 Jonesville, TX 54199",001-927-927-7643x34711,danielbrown@west.com,2022-09-26,9999-12-31
1093,Eric Glenn,"2560 Christian Junctions North Shane, CA 88681",001-944-838-5681,tarabradley@jones.com,2023-06-21,9999-12-31
1081,Scott Jones,"591 Richard Lodge Suite 568 New John, HI 87997",239.669.1888x6136,nicolefisher@miller.com,2022-10-19,9999-12-31
1036,Kimberly Garza,USNS Reilly FPO AP 52950,8095911248,sdavis@gmail.com,2022-11-12,9999-12-31
1022,Thomas Mathis,"2386 Parker Mountain East Sharonmouth, IL 15940",+1-871-965-0054,jennifer65@gmail.com,2023-02-04,9999-12-31
1062,Steven Steele,"6203 Smith Forest Lake Crystalchester, CA 67075",(253)242-6269x1355,johnsonannette@wallace-griffith.com,2022-11-12,9999-12-31
1021,Nicholas Anderson,"578 Hill Village Suite 210 Evansbury, UT 95568",008-256-3946,mavery@baker-ayala.com,2023-01-09,9999-12-31
1037,Richard Owens,"2941 Nicole Extensions Jeffreyport, OH 42853",001-874-094-3134x8557,jfields@stone.com,2023-02-22,9999-12-31
1047,Nina Weeks,"4250 Burns Forges Suite 922 West Christopher, SC 88814",574-638-3211x5713,ohowell@berry.com,2023-03-31,9999-12-31
1099,Steven Mathews,"4536 Michele Village Suite 277 North Tracy, NV 18979",522-742-2881x05998,thomaserica@yahoo.com,2022-11-07,9999-12-31




check,check_level,check_status,constraint,constraint_status,constraint_message
Booking Data Check,Error,Success,SizeConstraint(Size(None)),Success,
Booking Data Check,Error,Success,"UniquenessConstraint(Uniqueness(List(booking_id),None,None))",Success,
Booking Data Check,Error,Success,"CompletenessConstraint(Completeness(customer_id,None,None))",Success,
Booking Data Check,Error,Success,"CompletenessConstraint(Completeness(amount,None,None))",Success,
Booking Data Check,Error,Success,"ComplianceConstraint(Compliance(amount is non-negative,COALESCE(CAST(amount AS DECIMAL(20,10)), 0.0) >= 0,None,List(amount),None))",Success,
Booking Data Check,Error,Success,"ComplianceConstraint(Compliance(quantity is non-negative,COALESCE(CAST(quantity AS DECIMAL(20,10)), 0.0) >= 0,None,List(quantity),None))",Success,
Booking Data Check,Error,Success,"ComplianceConstraint(Compliance(discount is non-negative,COALESCE(CAST(discount AS DECIMAL(20,10)), 0.0) >= 0,None,List(discount),None))",Success,


check,check_level,check_status,constraint,constraint_status,constraint_message
Customer Data Check,Error,Success,SizeConstraint(Size(None)),Success,
Customer Data Check,Error,Success,"UniquenessConstraint(Uniqueness(List(customer_id),None,None))",Success,
Customer Data Check,Error,Success,"CompletenessConstraint(Completeness(customer_name,None,None))",Success,
Customer Data Check,Error,Success,"CompletenessConstraint(Completeness(customer_address,None,None))",Success,
Customer Data Check,Error,Success,"CompletenessConstraint(Completeness(phone_number,None,None))",Success,
Customer Data Check,Error,Success,"CompletenessConstraint(Completeness(email,None,None))",Success,


booking_type,customer_id,total_amount_sum,total_quantity_sum
Flight,1007,1367,6
Hotel,1022,497,8
Flight,1099,1863,9
Flight,1008,480,3
Hotel,1099,246,1
Hotel,1003,-49,5
Hotel,1096,121,4
Flight,1078,180,4
Hotel,1047,235,2
Flight,1042,375,6
