# Week 5 ‚Äì Data Quality & Validation

## Learning Objectives
By the end of this session, you will:
- üîç Identify data quality dimensions and common issues in datasets
- üìã Design validation rules using schema enforcement and rule-based approaches
- üéØ Implement Great Expectations Core for declarative validation
- üõ°Ô∏è Apply Delta constraints and PySpark validation logic
- üìä Set up data quality monitoring and profiling dashboards
- ‚úÖ Build robust data quality pipelines with automated validation

## Setup and Imports

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import DeltaTable
import pandas as pd

# Initialize Spark session
spark = SparkSession.builder \
    .appName("Week5_DataQuality") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

print("‚úÖ Spark session initialized")

## Topic 1: Data Quality Dimensions & Issue Identification

### Create Sample Dataset with Quality Issues

In [None]:
# Create sample orders data with intentional quality issues
sample_data = [
    ("ORD001", "CUST001", 150.50, "2024-01-15", "completed", "john@email.com"),
    ("ORD002", "CUST002", -25.00, "2024-01-16", "pending", "jane.doe@company.com"),  # Negative amount
    ("ORD003", None, 75.25, "2024-01-17", "cancelled", "invalid-email"),  # Missing customer_id, invalid email
    ("ORD004", "CUST003", 200.00, "2024-01-18", "completed", "bob@test.org"),
    ("ORD004", "CUST003", 200.00, "2024-01-18", "completed", "bob@test.org"),  # Duplicate
    (None, "CUST004", 99.99, "2024-01-19", "invalid_status", "alice@domain.co.uk"),  # Missing order_id, invalid status
    ("ORD006", "CUST005", None, "2024-01-20", "pending", "charlie@email.net"),  # Missing amount
    ("ORD007", "CUST006", 300.75, None, "completed", "diana@company.com"),  # Missing date
]

schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("amount", DoubleType(), True),
    StructField("order_date", StringType(), True),
    StructField("status", StringType(), True),
    StructField("email", StringType(), True)
])

df = spark.createDataFrame(sample_data, schema)
df.show()
print(f"Total records: {df.count()}")

### Data Quality Assessment

In [None]:
# 1. Schema inspection
print("=== SCHEMA INSPECTION ===")
df.printSchema()

# 2. Basic statistics
print("\n=== BASIC STATISTICS ===")
df.describe().show()

In [None]:
# 3. Check for missing values (Completeness)
print("=== COMPLETENESS CHECK ===")
null_counts = df.select([count(when(col(c).isNull(), c)).alias(c) for c in df.columns])
null_counts.show()

# Calculate completeness percentage
total_rows = df.count()
for column in df.columns:
    null_count = df.filter(col(column).isNull()).count()
    completeness = ((total_rows - null_count) / total_rows) * 100
    print(f"{column}: {completeness:.1f}% complete")

In [None]:
# 4. Check for duplicates (Uniqueness)
print("=== UNIQUENESS CHECK ===")
duplicate_orders = df.groupBy("order_id").count().filter("count > 1")
print(f"Duplicate order_ids: {duplicate_orders.count()}")
duplicate_orders.show()

# Show duplicate records
if duplicate_orders.count() > 0:
    duplicate_ids = [row.order_id for row in duplicate_orders.collect()]
    df.filter(col("order_id").isin(duplicate_ids)).show()

In [None]:
# 5. Check for invalid ranges (Validity)
print("=== VALIDITY CHECK ===")

# Negative amounts
negative_amounts = df.filter(col("amount") < 0).count()
print(f"Records with negative amounts: {negative_amounts}")

# Invalid email formats
email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
invalid_emails = df.filter(~col("email").rlike(email_pattern)).count()
print(f"Records with invalid email formats: {invalid_emails}")

# Invalid status values
valid_statuses = ["pending", "completed", "cancelled"]
invalid_status = df.filter(~col("status").isin(valid_statuses)).count()
print(f"Records with invalid status: {invalid_status}")

# Show invalid records
print("\nInvalid records:")
df.filter(
    (col("amount") < 0) | 
    (~col("email").rlike(email_pattern)) | 
    (~col("status").isin(valid_statuses))
).show()

### Quality Issues Summary

