### 🔹 Bronze Layer: Data Ingestion in Azure Databricks  
This section ingests raw data from CSV and JSON files into Delta format, creating **bronze tables** for customers, orders, and products.  
These Delta tables serve as the foundation for further transformations in the Silver and Gold layers.


In [0]:
# Bronze Layer - Data Ingestion (Unity Catalog Friendly)
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
from pyspark.sql.functions import to_date

delta_base = 'dbfs:/FileStore/retail360/delta'

# Schemas
customers_schema = StructType([
    StructField('customer_id', IntegerType(), True),
    StructField('name', StringType(), True),
    StructField('region', StringType(), True),
    StructField('email', StringType(), True)
])

orders_schema = StructType([
    StructField('order_id', IntegerType(), True),
    StructField('customer_id', IntegerType(), True),
    StructField('product', StringType(), True),
    StructField('quantity', IntegerType(), True),
    StructField('price', DoubleType(), True),
    StructField('status', StringType(), True),
    StructField('order_date', StringType(), True)
])

# --- Customers ---
bronze_customers_path = f'{delta_base}/bronze_customers'
customers_df = (spark.read.option('header', True)
                .schema(customers_schema)
                .csv('/FileStore/tables/customers-4.csv'))
customers_df.write.format('delta').mode('overwrite').save(bronze_customers_path)

# --- Orders ---
bronze_orders_path = f'{delta_base}/bronze_orders'
orders_df = (spark.read.option('header', True)
             .schema(orders_schema)
             .csv('/FileStore/tables/orders_day1-3.csv')
             .withColumn('order_date', to_date('order_date', 'yyyy-MM-dd')))
orders_df.write.format('delta').mode('overwrite').save(bronze_orders_path)

# --- Products ---
bronze_products_path = f'{delta_base}/bronze_products'
products_df = spark.read.json('/FileStore/tables/products-3.json')
products_df.write.format('delta').mode('overwrite').save(bronze_products_path)

print("Bronze layer ingestion completed.")


Bronze layer ingestion completed.


### 🔸 Silver Layer: Data Cleansing & Transformation in Azure Databricks  
This section refines the Bronze data by removing invalid records, enriching it with product and customer details, and calculating total amounts.  
The resulting **Silver tables** contain clean, structured, and analytics-ready data.


In [0]:
# Databricks Notebook: Silver Layer - Cleansing & Transformation (Unity Catalog–Friendly)
# Reads Bronze Delta files and produces Silver-level cleaned data.

from pyspark.sql.functions import col

# Define Delta base path
delta_base = 'dbfs:/FileStore/retail360/delta'

# --- Load Bronze Data ---
bronze_customers = spark.read.format('delta').load(f'{delta_base}/bronze_customers')
bronze_orders = spark.read.format('delta').load(f'{delta_base}/bronze_orders')
bronze_products = spark.read.format('delta').load(f'{delta_base}/bronze_products')

# --- Data Cleansing ---
clean_customers = bronze_customers.filter(col('email').isNotNull())
clean_orders = bronze_orders.filter(col('status') == 'Completed')

# --- Data Enrichment ---
# Include category in mapping
products_map = bronze_products.selectExpr('product_id', 'product_name as product', 'category')

# Join orders with customer and product data
silver_orders = (clean_orders
                 .join(clean_customers, on='customer_id', how='left')
                 .join(products_map, on='product', how='left')
                 .withColumn('total_amount', col('quantity') * col('price'))
                 .select(
                     'order_id', 'customer_id', 'name', 'region', 'email',
                     'product', 'product_id', 'category',
                     'quantity', 'price', 'total_amount',
                     'order_date', 'status'
                 ))

# --- Write Silver Layer ---
silver_orders_path = f'{delta_base}/silver_orders'
silver_orders.write.format('delta').mode('overwrite').save(silver_orders_path)

print(" Silver layer transformation completed successfully.")
print(f"Silver data written to: {silver_orders_path}")

# Optional preview:
# display(spark.read.format('delta').load(silver_orders_path))


 Silver layer transformation completed successfully.
Silver data written to: dbfs:/FileStore/retail360/delta/silver_orders


### 🟡 Gold Layer: Business Aggregations in Azure Databricks  
This section aggregates cleansed Silver data to generate **business insights**, including total revenue by region and top-selling products.  
The resulting **Gold tables** provide summarized, analytics-ready datasets for reporting and visualization.


In [0]:
# Databricks Notebook: Gold Layer - Business Aggregations (Unity Catalog–Friendly)
# Reads Silver Delta data and produces Gold-level aggregated outputs.

from pyspark.sql.functions import sum as _sum, rank, desc
from pyspark.sql.window import Window

