In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from libs import schema_handler as sh
from libs import datafunctions as spdf
from libs import utils as ut
from libs import azureauth as az
from delta.tables import *
from datetime import date, datetime

In [0]:
silver_table = dbutils.widgets.get('silver_table')
gold_table = dbutils.widgets.get('gold_table')
source = dbutils.widgets.get('source')
watermark = dbutils.widgets.get('watermark')

In [0]:
env = az.get_env(spark)
#watermark_timestamp = "DH_"+silver_table.split('_')[1].upper()+"_"+ut.format_timestamp(watermark)
field_schema, primary_keys, table_type, dependencies = sh.read_schema(env = env, layer= "gold", source= source, entity=gold_table)
surrogate_keys = sh.extract_surrogate_keys(field_schema)
merge_condition = spdf.generate_merge_condition(surrogate_keys=surrogate_keys)
print(table_type)
print(merge_condition)
print(field_schema)
print(surrogate_keys)
print(dependencies)


In [0]:
#to remove the unnecessary surrogate keys. can also implement a function. but not in mood right now.


dep_sk = [dep["surrogateKey"] for dep in dependencies]

surrogate_keys = [sk for sk in surrogate_keys if sk not in dep_sk]
print(surrogate_keys)

In [0]:
dfSilver = spark.read.table(f"datahub_{env}_silver.{source}.{silver_table}").filter(F.col("dbx_extract_timestamp") > watermark)
display(dfSilver)

In [0]:
#in case of dependencies we need to resolve them. like we need to link to their surrorate keys first.
if dependencies:
    if not spdf.check_dependencies_exist(spark, dependencies):
        raise Exception("Dependency tables not found")
    
    dfSilver = spdf.resolve_dependencies(spark,dfSilver, dependencies)

display(dfSilver)
    



In [0]:
#In case we have dim tables then we need to add the surrogate key
if table_type =="SCD1":
    if not sh.uc_table_exists(spark, catalog= f"datahub_{env}_gold", schema = source, table = gold_table):
        dfSilver = spdf.assign_surrogate_key(spark, dfSilver, f"datahub_{env}_gold", source, silver_table, surrogate_keys[0])
        display(dfSilver)
    else:
        #Table exists then again we need to divide the table in 2 halves.
        dfGold = spark.read.table(f"datahub_{env}_gold.{source}.{gold_table}")
        dfCombined = dfSilver.join(dfGold, spdf.generate_join_condition(dfSilver, dfGold, primary_keys), how = "left")
        #display(dfCombined)

        dfSilver_old = dfCombined.filter(F.col(surrogate_keys[0]).isNotNull())\
                        .select(dfSilver["*"], dfGold[surrogate_keys[0]], dfGold["create_date"])
        dfSilver_new = dfCombined.filter(F.col(surrogate_keys[0]).isNull())\
                        .select(dfSilver["*"])
        #assign new surrogate keys to the new entries
        dfSilver_new = spdf.assign_surrogate_key(spark, dfSilver_new, f"datahub_{env}_gold", source, silver_table, surrogate_keys[0])
        dfSilver_old = dfSilver_old.withColumn("update_date", F.current_date())
        dfSilver_new = dfSilver_new.withColumn("create_date", F.current_date()).withColumn("update_date", F.current_date())

        dfSilver = dfSilver_old.union(dfSilver_new)
        display(dfSilver)


