In [11]:
from pyspark.sql.functions import col, when, trim, upper, coalesce, lit, current_timestamp, md5, concat_ws, round as spark_round, abs as spark_abs, sum, count, countDistinct, min, max, avg
from pyspark.sql.types import *
from delta.tables import DeltaTable
from delta.tables import DeltaTable

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 13, Finished, Available, Finished)

In [5]:
# =============================================================================
# CONFIGURATION
# =============================================================================

saleitems_path = "Files/bronze/sales/saleitems.parquet"
sales_path = "Files/bronze/sales/sales.parquet"
delta_output_path = "Tables/fact_sales"

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 7, Finished, Available, Finished)

In [6]:
# =============================================================================
# LOAD SOURCE DATA
# =============================================================================

print("Loading source data...")

# Load sale items table (transaction details)
df_saleitems = spark.read.parquet(saleitems_path)
print(f"Sale items loaded: {df_saleitems.count()} records")

# Load sales table (transaction headers)
df_sales = spark.read.parquet(sales_path)
print(f"Sales headers loaded: {df_sales.count()} records")

# Display sample data for debugging
print("\n--- Sale Items Sample ---")
df_saleitems.show(3, truncate=False)

print("\n--- Sales Headers Sample ---")
df_sales.show(3, truncate=False)

# Check schemas
print("\n--- Sale Items Schema ---")
df_saleitems.printSchema()

print("\n--- Sales Headers Schema ---")
df_sales.printSchema()

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 8, Finished, Available, Finished)

Loading source data...
Sale items loaded: 68 records
Sales headers loaded: 61 records

--- Sale Items Sample ---
+----------+------+---------+--------+---------------+------------+
|SaleItemID|SaleID|ProductID|Quantity|ActualUnitPrice|ItemSubtotal|
+----------+------+---------+--------+---------------+------------+
|1         |1     |4        |2       |3.00           |6.00        |
|2         |1     |11       |3       |1.00           |3.00        |
|3         |1     |1        |2       |1.50           |3.00        |
+----------+------+---------+--------+---------------+------------+
only showing top 3 rows


--- Sales Headers Sample ---
+------+-------------------+-------+------+------------+-------------+---------------------+---------------+
|SaleID|SaleDateTime       |TruckID|CityID|SaleLatitude|SaleLongitude|Remarks              |TotalSaleAmount|
+------+-------------------+-------+------+------------+-------------+---------------------+---------------+
|1     |2025-02-27 14:30:00|1

In [7]:
# =============================================================================
# DATA QUALITY CHECKS - BEFORE CLEANING
# =============================================================================

print("\n--- Data Quality Analysis ---")

# Check for nulls in key fields
print("Sale Items Key Field Analysis:")
print(f"Null SaleItemIDs: {df_saleitems.filter(col('SaleItemID').isNull()).count()}")
print(f"Null SaleIDs: {df_saleitems.filter(col('SaleID').isNull()).count()}")
print(f"Null ProductIDs: {df_saleitems.filter(col('ProductID').isNull()).count()}")

print("\nSales Headers Key Field Analysis:")
print(f"Null SaleIDs: {df_sales.filter(col('SaleID').isNull()).count()}")

# Check for orphaned records
print("\nJoin Analysis:")
saleitems_saleids = df_saleitems.select("SaleID").distinct()
sales_saleids = df_sales.select("SaleID").distinct()

orphaned_items = saleitems_saleids.join(sales_saleids, "SaleID", "left_anti")
print(f"Sale items without headers: {orphaned_items.count()}")

orphaned_headers = sales_saleids.join(saleitems_saleids, "SaleID", "left_anti") 
print(f"Sale headers without items: {orphaned_headers.count()}")

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 9, Finished, Available, Finished)


--- Data Quality Analysis ---
Sale Items Key Field Analysis:
Null SaleItemIDs: 0
Null SaleIDs: 0
Null ProductIDs: 0

