# Data Quality Validation - Order Items Table

**Table:** `stg_order_items`  
**Project:** Maven Fuzzy Factory E-Commerce Analytics  
**Created:** November 20, 2025  
**Purpose:** Validate data quality for order items staging table

---

## Validation Scope

**Primary Key:** order_item_id  
**Foreign Keys:** order_id, product_id  
**Critical Fields:** created_at, is_primary_item, price_usd, cogs_usd  
**Expected Row Count Range:** 1,000 - 2,000,000

**Validation Checks:**
- Row count within expected range
- Primary key uniqueness
- Null checks on critical columns
- Data type validation
- Positive values for IDs and financial fields
- Binary flag validation (is_primary_item)
- No future dates
- Business logic: price >= cogs (no negative margins)

---

## 1. Configuration & Setup

In [None]:
from pyspark.sql.functions import *
from pyspark.sql.types import IntegerType, StructType, StructField, StringType, TimestampType
from datetime import datetime
import uuid

# Restore Python built-in sum (prevent PySpark function shadowing)
del sum

# Configuration
SOURCE_TABLE = "stg_order_items"
PK_COLUMN = "order_item_id"
QUALITY_LOG_TABLE = "data_quality_log"
QUALITY_SUMMARY_TABLE = "data_quality_summary"

# Quality thresholds
MAX_DUPLICATE_PCT = 1.0  # Max 1% duplicates allowed
MIN_ROW_COUNT = 1000
MAX_ROW_COUNT = 2000000

# Validation run metadata
RUN_ID = str(uuid.uuid4())
RUN_TIMESTAMP = datetime.now()

print(f"Validation Run ID: {RUN_ID}")
print(f"Run Timestamp: {RUN_TIMESTAMP}")
print(f"Source Table: {SOURCE_TABLE}")

## 2. Load Source Data

In [None]:
# Load staging table
df = spark.read.table(SOURCE_TABLE)

print(f"Total Rows: {df.count():,}")
print(f"Total Columns: {len(df.columns)}")
print("\nSchema:")
df.printSchema()

# Display sample
print("\nSample Data:")
df.show(5, truncate=False)

## 3. Basic Profiling

In [None]:
# Basic statistics
total_rows = df.count()
distinct_items = df.select(PK_COLUMN).distinct().count()
distinct_orders = df.select("order_id").distinct().count()
distinct_products = df.select("product_id").distinct().count()

# Date range
date_stats = df.select(
    min(col("created_at")).alias("min_date"),
    max(col("created_at")).alias("max_date")
).collect()[0]

# Primary item analysis
primary_item_stats = df.groupBy("is_primary_item").count().orderBy("is_primary_item")

# Revenue statistics
revenue_stats = df.select(
    sum(col("price_usd")).alias("total_revenue"),
    avg(col("price_usd")).alias("avg_item_price"),
    sum(col("cogs_usd")).alias("total_cogs"),
    sum(col("price_usd") - col("cogs_usd")).alias("total_margin")
).collect()[0]

# Product popularity
product_stats = df.groupBy("product_id").count().orderBy(desc("count"))

print(f"Total Rows: {total_rows:,}")
print(f"Distinct Order Items: {distinct_items:,}")
print(f"Distinct Orders: {distinct_orders:,}")
print(f"Distinct Products: {distinct_products:,}")
print(f"Duplicate Items: {total_rows - distinct_items:,} ({((total_rows - distinct_items) / total_rows * 100):.2f}%)")
print(f"Date Range: {date_stats['min_date']} to {date_stats['max_date']}")
print(f"\nPrimary Item Distribution:")
primary_item_stats.show()
print(f"\nRevenue Metrics:")
print(f"  Total Revenue: ${revenue_stats['total_revenue']:,.2f}")
print(f"  Average Item Price: ${revenue_stats['avg_item_price']:.2f}")
print(f"  Total COGS: ${revenue_stats['total_cogs']:,.2f}")
print(f"  Total Margin: ${revenue_stats['total_margin']:,.2f}")
print(f"\nTop 5 Products by Volume:")
product_stats.show(5)

## 4. Validation Checks

In [None]:
# Initialize validation results storage
validation_results = []

