In [0]:
%sql
CREATE DATABASE IF NOT EXISTS retail;


In [0]:
from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable


In [0]:
dbutils.widgets.text("fileName", "")
dbutils.widgets.text("loadType", "incremental")


file_name = dbutils.widgets.get("fileName")
load_type = dbutils.widgets.get("loadType").lower()


print(f"file_name={file_name}, load_type={load_type}")

file_name=sales_sample.csv, load_type=incremental


In [0]:
# Mount ADLS Gen2 to Databricks File System (DBFS)
configs = {
  'fs.azure.account.auth.type': 'OAuth',
  'fs.azure.account.oauth.provider.type': 'org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider',
  'fs.azure.account.oauth2.client.id': '4468424b-2bd2-4feb-b598-baee8b36f8f7',
  'fs.azure.account.oauth2.client.secret': 'p.28Q~zJz3Pse2sKzF~Y_pT5R2kDpM.QVF-ZEaO~',
  'fs.azure.account.oauth2.client.endpoint': 'https://login.microsoftonline.com/6e5e9329-40e8-4a76-8726-ba7dcc3c2564/oauth2/token'
}
# Mount container from ADLS
dbutils.fs.mount(
  source = 'abfss://retail@copyname.dfs.core.windows.net/',
  mount_point = '/mnt/retail',
  extra_configs = configs
)

True

In [0]:
storage_account = "copyname"
container = "retail"



configs = {
  f"fs.azure.account.auth.type.{storage_account}.dfs.core.windows.net": "OAuth",
  f"fs.azure.account.oauth.provider.type.{storage_account}.dfs.core.windows.net": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  f"fs.azure.account.oauth2.client.id.{storage_account}.dfs.core.windows.net": '4468424b-2bd2-4feb-b598-baee8b36f8f7',
  f"fs.azure.account.oauth2.client.secret.{storage_account}.dfs.core.windows.net": 'p.28Q~zJz3Pse2sKzF~Y_pT5R2kDpM.QVF-ZEaO~',
  f"fs.azure.account.oauth2.client.endpoint.{storage_account}.dfs.core.windows.net": 
      f'https://login.microsoftonline.com/6e5e9329-40e8-4a76-8726-ba7dcc3c2564/oauth2/token'
}

# Only mount if it does not exist
if "/mnt/retail" not in [m.mountPoint for m in dbutils.fs.mounts()]:
    dbutils.fs.mount(
        source = f"abfss://{retail}@{copyname}.dfs.core.windows.net/",
        mount_point = "/mnt/retail",
        extra_configs = configs
    )


In [0]:
%fs ls '/mnt/retail'

path,name,size,modificationTime
dbfs:/mnt/retail/archive/,archive/,0,0
dbfs:/mnt/retail/curated/,curated/,0,0
dbfs:/mnt/retail/landing/,landing/,0,0
dbfs:/mnt/retail/raw/,raw/,0,0


In [0]:
# ============================================================
# Retail Sales ETL – from /mnt/retail (ADLS Gen2 mount)
# - Validates raw sales
# - Joins product & store master
# - Writes curated Delta (full + incremental)
# ============================================================

from pyspark.sql import functions as F
from pyspark.sql.types import *
from delta.tables import DeltaTable

# -------------------------------
# 1. Widgets / Parameters
# -------------------------------
# ADF will populate these. For manual runs, you can set defaults.
dbutils.widgets.text("fileName", "sales_sample.csv")
dbutils.widgets.text("loadType", "incremental")
dbutils.widgets.text("basePath", "/mnt/retail")

file_name = dbutils.widgets.get("fileName")
load_type = dbutils.widgets.get("loadType").lower()
base_path = dbutils.widgets.get("basePath")

print(f"[INFO] file_name={file_name}, load_type={load_type}, basePath={base_path}")

if not file_name:
    raise ValueError("fileName widget is empty. Make sure ADF passes it or set a default for testing.")

# Optional: tune shuffles for smallish datasets (set higher for big volumes)
spark.conf.set("spark.sql.shuffle.partitions", "64")

# -------------------------------
# 2. Path configuration
# -------------------------------
raw_sales_path       = f"{base_path}/raw/sales/{file_name}"
product_master_path  = f"{base_path}/raw/master/product_master.csv"
store_master_path    = f"{base_path}/raw/master/store_master.csv"

target_path          = f"{base_path}/curated/sales_enriched"
reject_path          = f"{base_path}/curated/rejects/sales"
target_table_name    = "retail.sales_enriched"

