In [0]:
%run "../bronze_to_silver/config"

In [0]:
df_loans_silver=spark.read.format("parquet").load(f"{silver_folder_path}/loans/*.parquet")

In [0]:
from pyspark.sql.functions import crc32, concat, col,lit,current_timestamp

In [0]:
# add hashkey to source
df_src_hash=df_loans_silver.\
    withColumn("src_hashkey",crc32(concat(col("loan_id").cast("string"),col("customer_id").cast("string"),col("loan_amount").cast("string"),col("interest_rate").cast("string"),col("loan_term").cast("string"))))

In [0]:
from delta.tables import DeltaTable

In [0]:
dbtable=DeltaTable.forPath(spark,f"{gold_folder_path}/loans/")
df_tgt=dbtable.toDF()

In [0]:
# join src and tgt to filter new rows
df_final=df_src_hash.alias("src").\
    join(df_tgt.alias("tgt"),
         (col("src.loan_id")==col("tgt.loan_id")) & (col("src.src_hashkey")==col("tgt.hashkey")),
         "anti"
         )

In [0]:
dbtable.alias('tgt') \
  .merge(
    df_final.alias('src'),
    'src.loan_id = tgt.loan_id'
  ) \
  .whenMatchedUpdate(set =
    {
      "tgt.loan_id":"src.loan_id",
      "tgt.customer_id":"src.customer_id",
       "tgt.loan_amount":"src.loan_amount",
       "tgt.interest_rate":"src.interest_rate",
       "tgt.loan_term":"src.loan_term",
       "tgt.hashkey":"src.src_hashkey",
       "tgt.updateddate":current_timestamp(),
       "tgt.updatedby":lit("databricks-updated")
    }
  ) \
  .whenNotMatchedInsert(values =
    {
       "tgt.loan_id":"src.loan_id",
       "tgt.customer_id":"src.customer_id",
       "tgt.loan_amount":"src.loan_amount",
       "tgt.interest_rate":"src.interest_rate",
       "tgt.loan_term":"src.loan_term",
       "tgt.hashkey":"src.src_hashkey",
       "tgt.createddate":current_timestamp(),
       "tgt.createdby":lit("databricks"),
       "tgt.updateddate":current_timestamp(),
       "tgt.updatedby":lit("databricks")
    }
  ) \
  .execute()

In [0]:
%sql
select * from loans order by loan_id;