In [None]:
# Create quality issues summary
quality_issues = [
    ("Missing order_id", "Completeness", df.filter(col("order_id").isNull()).count()),
    ("Missing customer_id", "Completeness", df.filter(col("customer_id").isNull()).count()),
    ("Missing amount", "Completeness", df.filter(col("amount").isNull()).count()),
    ("Missing order_date", "Completeness", df.filter(col("order_date").isNull()).count()),
    ("Duplicate order_ids", "Uniqueness", duplicate_orders.count()),
    ("Negative amounts", "Validity", negative_amounts),
    ("Invalid emails", "Validity", invalid_emails),
    ("Invalid status", "Validity", invalid_status)
]

quality_df = spark.createDataFrame(quality_issues, ["Issue", "Dimension", "Count"])
quality_df.show(truncate=False)

print("\nüìä Quality Assessment Complete!")

## Topic 2: Validation Approaches & Rule Design

### Schema Enforcement

In [None]:
# Define strict schema with proper types
strict_schema = StructType([
    StructField("order_id", StringType(), False),  # NOT NULL
    StructField("customer_id", StringType(), False),  # NOT NULL
    StructField("amount", DoubleType(), False),  # NOT NULL
    StructField("order_date", DateType(), False),  # NOT NULL
    StructField("status", StringType(), False),  # NOT NULL
    StructField("email", StringType(), True)  # NULLABLE
])

print("‚úÖ Strict schema defined")
print("Schema will enforce:")
print("- order_id: NOT NULL")
print("- customer_id: NOT NULL")
print("- amount: NOT NULL, DoubleType")
print("- order_date: NOT NULL, DateType")
print("- status: NOT NULL")
print("- email: NULLABLE")

### Rule-Based Validation Functions

In [None]:
def validate_orders(df):
    """Apply comprehensive validation rules"""
    
    # Define validation conditions
    email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    valid_statuses = ["pending", "completed", "cancelled"]
    
    return df.withColumn("validation_errors", 
        when(col("order_id").isNull(), "Missing order_id")
        .when(col("customer_id").isNull(), "Missing customer_id")
        .when(col("amount").isNull(), "Missing amount")
        .when(col("amount") <= 0, "Invalid amount (must be positive)")
        .when(col("order_date").isNull(), "Missing order_date")
        .when(~col("status").isin(valid_statuses), "Invalid status")
        .when(col("email").isNotNull() & ~col("email").rlike(email_pattern), "Invalid email format")
        .otherwise("Valid")
    ).withColumn("is_valid", 
        when(col("validation_errors") == "Valid", True).otherwise(False)
    )

# Apply validation
validated_df = validate_orders(df)
validated_df.show(truncate=False)

# Summary of validation results
validation_summary = validated_df.groupBy("validation_errors").count().orderBy(desc("count"))
validation_summary.show(truncate=False)

valid_count = validated_df.filter(col("is_valid") == True).count()
invalid_count = validated_df.filter(col("is_valid") == False).count()
print(f"\nüìä Validation Summary:")
print(f"Valid records: {valid_count}")
print(f"Invalid records: {invalid_count}")
print(f"Data quality rate: {(valid_count / df.count()) * 100:.1f}%")

### Advanced Validation Rules

In [None]:
def comprehensive_validation(df):
    """Apply multiple validation rules with detailed error tracking"""
    
    email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    valid_statuses = ["pending", "completed", "cancelled"]
    
    return df \
        .withColumn("has_order_id", col("order_id").isNotNull()) \
        .withColumn("has_customer_id", col("customer_id").isNotNull()) \
        .withColumn("has_valid_amount", (col("amount").isNotNull()) & (col("amount") > 0)) \
        .withColumn("has_order_date", col("order_date").isNotNull()) \
        .withColumn("has_valid_status", col("status").isin(valid_statuses)) \
        .withColumn("has_valid_email", 
            when(col("email").isNull(), True)  # Email is optional
            .otherwise(col("email").rlike(email_pattern))
        ) \
        .withColumn("validation_score", 
            col("has_order_id").cast("int") +
            col("has_customer_id").cast("int") +
            col("has_valid_amount").cast("int") +
            col("has_order_date").cast("int") +
            col("has_valid_status").cast("int") +
            col("has_valid_email").cast("int")
        ) \
        .withColumn("quality_grade", 
            when(col("validation_score") == 6, "A - Excellent")
            .when(col("validation_score") == 5, "B - Good")
            .when(col("validation_score") == 4, "C - Fair")
            .otherwise("D - Poor")
        )

