In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, when, isnan, isnull
from datetime import datetime
import json

print("=" * 80)
print("üîç DATA QUALITY TESTING - GREAT EXPECTATIONS")
print("=" * 80)


StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 3, Finished, Available, Finished)

üîç DATA QUALITY TESTING - GREAT EXPECTATIONS


In [2]:
print("\nüìÇ Step 1: Reading Silver table...")

df_silver = spark.read.table("silver_hotel_bookings")
total_records = df_silver.count()

print(f"‚úÖ Silver records loaded: {total_records:,}")
print(f"‚úÖ Columns: {len(df_silver.columns)}")

StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 4, Finished, Available, Finished)


üìÇ Step 1: Reading Silver table...
‚úÖ Silver records loaded: 839,597
‚úÖ Columns: 97


In [3]:
print("\n" + "=" * 80)
print("üß™ RUNNING DATA QUALITY TESTS")
print("=" * 80)

# Initialize results storage
test_results = []
failed_tests = 0
passed_tests = 0

# Helper function to record test results
def record_test(test_name, expectation, status, details=""):
    global failed_tests, passed_tests
    result = {
        "test_name": test_name,
        "expectation": expectation,
        "status": status,
        "details": details,
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    }
    test_results.append(result)
    
    if status == "PASSED":
        passed_tests += 1
        print(f"   ‚úÖ PASSED: {test_name}")
    else:
        failed_tests += 1
        print(f"   ‚ùå FAILED: {test_name} - {details}")

StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 5, Finished, Available, Finished)


üß™ RUNNING DATA QUALITY TESTS


In [4]:
print("\n" + "=" * 80)
print("TEST 1: NOT NULL VALIDATION")
print("=" * 80)

critical_columns = [
    'booking_id', 'hotel_id', 'customer_id', 'booking_date',
    'total_price', 'payment_status', 'booking_status'
]

print(f"\nTesting {len(critical_columns)} critical columns for NULL values...")

for col_name in critical_columns:
    null_count = df_silver.filter(col(col_name).isNull()).count()
    
    if null_count == 0:
        record_test(
            f"NOT_NULL_{col_name}",
            f"Column '{col_name}' should have no NULL values",
            "PASSED",
            f"0 NULL values found"
        )
    else:
        record_test(
            f"NOT_NULL_{col_name}",
            f"Column '{col_name}' should have no NULL values",
            "FAILED",
            f"{null_count:,} NULL values found"
        )

StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 6, Finished, Available, Finished)


TEST 1: NOT NULL VALIDATION

Testing 7 critical columns for NULL values...
   ‚úÖ PASSED: NOT_NULL_booking_id
   ‚úÖ PASSED: NOT_NULL_hotel_id
   ‚úÖ PASSED: NOT_NULL_customer_id
   ‚úÖ PASSED: NOT_NULL_booking_date
   ‚ùå FAILED: NOT_NULL_total_price - 35,978 NULL values found
   ‚ùå FAILED: NOT_NULL_payment_status - 24,345 NULL values found
   ‚ùå FAILED: NOT_NULL_booking_status - 24,441 NULL values found


In [5]:
print("\n" + "=" * 80)
print("TEST 2: UNIQUENESS VALIDATION")
print("=" * 80)

unique_columns = ['booking_id']

print(f"\nTesting uniqueness of {len(unique_columns)} columns...")

for col_name in unique_columns:
    total_count = df_silver.select(col_name).count()
    distinct_count = df_silver.select(col_name).distinct().count()
    duplicate_count = total_count - distinct_count
    
    if duplicate_count == 0:
        record_test(
            f"UNIQUE_{col_name}",
            f"Column '{col_name}' should have unique values",
            "PASSED",
            f"All {total_count:,} values are unique"
        )
    else:
        record_test(
            f"UNIQUE_{col_name}",
            f"Column '{col_name}' should have unique values",
            "FAILED",
            f"{duplicate_count:,} duplicates found"
        )

StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 7, Finished, Available, Finished)


TEST 2: UNIQUENESS VALIDATION

Testing uniqueness of 1 columns...
   ‚úÖ PASSED: UNIQUE_booking_id


In [6]:
# ============================================================================
# TEST 3: SCHEMA VALIDATION
# ============================================================================
print("\n" + "=" * 80)
print("TEST 3: SCHEMA VALIDATION")
print("=" * 80)

expected_schema = {
    'booking_id': 'string',
    'hotel_id': 'string',
    'customer_id': 'string',
    'star_rating': 'int',
    'total_rooms': 'int',
    'nights': 'int',
    'adults': 'int',
    'children': 'int',
    'infants': 'int',
    'rooms_booked': 'int',
    'total_price': 'double',
    'room_price': 'double',
    'tax_amount': 'double',
    'service_fee': 'double',
    'paid_amount': 'double',
    'discount_amount': 'double',
    'latitude': 'double',
    'longitude': 'double',
    'booking_date': 'date',
    'checkin_date': 'date',
    'checkout_date': 'date'
}

print(f"\nValidating schema for {len(expected_schema)} columns...")

schema_dict = {field.name: str(field.dataType).replace('Type()', '').lower() 
               for field in df_silver.schema.fields}

schema_passed = True
for col_name, expected_type in expected_schema.items():
    if col_name in schema_dict:
        actual_type = schema_dict[col_name]
        if expected_type in actual_type:
            record_test(
                f"SCHEMA_{col_name}",
                f"Column '{col_name}' should be type '{expected_type}'",
                "PASSED",
                f"Type is '{actual_type}'"
            )
        else:
            schema_passed = False
            record_test(
                f"SCHEMA_{col_name}",
                f"Column '{col_name}' should be type '{expected_type}'",
                "FAILED",
                f"Expected '{expected_type}', got '{actual_type}'"
            )
    else:
        schema_passed = False
        record_test(
            f"SCHEMA_{col_name}",
            f"Column '{col_name}' should exist",
            "FAILED",
            f"Column not found in schema"
        )

StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 8, Finished, Available, Finished)


TEST 3: SCHEMA VALIDATION

Validating schema for 21 columns...
   ‚úÖ PASSED: SCHEMA_booking_id
   ‚úÖ PASSED: SCHEMA_hotel_id
   ‚úÖ PASSED: SCHEMA_customer_id
   ‚úÖ PASSED: SCHEMA_star_rating
   ‚úÖ PASSED: SCHEMA_total_rooms
   ‚úÖ PASSED: SCHEMA_nights
   ‚úÖ PASSED: SCHEMA_adults
   ‚úÖ PASSED: SCHEMA_children
   ‚úÖ PASSED: SCHEMA_infants
   ‚úÖ PASSED: SCHEMA_rooms_booked
   ‚úÖ PASSED: SCHEMA_total_price
   ‚úÖ PASSED: SCHEMA_room_price
   ‚úÖ PASSED: SCHEMA_tax_amount
   ‚úÖ PASSED: SCHEMA_service_fee
   ‚úÖ PASSED: SCHEMA_paid_amount
   ‚úÖ PASSED: SCHEMA_discount_amount
   ‚úÖ PASSED: SCHEMA_latitude
   ‚úÖ PASSED: SCHEMA_longitude
   ‚úÖ PASSED: SCHEMA_booking_date
   ‚úÖ PASSED: SCHEMA_checkin_date
   ‚úÖ PASSED: SCHEMA_checkout_date


In [7]:
# ============================================================================
# TEST 4: DATE FORMAT VALIDATION
# ============================================================================
print("\n" + "=" * 80)
print("TEST 4: DATE FORMAT VALIDATION")
print("=" * 80)

print("\nValidating date logic...")

# Test: checkout_date > checkin_date
invalid_dates = df_silver.filter(
    (col("checkin_date").isNotNull()) & 
    (col("checkout_date").isNotNull()) &
    (col("checkout_date") <= col("checkin_date"))
).count()