print(f"[INFO] raw_sales_path     = {raw_sales_path}")
print(f"[INFO] product_master_path= {product_master_path}")
print(f"[INFO] store_master_path  = {store_master_path}")
print(f"[INFO] target_path        = {target_path}")

# -------------------------------
# 2a. Check if source files exist before reading
# -------------------------------
def path_exists(path):
    try:
        dbutils.fs.ls(path)
        return True
    except Exception:
        return False

missing_paths = []
for p, label in [
    (raw_sales_path, "raw_sales_path"),
    (product_master_path, "product_master_path"),
    (store_master_path, "store_master_path")
]:
    if not path_exists(p):
        missing_paths.append(f"{label}: {p}")

if missing_paths:
    # If raw_sales_path is missing, list available files in the directory for user help
    if not path_exists(raw_sales_path):
        try:
            available_files = dbutils.fs.ls(f"{base_path}/raw/sales/")
            available_files_list = [f.path for f in available_files]
            print("[ERROR] The specified raw_sales_path does not exist.")
            print(f"[INFO] Available files in {base_path}/raw/sales/:\n" + "\n".join(available_files_list))
        except Exception as e:
            print(f"[ERROR] Could not list files in {base_path}/raw/sales/: {e}")
    raise FileNotFoundError(
        f"The following required file(s) or directory(ies) do not exist:\n" + "\n".join(missing_paths) +
        (f"\n\n[INFO] Please check the fileName widget value. See the notebook output above for available files in {base_path}/raw/sales/." if not path_exists(raw_sales_path) else "")
    )

# -------------------------------
# 3. Define schemas
# -------------------------------
sales_schema = StructType([
    StructField("sale_id",    IntegerType(), False),
    StructField("sale_date",  StringType(),  False),
    StructField("store_id",   IntegerType(), True),
    StructField("product_id", IntegerType(), True),
    StructField("quantity",   IntegerType(), True),
    StructField("amount",     DoubleType(),  True),
])

product_schema = StructType([
    StructField("product_id",   IntegerType(), False),
    StructField("product_name", StringType(),  True),
    StructField("category",     StringType(),  True),
    StructField("unit_price",   DoubleType(),  True),
])

store_schema = StructType([
    StructField("store_id",   IntegerType(), False),
    StructField("store_name", StringType(),  True),
    StructField("store_city", StringType(),  True),
])

# -------------------------------
# 4. Read source data
# -------------------------------
sales_df = (
    spark.read
         .option("header", "true")
         .schema(sales_schema)
         .csv(raw_sales_path)
)

product_df = (
    spark.read
         .option("header", "true")
         .schema(product_schema)
         .csv(product_master_path)
)

store_df = (
    spark.read
         .option("header", "true")
         .schema(store_schema)
         .csv(store_master_path)
)

# Cache small dimension tables for faster reuse
product_df = product_df.dropDuplicates(["product_id"]).cache()
store_df   = store_df.dropDuplicates(["store_id"]).cache()

print(f"[INFO] sales_df count   = {sales_df.count()}")
print(f"[INFO] product_df count = {product_df.count()}")
print(f"[INFO] store_df count   = {store_df.count()}")

# -------------------------------
# 5. Data validation
# -------------------------------
# Rules:
# - sale_date valid (yyyy-MM-dd)
# - product_id, store_id not null
# - quantity >= 0
# - amount not null

sales_valid = (
    sales_df
    .withColumn("sale_date_parsed", F.to_date("sale_date", "yyyy-MM-dd"))
    .filter(F.col("sale_date_parsed").isNotNull())
    .filter(F.col("product_id").isNotNull())
    .filter(F.col("store_id").isNotNull())
    .filter(F.col("quantity").isNotNull() & (F.col("quantity") >= 0))
    .filter(F.col("amount").isNotNull())
)

# -------------------------------
# 6. Enrichment & derived columns
# -------------------------------
#  - Date parts: year, month, day
#  - Joins with product & store master
#  - Derived: total_value, discount_flag

sales_enriched = (
    sales_valid
        .withColumn("sale_date", F.col("sale_date_parsed"))
        .drop("sale_date_parsed")
        .withColumn("year",  F.year("sale_date"))
        .withColumn("month", F.month("sale_date"))
        .withColumn("day",   F.dayofmonth("sale_date"))
)

# Broadcast small dimension tables for faster joins
sales_enriched = (
    sales_enriched
        .join(F.broadcast(product_df), on="product_id", how="left")
        .join(F.broadcast(store_df),   on="store_id",   how="left")
)

