# Bronze → Silver: Orders Transformation

## Purpose
Transform raw orders data from Bronze layer into clean, validated Silver layer

## Transformations Applied
1. **Data Quality**: Remove cancelled/unavailable orders
2. **Date Features**: Extract year, month, quarter, day of week
3. **Delivery Metrics**: Calculate delivery days and late delivery flag
4. **Deduplication**: Remove duplicate order_id records
5. **Schema Validation**: Ensure correct data types

## Input
- **Source**: `bronze/olist/orders/oracle_OLIST.OLIST_ORDERS_BASE.parquet`
- **Format**: Parquet
- **Records**: ~99,441 rows

## Output
- **Destination**: `silver/orders_clean/`
- **Format**: Delta Lake (ACID transactions, time travel)
- **Partitioning**: By order_year and order_month
- **Expected Records**: ~95,000 rows (after filtering)

## Author
Kevin

## Last Updated
Feb 9, 2026


## Configuration

Set up storage account details and import required libraries


In [0]:
# =============================================================================
# CONFIGURATION & IMPORTS
# =============================================================================

# Import PySpark SQL functions
# These are the main transformation functions we'll use
from pyspark.sql.functions import (
    col,              # Reference columns
    year,             # Extract year from date
    month,            # Extract month from date
    quarter,          # Extract quarter from date
    dayofweek,        # Extract day of week (1=Sunday, 7=Saturday)
    datediff,         # Calculate days between two dates
    when,             # Conditional logic (IF-THEN-ELSE)
    current_timestamp,# Get current timestamp for auditing
    count,            # Count rows
    min,              # Get minimum value
    max,              # Get maximum value
    avg,              # Calculate average
    sum               # Calculate sum
)

# Import Delta Lake functionality
from delta.tables import DeltaTable

# Storage account name
storage_account_name = "stgolistmigration"


account_key = ""

# Configure Spark authentication
spark.conf.set(
    f"fs.azure.account.key.{storage_account_name}.dfs.core.windows.net",
    account_key
)

print("✅ Libraries imported")
print("✅ Authentication configured")


✅ Libraries imported
✅ Authentication configured


In [0]:
# =============================================================================
# HELPER FUNCTIONS - PATH BUILDERS
# =============================================================================

def get_bronze_path(table_folder, filename):
    """
    Build full path to bronze layer data
    
    Args:
        table_folder (str): Folder name (e.g., 'orders', 'customers')
        filename (str): Parquet file name
    
    Returns:
        str: Full ABFSS path to bronze data
    
    Example:
        get_bronze_path("orders", "oracle_OLIST.OLIST_ORDERS_BASE.parquet")
        → "abfss://bronze@stgolistmigration.dfs.core.windows.net/olist/orders/oracle_OLIST.OLIST_ORDERS_BASE.parquet"
    """
    return f"abfss://bronze@{storage_account_name}.dfs.core.windows.net/olist/{table_folder}/{filename}"


def get_silver_path(table_name):
    """
    Build full path to silver layer data
    
    Args:
        table_name (str): Table name (e.g., 'orders_clean')
    
    Returns:
        str: Full ABFSS path to silver data
    
    Example:
        get_silver_path("orders_clean")
        → "abfss://silver@stgolistmigration.dfs.core.windows.net/orders_clean/"
    """
    return f"abfss://silver@{storage_account_name}.dfs.core.windows.net/{table_name}/"


def get_gold_path(table_name):
    """
    Build full path to gold layer data
    
    Args:
        table_name (str): Table name (e.g., 'fact_orders')
    
    Returns:
        str: Full ABFSS path to gold data
    """
    return f"abfss://gold@{storage_account_name}.dfs.core.windows.net/{table_name}/"


print("✅ Helper functions defined")


✅ Helper functions defined


## Step 1: Read Bronze Data

Load raw orders data from Bronze layer


In [0]:
# =============================================================================
# STEP 1: READ BRONZE DATA
# =============================================================================

