In [0]:
print("Mamba Mentality")

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable

In [0]:
### We will be creating a dynamic solution we'll be handling intial and incremental load

In [0]:
%sql
select * from project.silver.silver_flights

In [0]:
## src >> dimension(flights >> dim_flights)
## We would assign a dim_surogate key
## We need to ensure when upsert is happening the load that has update values should get the same surrogate key and rest next values

## PARAMETERS

In [0]:
# ## Key Cols list
# key_cols = "['flight_id']"
# key_cols_list = eval(key_cols)

# ## CDC Column
# cdc_col = "modifiedDate"

# ## Backdated Refresh
# backdated_refresh = ""

# # Catalog_name
# catalog = "project"

# ## Source Object
# source_object = "silver_flights"

# ## Source Schema
# source_schema = "silver"

# ## Target Object
# target_object = "dim_flights"

# ## Target Schema
# target_schema = "gold"

# ## Surrogate Key
# surrogate_key = 'DimFlightsKey'

# ## eval - evaluates string into a list

In [0]:
%sql
SELECT * FROM project.gold.dim_flights

In [0]:
# ## Key Cols list
# key_cols = "['passenger_id']"
# key_cols_list = eval(key_cols)

# ## CDC Column
# cdc_col = "modifiedDate"

# ## Backdated Refresh
# backdated_refresh = ""

# # Catalog_name
# catalog = "project"

# ## Source Object
# source_object = "silver_passengers"

# ## Source Schema
# source_schema = "silver"

# ## Target Object
# target_object = "dim_passengers"

# ## Target Schema
# target_schema = "gold"

# ## Surrogate Key
# surrogate_key = 'DimPassengersKey'

# ## eval - evaluates string into a list

# Dynamic Parameters

In [0]:
# Catalog_name
catalog = "project"

## Source Object
source_object = "silver_bookings"

## Source Schema
source_schema = "silver"

#CDC Column
cdc_column = "modifiedDate"

#Backdated Refresh
backdated_refresh = ""

# Source Fact Table
fact_table = f"{catalog}.{source_schema}.{source_object}"

#Target Schema
target_schema = "gold"

# Target Object
target_object = "FactBookings"

## eval - evaluates string into a list

In [0]:
dimensions = [
    {
        "table" : "{catalog}.{source_schema}.dim_passengers",
        "alias" : "dim_passengers",
        "join_keys" : [("passenger_id" , "passenger_id")]
    },
    {
        "table" : "{catalog}.{source_schema}.dim_flights",
        "alias" : "dim_flights",
        "join_keys" : [("flight_id" , "flight_id")]
    }
]
## Columns you want to keep from fact table (besides the surogate key)
fact_columns = ["amount" , "booking_date", "modifiedDate"]

In [0]:
key_cols_list

## ** INCREMENTAL DATA INGESTION **

### Last Load Date

In [0]:
%sql
select * from project.silver.silver_airports

In [0]:
# No backdated refresh
if len(backdated_refresh) == 0:
    
    ## If table exisits in the destination
    if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

        last_load = spark.sql(f"SELECT max({cdc_col}) FROM project.{target_schema}.{target_object}").collect()[0][0]

    else:
        last_load = "1900-01-01 00:00:00"

## Yes, Backdated Refresh
else:
    last_load = backdated_refresh

# Test the last load
last_load

In [0]:
df_src = spark.sql(f"SELECT * FROM {catalog}.{source_schema}.{source_object} WHERE {cdc_col} > '{last_load}'")

In [0]:
display(df_src)

In [0]:
## We have to pull key_cols , cr/update Date , surrKey col

if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):

    ## Key Columns String for Incremental
    key_cols_string_incr = ' '.join(key_cols_list)
    
    df_trg = spark.sql(f"SELECT {key_cols_string_incr} , {surrogate_key} , create_date , update_date FROM {catalog}.{target_schema}.{target_object}")
else:
    ## Key Columns String for Inital
    key_cols_string_init = [f" '' AS {i}" for i in key_cols_list]
    key_cols_string_init = " ,".join(key_cols_string_init)
    key_cols_string_init


    df_trg = spark.sql(f"SELECT {key_cols_string_init} , CAST('0' AS INT) AS {surrogate_key} , CAST('1900-01-01 00:00:00' AS TIMESTAMP) AS create_date , CAST('1900-01-01 00:00:00' AS TIMESTAMP) AS update_date WHERE 1 = 0""")


In [0]:
df_trg.display()

#### Join Condition

In [0]:
join_condition = ' AND '.join([f"src.{i} = trg.{i}" for i in key_cols_list])

In [0]:
df_src.createOrReplaceTempView("src")
df_trg.createOrReplaceTempView("trg")

df_join = spark.sql(
    f"""
    SELECT src.*,
           trg.{surrogate_key},
           trg.create_date,
           trg.update_date
    FROM src
    LEFT JOIN trg
      ON {join_condition}
    """
)
display(df_join)

In [0]:
df_old = df_join.filter(col(f'{surrogate_key}').isNotNull())
df_new = df_join.filter(col(f'{surrogate_key}').isNull())

display(df_old)
display(df_new)

## Preparing DF_OLD

In [0]:
df_old_enrich = df_old.withColumn("update_date" , current_timestamp())

## Preparing New_DF

In [0]:
df_new.display()

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    max_surrogate_key = spark.sql(f"""
                                  SELECT max({surrogate_key}) FROM {catalog}.{target_schema}.{target_object}
                                  """).collect()[0][0]
    
    df_new_enr = df_new.withColumn(f"{surrogate_key}" , lit(max_surrogate_key) + lit(1) + monotonically_increasing_id())    \
        .withColumn('create_date' , current_timestamp())    \
        .withColumn('update_date' , current_timestamp())


else:
    max_surrogate_key = 0
    df_new_enr = df_new.withColumn(f"{surrogate_key}" , lit(max_surrogate_key) + lit(1) + monotonically_increasing_id())    \
        .withColumn('create_date' , current_timestamp())    \
        .withColumn('update_date' , current_timestamp())

In [0]:
df_new_enr.display()

In [0]:
df_old_enr = df_old.withColumn('update_date', current_timestamp())
display(df_old_enr)

### Union of  OLD and NEW Records

In [0]:
df_union = df_old_enr.unionByName(df_new_enr)
df_union.display()

# UPSERT

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
  
  dlt_obj = DeltaTable.forName(spark , f"{catalog}.{target_schema}.{target_object}")

  dlt_obj.alias("trg").merge(df_union.alias("src") , f"trg.{surrogate_key} = src.{surrogate_key}")  \
    .whenMatchedUpdateAll(condition = f"src.{cdc_col} >= trg.{cdc_col}") \
    .whenNotMatchedInsertAll()  \
    .execute()

else:

  df_union.write.format("delta")  \
    .mode("append") \
    .saveAsTable(f"{catalog}.{target_schema}.{target_object}")