In [0]:
dbutils.fs.mounts()

In [0]:
%sql
USE CATALOG hive_metastore; 
CREATE DATABASE IF NOT EXISTS silver;

USE silver;

In [0]:
srcpath="/mnt/data/silver/"
trgpath="/mnt/data/gold/accounts"


In [0]:
%sql
CREATE TABLE IF NOT EXISTS ACCOUNTS (
    ACCOUNT_ID STRING NOT NULL,
    CUSTOMER_ID STRING,
    ACCOUNT_TYPE STRING,
    BALANCE DOUBLE,
    HASHKEY STRING,
    CREATEDBY STRING,
    CREATEDDATE DATE,
    UPDATEDBY STRING,
    UPDATEDDATE DATE
)
USING DELTA
LOCATION '/mnt/data/gold/accounts';


In [0]:
dbutils.widgets.text("foldername","")


In [0]:
f_path=dbutils.widgets.get("foldername")
print(f_path)

In [0]:
df=spark.read.format("parquet").option("header","true").option("inferSchema","true").load(srcpath+f_path)
display(df)

In [0]:
from pyspark.sql.functions import *

df_hash=df.withColumn("Hash", crc32(concat(*df.columns)))

In [0]:
display(df_hash)

In [0]:
from delta.tables import DeltaTable
# Use forPath instead of fromPath
dtable = DeltaTable.forPath(spark, trgpath)
# Show the data
dtable.toDF().show()

In [0]:
from pyspark.sql.functions import col

df_src1=df_hash.alias("src").join(dtable.toDF().alias("tgt"), ((col("src.account_id")==col("tgt.ACCOUNT_ID")) & (col("src.Hash")==col("tgt.HASHKEY"))),"anti").select("src.*")

display(df_src1)


In [0]:
dtable.alias("tgt").merge(df_src1.alias("src"), ((col("src.account_id")==col("tgt.ACCOUNT_ID"))))\
    .whenMatchedUpdate(set={
        "tgt.ACCOUNT_ID":"src.account_id",
        "tgt.CUSTOMER_ID":"src.customer_id",
        "tgt.ACCOUNT_TYPE":"src.account_type",
        "tgt.BALANCE":"src.balance",
        "tgt.HASHKEY":"src.Hash",
        "tgt.UPDATEDDATE":current_timestamp(),
        "tgt.UPDATEDBY":lit("databricks-updated")
    })\
    .whenNotMatchedInsert(values={
        "tgt.ACCOUNT_ID":"src.account_id",
        "tgt.CUSTOMER_ID":"src.customer_id",
        "tgt.ACCOUNT_TYPE":"src.account_type",
        "tgt.BALANCE":"src.balance",
        "tgt.HASHKEY":"src.Hash",
        "tgt.CREATEDDATE":current_timestamp(),
        "tgt.CREATEDBY":lit("databricks"),
        "tgt.UPDATEDDATE":current_timestamp(),
        "tgt.UPDATEDBY":lit("databricks")
    }).execute()
        

In [0]:
%sql
select * from silver.ACCOUNTS

In [0]:
df_gold=spark.sql("select * from silver.ACCOUNTS")

df_gold.write.format("delta").mode("overwrite").save("/mnt/data/gold/accounts")

In [0]:
# List files in the gold layer directory
dbutils.fs.ls("/mnt/data/gold/accounts")


In [0]:
a=5
print(a)