# Construct path to bronze orders data
# This is the raw data extracted from Oracle database
bronze_orders_path = get_bronze_path("orders", "oracle_OLIST.OLIST_ORDERS_BASE.parquet")

print(f"📖 Reading bronze data from:")
print(f"   {bronze_orders_path}")
print("=" * 80)

# Read parquet file into DataFrame
# spark.read.parquet() is optimized for columnar data
# DataFrame is an immutable distributed collection of data
df_orders_bronze = spark.read.parquet(bronze_orders_path)

# Get record count
# .count() is an ACTION that triggers computation
bronze_count = df_orders_bronze.count()

print(f"✅ Bronze data loaded successfully")
print(f"   Total rows: {bronze_count:,}")
print(f"   Total columns: {len(df_orders_bronze.columns)}")
print("=" * 80)

# Display sample records
# .show() displays data in tabular format
# truncate=False shows full column values
# vertical=True shows one column per line (easier to read)
print("\n📊 Sample bronze data (first 3 rows):")
df_orders_bronze.limit(3).show(truncate=False, vertical=True)


📖 Reading bronze data from:
   abfss://bronze@stgolistmigration.dfs.core.windows.net/olist/orders/oracle_OLIST.OLIST_ORDERS_BASE.parquet
✅ Bronze data loaded successfully
   Total rows: 99,441
   Total columns: 8

📊 Sample bronze data (first 3 rows):
-RECORD 0---------------------------------------------------------
 ORDER_ID                      | e5fa5a7210941f7d56d0208e4e071d35 
 CUSTOMER_ID                   | 683c54fc24d40ee9f8a6fc179fd9856c 
 ORDER_STATUS                  | canceled                         
 ORDER_PURCHASE_TIMESTAMP      | 2016-09-05 00:15:34              
 ORDER_APPROVED_AT             | 2016-10-07 13:17:15              
 ORDER_DELIVERED_CARRIER_DATE  | NULL                             
 ORDER_DELIVERED_CUSTOMER_DATE | NULL                             
 ORDER_ESTIMATED_DELIVERY_DATE | 2016-10-28 00:00:00              
-RECORD 1---------------------------------------------------------
 ORDER_ID                      | 2e7a8482f6fb09756ca50c10d7bfc047 
 CUSTOMER_ID

## Step 2: Data Quality Analysis

Inspect data quality issues before transformation



In [0]:
# =============================================================================
# STEP 2: DATA QUALITY ANALYSIS
# =============================================================================

print("🔍 Data Quality Analysis")
print("=" * 80)

# -------------------------
# Check 1: Null Values
# -------------------------
print("\n1️⃣ NULL VALUE ANALYSIS:")
print("-" * 80)

# For each column, count how many null values exist
# when(condition, value) returns value if condition is true
# col(c).isNull() checks if column has null value
null_counts = df_orders_bronze.select([
    count(when(col(c).isNull(), c)).alias(c) 
    for c in df_orders_bronze.columns
])

# Display as transposed table (easier to read)
null_counts.show(vertical=True, truncate=False)

# -------------------------
# Check 2: Order Status Distribution
# -------------------------
print("\n2️⃣ ORDER STATUS DISTRIBUTION:")
print("-" * 80)

# Group by order_status and count occurrences
# This shows which statuses exist and how common they are
df_orders_bronze.groupBy("order_status") \
    .count() \
    .orderBy(col("count").desc()) \
    .show(truncate=False)

# -------------------------
# Check 3: Date Range Analysis
# -------------------------
print("\n3️⃣ DATE RANGE ANALYSIS:")
print("-" * 80)

# Find earliest and latest order dates
# This helps understand the time span of the data
date_summary = df_orders_bronze.select(
    min("order_purchase_timestamp").alias("earliest_order"),
    max("order_purchase_timestamp").alias("latest_order")
)

date_summary.show(truncate=False)

# -------------------------
# Check 4: Duplicate Check
# -------------------------
print("\n4️⃣ DUPLICATE ANALYSIS:")
print("-" * 80)