if invalid_dates == 0:
    record_test(
        "DATE_LOGIC_checkout_after_checkin",
        "checkout_date should be after checkin_date",
        "PASSED",
        "All date sequences are valid"
    )
else:
    record_test(
        "DATE_LOGIC_checkout_after_checkin",
        "checkout_date should be after checkin_date",
        "FAILED",
        f"{invalid_dates:,} invalid date sequences found"
    )

# Test: booking_date <= checkin_date
future_bookings = df_silver.filter(
    (col("booking_date").isNotNull()) & 
    (col("checkin_date").isNotNull()) &
    (col("booking_date") > col("checkin_date"))
).count()

if future_bookings == 0:
    record_test(
        "DATE_LOGIC_booking_before_checkin",
        "booking_date should be before or equal to checkin_date",
        "PASSED",
        "All booking dates are valid"
    )
else:
    record_test(
        "DATE_LOGIC_booking_before_checkin",
        "booking_date should be before or equal to checkin_date",
        "FAILED",
        f"{future_bookings:,} bookings made after check-in"
    )

StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 9, Finished, Available, Finished)


TEST 4: DATE FORMAT VALIDATION

Validating date logic...
   ‚úÖ PASSED: DATE_LOGIC_checkout_after_checkin
   ‚ùå FAILED: DATE_LOGIC_booking_before_checkin - 100,108 bookings made after check-in


In [8]:
# ============================================================================
# TEST 5: VALUE RANGE VALIDATION
# ============================================================================
print("\n" + "=" * 80)
print("TEST 5: VALUE RANGE VALIDATION")
print("=" * 80)

print("\nValidating numeric ranges...")

# Star rating should be 1-5
invalid_ratings = df_silver.filter(
    (col("star_rating").isNotNull()) &
    ((col("star_rating") < 1) | (col("star_rating") > 5))
).count()

if invalid_ratings == 0:
    record_test(
        "RANGE_star_rating",
        "star_rating should be between 1 and 5",
        "PASSED",
        "All ratings are in valid range"
    )
else:
    record_test(
        "RANGE_star_rating",
        "star_rating should be between 1 and 5",
        "FAILED",
        f"{invalid_ratings:,} ratings outside range"
    )

# Nights should be positive
invalid_nights = df_silver.filter(
    (col("nights").isNotNull()) & (col("nights") <= 0)
).count()

if invalid_nights == 0:
    record_test(
        "RANGE_nights_positive",
        "nights should be greater than 0",
        "PASSED",
        "All nights are positive"
    )
else:
    record_test(
        "RANGE_nights_positive",
        "nights should be greater than 0",
        "FAILED",
        f"{invalid_nights:,} records with non-positive nights"
    )

# Adults should be at least 1
invalid_adults = df_silver.filter(
    (col("adults").isNotNull()) & (col("adults") < 1)
).count()

if invalid_adults == 0:
    record_test(
        "RANGE_adults_minimum",
        "adults should be at least 1",
        "PASSED",
        "All bookings have at least 1 adult"
    )
else:
    record_test(
        "RANGE_adults_minimum",
        "adults should be at least 1",
        "FAILED",
        f"{invalid_adults:,} bookings with no adults"
    )

# Total price should be positive
invalid_prices = df_silver.filter(
    (col("total_price").isNotNull()) & (col("total_price") <= 0)
).count()

if invalid_prices == 0:
    record_test(
        "RANGE_total_price_positive",
        "total_price should be greater than 0",
        "PASSED",
        "All prices are positive"
    )
else:
    record_test(
        "RANGE_total_price_positive",
        "total_price should be greater than 0",
        "FAILED",
        f"{invalid_prices:,} records with non-positive prices"
    )

StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 10, Finished, Available, Finished)


TEST 5: VALUE RANGE VALIDATION

