In [0]:
# Access the via abfss directly without the mount
# 1. Configure Spark to access ADLS Gen2 via OAuth
spark.conf.set("fs.azure.account.auth.type.samagnadev.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.samagnadev.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.samagnadev.dfs.core.windows.net", "354e564a-6750-407d-bd2b-31aec79a0a4f")
spark.conf.set("fs.azure.account.oauth2.client.secret.samagnadev.dfs.core.windows.net", "DrS8Q~XtsAnlppppcT14RxUTCMP~w-HE2nCEQc5-")
spark.conf.set("fs.azure.account.oauth2.client.endpoint.samagnadev.dfs.core.windows.net", "https://login.microsoftonline.com/7af2a929-b20f-4b9e-afe3-e02c2a070412/oauth2/token")

In [0]:
dbutils.widgets.text("watermark_param", "")

In [0]:
watermark_param = dbutils.widgets.get("watermark_param")
raw_path = "abfss://practice@samagnadev.dfs.core.windows.net/Raw"
silver_path = "abfss://practice@samagnadev.dfs.core.windows.net/silver/sales_data"

In [0]:
from pyspark.sql.functions import col, lit, to_timestamp, date_format, max as spark_max
from delta.tables import DeltaTable

In [0]:
# 1) Read raw CSV files (only those present)
df_raw = spark.read.option("header","true").option("inferSchema","true").csv(raw_path)








In [0]:
# 2) Cast important columns
df_raw = df_raw \
  .withColumn("CreatedTimestamp", to_timestamp(col("CreatedTimestamp"))) \
  .withColumn("OrderDate", to_timestamp(col("OrderDate"))) \
  .withColumn("QuantitySold", col("QuantitySold").cast("int")) \
  .withColumn("UnitPrice", col("UnitPrice").cast("double")) \
  .withColumn("StandardCost", col("StandardCost").cast("double"))


In [0]:
# 3) Filter by watermark param (defensive)
if watermark_param and watermark_param.strip():
  df_delta = df_raw.filter(col("CreatedTimestamp") > lit(watermark_param))
else:
  df_delta = df_raw


In [0]:
# 4) Cleaning & validation (replace with your business rules)
df_clean = df_delta.filter(col("SalesOrderID").isNotNull()).filter(col("CustomerID").isNotNull())


In [0]:
# 5) Merge into Delta silver (idempotent)
silver_path = silver_path.rstrip("/")  # ensure no trailing slash
if not DeltaTable.isDeltaTable(spark, silver_path):
  df_clean.write.format("delta").mode("overwrite").save(silver_path)
else:
  deltaTbl = DeltaTable.forPath(spark, silver_path)
  merge_cond = "target.SalesOrderID = source.SalesOrderID AND target.SalesOrderDetailID_Source = source.SalesOrderDetailID_Source"
  deltaTbl.alias("target").merge(
      source = df_clean.alias("source"),
      condition = merge_cond
  ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()