Sales Headers Key Field Analysis:
Null SaleIDs: 0

Join Analysis:
Sale items without headers: 0
Sale headers without items: 41


In [12]:
# =============================================================================
# DATA CLEANING - SALE ITEMS
# =============================================================================

print("\nStarting sale items data cleaning...")

df_saleitems_clean = df_saleitems \
    .filter(col("SaleItemID").isNotNull()) \
    .filter(col("SaleID").isNotNull()) \
    .filter(col("ProductID").isNotNull()) \
    .withColumn("Quantity", 
               when(col("Quantity").isNull(), 0)
               .when(col("Quantity") < 0, 0)
               .otherwise(col("Quantity"))) \
    .withColumn("ActualUnitPrice", 
               when(col("ActualUnitPrice").isNull(), 0.0)
               .when(col("ActualUnitPrice") < 0, 0.0)
               .otherwise(col("ActualUnitPrice"))) \
    .withColumn("ItemSubtotal", 
               when(col("ItemSubtotal").isNull(), 0.0)
               .when(col("ItemSubtotal") < 0, 0.0)
               .otherwise(col("ItemSubtotal"))) \
    .filter(col("Quantity") > 0) \
    .filter(col("ActualUnitPrice") >= 0)

print(f"Sale items after cleaning: {df_saleitems_clean.count()} records")

# Validate subtotal calculation
print("\n--- Sale Items Validation ---")
df_saleitems_clean = df_saleitems_clean \
    .withColumn("calculated_subtotal", 
                spark_round(col("Quantity") * col("ActualUnitPrice"), 2))

# Check for subtotal discrepancies
discrepancies = df_saleitems_clean.filter(
    spark_abs(col("ItemSubtotal") - col("calculated_subtotal")) > 0.01
).count()
print(f"Subtotal calculation discrepancies: {discrepancies}")

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 14, Finished, Available, Finished)


Starting sale items data cleaning...
Sale items after cleaning: 68 records

--- Sale Items Validation ---
Subtotal calculation discrepancies: 0


In [9]:
# =============================================================================
# DATA CLEANING - SALES HEADERS
# =============================================================================

print("\nStarting sales headers data cleaning...")

df_sales_clean = df_sales \
    .filter(col("SaleID").isNotNull()) \
    .withColumn("SaleDateTime", 
               when(col("SaleDateTime").isNull(), current_timestamp())
               .otherwise(col("SaleDateTime"))) \
    .withColumn("TruckID", 
               when(col("TruckID").isNull(), 0)
               .otherwise(col("TruckID"))) \
    .withColumn("CityID", 
               when(col("CityID").isNull(), 0)
               .otherwise(col("CityID"))) \
    .withColumn("SaleLatitude", 
               when(col("SaleLatitude").isNull(), 0.0)
               .otherwise(col("SaleLatitude"))) \
    .withColumn("SaleLongitude", 
               when(col("SaleLongitude").isNull(), 0.0)
               .otherwise(col("SaleLongitude"))) \
    .withColumn("Remarks", 
               when(col("Remarks").isNull(), "")
               .otherwise(trim(col("Remarks")))) \
    .withColumn("TotalSaleAmount", 
               when(col("TotalSaleAmount").isNull(), 0.0)
               .when(col("TotalSaleAmount") < 0, 0.0)
               .otherwise(col("TotalSaleAmount")))

print(f"Sales headers after cleaning: {df_sales_clean.count()} records")


StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 11, Finished, Available, Finished)


Starting sales headers data cleaning...
Sales headers after cleaning: 61 records


In [13]:
# =============================================================================
# CREATE AGGREGATED METRICS
# =============================================================================

print("\nCalculating aggregated metrics...")

# Calculate sale-level aggregations from items
df_sale_aggregations = df_saleitems_clean \
    .groupBy("SaleID") \
    .agg(
        col("SaleID").alias("sale_id_agg"),
        sum("Quantity").alias("total_quantity"),
        sum("ItemSubtotal").alias("calculated_total_amount"),
        count("SaleItemID").alias("total_line_items"),
        countDistinct("ProductID").alias("unique_products")
    )