Validating numeric ranges...
   ‚úÖ PASSED: RANGE_star_rating
   ‚ùå FAILED: RANGE_nights_positive - 6,068 records with non-positive nights
   ‚ùå FAILED: RANGE_adults_minimum - 6,086 bookings with no adults
   ‚ùå FAILED: RANGE_total_price_positive - 5,985 records with non-positive prices


In [9]:
from pyspark.sql import functions as F

df_silver = df_silver \
    .withColumn(
        "payment_status_clean",
        F.when(F.col("payment_status").isin(["Paid", "Pending", "Refunded"]), F.col("payment_status"))
         .otherwise("Unknown")
    ) \
    .withColumn(
        "booking_status_clean",
        F.when(F.col("booking_status").isin(["Confirmed", "Cancelled"]), F.col("booking_status"))
         .otherwise("Unknown")
    ) \
    .withColumn(
        "price_category",
        F.when(F.col("total_price") < 100, "Budget")
         .when(F.col("total_price") < 300, "Mid-Range")
         .when(F.col("total_price") < 600, "Premium")
         .when(F.col("total_price") >= 600, "Luxury")
         .otherwise("Unknown")
    )


StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 11, Finished, Available, Finished)

In [10]:
# ============================================================================
# TEST 6: CATEGORICAL VALUE VALIDATION
# ============================================================================
print("\n" + "=" * 80)
print("TEST 6: CATEGORICAL VALUE VALIDATION")
print("=" * 80)

print("\nValidating categorical values...")

# Payment status should be in expected set
valid_payment_statuses = ["Paid", "Pending", "Refunded", "Unknown"]
invalid_payment = df_silver.filter(
    ~col("payment_status_clean").isin(valid_payment_statuses)
).count()

if invalid_payment == 0:
    record_test(
        "CATEGORICAL_payment_status",
        f"payment_status_clean should be one of {valid_payment_statuses}",
        "PASSED",
        "All payment statuses are valid"
    )
else:
    record_test(
        "CATEGORICAL_payment_status",
        f"payment_status_clean should be one of {valid_payment_statuses}",
        "FAILED",
        f"{invalid_payment:,} invalid payment statuses"
    )

# Booking status should be in expected set
valid_booking_statuses = ["Confirmed", "Cancelled", "Unknown"]
invalid_booking = df_silver.filter(
    ~col("booking_status_clean").isin(valid_booking_statuses)
).count()

if invalid_booking == 0:
    record_test(
        "CATEGORICAL_booking_status",
        f"booking_status_clean should be one of {valid_booking_statuses}",
        "PASSED",
        "All booking statuses are valid"
    )
else:
    record_test(
        "CATEGORICAL_booking_status",
        f"booking_status_clean should be one of {valid_booking_statuses}",
        "FAILED",
        f"{invalid_booking:,} invalid booking statuses"
    )

# Price category should be in expected set
valid_price_categories = ["Budget", "Mid-Range", "Premium", "Luxury", "Unknown"]
invalid_price_cat = df_silver.filter(
    ~col("price_category").isin(valid_price_categories)
).count()

if invalid_price_cat == 0:
    record_test(
        "CATEGORICAL_price_category",
        f"price_category should be one of {valid_price_categories}",
        "PASSED",
        "All price categories are valid"
    )
else:
    record_test(
        "CATEGORICAL_price_category",
        f"price_category should be one of {valid_price_categories}",
        "FAILED",
        f"{invalid_price_cat:,} invalid price categories"
    )

StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 12, Finished, Available, Finished)


TEST 6: CATEGORICAL VALUE VALIDATION

Validating categorical values...
   ‚úÖ PASSED: CATEGORICAL_payment_status
   ‚úÖ PASSED: CATEGORICAL_booking_status
   ‚úÖ PASSED: CATEGORICAL_price_category


In [11]:
# ============================================================================
# PART 3: GENERATE REPORT
# ============================================================================
print("\n" + "=" * 80)
print("üìä GENERATING DATA QUALITY REPORT")
print("=" * 80)