def add_validation_result(check_name, check_type, column_name, passed, invalid_count, threshold, message):
    """Helper function to store validation results"""
    validation_results.append({
        "run_id": RUN_ID,
        "run_timestamp": RUN_TIMESTAMP,
        "table_name": SOURCE_TABLE,
        "check_name": check_name,
        "check_type": check_type,
        "column_name": column_name,
        "passed": "True" if passed else "False",
        "invalid_count": invalid_count,
        "threshold": threshold,
        "message": message
    })
    
    status = "✓ PASSED" if passed else "✗ FAILED"
    print(f"{status} - {check_name}: {message}")

In [None]:
# Check 1: Row count within expected range
row_count_valid = MIN_ROW_COUNT <= total_rows <= MAX_ROW_COUNT
add_validation_result(
    check_name="Row Count Range",
    check_type="completeness",
    column_name="*",
    passed=row_count_valid,
    invalid_count=0 if row_count_valid else total_rows,
    threshold=f"{MIN_ROW_COUNT}-{MAX_ROW_COUNT}",
    message=f"Row count {total_rows:,} is {'within' if row_count_valid else 'outside'} expected range"
)

In [None]:
# Check 2: Primary key uniqueness
duplicate_count = total_rows - distinct_items
duplicate_pct = (duplicate_count / total_rows * 100) if total_rows > 0 else 0
pk_valid = duplicate_pct <= MAX_DUPLICATE_PCT

add_validation_result(
    check_name="Primary Key Uniqueness",
    check_type="uniqueness",
    column_name=PK_COLUMN,
    passed=pk_valid,
    invalid_count=duplicate_count,
    threshold=f"<={MAX_DUPLICATE_PCT}%",
    message=f"Found {duplicate_count:,} duplicates ({duplicate_pct:.2f}%)"
)

In [None]:
# Check 3: Null value checks for critical columns
critical_columns = [PK_COLUMN, "created_at", "order_id", "product_id", "is_primary_item", "price_usd", "cogs_usd"]

for col_name in critical_columns:
    null_count = df.filter(col(col_name).isNull()).count()
    null_valid = null_count == 0
    
    add_validation_result(
        check_name=f"Null Check - {col_name}",
        check_type="completeness",
        column_name=col_name,
        passed=null_valid,
        invalid_count=null_count,
        threshold="0",
        message=f"Found {null_count:,} null values"
    )

In [None]:
# Check 4: Positive values for IDs and financial fields
numeric_checks = {
    PK_COLUMN: ">0",
    "order_id": ">0",
    "product_id": ">0",
    "price_usd": ">0",
    "cogs_usd": ">0"
}

for col_name, threshold in numeric_checks.items():
    invalid_count = df.filter(col(col_name) <= 0).count()
    valid = invalid_count == 0
    
    add_validation_result(
        check_name=f"Positive Value - {col_name}",
        check_type="validity",
        column_name=col_name,
        passed=valid,
        invalid_count=invalid_count,
        threshold=threshold,
        message=f"Found {invalid_count:,} non-positive values"
    )

In [None]:
# Check 5: Binary flag validation (is_primary_item)
invalid_flags = df.filter(~col("is_primary_item").isin([0, 1])).count()
flag_valid = invalid_flags == 0

add_validation_result(
    check_name="Binary Flag - is_primary_item",
    check_type="validity",
    column_name="is_primary_item",
    passed=flag_valid,
    invalid_count=invalid_flags,
    threshold="0 or 1",
    message=f"Found {invalid_flags:,} invalid binary values"
)

In [None]:
# Check 6: No future dates
current_timestamp = datetime.now()
future_dates = df.filter(col("created_at") > lit(current_timestamp)).count()
date_valid = future_dates == 0

add_validation_result(
    check_name="No Future Dates",
    check_type="validity",
    column_name="created_at",
    passed=date_valid,
    invalid_count=future_dates,
    threshold="<= current_date",
    message=f"Found {future_dates:,} future dates"
)

In [None]:
# Check 7: Business logic - price >= cogs
negative_margin = df.filter(col("price_usd") < col("cogs_usd")).count()
margin_valid = negative_margin == 0