print(f"Sale aggregations calculated for: {df_sale_aggregations.count()} sales")

print("\n--- Sale Aggregations Sample ---")
df_sale_aggregations.show(5, truncate=False)

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 15, Finished, Available, Finished)


Calculating aggregated metrics...
Sale aggregations calculated for: 20 sales

--- Sale Aggregations Sample ---
+------+-----------+--------------+-----------------------+----------------+---------------+
|SaleID|sale_id_agg|total_quantity|calculated_total_amount|total_line_items|unique_products|
+------+-----------+--------------+-----------------------+----------------+---------------+
|19    |19         |8             |14.5                   |3               |3              |
|7     |7          |7             |10.25                  |3               |3              |
|6     |6          |9             |15.5                   |3               |3              |
|9     |9          |7             |13.0                   |3               |3              |
|17    |17         |11            |15.0                   |4               |4              |
+------+-----------+--------------+-----------------------+----------------+---------------+
only showing top 5 rows



In [14]:
# =============================================================================
# CREATE FACT TABLE
# =============================================================================

print("\nCreating sales fact table...")

# Join sale items with sales headers
fact_sales = df_saleitems_clean.alias("si") \
    .join(df_sales_clean.alias("s"), 
          col("si.SaleID") == col("s.SaleID"), 
          "inner") \
    .join(df_sale_aggregations.alias("agg"),
          col("si.SaleID") == col("agg.sale_id_agg"),
          "left") \
    .select(
        # Primary Keys
        col("si.SaleItemID").alias("sale_item_id"),
        col("si.SaleID").alias("sale_id"),
        
        # Foreign Keys (Dimension References)
        col("si.ProductID").alias("product_id"),
        col("s.TruckID").alias("truck_id"),
        col("s.CityID").alias("city_id"),
        
        # Date/Time Dimensions
        col("s.SaleDateTime").alias("sale_datetime"),
        col("s.SaleDateTime").cast("date").alias("sale_date"),
        
        # Measures (Facts)
        col("si.Quantity").alias("quantity"),
        col("si.ActualUnitPrice").alias("unit_price"),
        col("si.ItemSubtotal").alias("line_total"),
        
        # Geographic Information
        col("s.SaleLatitude").alias("sale_latitude"),
        col("s.SaleLongitude").alias("sale_longitude"),
        
        # Additional Context
        col("s.Remarks").alias("sale_remarks"),
        col("s.TotalSaleAmount").alias("sale_total_amount"),
        
        # Aggregated Metrics (for analytical queries)
        col("agg.total_quantity").alias("sale_total_quantity"),
        col("agg.calculated_total_amount").alias("sale_calculated_total"),
        col("agg.total_line_items").alias("sale_line_items_count"),
        col("agg.unique_products").alias("sale_unique_products_count")
    ) \
    .withColumn("created_at", current_timestamp()) \
    .withColumn("updated_at", current_timestamp()) \
    .withColumn("record_hash", 
               md5(concat_ws("|", 
                           col("sale_item_id"),
                           col("sale_id"),
                           col("product_id"),
                           col("truck_id"),
                           col("city_id"),
                           col("sale_datetime"),
                           col("quantity"),
                           col("unit_price"),
                           col("line_total"))))

print(f"Fact table created: {fact_sales.count()} records")

# Display sample of fact table
print("\n--- Fact Sales Sample ---")
fact_sales.show(5, truncate=False)

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 16, Finished, Available, Finished)


Creating sales fact table...
Fact table created: 68 records