sales_enriched = (
    sales_enriched
        .withColumn("total_value", F.col("quantity") * F.col("amount"))
        .withColumn(
            "discount_flag",
            F.when(
                F.col("unit_price").isNotNull() & (F.col("amount") < F.col("unit_price")),
                F.lit(True)
            ).otherwise(F.lit(False))
        )
)

# (Optional) select & order final columns explicitly for cleaner schema
sales_enriched = sales_enriched.select(
    "sale_id",
    "sale_date",
    "year", "month", "day",
    "store_id", "store_name", "store_city",
    "product_id", "product_name", "category",
    "quantity", "amount", "unit_price",
    "total_value", "discount_flag",
)

print(f"[INFO] Enriched record count = {sales_enriched.count()}")

# -------------------------------
# 7. Delta Lake: full + incremental load
# -------------------------------
spark.sql("CREATE DATABASE IF NOT EXISTS retail")

def is_delta_table(path: str) -> bool:
    try:
        DeltaTable.forPath(spark, path)
        return True
    except Exception:
        return False

def path_exists_fs(path: str) -> bool:
    try:
        dbutils.fs.ls(path)
        return True
    except Exception:
        return False

table_exists = is_delta_table(target_path)
print(f"[INFO] Delta table exists at target_path: {table_exists}")

