In [0]:
%sql
select * from `1_flight_silver`.silver_flights limit 10;

# Parameters

In [0]:
# To implementing the dynamic solution, we first considering the parameters

# catalog name
catalog = 'workspace'

# source object
source_object = 'silver_flights'

# source schema
source_schema = '1_flight_silver'

# Key col list
key_col = "['flight_id']"
key_col_list = eval(key_col)

# cdc col list
cdc_col = 'modified_date'

# backdated refresh
backdated_refresh = ''

# target_schema 
target_schema = '2_flight_gold'

target_object = 'flights'

# surrogate key
surrogate_key = 'DimFlightsKey'

### **Source Query: Increamental data ingestion**

In [0]:
# LAST_LOAD_DATE

# No backdated refresh

if len(backdated_refresh) == 0:
    
    # If table exists in the destination
    if spark.catalog.tableExists(f'{catalog}.{target_schema}.{target_object}'):
        LAST_LOAD = spark.sql(f"SELECT max({cdc_col}) as LAST_LOAD FROM {target_schema}.{target_object}").collect()[0][0]
        print(LAST_LOAD)
    
    # if table not exists in the destination
    else:
        LAST_LOAD = '1900-01-01 00:00:00'
# yes, backdated refresh
else:
    LAST_LOAD = backdated_refresh
    


In [0]:

LAST_LOAD

# because in intial load in gold layer there is no table and no data so by default the backled date will be this defined 1900-01-01 to fetch tha all record from the beginning

# UNDERSTAND BRO!

In [0]:

df_src = spark.sql(f"""  SELECT * FROM {catalog}.{source_schema}.{source_object} 
                WHERE {cdc_col} >= '{LAST_LOAD}'
          """)

df_src.display()          

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

### OLD Vs NEW Records

#### list comprehension

In [0]:
key_col_string_incremental = ', '.join(key_col_list)
key_col_string_incremental

In [0]:
key_col_string_init = [f"'' as {i}"for i in key_col_list ]
print(key_col_string_init)
key_col_string_initial = ', '.join(key_col_string_init)
print(key_col_string_initial)


In [0]:
spark.sql(f"""SELECT {key_col_string_initial}, '' as {surrogate_key} , 
                        CAST('1900-01-01 00:00:00' AS timestamp) as created_date, 
                        CAST('1900-01-01 00:00:00' AS timestamp) as updated_date
                from {catalog}.{source_schema}.{source_object}""")

In [0]:
if spark.catalog.tableExists(f'workspace.{target_schema}.{target_object}'):

    # KEY COL STRING for incremental:
    key_col_string_incremental = ', '.join(key_col_list)
    # creating sudo column

    df_tgt = spark.sql(f"SELECT {key_col_string_incremental} , {surrogate_key}, created_date, updated_date FROM workspace.{target_schema}.{target_object}")
    
else:
    # KEY COL STRING for Initial:
    key_col_string_init = [f"'' as {i}"for i in key_col_list ]
    #print(key_col_string_init)
    key_col_string_initial = ', '.join(key_col_string_init)
    print(key_col_string_initial)

    df_tgt = spark.sql(f"""SELECT {key_col_string_initial}, '' as {surrogate_key} , 
                            CAST('1900-01-01 00:00:00' AS timestamp) as created_date,
                            CAST('1900-01-01 00:00:00' AS timestamp) as updated_date 
                            WHERE 1=0
                            """)

In [0]:
df_tgt.display()

In [0]:
key_col_list = ['flight_id', 'flight_name']

**JOIN CONDITION**

In [0]:
key_col_list_join_cond = [f"src.{i} = tgt.{i}" for i in key_col_list]
key_col_list_join_cond = ' AND '.join(key_col_list_join_cond)
key_col_list_join_cond

In [0]:
df_src.createOrReplaceTempView('src')
df_tgt.createOrReplaceTempView('tgt')

df_join = spark.sql(f"""
          SELECT src.*,
                 tgt.{surrogate_key} as {surrogate_key},
                 tgt.created_date,
                 tgt.updated_date
          FROM src
          LEFT JOIN tgt
          ON {key_col_list_join_cond}
          """)

In [0]:
df_join.display()

In [0]:
# if the dimFlightKey is null which means its a new record, it it has some value it means its old record which needs to update

# OLD RECORDS
df_old = df_join.filter(col(f'{surrogate_key}').isNotNull())

# NEW RECORDS
df_new = df_join.filter(col(f'{surrogate_key}').isNull())


In [0]:
df_old.display()
df_new.display()

# **Preparing `df_old` :  old record**

In [0]:
# we need to keep as it is from source and simpley update this updated_date column

df_old_enriched = df_old.withColumn('updated_date', current_timestamp())
df_old_enriched.display()

# **Preparing `df_new` :  for new record**

In [0]:
# for initial load we need to populate the surrogate key, created_date 

df_new_enriched = df_new.withColumn(surrogate_key, monotonically_increasing_id()) \
                        .withColumn('created_date', current_timestamp()) \
                        .withColumn('updated_date', current_timestamp())
df_new_enriched.display()


In [0]:
spark.sql(f'SELECT max({surrogate_key}) as max_surrogate_key FROM workspace.{target_schema}.{target_object}').collect()[0][0]

In [0]:
if spark.catalog.tableExists(f"{catalog}.{target_schema}.{target_object}"):
    max_surrogate_key = spark.sql(f"""
                                  SELECT max({surrogate_key}) as max_surrogate_key 
                                  FROM workspace.{target_schema}.{target_object}
                                  """).collect()[0][0]
else:
    max_surrogate_key = 0

    df_new_enr = df_new.withColumn(f'{surrogate_key}', lit(max_surrogate_key) + lit(1) + monotonically_increasing_id()) \
                   .withColumn('created_date', current_timestamp()) \
                   .withColumn('updated_date', current_timestamp())
    


In [0]:
df_new_enr.display()