# Define Delta paths
delta_base = 'dbfs:/FileStore/retail360/delta'
silver_orders_path = f'{delta_base}/silver_orders'
gold_revenue_path = f'{delta_base}/gold_revenue_by_region'
gold_products_path = f'{delta_base}/gold_top_products'
gold_summary_path = f'{delta_base}/gold_sales_summary'

# --- Load Silver Data ---
silver_orders = spark.read.format('delta').load(silver_orders_path)

# --- Aggregation 1: Total Revenue by Region ---
gold_revenue_by_region = (
    silver_orders.groupBy('region')
    .agg(_sum('total_amount').alias('total_revenue'))
    .orderBy(desc('total_revenue'))
)

# Save output
gold_revenue_by_region.write.format('delta').mode('overwrite').save(gold_revenue_path)

# --- Aggregation 2: Top-Selling Products ---
product_sales = (
    silver_orders.groupBy('product', 'product_id')
    .agg(
        _sum('quantity').alias('units_sold'),
        _sum('total_amount').alias('revenue')
    )
)

# --- Apply Ranking ---
window_spec = Window.orderBy(desc('revenue'))
product_sales_ranked = product_sales.withColumn('rank', rank().over(window_spec))

# Save output
product_sales_ranked.write.format('delta').mode('overwrite').save(gold_products_path)

# --- Combined Gold Summary ---
gold_summary = product_sales_ranked.select('product', 'product_id', 'units_sold', 'revenue', 'rank')
gold_summary.write.format('delta').mode('overwrite').save(gold_summary_path)

print(" Gold layer aggregations completed.")
print(f"Gold outputs written to:\n- {gold_revenue_path}\n- {gold_products_path}\n- {gold_summary_path}")

# Optional preview:
# display(spark.read.format('delta').load(gold_summary_path))


 Gold layer aggregations completed.
Gold outputs written to:
- dbfs:/FileStore/retail360/delta/gold_revenue_by_region
- dbfs:/FileStore/retail360/delta/gold_top_products
- dbfs:/FileStore/retail360/delta/gold_sales_summary


### 🔹 Incremental Load Simulation: Updating Silver Layer in Azure Databricks  
This notebook simulates new incoming orders and **merges** them into the existing Silver Layer.  
Steps include loading new data, appending to Bronze, staging transformed records, and performing a **MERGE/UPSERT** to keep the Silver Layer up-to-date.


In [0]:
# Databricks Notebook: Incremental Load Simulation (Unity Catalog–Friendly)
# Purpose: Simulate new incoming orders and merge them into the Silver Layer.

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType
from pyspark.sql.functions import to_date, col
from delta.tables import DeltaTable

# Define base paths
delta_base = 'dbfs:/FileStore/retail360/delta'
silver_orders_path = f'{delta_base}/silver_orders'
bronze_orders_path = f'{delta_base}/bronze_orders'

# --- Schema for new order data ---
orders_schema = StructType([
    StructField('order_id', IntegerType(), True),
    StructField('customer_id', IntegerType(), True),
    StructField('product', StringType(), True),
    StructField('quantity', IntegerType(), True),
    StructField('price', DoubleType(), True),
    StructField('status', StringType(), True),
    StructField('order_date', StringType(), True)
])

# ---  Read New Incremental Orders File ---
# Make sure you upload your new file, for example:
# /FileStore/tables/orders_day2.csv
new_orders = (spark.read
              .option('header', True)
              .schema(orders_schema)
              .csv('/FileStore/tables/orders_day2.csv')
              .withColumn('order_date', to_date('order_date', 'yyyy-MM-dd')))

print(" New incremental orders loaded.")

# ---  Append to Bronze Orders (simulated raw ingestion) ---
new_orders.write.format('delta').mode('append').save(bronze_orders_path)
print(" Appended new orders to Bronze layer.")

# ---  Rebuild Silver Staging from new orders ---
# Reload reference data
bronze_customers = spark.read.format('delta').load(f'{delta_base}/bronze_customers')
bronze_products = spark.read.format('delta').load(f'{delta_base}/bronze_products')

clean_customers = bronze_customers.filter(col('email').isNotNull())
products_map = bronze_products.selectExpr('product_id', 'product_name as product', 'category')

staging_new = (new_orders
               .filter(col('status') == 'Completed')
               .join(clean_customers, on='customer_id', how='left')
               .join(products_map, on='product', how='left')
               .withColumn('total_amount', col('quantity') * col('price'))
               .select(
                   'order_id', 'customer_id', 'name', 'region', 'email',
                   'product', 'product_id', 'category',
                   'quantity', 'price', 'total_amount',
                   'order_date', 'status'
               ))

staging_path = f'{delta_base}/staging_new_orders'
staging_new.write.format('delta').mode('overwrite').save(staging_path)
print(" Staging Delta file created for new orders.")