# Count total rows vs unique order_ids
# If different, we have duplicates
total_rows = df_orders_bronze.count()
unique_orders = df_orders_bronze.select("order_id").distinct().count()
duplicates = total_rows - unique_orders

print(f"Total rows: {total_rows:,}")
print(f"Unique order_ids: {unique_orders:,}")
print(f"Duplicate records: {duplicates:,}")

if duplicates > 0:
    print("⚠️  Duplicates found - will remove in transformation")
else:
    print("✅ No duplicates found")

print("=" * 80)


🔍 Data Quality Analysis

1️⃣ NULL VALUE ANALYSIS:
--------------------------------------------------------------------------------
-RECORD 0-----------------------------
 ORDER_ID                      | 0    
 CUSTOMER_ID                   | 0    
 ORDER_STATUS                  | 0    
 ORDER_PURCHASE_TIMESTAMP      | 0    
 ORDER_APPROVED_AT             | 160  
 ORDER_DELIVERED_CARRIER_DATE  | 1783 
 ORDER_DELIVERED_CUSTOMER_DATE | 2965 
 ORDER_ESTIMATED_DELIVERY_DATE | 0    


2️⃣ ORDER STATUS DISTRIBUTION:
--------------------------------------------------------------------------------
+------------+-----+
|order_status|count|
+------------+-----+
|delivered   |96478|
|shipped     |1107 |
|canceled    |625  |
|unavailable |609  |
|invoiced    |314  |
|processing  |301  |
|created     |5    |
|approved    |2    |
+------------+-----+


3️⃣ DATE RANGE ANALYSIS:
--------------------------------------------------------------------------------
+-------------------+-------------------+
|e

## Step 3: Apply Transformations

Clean and enrich the orders data


In [0]:
# =============================================================================
# STEP 3: APPLY TRANSFORMATIONS
# =============================================================================

print("🔄 Applying transformations...")
print("=" * 80)

# Start with bronze DataFrame
# Each transformation returns a new DataFrame (DataFrames are immutable)
df_orders_silver = df_orders_bronze \
    .filter(
        # TRANSFORMATION 1: Filter out invalid order statuses
        # Only keep orders that were successfully processed
        # .isin() checks if value is in the list
        col("order_status").isin(["delivered", "shipped", "processing", "invoiced"])
    ) \
    .withColumn(
        # TRANSFORMATION 2: Extract year from purchase timestamp
        # year() function extracts the year component
        # This is useful for time-based analysis and partitioning
        "order_year",
        year(col("order_purchase_timestamp"))
    ) \
    .withColumn(
        # TRANSFORMATION 3: Extract month (1-12)
        "order_month",
        month(col("order_purchase_timestamp"))
    ) \
    .withColumn(
        # TRANSFORMATION 4: Extract quarter (1-4)
        # Q1 = Jan-Mar, Q2 = Apr-Jun, Q3 = Jul-Sep, Q4 = Oct-Dec
        "order_quarter",
        quarter(col("order_purchase_timestamp"))
    ) \
    .withColumn(
        # TRANSFORMATION 5: Extract day of week
        # 1 = Sunday, 2 = Monday, ..., 7 = Saturday
        # Useful for analyzing weekly patterns
        "order_day_of_week",
        dayofweek(col("order_purchase_timestamp"))
    ) \
    .withColumn(
        # TRANSFORMATION 6: Calculate delivery time in days
        # datediff() calculates days between two dates
        # This measures how long delivery took
        "delivery_days",
        datediff(
            col("order_delivered_customer_date"),
            col("order_purchase_timestamp")
        )
    ) \
    .withColumn(
        # TRANSFORMATION 7: Calculate difference between estimated and actual delivery
        # Positive number = late delivery
        # Negative number = early delivery
        "estimated_vs_actual_delivery_diff",
        datediff(
            col("order_delivered_customer_date"),
            col("order_estimated_delivery_date")
        )
    ) \
    .withColumn(
        # TRANSFORMATION 8: Create boolean flag for late deliveries
        # when() is like IF-THEN-ELSE
        # True if delivered after estimated date, False otherwise
        "is_late_delivery",
        when(col("estimated_vs_actual_delivery_diff") > 0, True).otherwise(False)
    ) \
    .withColumn(
        # TRANSFORMATION 9: Add ingestion timestamp for auditing
        # Records when this row was processed into Silver layer
        # Useful for tracking data lineage and debugging
        "ingestion_timestamp",
        current_timestamp()
    ) \
    .dropDuplicates(
        # TRANSFORMATION 10: Remove duplicate order_ids
        # Keeps the first occurrence of each order_id
        ["order_id"]
    )