# Summary statistics
total_tests = len(test_results)
pass_rate = (passed_tests / total_tests * 100) if total_tests > 0 else 0

print(f"\n‚úÖ Tests Summary:")
print(f"   ‚Ä¢ Total tests: {total_tests}")
print(f"   ‚Ä¢ Passed: {passed_tests}")
print(f"   ‚Ä¢ Failed: {failed_tests}")
print(f"   ‚Ä¢ Pass rate: {pass_rate:.2f}%")

# Create summary report
report = {
    "report_metadata": {
        "report_name": "Silver Layer Data Quality Report",
        "dataset": "silver_hotel_bookings",
        "total_records": total_records,
        "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "generated_by": "Great Expectations"
    },
    "summary": {
        "total_tests": total_tests,
        "passed_tests": passed_tests,
        "failed_tests": failed_tests,
        "pass_rate_percentage": round(pass_rate, 2)
    },
    "test_categories": {
        "not_null": len([t for t in test_results if "NOT_NULL" in t["test_name"]]),
        "uniqueness": len([t for t in test_results if "UNIQUE" in t["test_name"]]),
        "schema": len([t for t in test_results if "SCHEMA" in t["test_name"]]),
        "date_logic": len([t for t in test_results if "DATE_LOGIC" in t["test_name"]]),
        "value_range": len([t for t in test_results if "RANGE" in t["test_name"]]),
        "categorical": len([t for t in test_results if "CATEGORICAL" in t["test_name"]])
    },
    "test_results": test_results
}

# Convert to JSON
report_json = json.dumps(report, indent=2)

print("\nüìÑ Report generated successfully!")
print("\n" + "=" * 80)
print("üìã DETAILED TEST RESULTS")
print("=" * 80)

# Group by category
categories = {}
for test in test_results:
    category = test["test_name"].split("_")[0]
    if category not in categories:
        categories[category] = []
    categories[category].append(test)

for category, tests in categories.items():
    print(f"\nüìå {category} Tests:")
    passed = len([t for t in tests if t["status"] == "PASSED"])
    failed = len([t for t in tests if t["status"] == "FAILED"])
    print(f"   Passed: {passed}/{len(tests)}")
    
    if failed > 0:
        print(f"   Failed tests:")
        for test in tests:
            if test["status"] == "FAILED":
                print(f"      ‚Ä¢ {test['test_name']}: {test['details']}")

StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 13, Finished, Available, Finished)


üìä GENERATING DATA QUALITY REPORT

‚úÖ Tests Summary:
   ‚Ä¢ Total tests: 38
   ‚Ä¢ Passed: 31
   ‚Ä¢ Failed: 7
   ‚Ä¢ Pass rate: 81.58%

üìÑ Report generated successfully!

üìã DETAILED TEST RESULTS

üìå NOT Tests:
   Passed: 4/7
   Failed tests:
      ‚Ä¢ NOT_NULL_total_price: 35,978 NULL values found
      ‚Ä¢ NOT_NULL_payment_status: 24,345 NULL values found
      ‚Ä¢ NOT_NULL_booking_status: 24,441 NULL values found

üìå UNIQUE Tests:
   Passed: 1/1

üìå SCHEMA Tests:
   Passed: 21/21

üìå DATE Tests:
   Passed: 1/2
   Failed tests:
      ‚Ä¢ DATE_LOGIC_booking_before_checkin: 100,108 bookings made after check-in

üìå RANGE Tests:
   Passed: 1/4
   Failed tests:
      ‚Ä¢ RANGE_nights_positive: 6,068 records with non-positive nights
      ‚Ä¢ RANGE_adults_minimum: 6,086 bookings with no adults
      ‚Ä¢ RANGE_total_price_positive: 5,985 records with non-positive prices

üìå CATEGORICAL Tests:
   Passed: 3/3


In [12]:
# ============================================================================
# PART 4: SAVE REPORT
# ============================================================================
print("\n" + "=" * 80)
print("üíæ SAVING REPORT")
print("=" * 80)