--- Fact Sales Sample ---
+------------+-------+----------+--------+-------+-------------------+----------+--------+----------+----------+-------------+--------------+---------------------+-----------------+-------------------+---------------------+---------------------+--------------------------+--------------------------+--------------------------+--------------------------------+
|sale_item_id|sale_id|product_id|truck_id|city_id|sale_datetime      |sale_date |quantity|unit_price|line_total|sale_latitude|sale_longitude|sale_remarks         |sale_total_amount|sale_total_quantity|sale_calculated_total|sale_line_items_count|sale_unique_products_count|created_at                |updated_at                |record_hash                     |
+------------+-------+----------+--------+-------+-------------------+----------+--------+----------+----------+-------------+--------------+---------------------+-----------------+-----------

In [15]:
# =============================================================================
# DATA QUALITY VALIDATION
# =============================================================================

print("\n--- Fact Table Validation ---")

# Check for data integrity
total_records = fact_sales.count()
null_sale_ids = fact_sales.filter(col("sale_id").isNull()).count()
null_product_ids = fact_sales.filter(col("product_id").isNull()).count()
zero_quantities = fact_sales.filter(col("quantity") <= 0).count()
negative_amounts = fact_sales.filter(col("line_total") < 0).count()

print(f"Total fact records: {total_records}")
print(f"Null sale IDs: {null_sale_ids}")
print(f"Null product IDs: {null_product_ids}")
print(f"Zero/negative quantities: {zero_quantities}")
print(f"Negative line totals: {negative_amounts}")

# Business logic validation
print("\n--- Business Logic Validation ---")
amount_discrepancies = fact_sales.filter(
    spark_abs(col("line_total") - (col("quantity") * col("unit_price"))) > 0.01
).count()
print(f"Line total calculation discrepancies: {amount_discrepancies}")

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 17, Finished, Available, Finished)


--- Fact Table Validation ---
Total fact records: 68
Null sale IDs: 0
Null product IDs: 0
Zero/negative quantities: 0
Negative line totals: 0

--- Business Logic Validation ---
Line total calculation discrepancies: 0


In [16]:
# =============================================================================
# SAVE AS DELTA TABLE (INCREMENTAL)
# =============================================================================

print(f"\nSaving fact table to: {delta_output_path}")

# Check if Delta table already exists
if DeltaTable.isDeltaTable(spark, delta_output_path):
    print("Existing Delta table found. Executing merge...")
    
    # Load existing Delta table
    delta_table = DeltaTable.forPath(spark, delta_output_path)
    
    # Execute merge (upsert) based on sale_item_id
    delta_table.alias("target") \
        .merge(fact_sales.alias("source"), 
               "target.sale_item_id = source.sale_item_id") \
        .whenMatchedUpdate(
            condition="target.record_hash != source.record_hash",
            set={
                "sale_id": "source.sale_id",
                "product_id": "source.product_id",
                "truck_id": "source.truck_id",
                "city_id": "source.city_id",
                "sale_datetime": "source.sale_datetime",
                "sale_date": "source.sale_date",
                "quantity": "source.quantity",
                "unit_price": "source.unit_price",
                "line_total": "source.line_total",
                "sale_latitude": "source.sale_latitude",
                "sale_longitude": "source.sale_longitude",
                "sale_remarks": "source.sale_remarks",
                "sale_total_amount": "source.sale_total_amount",
                "sale_total_quantity": "source.sale_total_quantity",
                "sale_calculated_total": "source.sale_calculated_total",
                "sale_line_items_count": "source.sale_line_items_count",
                "sale_unique_products_count": "source.sale_unique_products_count",
                "updated_at": "source.updated_at",
                "record_hash": "source.record_hash"
            }) \
        .whenNotMatchedInsertAll() \
        .execute()
        
    print("Merge executed successfully!")
    
else:
    print("Creating new Delta table...")
    fact_sales.write \
        .format("delta") \
        .mode("overwrite") \
        .option("mergeSchema", "true") \
        .partitionBy("sale_date") \
        .save(delta_output_path)
    print("Delta table created successfully!")

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 18, Finished, Available, Finished)


Saving fact table to: Tables/fact_sales
Creating new Delta table...
Delta table created successfully!