# Apply comprehensive validation
comprehensive_df = comprehensive_validation(df)
comprehensive_df.select("order_id", "validation_score", "quality_grade").show()

# Quality grade distribution
grade_distribution = comprehensive_df.groupBy("quality_grade").count().orderBy("quality_grade")
grade_distribution.show()

## Topic 3: Great Expectations Core Implementation

### Install and Setup Great Expectations

In [None]:
# Note: In a real environment, you would install Great Expectations
# !pip install great-expectations

# For this demo, we'll simulate Great Expectations functionality
# with PySpark-based expectations

class SimpleExpectations:
    """Simplified Great Expectations-like functionality"""
    
    def __init__(self, df):
        self.df = df
        self.expectations = []
        self.results = []
    
    def expect_column_values_to_not_be_null(self, column):
        null_count = self.df.filter(col(column).isNull()).count()
        success = null_count == 0
        self.expectations.append({
            'expectation': f'expect_column_values_to_not_be_null({column})',
            'success': success,
            'details': f'Found {null_count} null values'
        })
        return self
    
    def expect_column_values_to_be_unique(self, column):
        total_count = self.df.count()
        unique_count = self.df.select(column).distinct().count()
        success = total_count == unique_count
        self.expectations.append({
            'expectation': f'expect_column_values_to_be_unique({column})',
            'success': success,
            'details': f'Total: {total_count}, Unique: {unique_count}'
        })
        return self
    
    def expect_column_values_to_be_between(self, column, min_value, max_value):
        out_of_range = self.df.filter(
            (col(column) < min_value) | (col(column) > max_value)
        ).count()
        success = out_of_range == 0
        self.expectations.append({
            'expectation': f'expect_column_values_to_be_between({column}, {min_value}, {max_value})',
            'success': success,
            'details': f'Found {out_of_range} values out of range'
        })
        return self
    
    def expect_column_values_to_be_in_set(self, column, value_set):
        invalid_count = self.df.filter(~col(column).isin(value_set)).count()
        success = invalid_count == 0
        self.expectations.append({
            'expectation': f'expect_column_values_to_be_in_set({column}, {value_set})',
            'success': success,
            'details': f'Found {invalid_count} invalid values'
        })
        return self
    
    def validate(self):
        all_passed = all(exp['success'] for exp in self.expectations)
        return {
            'success': all_passed,
            'expectations': self.expectations,
            'summary': f"{sum(1 for exp in self.expectations if exp['success'])}/{len(self.expectations)} expectations passed"
        }

print("‚úÖ Simple Expectations class created")

### Create and Run Expectation Suite

In [None]:
# Create expectation suite for orders data
expectations = SimpleExpectations(df)

# Add expectations
expectations \
    .expect_column_values_to_not_be_null("order_id") \
    .expect_column_values_to_not_be_null("customer_id") \
    .expect_column_values_to_be_unique("order_id") \
    .expect_column_values_to_be_between("amount", 0, 10000) \
    .expect_column_values_to_be_in_set("status", ["pending", "completed", "cancelled"])

# Run validation
results = expectations.validate()

print(f"\nüìä Expectation Suite Results:")
print(f"Overall Success: {results['success']}")
print(f"Summary: {results['summary']}")
print("\nDetailed Results:")

for exp in results['expectations']:
    status = "‚úÖ PASS" if exp['success'] else "‚ùå FAIL"
    print(f"{status} - {exp['expectation']}")
    print(f"    Details: {exp['details']}")

### Clean Data and Re-validate

In [None]:
# Clean the data by removing/fixing issues
clean_df = df \
    .filter(col("order_id").isNotNull()) \
    .filter(col("customer_id").isNotNull()) \
    .filter(col("amount").isNotNull() & (col("amount") > 0)) \
    .filter(col("status").isin(["pending", "completed", "cancelled"])) \
    .dropDuplicates(["order_id"])

print(f"Original records: {df.count()}")
print(f"Clean records: {clean_df.count()}")
print(f"Records removed: {df.count() - clean_df.count()}")

# Re-validate clean data
clean_expectations = SimpleExpectations(clean_df)
clean_expectations \
    .expect_column_values_to_not_be_null("order_id") \
    .expect_column_values_to_not_be_null("customer_id") \
    .expect_column_values_to_be_unique("order_id") \
    .expect_column_values_to_be_between("amount", 0, 10000) \
    .expect_column_values_to_be_in_set("status", ["pending", "completed", "cancelled"])