# FULL LOAD – overwrite curated table (first time or explicit full)
if (load_type == "full") or (not table_exists):
    print("[INFO] Running FULL LOAD...")
    # If the path exists but is not a Delta table, remove it to avoid format conflicts
    if path_exists_fs(target_path) and not is_delta_table(target_path):
        print(f"[WARNING] Target path {target_path} exists but is not a Delta table. Removing it to avoid format conflict.")
        dbutils.fs.rm(target_path, True)
    
    (sales_enriched
        .repartition("year", "month")  # good for partitioning
        .write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .partitionBy("year", "month")
        .save(target_path)
    )
    
    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {target_table_name}
        USING DELTA
        LOCATION '{target_path}'
    """)
    
    print(f"[INFO] Full load completed. Table: {target_table_name}")

# INCREMENTAL LOAD – upsert using sale_id
elif load_type == "incremental" and table_exists:
    print("[INFO] Running INCREMENTAL LOAD (MERGE on sale_id)...")

    delta_tbl = DeltaTable.forPath(spark, target_path)

    # Avoid duplicates in incoming batch
    batch_df = sales_enriched.dropDuplicates(["sale_id"])

    (delta_tbl.alias("t")
        .merge(
            batch_df.alias("s"),
            "t.sale_id = s.sale_id"
        )
        .whenMatchedUpdateAll()
        .whenNotMatchedInsertAll()
        .execute()
    )

    print("[INFO] Incremental upsert completed.")

else:
    raise ValueError(f"Unsupported loadType: {load_type}. Use 'full' or 'incremental'.")

# -------------------------------
# 8. Optional: optimize Delta table
# -------------------------------
# Uncomment these for larger datasets / scheduled maintenance

# spark.sql(f"OPTIMIZE {target_table_name}")
# spark.sql(f"VACUUM {target_table_name} RETAIN 168 HOURS")  # keep 7 days of history

print("[INFO] ETL job finished successfully.")

[INFO] file_name=sales_sample.csv, load_type=incremental, basePath=/mnt/retail
[INFO] raw_sales_path     = /mnt/retail/raw/sales/sales_sample.csv
[INFO] product_master_path= /mnt/retail/raw/master/product_master.csv
[INFO] store_master_path  = /mnt/retail/raw/master/store_master.csv
[INFO] target_path        = /mnt/retail/curated/sales_enriched
[INFO] sales_df count   = 5
[INFO] product_df count = 3
[INFO] store_df count   = 3
[INFO] Enriched record count = 5
[INFO] Delta table exists at target_path: True
[INFO] Running INCREMENTAL LOAD (MERGE on sale_id)...
[INFO] Incremental upsert completed.
[INFO] ETL job finished successfully.


In [0]:
spark.sql("DESCRIBE HISTORY retail.sales_enriched")


DataFrame[version: bigint, timestamp: timestamp, userId: string, userName: string, operation: string, operationParameters: map<string,string>, job: struct<jobId:string,jobName:string,jobRunId:string,runId:string,jobOwnerId:string,triggerType:string>, notebook: struct<notebookId:string>, clusterId: string, readVersion: bigint, isolationLevel: string, isBlindAppend: boolean, operationMetrics: map<string,string>, userMetadata: string, engineInfo: string]

In [0]:
%sql
DROP TABLE IF EXISTS retail.sales_enriched;

In [0]:
# See what's inside curated
display(dbutils.fs.ls("/mnt/retail/curated"))

# See what's inside the target folder (if it exists)
display(dbutils.fs.ls("/mnt/retail/curated/sales_enriched"))

# ⚠️ One-time cleanup: delete the old contents
dbutils.fs.rm("/mnt/retail/curated/sales_enriched", recurse=True)


path,name,size,modificationTime
dbfs:/mnt/retail/curated/sales_enriched/,sales_enriched/,0,1763147756000


path,name,size,modificationTime
dbfs:/mnt/retail/curated/sales_enriched/_delta_log/,_delta_log/,0,1763147756000
dbfs:/mnt/retail/curated/sales_enriched/year=2025/,year=2025/,0,1763147760000


True

In [0]:
if load_type == "full" or not table_exists:
    (sales_enriched
        .repartition("year", "month")
        .write
        .format("delta")
        .mode("overwrite")
        .option("overwriteSchema", "true")
        .partitionBy("year", "month")
        .save(target_path)
    )

    spark.sql(f"""
        CREATE TABLE IF NOT EXISTS {target_table_name}
        USING DELTA
        LOCATION '{target_path}'
    """)


In [0]:
%sql
SELECT * FROM retail.sales_enriched LIMIT 20;


sale_id,sale_date,year,month,day,store_id,store_name,store_city,product_id,product_name,category,quantity,amount,unit_price,total_value,discount_flag
1,2025-01-01,2025,1,1,101,Store A,Hyderabad,201,Shampoo,Personal Care,2,200.0,120.0,400.0,False
2,2025-01-01,2025,1,1,102,Store B,Mumbai,202,Soap,Personal Care,3,150.0,50.0,450.0,False
3,2025-01-02,2025,1,2,101,Store A,Hyderabad,203,Toothpaste,Oral Care,1,300.0,150.0,300.0,False
4,2025-01-02,2025,1,2,103,Store C,Bangalore,201,Shampoo,Personal Care,4,200.0,120.0,800.0,False
5,2025-01-03,2025,1,3,102,Store B,Mumbai,202,Soap,Personal Care,2,150.0,50.0,300.0,False


In [0]:
%sql

select store_id, store_name, SUM(total_value) as revenue 
from retail.sales_enriched group by store_id, store_name
order by revenue desc
limit 5

store_id,store_name,revenue
103,Store C,800.0
102,Store B,750.0
101,Store A,700.0


In [0]:
%sql
select year, month, SUM(quantity) total_quantity, SUM(amount) total_sales 
from retail.sales_enriched group by year

year,month,total_quantity,total_sales
2025,1,12,1000.0


In [0]:
%sql
select category, SUM(quantity) total_quantity, SUM(total_value) total_value from retail.sales_enriched group by category order by total_value desc

category,total_quantity,total_value
Personal Care,11,1950.0
Oral Care,1,300.0


In [0]:
%sql
select * from retail.sales_enriched


sale_id,sale_date,year,month,day,store_id,store_name,store_city,product_id,product_name,category,quantity,amount,unit_price,total_value,discount_flag
1,2025-01-01,2025,1,1,101,Store A,Hyderabad,201,Shampoo,Personal Care,2,200.0,120.0,400.0,False
2,2025-01-01,2025,1,1,102,Store B,Mumbai,202,Soap,Personal Care,3,150.0,50.0,450.0,False
3,2025-01-02,2025,1,2,101,Store A,Hyderabad,203,Toothpaste,Oral Care,1,300.0,150.0,300.0,False
4,2025-01-02,2025,1,2,103,Store C,Bangalore,201,Shampoo,Personal Care,4,200.0,120.0,800.0,False
5,2025-01-03,2025,1,3,102,Store B,Mumbai,202,Soap,Personal Care,2,150.0,50.0,300.0,False


In [0]:
%sql
select product_id, product_name, category, SUM(total_value) revenue
from retail.sales_enriched
group by product_name, category, product_id
order by revenue desc

product_id,product_name,category,revenue
201,Shampoo,Personal Care,1200.0
202,Soap,Personal Care,750.0
203,Toothpaste,Oral Care,300.0


In [0]:
%sql
select store_id, store_name, count(*) as total_sales  from retail.sales_enriched group by store_id, store_name order by total_sales desc limit 1

store_id,store_name,total_sales
101,Store A,2