# Get transformed record count
silver_count = df_orders_silver.count()
removed_count = bronze_count - silver_count

print(f"✅ Transformations complete")
print(f"   Silver rows: {silver_count:,}")
print(f"   Removed: {removed_count:,} rows ({removed_count/bronze_count*100:.1f}%)")
print("=" * 80)


🔄 Applying transformations...
✅ Transformations complete
   Silver rows: 98,200
   Removed: 1,241 rows (1.2%)


## Step 4: Validate Transformed Data

Verify data quality after transformations


In [0]:
# =============================================================================
# STEP 4: VALIDATE TRANSFORMED DATA
# =============================================================================

print("✅ Silver Layer Quality Metrics")
print("=" * 80)

# Metric 1: Record count
print(f"\n📊 RECORD COUNT:")
print(f"   Total Orders: {df_orders_silver.count():,}")

# Metric 2: Date range
print(f"\n📅 DATE RANGE:")
date_range = df_orders_silver.agg(
    min('order_purchase_timestamp').alias("min_date"),
    max('order_purchase_timestamp').alias("max_date")
).collect()[0]
print(f"   From: {date_range['min_date']}")
print(f"   To: {date_range['max_date']}")

# Metric 3: Delivery metrics
print(f"\n🚚 DELIVERY METRICS:")
delivery_metrics = df_orders_silver.agg(
    count(when(col('is_late_delivery') == True, 1)).alias("late_deliveries"),
    avg('delivery_days').alias("avg_delivery_days"),
    min('delivery_days').alias("min_delivery_days"),
    max('delivery_days').alias("max_delivery_days")
).collect()[0]

print(f"   Late Deliveries: {delivery_metrics['late_deliveries']:,}")
print(f"   Avg Delivery Days: {delivery_metrics['avg_delivery_days']:.1f}")
print(f"   Min Delivery Days: {delivery_metrics['min_delivery_days']}")
print(f"   Max Delivery Days: {delivery_metrics['max_delivery_days']}")

# Metric 4: Year/Month distribution
print(f"\n📈 ORDERS BY YEAR-MONTH:")
df_orders_silver.groupBy("order_year", "order_month") \
    .count() \
    .orderBy("order_year", "order_month") \
    .show(20, truncate=False)

print("=" * 80)


✅ Silver Layer Quality Metrics

📊 RECORD COUNT:
   Total Orders: 98,200

📅 DATE RANGE:
   From: 2016-09-04 21:15:19
   To: 2018-09-03 09:06:57

🚚 DELIVERY METRICS:
   Late Deliveries: 6,534
   Avg Delivery Days: 12.5
   Min Delivery Days: 0
   Max Delivery Days: 210

📈 ORDERS BY YEAR-MONTH:
+----------+-----------+-----+
|order_year|order_month|count|
+----------+-----------+-----+
|2016      |9          |2    |
|2016      |10         |293  |
|2016      |12         |1    |
|2017      |1          |787  |
|2017      |2          |1717 |
|2017      |3          |2617 |
|2017      |4          |2376 |
|2017      |5          |3640 |
|2017      |6          |3205 |
|2017      |7          |3946 |
|2017      |8          |4272 |
|2017      |9          |4227 |
|2017      |10         |4547 |
|2017      |11         |7421 |
|2017      |12         |5618 |
|2018      |1          |7187 |
|2018      |2          |6624 |
|2018      |3          |7168 |
|2018      |4          |6919 |
|2018      |5          |68