clean_results = clean_expectations.validate()

print(f"\nüìä Clean Data Validation Results:")
print(f"Overall Success: {clean_results['success']}")
print(f"Summary: {clean_results['summary']}")

if clean_results['success']:
    print("üéâ All expectations passed on clean data!")
else:
    print("‚ùå Some expectations still failing")

## Topic 4: Delta Constraints & PySpark Validation

### Create Delta Table with Constraints

In [None]:
# Create Delta table with constraints
spark.sql("""
CREATE OR REPLACE TABLE silver.orders (
    order_id STRING NOT NULL,
    customer_id STRING NOT NULL,
    amount DOUBLE,
    order_date STRING NOT NULL,
    status STRING,
    email STRING,
    CONSTRAINT positive_amount CHECK (amount > 0),
    CONSTRAINT valid_status CHECK (status IN ('pending', 'completed', 'cancelled'))
) USING DELTA
""")

print("‚úÖ Delta table created with constraints:")
print("- order_id: NOT NULL")
print("- customer_id: NOT NULL")
print("- order_date: NOT NULL")
print("- positive_amount: CHECK (amount > 0)")
print("- valid_status: CHECK (status IN ('pending', 'completed', 'cancelled'))")

### Test Constraint Enforcement

In [None]:
# Try to insert clean data (should succeed)
try:
    clean_df.write.format("delta").mode("append").saveAsTable("silver.orders")
    print("‚úÖ Clean data inserted successfully")
    
    # Check inserted data
    result_count = spark.table("silver.orders").count()
    print(f"Records in table: {result_count}")
    
except Exception as e:
    print(f"‚ùå Error inserting clean data: {str(e)}")

In [None]:
# Try to insert invalid data (should fail)
invalid_data = [
    ("ORD999", "CUST999", -100.0, "2024-01-25", "pending", "test@email.com")  # Negative amount
]

invalid_df = spark.createDataFrame(invalid_data, schema)

try:
    invalid_df.write.format("delta").mode("append").saveAsTable("silver.orders")
    print("‚ùå Invalid data was inserted (this shouldn't happen!)")
except Exception as e:
    print(f"‚úÖ Constraint enforcement working: {str(e)}")

### Advanced PySpark Validation Pipeline

In [None]:
def data_quality_pipeline(input_df, table_name):
    """Complete data quality pipeline with validation and cleansing"""
    
    print(f"üîÑ Starting data quality pipeline for {table_name}")
    
    # Step 1: Initial assessment
    initial_count = input_df.count()
    print(f"üìä Initial record count: {initial_count}")
    
    # Step 2: Apply validation rules
    validated_df = validate_orders(input_df)
    
    # Step 3: Separate valid and invalid records
    valid_df = validated_df.filter(col("is_valid") == True).drop("validation_errors", "is_valid")
    invalid_df = validated_df.filter(col("is_valid") == False)
    
    valid_count = valid_df.count()
    invalid_count = invalid_df.count()
    
    print(f"‚úÖ Valid records: {valid_count}")
    print(f"‚ùå Invalid records: {invalid_count}")
    print(f"üìà Data quality rate: {(valid_count / initial_count) * 100:.1f}%")
    
    # Step 4: Remove duplicates
    clean_df = valid_df.dropDuplicates(["order_id"])
    final_count = clean_df.count()
    duplicates_removed = valid_count - final_count
    
    if duplicates_removed > 0:
        print(f"üîÑ Removed {duplicates_removed} duplicate records")
    
    # Step 5: Write to Delta table
    try:
        clean_df.write.format("delta").mode("overwrite").saveAsTable(table_name)
        print(f"‚úÖ Data successfully written to {table_name}")
    except Exception as e:
        print(f"‚ùå Error writing to table: {str(e)}")
        return None
    
    # Step 6: Generate quality report
    quality_report = {
        'initial_records': initial_count,
        'valid_records': valid_count,
        'invalid_records': invalid_count,
        'duplicates_removed': duplicates_removed,
        'final_records': final_count,
        'quality_rate': (valid_count / initial_count) * 100,
        'retention_rate': (final_count / initial_count) * 100
    }
    
    return quality_report, invalid_df

# Run the pipeline
quality_report, rejected_records = data_quality_pipeline(df, "silver.orders")

if quality_report:
    print("\nüìã Quality Report:")
    for key, value in quality_report.items():
        if 'rate' in key:
            print(f"{key}: {value:.1f}%")
        else:
            print(f"{key}: {value}")