# ---  MERGE / UPSERT into Silver Orders ---
silver_table = DeltaTable.forPath(spark, silver_orders_path)
staging_table = spark.read.format('delta').load(staging_path)

(silver_table.alias('t')
 .merge(
     staging_table.alias('s'),
     't.order_id = s.order_id'
 )
 .whenMatchedUpdateAll()
 .whenNotMatchedInsertAll()
 .execute())

print(" Incremental MERGE completed. Silver layer updated with new data.")

# Optional: Verify
# display(spark.read.format('delta').load(silver_orders_path))


 New incremental orders loaded.
 Appended new orders to Bronze layer.
 Staging Delta file created for new orders.
 Incremental MERGE completed. Silver layer updated with new data.


### 🕒 Time Travel & Vacuum in Azure Databricks  
This notebook demonstrates **Delta Lake time travel** to explore historical versions of Gold tables and uses **VACUUM** to clean up old, unneeded data files while maintaining retention policies.


In [0]:
# Databricks Notebook: Time Travel & Vacuum (Unity Catalog–Friendly)
# Purpose: Explore historical Delta versions and clean up old data files.

from delta.tables import DeltaTable

# Define paths
delta_base = 'dbfs:/FileStore/retail360/delta'
gold_summary_path = f'{delta_base}/gold_sales_summary'

# ---  View Delta Table History ---
print(" Delta table history for gold_sales_summary:")
history_df = spark.sql(f"DESCRIBE HISTORY delta.`{gold_summary_path}`")
display(history_df)

# Optional: To check programmatically
# history_df.show(truncate=False)

# ---  Time Travel: Read a Previous Version ---
# Example: Load version 0 (or whichever version you want to inspect)
try:
    old_version_df = (spark.read
                      .format('delta')
                      .option('versionAsOf', 0)
                      .load(gold_summary_path))
    print(" Successfully loaded version 0 of gold_sales_summary.")
    display(old_version_df)
except Exception as e:
    print("⚠️ Could not load version 0. Maybe that version doesn’t exist yet.")
    print(e)

try:
    gold_dt = DeltaTable.forPath(spark, gold_summary_path)
    gold_dt.vacuum(retentionHours=168)  # Keep last 7 days
    print(" VACUUM completed successfully (7-day retention).")
except Exception as e:
    print(" VACUUM failed — check cluster permissions or retention settings.")
    print(e)

print(" Time Travel & Vacuum demo complete.")


 Delta table history for gold_sales_summary:


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata,engineInfo
3,2025-10-13T08:10:32.000Z,146222421029543,azuser4803_mml.local@techademy.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(3458400548074996),1013-050721-4duilhgc-v2n,2.0,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 1, numRemovedBytes -> 1518, numDeletionVectorsRemoved -> 0, numOutputRows -> 3, numOutputBytes -> 1518)",,Databricks-Runtime/17.2.x-photon-scala2.13
2,2025-10-13T07:56:54.001Z,146222421029543,azuser4803_mml.local@techademy.com,VACUUM END,Map(status -> COMPLETED),,List(3458400548074996),1013-050721-4duilhgc-v2n,1.0,SnapshotIsolation,True,"Map(numDeletedFiles -> 0, numVacuumedDirectories -> 1)",,Databricks-Runtime/17.2.x-photon-scala2.13
1,2025-10-13T07:56:54.000Z,146222421029543,azuser4803_mml.local@techademy.com,VACUUM START,"Map(retentionCheckEnabled -> true, defaultRetentionMillis -> 604800000, specifiedRetentionMillis -> 604800000)",,List(3458400548074996),1013-050721-4duilhgc-v2n,0.0,SnapshotIsolation,True,"Map(numFilesToDelete -> 0, sizeOfDataToDelete -> 0)",,Databricks-Runtime/17.2.x-photon-scala2.13
0,2025-10-13T07:46:19.000Z,146222421029543,azuser4803_mml.local@techademy.com,WRITE,"Map(mode -> Overwrite, statsOnLoad -> false, partitionBy -> [])",,List(3458400548074996),1013-050721-4duilhgc-v2n,,WriteSerializable,False,"Map(numFiles -> 1, numRemovedFiles -> 0, numRemovedBytes -> 0, numDeletionVectorsRemoved -> 0, numOutputRows -> 3, numOutputBytes -> 1518)",,Databricks-Runtime/17.2.x-photon-scala2.13


 Successfully loaded version 0 of gold_sales_summary.


product,product_id,units_sold,revenue,rank
Laptop,P001,2,110000.0,1
Mobile,P002,3,75000.0,2
Headphones,P004,5,15000.0,3


 VACUUM completed successfully (7-day retention).
🏁 Time Travel & Vacuum demo complete.