add_validation_result(
    check_name="Business Logic - Price >= COGS",
    check_type="validity",
    column_name="price_usd,cogs_usd",
    passed=margin_valid,
    invalid_count=negative_margin,
    threshold="price_usd >= cogs_usd",
    message=f"Found {negative_margin:,} items with negative margin"
)

## 5. Calculate Quality Score

In [None]:
# Calculate overall quality score
total_checks = len(validation_results)
passed_checks = sum([1 for r in validation_results if r["passed"] == "True"])
quality_score = (passed_checks / total_checks * 100) if total_checks > 0 else 0
overall_status = "PASSED" if quality_score == 100 else "FAILED"

print("\n" + "="*60)
print(f"QUALITY SCORE: {quality_score:.1f}%")
print(f"CHECKS PASSED: {passed_checks}/{total_checks}")
print(f"OVERALL STATUS: {overall_status}")
print("="*60)

## 6. Persist Results to Quality Log

In [None]:
# Create validation log DataFrame with exact schema matching table
log_schema = StructType([
    StructField("run_id", StringType(), False),
    StructField("run_timestamp", TimestampType(), False),
    StructField("table_name", StringType(), False),
    StructField("check_name", StringType(), False),
    StructField("check_type", StringType(), False),
    StructField("column_name", StringType(), True),
    StructField("passed", StringType(), False),
    StructField("invalid_count", IntegerType(), False),
    StructField("threshold", StringType(), True),
    StructField("message", StringType(), True)
])

validation_log_df = spark.createDataFrame(validation_results, schema=log_schema)

# Write to quality log table (append mode)
validation_log_df.write.mode("append").saveAsTable(QUALITY_LOG_TABLE)

print(f"✓ Validation results written to {QUALITY_LOG_TABLE}")
print(f"  Records written: {len(validation_results)}")

## 7. Persist Summary to Quality Summary Table

In [None]:
# Calculate null violations
null_violations = sum([r["invalid_count"] for r in validation_results if r["check_type"] == "completeness" and "Null Check" in r["check_name"]])

# Create summary record
summary_data = [{
    "run_id": RUN_ID,
    "run_timestamp": RUN_TIMESTAMP,
    "table_name": SOURCE_TABLE,
    "row_count": total_rows,
    "pk_duplicate_count": duplicate_count,
    "null_violations": null_violations,
    "validation_checks_total": total_checks,
    "validation_checks_passed": passed_checks,
    "quality_score": f"{quality_score:.1f}",
    "overall_status": overall_status
}]

summary_schema = StructType([
    StructField("run_id", StringType(), False),
    StructField("run_timestamp", TimestampType(), False),
    StructField("table_name", StringType(), False),
    StructField("row_count", IntegerType(), False),
    StructField("pk_duplicate_count", IntegerType(), False),
    StructField("null_violations", IntegerType(), False),
    StructField("validation_checks_total", IntegerType(), False),
    StructField("validation_checks_passed", IntegerType(), False),
    StructField("quality_score", StringType(), False),
    StructField("overall_status", StringType(), False)
])

summary_df = spark.createDataFrame(summary_data, schema=summary_schema)

# Write to summary table (append mode)
summary_df.write.mode("append").saveAsTable(QUALITY_SUMMARY_TABLE)

print(f"✓ Summary written to {QUALITY_SUMMARY_TABLE}")
print(f"\nValidation Complete!")

## 8. Verification - Query Persisted Results

In [None]:
# Query and display persisted log results for this run
print("Validation Log Records:")
spark.sql(f"""
    SELECT check_name, check_type, column_name, passed, invalid_count, message
    FROM {QUALITY_LOG_TABLE}
    WHERE run_id = '{RUN_ID}'
    ORDER BY check_name
""").show(truncate=False)

# Query and display summary
print("\nQuality Summary:")
spark.sql(f"""
    SELECT table_name, row_count, pk_duplicate_count, null_violations,
           validation_checks_passed, validation_checks_total, 
           quality_score, overall_status
    FROM {QUALITY_SUMMARY_TABLE}
    WHERE run_id = '{RUN_ID}'
""").show(truncate=False)