### Analyze Rejected Records

In [None]:
# Analyze rejected records
if rejected_records.count() > 0:
    print("üìã Rejected Records Analysis:")
    rejected_records.select("order_id", "customer_id", "amount", "status", "validation_errors").show(truncate=False)
    
    # Group by error type
    error_summary = rejected_records.groupBy("validation_errors").count().orderBy(desc("count"))
    print("\nüìä Error Type Distribution:")
    error_summary.show(truncate=False)
else:
    print("üéâ No rejected records!")

## Topic 5: Data Quality Monitoring & Profiling

### Create Quality Metrics Dashboard

In [None]:
def calculate_quality_metrics(df, table_name):
    """Calculate comprehensive quality metrics for monitoring"""
    
    total_rows = df.count()
    
    if total_rows == 0:
        print("‚ö†Ô∏è No data to analyze")
        return None
    
    # Completeness metrics
    completeness_metrics = {}
    for column in df.columns:
        null_count = df.filter(col(column).isNull()).count()
        completeness_metrics[f"{column}_completeness"] = ((total_rows - null_count) / total_rows) * 100
    
    # Uniqueness metrics
    unique_order_ids = df.select("order_id").distinct().count()
    uniqueness_rate = (unique_order_ids / total_rows) * 100
    
    # Validity metrics
    valid_amounts = df.filter((col("amount").isNotNull()) & (col("amount") > 0)).count()
    amount_validity = (valid_amounts / total_rows) * 100
    
    valid_statuses = df.filter(col("status").isin(["pending", "completed", "cancelled"])).count()
    status_validity = (valid_statuses / total_rows) * 100
    
    # Email format validity (for non-null emails)
    email_pattern = r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"
    non_null_emails = df.filter(col("email").isNotNull()).count()
    if non_null_emails > 0:
        valid_emails = df.filter(col("email").isNotNull() & col("email").rlike(email_pattern)).count()
        email_validity = (valid_emails / non_null_emails) * 100
    else:
        email_validity = 100.0  # No emails to validate
    
    # Overall quality score
    quality_scores = [
        completeness_metrics.get('order_id_completeness', 0),
        completeness_metrics.get('customer_id_completeness', 0),
        uniqueness_rate,
        amount_validity,
        status_validity,
        email_validity
    ]
    overall_quality = sum(quality_scores) / len(quality_scores)
    
    metrics = {
        'table_name': table_name,
        'timestamp': spark.sql("SELECT current_timestamp()").collect()[0][0],
        'total_records': total_rows,
        'unique_order_ids': unique_order_ids,
        'uniqueness_rate': uniqueness_rate,
        'amount_validity': amount_validity,
        'status_validity': status_validity,
        'email_validity': email_validity,
        'overall_quality_score': overall_quality,
        **completeness_metrics
    }
    
    return metrics

# Calculate metrics for our table
current_data = spark.table("silver.orders")
metrics = calculate_quality_metrics(current_data, "silver.orders")

if metrics:
    print("üìä Data Quality Metrics Dashboard")
    print("=" * 50)
    print(f"Table: {metrics['table_name']}")
    print(f"Timestamp: {metrics['timestamp']}")
    print(f"Total Records: {metrics['total_records']}")
    print(f"\nüéØ Quality Scores:")
    print(f"Overall Quality Score: {metrics['overall_quality_score']:.1f}%")
    print(f"Uniqueness Rate: {metrics['uniqueness_rate']:.1f}%")
    print(f"Amount Validity: {metrics['amount_validity']:.1f}%")
    print(f"Status Validity: {metrics['status_validity']:.1f}%")
    print(f"Email Validity: {metrics['email_validity']:.1f}%")
    print(f"\nüìã Completeness Rates:")
    for key, value in metrics.items():
        if 'completeness' in key:
            column_name = key.replace('_completeness', '')
            print(f"{column_name}: {value:.1f}%")

### Quality Monitoring Over Time