## Step 5: Write to Silver Layer

Save transformed data as Delta Lake table


In [0]:
# =============================================================================
# STEP 5: WRITE TO SILVER LAYER (DELTA FORMAT)
# =============================================================================

# Define output path
output_path = get_silver_path("orders_clean")

print(f"💾 Writing to Silver layer...")
print(f"   Path: {output_path}")
print("=" * 80)

# Write as Delta Lake table
# Delta Lake provides:
# - ACID transactions
# - Time travel (query historical versions)
# - Schema enforcement
# - Faster queries than Parquet
df_orders_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("order_year", "order_month") \
    .save(output_path)

print(f"✅ Silver layer written successfully")
print(f"   Format: Delta Lake")
print(f"   Partitioning: order_year, order_month")
print("=" * 80)


💾 Writing to Silver layer...
   Path: abfss://silver@stgolistmigration.dfs.core.windows.net/orders_clean/
✅ Silver layer written successfully
   Format: Delta Lake
   Partitioning: order_year, order_month


## Step 6: Verify Silver Data

In [0]:
# Write to Silver Layer
output_path = get_silver_path("orders_clean")

print(f"💾 Writing to Silver layer...")
print(f"   Path: {output_path}")
print("=" * 80)

# Write as Delta format
df_orders_silver.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .partitionBy("order_year", "order_month") \
    .save(output_path)

print(f"✅ Data written to Silver")
print(f"   Format: Delta Lake")
print(f"   Partitioned by: order_year, order_month")
print("=" * 80)


💾 Writing to Silver layer...
   Path: abfss://silver@stgolistmigration.dfs.core.windows.net/orders_clean/
✅ Data written to Silver
   Format: Delta Lake
   Partitioned by: order_year, order_month


## Step 7: Verify Silver Data

Query the Delta table to verify it was created correctly


In [0]:
# Verify data was written successfully
print("🔍 Verifying Silver layer...")
print("=" * 80)

# Read back from the Delta path
output_path = get_silver_path("orders_clean")
df_verify = spark.read.format("delta").load(output_path)

print(f"✅ Silver data readable")
print(f"   Total rows: {df_verify.count():,}")
print(f"   Columns: {len(df_verify.columns)}")

# Show summary by year/month
print("\n📊 Orders by Year-Month:")
df_verify.groupBy("order_year", "order_month") \
    .agg(
        count("*").alias("orders"),
        avg("delivery_days").alias("avg_delivery"),
        sum(when(col("is_late_delivery"), 1).otherwise(0)).alias("late_deliveries")
    ) \
    .orderBy("order_year", "order_month") \
    .show(20, truncate=False)

# Show sample records
print("\n📋 Sample records:")
df_verify.limit(3).show(truncate=False, vertical=True)

print("=" * 80)
print("🎉 Orders Bronze → Silver complete!")


🔍 Verifying Silver layer...
✅ Silver data readable
   Total rows: 98,200
   Columns: 16

📊 Orders by Year-Month:
+----------+-----------+------+------------------+---------------+
|order_year|order_month|orders|avg_delivery      |late_deliveries|
+----------+-----------+------+------------------+---------------+
|2016      |9          |2     |55.0              |1              |
|2016      |10         |293   |19.633962264150945|2              |
|2016      |12         |1     |5.0               |0              |
|2017      |1          |787   |12.753333333333334|22             |
|2017      |2          |1717  |13.300060496067756|49             |
|2017      |3          |2617  |13.044383346425766|116            |
|2017      |4          |2376  |15.02301346070343 |151            |
|2017      |5          |3640  |11.427080394922426|106            |
|2017      |6          |3205  |12.012759170653908|95             |
|2017      |7          |3946  |11.488894628099173|108            |
|2017      |8   