In [0]:
configs = {
  "fs.azure.account.auth.type": "OAuth",
  "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id": "3e04ee23-1ccd-49ad-8945-2ebce8253fc2",
  "fs.azure.account.oauth2.client.secret": dbutils.secrets.get("scope", "key"),
  "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/9b91be9b-3f50-4b53-ba3b-d896a492064e/oauth2/token"
}
dbutils.fs.mount(
  source = "abfss://output@adlsdevvbsource001.dfs.core.windows.net/",
  mount_point = "/mnt/source",
  extra_configs = configs)

In [0]:
%fs
ls '/mnt/source/snowflake'


In [0]:
from datetime import datetime, timedelta

# yesterdayâ€™s file (assuming you process next day)
#process_date = (datetime.today() - timedelta(days=1)).strftime("%Y/%m/%d")
process_date = datetime.today().strftime("%Y/%m/%d") #--today's file

path = f"/mnt/source/snowflake/{process_date}/*.parquet"


raw_df = spark.read.format("parquet") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(path)

display(raw_df)

In [0]:
raw_df.write.format("delta").mode("overwrite").save("/mnt/curated/cust_order_delta")
spark.sql("CREATE TABLE IF NOT EXISTS cust_order_delta USING DELTA LOCATION '/mnt/curated/cust_order_delta'")

###SCD1

In [0]:
from pyspark.sql.functions import current_date, lit, row_number
from pyspark.sql.window import Window
from delta.tables import DeltaTable

# ---- Step 1: Get latest file path and folder date ----
folders = dbutils.fs.ls("/mnt/source/snowflake/")
latest_year = max([f.name.replace('/', '') for f in folders])

folders = dbutils.fs.ls(f"/mnt/source/snowflake/{latest_year}/")
latest_month = max([f.name.replace('/', '') for f in folders])

folders = dbutils.fs.ls(f"/mnt/source/snowflake/{latest_year}/{latest_month}/")
latest_day = max([f.name.replace('/', '') for f in folders])

latest_path = f"/mnt/source/snowflake/{latest_year}/{latest_month}/{latest_day}/*.parquet"
print(f"Reading from: {latest_path}")

# Construct file_date from folder
file_date = f"{latest_year}-{latest_month}-{latest_day}"

# ---- Step 2: Read raw data and add columns ----
raw_df = spark.read.parquet(latest_path)

spark.conf.set("spark.databricks.delta.schema.autoMerge.enabled", "true")

new_df = (raw_df
          .withColumn("ingest_date", current_date())   # actual load date
          .withColumn("file_date", lit(file_date)))    # folder date

# ---- Step 2.1: Deduplicate (avoid merge errors) ----
window = Window.partitionBy("ORDER_ID", "ORDER_ITEM_ID").orderBy(new_df["ingest_date"].desc())
new_df = (new_df
          .withColumn("rn", row_number().over(window))
          .filter("rn = 1")   # keep only latest record per ORDER_ID + ORDER_ITEM_ID
          .drop("rn"))

# ---- Step 3: Merge into Delta (SCD1) ----
delta_path = "/mnt/curated/cust_order_delta"

if not DeltaTable.isDeltaTable(spark, delta_path):
    # Initial load
    (new_df.write
        .format("delta")
        .mode("overwrite")
        .option("mergeSchema", "true")
        .save(delta_path))
else:
    # Merge (SCD1: overwrite on match, insert on new)
    deltaTable = DeltaTable.forPath(spark, delta_path)
    (deltaTable.alias("t")
     .merge(new_df.alias("s"),
            "t.ORDER_ID = s.ORDER_ID AND t.ORDER_ITEM_ID = s.ORDER_ITEM_ID")
     .whenMatchedUpdateAll()
     .whenNotMatchedInsertAll()
     .execute())

# ---- Step 4: Reload and check ----
updated_df = (spark.read
              .format("delta")
              .option("mergeSchema", "true")
              .load(delta_path))

print("Before Merge:")
raw_df.show(10, truncate=False)

print("After Merge:")
updated_df.show(10, truncate=False)