In [None]:
# Simulate quality monitoring over time
def simulate_quality_monitoring():
    """Simulate quality metrics over multiple time periods"""
    
    # Create sample data with varying quality
    monitoring_data = [
        ("2024-01-01", 1000, 95.5, 98.2, 92.1, 89.3),
        ("2024-01-02", 1050, 94.8, 97.9, 91.5, 88.7),
        ("2024-01-03", 980, 96.2, 98.5, 93.2, 90.1),
        ("2024-01-04", 1100, 93.1, 96.8, 89.4, 87.2),  # Quality dip
        ("2024-01-05", 1075, 95.9, 98.1, 92.8, 89.9),
    ]
    
    monitoring_schema = StructType([
        StructField("date", StringType(), False),
        StructField("record_count", IntegerType(), False),
        StructField("completeness_score", DoubleType(), False),
        StructField("uniqueness_score", DoubleType(), False),
        StructField("validity_score", DoubleType(), False),
        StructField("overall_quality", DoubleType(), False)
    ])
    
    monitoring_df = spark.createDataFrame(monitoring_data, monitoring_schema)
    
    print("üìà Quality Monitoring Trends:")
    monitoring_df.show()
    
    # Identify quality issues
    quality_threshold = 90.0
    alerts = monitoring_df.filter(col("overall_quality") < quality_threshold)
    
    if alerts.count() > 0:
        print(f"üö® Quality Alerts (below {quality_threshold}%):")
        alerts.show()
    else:
        print(f"‚úÖ All periods above quality threshold ({quality_threshold}%)")
    
    # Calculate trends
    avg_quality = monitoring_df.agg(avg("overall_quality")).collect()[0][0]
    min_quality = monitoring_df.agg(min("overall_quality")).collect()[0][0]
    max_quality = monitoring_df.agg(max("overall_quality")).collect()[0][0]
    
    print(f"\nüìä Quality Trends Summary:")
    print(f"Average Quality: {avg_quality:.1f}%")
    print(f"Best Quality: {max_quality:.1f}%")
    print(f"Worst Quality: {min_quality:.1f}%")
    print(f"Quality Range: {max_quality - min_quality:.1f}%")

simulate_quality_monitoring()

## Final Exercise: Complete Data Quality Pipeline

### Build End-to-End Quality Pipeline

In [None]:
def complete_quality_pipeline(source_df, target_table, quality_threshold=85.0):
    """Complete end-to-end data quality pipeline"""
    
    print("üöÄ Starting Complete Data Quality Pipeline")
    print("=" * 60)
    
    # Step 1: Initial Assessment
    print("\nüìä Step 1: Initial Data Assessment")
    initial_count = source_df.count()
    print(f"Initial record count: {initial_count}")
    
    # Step 2: Schema Validation
    print("\nüèóÔ∏è Step 2: Schema Validation")
    required_columns = ["order_id", "customer_id", "amount", "order_date", "status"]
    missing_columns = [col for col in required_columns if col not in source_df.columns]
    
    if missing_columns:
        print(f"‚ùå Missing required columns: {missing_columns}")
        return None
    else:
        print("‚úÖ All required columns present")
    
    # Step 3: Great Expectations Validation
    print("\nüéØ Step 3: Great Expectations Validation")
    expectations = SimpleExpectations(source_df)
    expectations \
        .expect_column_values_to_not_be_null("order_id") \
        .expect_column_values_to_not_be_null("customer_id") \
        .expect_column_values_to_be_unique("order_id") \
        .expect_column_values_to_be_between("amount", 0, 10000) \
        .expect_column_values_to_be_in_set("status", ["pending", "completed", "cancelled"])
    
    ge_results = expectations.validate()
    print(f"Expectations passed: {ge_results['summary']}")
    
    # Step 4: Rule-Based Validation
    print("\nüìã Step 4: Rule-Based Validation")
    validated_df = validate_orders(source_df)
    valid_df = validated_df.filter(col("is_valid") == True).drop("validation_errors", "is_valid")
    invalid_df = validated_df.filter(col("is_valid") == False)
    
    valid_count = valid_df.count()
    invalid_count = invalid_df.count()
    quality_rate = (valid_count / initial_count) * 100
    
    print(f"Valid records: {valid_count}")
    print(f"Invalid records: {invalid_count}")
    print(f"Quality rate: {quality_rate:.1f}%")
    
    # Step 5: Quality Gate
    print(f"\nüö™ Step 5: Quality Gate (threshold: {quality_threshold}%)")
    if quality_rate < quality_threshold:
        print(f"‚ùå Quality gate failed: {quality_rate:.1f}% < {quality_threshold}%")
        print("Pipeline stopped. Data quality too low.")
        return {
            'success': False,
            'quality_rate': quality_rate,
            'message': 'Quality gate failed'
        }
    else:
        print(f"‚úÖ Quality gate passed: {quality_rate:.1f}% >= {quality_threshold}%")
    
    # Step 6: Data Cleansing
    print("\nüßπ Step 6: Data Cleansing")
    clean_df = valid_df.dropDuplicates(["order_id"])
    final_count = clean_df.count()
    duplicates_removed = valid_count - final_count
    
    if duplicates_removed > 0:
        print(f"Removed {duplicates_removed} duplicate records")
    print(f"Final clean record count: {final_count}")
    
    # Step 7: Delta Constraints Enforcement
    print("\nüõ°Ô∏è Step 7: Delta Constraints Enforcement")
    try:
        clean_df.write.format("delta").mode("overwrite").saveAsTable(target_table)
        print(f"‚úÖ Data written to {target_table} with constraints enforced")
    except Exception as e:
        print(f"‚ùå Constraint violation: {str(e)}")
        return {
            'success': False,
            'message': f'Constraint violation: {str(e)}'
        }
    
    # Step 8: Quality Metrics Calculation
    print("\nüìä Step 8: Quality Metrics Calculation")
    final_data = spark.table(target_table)
    metrics = calculate_quality_metrics(final_data, target_table)
    
    # Step 9: Final Report
    print("\nüìã Step 9: Final Quality Report")
    print("=" * 40)
    
    pipeline_result = {
        'success': True,
        'initial_records': initial_count,
        'valid_records': valid_count,
        'invalid_records': invalid_count,
        'duplicates_removed': duplicates_removed,
        'final_records': final_count,
        'quality_rate': quality_rate,
        'retention_rate': (final_count / initial_count) * 100,
        'overall_quality_score': metrics['overall_quality_score'] if metrics else 0,
        'target_table': target_table
    }
    
    for key, value in pipeline_result.items():
        if key not in ['success', 'target_table']:
            if 'rate' in key or 'score' in key:
                print(f"{key.replace('_', ' ').title()}: {value:.1f}%")
            else:
                print(f"{key.replace('_', ' ').title()}: {value}")
    
    print("\nüéâ Data Quality Pipeline Completed Successfully!")
    return pipeline_result