elif table_type =="SCD2":
    #In case gold table doesn't exist which means it is the first time run in this case just assign the surrogate keys
    if not sh.uc_table_exists(spark, catalog= f"datahub_{env}_gold", schema = source, table = gold_table):
        dfSilver = spdf.assign_surrogate_key(spark, dfSilver, f"datahub_{env}_gold", source, silver_table, surrogate_keys[0])
        display(dfSilver)
    else:
        #table exists then now we need to find existing and the new records. For this apply the left join. And then we will be splitting the table in two halves.
        #This thing we are doing in order to assign new surrogate keys to the new records. and old records will be getting old surrogate keys only.


        #here also the challenge is that we need to update only create only those old records which are having different values as compared to the gold table. 

        #also we need active records only.


        dfGold = spark.read.table(f"datahub_{env}_gold.{source}.{gold_table}").filter(F.col("effective_end_date") == '9999-12-31' ) 
        dfCombined = dfSilver.join(dfGold, spdf.generate_join_condition(dfSilver, dfGold, primary_keys), how = "left")
        
        #display(dfCombined)

        #display(dfGold)

        dfSilver_old = dfCombined.filter(F.col(surrogate_keys[0]).isNotNull())\
                        .select(dfSilver["*"], dfGold[surrogate_keys[0]])



        #now we need to find what are the records what we need to update from the old records. 
        dfSilver_old_modified = sh.get_changed_new_records(dfGold, dfSilver_old, surrogate_keys[0], ["extract_timestamp", "dbx_extract_timestamp", "effective_start_date", "effective_end_date"])


        #need to reorder it for the union as sk was coming in front

        dfSilver_old_modified = sh.reorder_with_sk_last(dfSilver_old_modified, surrogate_keys)


        #now the thig is we need to mark only these records as inactive in the scd2
        #we will make union and add them to the silver then we will assign new surrogate keys to these old records and add to the union.



        dfSilver_new = dfCombined.filter(F.col(surrogate_keys[0]).isNull())\
                        .select(dfSilver["*"])

        #display(dfSilver_old)
        dfSilver_new = spdf.assign_surrogate_key(spark, dfSilver_new, f"datahub_{env}_gold", source, silver_table, surrogate_keys[0])
        #display(dfSilver_new)

        dfSilver = dfSilver_old_modified.union(dfSilver_new)
        #display(dfSilver)


        #after the union we need to drop the surrogate keys and assign the new ones but here we need to use dfSilver_new or else sk will be clashed
        if dfSilver_new.count() >0:
            max_sk_value = dfSilver_new.agg(F.max(surrogate_keys[0]).alias("max_sk_val")).collect()[0]["max_sk_val"]
            print(max_sk_value)
            dfSilver_old_modified = dfSilver_old_modified.drop(*surrogate_keys)
            dfSilver_old_modified = dfSilver_old_modified.withColumn(surrogate_keys[0], F.lit(max_sk_value+1) + F.monotonically_increasing_id())
        #display(dfSilver_old_modified)


        #now we need to prepare final dfSilver after the union. so in this way we will be able to handle both of the things with our scd_2 merge
        dfSilver = dfSilver.union(dfSilver_old_modified)
        display(dfSilver)
elif table_type =="FACT":
    dfSilver = dfSilver.withColumn("create_date", F.current_date()).withColumn("update_date", F.current_date())
    display(dfSilver)
    
        

        


    

In [0]:
#delta_table = DeltaTable.forName(spark, f"datahub_{env}_silver.{source}.{silver_table}")

print(datetime.strptime('9999-12-30', '%Y-%m-%d').date() - date.today())



In [0]:
gold_path = f"abfss://gold@storageawesum.dfs.core.windows.net/data/{source}/{gold_table}"


if not sh.uc_table_exists(spark, catalog= f"datahub_{env}_gold", schema = source, table = gold_table):

#this is for the first time run. In this case there is not table and hence are not focussed on schema evolution

    if table_type == "SCD1":
        create_date = date.today()
        update_date = date.today()
        dfSilver = dfSilver.withColumn("create_date", F.lit(create_date))
        dfSilver = dfSilver.withColumn("update_date", F.lit(update_date))
        display(dfSilver)
        
    elif table_type =="SCD2":
        effective_start_date = date.today()
        effective_end_date = '9999-12-31'
        dfSilver = dfSilver.withColumn("effective_start_date", F.lit(effective_start_date))
        dfSilver = dfSilver.withColumn("effective_end_date", F.to_date(F.lit(effective_end_date)))
        display(dfSilver)


    
    dfSilver.write.mode("overwrite").format("delta").partitionBy('extract_timestamp').save(gold_path)


    
    spark.sql(f"""
    CREATE SCHEMA IF NOT EXISTS datahub_{env}_gold.{source}
    """)

    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS datahub_{env}_gold.{source}.{gold_table}
        USING delta
        LOCATION '{gold_path}'
    """)
 

else:
    #handle schema evolution in another runs first
    schema_evolved = sh.handle_schema_evolution(
        spark = spark,
        source_df = dfSilver.limit(0),
        target_path=gold_path,
        contract_attributes = sh.extract_attributes(field_schema),
        mode = "append"
    )
    # in case we get green signal like schema can be evolved then we will handle the merge logic
    if schema_evolved:
        if table_type == "SCD1":
            merge_happened = spdf.scd1_merge(
                spark,
                dfSilver,
                f"datahub_{env}_gold.{source}.{gold_table}",
                surrogate_keys
            )
            if merge_happened:
                print("SCD1 Performed")
            else:
                print("SCD1 Not Performed")

        elif table_type =="SCD2":
            merge_happened = spdf.scd2_merge(
                spark,
                dfSilver,
                f"datahub_{env}_gold.{source}.{gold_table}",
                surrogate_keys
            )
            if merge_happened:
                print("SCD2 Performed")
            else:
                print("SCD2 Not Performed")

        elif table_type =="FACT":
            dfSilver.write.mode("append").partitionBy('extract_timestamp').format("delta").save(gold_path)
            print("FACT Table Updated")


    

 