In [17]:
# =============================================================================
# OPTIMIZE DELTA TABLE
# =============================================================================

print("\nOptimizing Delta table...")
try:
    spark.sql(f"OPTIMIZE delta.`{delta_output_path}`")
    print("Optimization completed!")
except Exception as e:
    print(f"Optimization error: {str(e)}")

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 19, Finished, Available, Finished)


Optimizing Delta table...
Optimization completed!


In [18]:
# =============================================================================
# FINAL DATA QUALITY CHECKS
# =============================================================================

print("\nRunning final data quality checks...")

# Load the saved table for verification
df_final = spark.read.format("delta").load(delta_output_path)

# Final statistics
final_total_records = df_final.count()
distinct_sales = df_final.select("sale_id").distinct().count()
distinct_products = df_final.select("product_id").distinct().count()
date_range = df_final.agg(
    min("sale_date").alias("min_date"),
    max("sale_date").alias("max_date")
).collect()[0]

print(f"Final total records: {final_total_records}")
print(f"Distinct sales: {distinct_sales}")
print(f"Distinct products: {distinct_products}")
print(f"Date range: {date_range['min_date']} to {date_range['max_date']}")

# Show final data sample
print("\n--- Final Fact Table Sample ---")
df_final.show(5, truncate=False)

# Revenue summary
print("\n--- Revenue Summary ---")
revenue_summary = df_final.agg(
    sum("line_total").alias("total_revenue"),
    avg("line_total").alias("avg_line_total"),
    sum("quantity").alias("total_quantity_sold")
).collect()[0]

print(f"Total Revenue: ${revenue_summary['total_revenue']:,.2f}")
print(f"Average Line Total: ${revenue_summary['avg_line_total']:,.2f}")
print(f"Total Quantity Sold: {revenue_summary['total_quantity_sold']:,}")

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 20, Finished, Available, Finished)


Running final data quality checks...
Final total records: 68
Distinct sales: 20
Distinct products: 12
Date range: 2025-02-27 to 2025-05-01

--- Final Fact Table Sample ---
+------------+-------+----------+--------+-------+-------------------+----------+--------+----------+----------+-------------+--------------+----------------+-----------------+-------------------+---------------------+---------------------+--------------------------+--------------------------+--------------------------+--------------------------------+
|sale_item_id|sale_id|product_id|truck_id|city_id|sale_datetime      |sale_date |quantity|unit_price|line_total|sale_latitude|sale_longitude|sale_remarks    |sale_total_amount|sale_total_quantity|sale_calculated_total|sale_line_items_count|sale_unique_products_count|created_at                |updated_at                |record_hash                     |
+------------+-------+----------+--------+-------+-------------------+----------+--------+----------+----------+-----

In [19]:
# =============================================================================
# SUMMARY
# =============================================================================

print("="*50)
print("SALES FACT TABLE ETL COMPLETED SUCCESSFULLY!")
print(f"Records processed: {final_total_records}")
print(f"Table saved at: {delta_output_path}")
print(f"Partitioned by: sale_date")
print("="*50)

# Summary dictionary for further analysis
summary_stats = {
    "total_records": final_total_records,
    "distinct_sales": distinct_sales,
    "distinct_products": distinct_products,
    "total_revenue": float(revenue_summary['total_revenue']),
    "date_range": f"{date_range['min_date']} to {date_range['max_date']}",
    "table_path": delta_output_path
}

print(f"Summary: {summary_stats}")

StatementMeta(, 49e094b2-1743-4f6e-bb08-de1bb690ee48, 21, Finished, Available, Finished)

SALES FACT TABLE ETL COMPLETED SUCCESSFULLY!
Records processed: 68
Table saved at: Tables/fact_sales
Partitioned by: sale_date
Summary: {'total_records': 68, 'distinct_sales': 20, 'distinct_products': 12, 'total_revenue': 331.5, 'date_range': '2025-02-27 to 2025-05-01', 'table_path': 'Tables/fact_sales'}