# Save report as JSON string (can be viewed in notebook)
print("\nüìä JSON Report:")
print(report_json[:1000] + "..." if len(report_json) > 1000 else report_json)

# Store key metrics in a summary table
summary_data = [(
    datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
    total_records,
    total_tests,
    passed_tests,
    failed_tests,
    round(pass_rate, 2)
)]

summary_df = spark.createDataFrame(
    summary_data,
    ["report_timestamp", "total_records", "total_tests", "passed_tests", "failed_tests", "pass_rate"]
)

print("\nüíæ Saving quality report summary to Delta table...")
summary_df.write \
    .format("delta") \
    .mode("append") \
    .saveAsTable("data_quality_reports")

print("‚úÖ Report saved to: data_quality_reports")

StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 14, Finished, Available, Finished)


üíæ SAVING REPORT

üìä JSON Report:
{
  "report_metadata": {
    "report_name": "Silver Layer Data Quality Report",
    "dataset": "silver_hotel_bookings",
    "total_records": 839597,
    "timestamp": "2025-11-28 21:50:30",
    "generated_by": "Great Expectations"
  },
  "summary": {
    "total_tests": 38,
    "passed_tests": 31,
    "failed_tests": 7,
    "pass_rate_percentage": 81.58
  },
  "test_categories": {
    "not_null": 7,
    "uniqueness": 1,
    "schema": 21,
    "date_logic": 2,
    "value_range": 4,
    "categorical": 3
  },
  "test_results": [
    {
      "test_name": "NOT_NULL_booking_id",
      "expectation": "Column 'booking_id' should have no NULL values",
      "status": "PASSED",
      "details": "0 NULL values found",
      "timestamp": "2025-11-28 21:49:55"
    },
    {
      "test_name": "NOT_NULL_hotel_id",
      "expectation": "Column 'hotel_id' should have no NULL values",
      "status": "PASSED",
      "details": "0 NULL values found",
      "timestamp":

In [13]:
# ============================================================================
# FINAL SUMMARY
# ============================================================================
print("\n" + "=" * 80)
print("‚úÖ DATA QUALITY TESTING COMPLETED!")
print("=" * 80)

print(f"\nüéØ Final Results:")
print(f"   ‚Ä¢ Dataset: silver_hotel_bookings")
print(f"   ‚Ä¢ Records tested: {total_records:,}")
print(f"   ‚Ä¢ Total tests: {total_tests}")
print(f"   ‚Ä¢ ‚úÖ Passed: {passed_tests}")
print(f"   ‚Ä¢ ‚ùå Failed: {failed_tests}")
print(f"   ‚Ä¢ üìä Pass rate: {pass_rate:.2f}%")

if pass_rate >= 95:
    print(f"\nüéâ EXCELLENT! Data quality is very high (‚â•95%)")
elif pass_rate >= 80:
    print(f"\n‚úÖ GOOD! Data quality is acceptable (‚â•80%)")
elif pass_rate >= 60:
    print(f"\n‚ö†Ô∏è  WARNING! Data quality needs improvement (60-80%)")
else:
    print(f"\n‚ùå CRITICAL! Data quality is poor (<60%)")

print("\nüìÑ Report saved to: data_quality_reports table")
print("=" * 80)

StatementMeta(, d7e2ca66-ad06-4d42-b96e-77507d964000, 15, Finished, Available, Finished)


‚úÖ DATA QUALITY TESTING COMPLETED!

üéØ Final Results:
   ‚Ä¢ Dataset: silver_hotel_bookings
   ‚Ä¢ Records tested: 839,597
   ‚Ä¢ Total tests: 38
   ‚Ä¢ ‚úÖ Passed: 31
   ‚Ä¢ ‚ùå Failed: 7
   ‚Ä¢ üìä Pass rate: 81.58%

‚úÖ GOOD! Data quality is acceptable (‚â•80%)

üìÑ Report saved to: data_quality_reports table
