In [0]:
from pyspark.sql.functions import *

## Parameters

In [0]:

# Catalog Name 
catalog = "workspace"

#Source Schema
source_schema = "silver"

#Source Object
source_object = "silver_bookings"

#CDC Column
cdc_column = "modifiedDate"

#Back-Dated Refresh
back_dated_refresh = ""

# Source Fact Table
fact_table = f"{catalog}.{source_schema}.{source_object}"

#Target Schema
target_schema = "gold"

#Target Object
target_object = "FactBookings"

# Fact Key Cols List
fact_key_cols = ['DimPassengersKey','DimFlightsKey','DimAirportsKey','booking_date']


##  Dimension Tables

In [0]:
dimensions = [

    {
        "table": f"{catalog}.{target_schema}.DimPassengers",
        "alias": "DimPassengers",
        "join_keys": [("passenger_id", "passenger_id")] # (fact_col, dim_col)
    },
    {
        "table": f"{catalog}.{target_schema}.DimFlights",
        "alias": "DimFlights",
        "join_keys": [("flight_id", "flight_id")] # (fact_col, dim_col)
    },
    {
        "table": f"{catalog}.{target_schema}.DimAirports",
        "alias": "DimAirports",
        "join_keys": [("airport_id", "airport_id")] # (fact_col, dim_col)
    }
]

# columns you want to keep from fact table other than surrogate key
fact_columns = ["amount","booking_date","modifiedDate"]

## Last Load Date

In [0]:
# No Back Dated Refresh 
if len(back_dated_refresh) == 0:
    
    # If Table exists in the Destination 
    if spark.catalog.tableExists(f"{target_schema}.{target_object}"):
        last_load = spark.sql(f"SELECT max({cdc_column}) from {catalog}.{target_schema}.{target_object}").collect()[0][0]
   
    # set a default old date
    else:
        last_load = "1900-01-01 00:00:00"

# Yes Back Dated Refresh
else:
    last_load = back_dated_refresh

# Test Last Load
last_load


datetime.datetime(2025, 7, 7, 2, 25, 8, 303000)

## Dynamic Fact Query (Bring Keys)

In [0]:
def generate_fact_query_incremental(fact_table, dimensions, fact_columns, cdc_column, last_load):
  fact_alias = "f"

  # Base Columns to select
  select_cols = [f"{fact_alias}.{col}" for col in fact_columns]

  # Build Joins Dynamically
  join_clauses = []
  for dim in dimensions:
    table_full = dim["table"]
    alias = dim["alias"]
    table_name = table_full.split('.')[-1]
    surrogate_key = f"{alias}.{table_name}Key"
    select_cols.append(surrogate_key)

    # Build ON Clause
    on_conditions= [
        f"{fact_alias}.{fk} = {alias}.{dk}" for fk, dk in dim["join_keys"]
    ]
    join_clause = f"LEFT JOIN {table_full} {alias} ON " + " AND ".join(on_conditions)
    join_clauses.append(join_clause)

  # final select and join clauses
  select_clause = ",\n    ".join(select_cols)
  join_clause = "\n    ".join(join_clauses)
   
  # Where Clause for incremental filtering
  where_clause = f"{fact_alias}.{cdc_column} >= DATE('{last_load}')"
  
  # Final Query

  query = f"""
  SELECT
    {select_clause}
  FROM {fact_table} {fact_alias}
    {join_clause}
  WHERE
    {where_clause}
  """.strip()

  return query

In [0]:
query = generate_fact_query_incremental(fact_table, dimensions, fact_columns, cdc_column, last_load)

In [0]:
print(query)

SELECT
    f.amount,
    f.booking_date,
    f.modifiedDate,
    DimPassengers.DimPassengersKey,
    DimFlights.DimFlightsKey,
    DimAirports.DimAirportsKey
  FROM workspace.silver.silver_bookings f
    LEFT JOIN workspace.gold.DimPassengers DimPassengers ON f.passenger_id = DimPassengers.passenger_id
    LEFT JOIN workspace.gold.DimFlights DimFlights ON f.flight_id = DimFlights.flight_id
    LEFT JOIN workspace.gold.DimAirports DimAirports ON f.airport_id = DimAirports.airport_id
  WHERE
    f.modifiedDate >= DATE('2025-07-07 02:25:08.303000')


## DF_FACT

In [0]:
df_fact = spark.sql(query)


In [0]:
df_fact.display()

amount,booking_date,modifiedDate,DimPassengersKey,DimFlightsKey,DimAirportsKey
850.72,2025-05-29,2025-07-07T02:25:08.303Z,119,3,25
376.63,2025-06-09,2025-07-07T02:25:08.303Z,42,110,14
534.02,2025-06-03,2025-07-07T02:25:08.303Z,97,38,1
1333.7,2025-06-16,2025-07-07T02:25:08.303Z,132,65,54
1334.96,2025-06-17,2025-07-07T02:25:08.303Z,41,19,37
296.13,2025-05-18,2025-07-07T02:25:08.303Z,142,109,23
460.14,2025-04-05,2025-07-07T02:25:08.303Z,172,47,30
1402.02,2025-06-04,2025-07-07T02:25:08.303Z,90,78,31
1444.51,2025-05-16,2025-07-07T02:25:08.303Z,84,14,30
292.39,2025-05-16,2025-07-07T02:25:08.303Z,69,63,21


## UPSERT

In [0]:
# Fact Key Cols Merge Condition
fact_key_cols_str = " AND ".join([f"src.{col} = trg.{col}" for col in fact_key_cols])
fact_key_cols_str



'src.DimPassengersKey = trg.DimPassengersKey AND src.DimFlightsKey = trg.DimFlightsKey AND src.DimAirportsKey = trg.DimAirportsKey AND src.booking_date = trg.booking_date'

In [0]:
from delta.tables import DeltaTable

if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    dlt_obj = DeltaTable.forName(spark,f"{catalog}.{target_schema}.{target_object}")
    dlt_obj.alias("trg").merge(df_fact.alias("src"),fact_key_cols_str)\
                        .whenMatchedUpdateAll(condition = f"src.{cdc_column}>=trg.{cdc_column}")\
                        .whenNotMatchedInsertAll()\
                        .execute()
else:
    df_fact.write.format('delta')\
                  .mode('append')\
                  .saveAsTable(f"{catalog}.{target_schema}.{target_object}")

In [0]:
%sql
select * from workspace.gold.factbookings

amount,booking_date,modifiedDate,DimPassengersKey,DimFlightsKey,DimAirportsKey
1368.6,2025-06-07,2025-07-07T02:25:08.303Z,40,93,12
427.56,2025-03-28,2025-07-07T02:25:08.303Z,40,70,14
320.52,2025-05-05,2025-07-07T02:25:08.303Z,40,100,5
220.26,2025-06-16,2025-07-07T02:25:08.303Z,40,42,54
716.33,2025-06-01,2025-07-07T02:25:08.303Z,122,57,26
940.65,2025-04-19,2025-07-07T02:25:08.303Z,122,84,56
265.87,2025-05-17,2025-07-07T02:25:08.303Z,122,23,36
835.11,2025-06-10,2025-07-07T02:25:08.303Z,122,37,17
910.26,2025-05-21,2025-07-07T02:25:08.303Z,122,90,24
915.79,2025-04-01,2025-07-07T02:25:08.303Z,122,80,11