# Run the complete pipeline
pipeline_result = complete_quality_pipeline(df, "silver.orders_final", quality_threshold=70.0)

if pipeline_result and pipeline_result['success']:
    print(f"\n‚úÖ Pipeline Success! Final table: {pipeline_result['target_table']}")
else:
    print("\n‚ùå Pipeline Failed!")

## Summary and Reflection

### Key Learnings

In [None]:
print("üìö Week 5 - Data Quality & Validation Summary")
print("=" * 60)
print("\nüîç Data Quality Dimensions Covered:")
print("‚úÖ Completeness - Identifying missing values")
print("‚úÖ Accuracy - Validating data correctness")
print("‚úÖ Validity - Checking formats and ranges")
print("‚úÖ Uniqueness - Detecting duplicates")
print("‚úÖ Consistency - Ensuring uniform values")
print("‚úÖ Timeliness - Data currency validation")

print("\nüìã Validation Approaches Implemented:")
print("‚úÖ Schema Enforcement - Type and structure validation")
print("‚úÖ Rule-Based Logic - Custom PySpark validation functions")
print("‚úÖ Great Expectations - Declarative expectation suites")
print("‚úÖ Delta Constraints - Database-level enforcement")

print("\nüõ°Ô∏è Quality Pipeline Components:")
print("‚úÖ Data profiling and assessment")
print("‚úÖ Multi-layer validation")
print("‚úÖ Quality gates and thresholds")
print("‚úÖ Data cleansing and deduplication")
print("‚úÖ Constraint enforcement")
print("‚úÖ Quality monitoring and reporting")

print("\nüìä Monitoring and Alerting:")
print("‚úÖ Quality metrics calculation")
print("‚úÖ Trend analysis")
print("‚úÖ Automated quality reporting")
print("‚úÖ Threshold-based alerting")

print("\nüéØ Next Steps:")
print("‚Ä¢ Implement real-time quality monitoring")
print("‚Ä¢ Set up automated quality alerts")
print("‚Ä¢ Create quality dashboards")
print("‚Ä¢ Establish data quality SLAs")
print("‚Ä¢ Build data lineage tracking")

print("\n‚ú® Congratulations! You've completed Week 5 - Data Quality & Validation")