In [0]:
import pandas as pd
import pyodbc
import datetime
import os
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat_ws, ltrim, rtrim, coalesce, lit, md5, upper
from pyspark.sql.functions import to_date

In [0]:
def hash_value(df,keys):
  non_primary_key_columns = [ c  for c in df.columns if c.lower() not in keys]
  key_columns = [ c for c in df.columns if c.lower() in keys]
  df = df.withColumn("src_hash_key", upper(md5(concat_ws("_", *key_columns))))
  df = df.withColumn("src_hash", upper(md5(concat_ws("_", *non_primary_key_columns))))
  return df

In [0]:
spark.conf.set("spark.sql.session.timeZone", "Asia/Bangkok") 

----
product_EXandIM

In [0]:
target_table = 'scgp_edl_dev_uat.dev_scgp_edl_staging.ext_tradereport_thailand_imp_exp_product'
tmp_table = "meas_tmp"
additional_cols = ["start_date", "end_date", "active_status"]
list_key = ['product','data_month','data_year','type']

In [0]:
df1 = spark.read.csv('abfss://scgpkgdldevhot@scgpkgdldevhot.dfs.core.windows.net/EDW_DATA_LANDING/collect_trader/csv/principal_export.csv',header=True)

df2 = spark.read.csv('abfss://scgpkgdldevhot@scgpkgdldevhot.dfs.core.windows.net/EDW_DATA_LANDING/collect_trader/csv/principal_import.csv',header=True)


In [0]:
df_product = df1.unionByName(df2)
df_product_hashed = hash_value(df_product,list_key)

In [0]:
df_product_hashed.createOrReplaceTempView(tmp_table)

In [0]:
src_cols = df_product_hashed.columns
all_cols = src_cols + additional_cols
all_cols_str = ", ".join(all_cols)
src_cols_str = ", ".join(src_cols)

In [0]:
update_query =f"""  MERGE INTO {target_table} tg
                    USING {tmp_table} tmp
                    ON tg.src_hash_key = tmp.src_hash_key
                    WHEN MATCHED AND tg.src_hash <> tmp.src_hash
                    AND tg.end_date = CAST('9999-12-31' AS TIMESTAMP)
                    AND tg.active_status = TRUE THEN
                      UPDATE SET
                        tg.end_date = GETDATE() """
spark.sql(update_query).display()

In [0]:
insert_query =f""" INSERT INTO {target_table}
                    (
                      {all_cols_str}
                    ) 
                    SELECT
                      {src_cols_str}
                      ,GETDATE() AS start_date
                      ,CAST('9999-12-31' AS TIMESTAMP)  AS end_date
                      ,TRUE AS active_status
                    FROM {tmp_table} tmp
                    WHERE tmp.src_hash_key || '_' || tmp.src_hash NOT IN 
                    (
                      SELECT tg.src_hash_key || '_' || tg.src_hash
                      FROM  {target_table} tg
                      WHERE tg.end_date = CAST('9999-12-31' AS TIMESTAMP)
                      AND tg.active_status = TRUE
                    )
                """
spark.sql(insert_query).display()

In [0]:
spark.sql("CLEAR CACHE")

------------

In [0]:
display(dbutils.fs.ls('abfss://scgpkgdldevhot@scgpkgdldevhot.dfs.core.windows.net/EDW_DATA_LANDING/collect_trader/csv'))