# Data Pipeline Validation

This notebook verifies that all fixes have been successfully applied after the Airflow DAG rerun.

**Fixes to Verify:**
1. Num_Credit_Inquiries: 100% NULL → ~5% NULL
2. Age: string dtype → integer dtype
3. Outlier bounds: 7 new columns cleaned
4. Clickstream: Conditional fill (NULL for Jan 2023, 0 for non-clickers)

**Date**: 2025-10-26

In [1]:
import os
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize Spark
spark = SparkSession.builder \
    .appName("validation") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

print("Spark Session initialized")
print(f"Spark version: {spark.version}")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/26 09:01:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark Session initialized
Spark version: 3.5.0


## 1. Load Gold Feature Store

In [7]:
# Load gold feature store - Load only recent months to avoid schema mismatch
# NOTE: Old parquet files have INT/FLOAT schema, new ones have LONG/DOUBLE
# After fixing gold_layer.py schema, we'll need to rerun the DAG
# For now, let's just load files from later months that have actual clickstream data

feature_path = '/app/datamart/gold/feature_store/'

# Option 1: Load just one month to verify schema
print("Loading months with clickstream data)...")
months_to_load = [
    'gold_feature_store_2023_01_01.parquet',
    'gold_feature_store_2023_02_01.parquet',
    'gold_feature_store_2023_03_01.parquet',
    'gold_feature_store_2023_04_01.parquet',
    'gold_feature_store_2023_05_01.parquet',
    'gold_feature_store_2023_06_01.parquet',
    'gold_feature_store_2023_07_01.parquet',
    'gold_feature_store_2023_08_01.parquet',
    'gold_feature_store_2023_09_01.parquet', 
    'gold_feature_store_2023_10_01.parquet',
    'gold_feature_store_2023_11_01.parquet',
    'gold_feature_store_2023_12_01.parquet',
    'gold_feature_store_2024_01_01.parquet',
    'gold_feature_store_2024_02_01.parquet',
    'gold_feature_store_2024_03_01.parquet',
    'gold_feature_store_2024_04_01.parquet',
    'gold_feature_store_2024_05_01.parquet',
    'gold_feature_store_2024_06_01.parquet',
    'gold_feature_store_2024_07_01.parquet',
    'gold_feature_store_2024_08_01.parquet',
    'gold_feature_store_2024_09_01.parquet',
    'gold_feature_store_2024_10_01.parquet',
    'gold_feature_store_2024_11_01.parquet',
    'gold_feature_store_2024_12_01.parquet',
    'gold_feature_store_2025_01_01.parquet'
]

# Load only months with consistent schema
df_list = []
for month_file in months_to_load:
    file_path = f'{feature_path}{month_file}'
    df_month = spark.read.parquet(file_path)
    df_list.append(df_month)

# Union all dataframes
from pyspark.sql import DataFrame
df_spark = df_list[0]
for df_month in df_list[1:]:
    df_spark = df_spark.union(df_month)

df = df_spark.toPandas()

print("=" * 80)
print("GOLD FEATURE STORE - LOADED (Aug 2023 onwards)")
print("=" * 80)
print(f"Total rows: {len(df):,}")
print(f"Total columns: {len(df.columns)}")
print(f"Date range: {df['snapshot_date'].min()} to {df['snapshot_date'].max()}")

Loading months with clickstream data)...
GOLD FEATURE STORE - LOADED (Aug 2023 onwards)
Total rows: 12,500
Total columns: 30
Date range: 2023-01-01 to 2025-01-01


## 2. Verify Fix #1: Num_Credit_Inquiries

**Expected**: Should go from 100% NULL to ~5% NULL

In [8]:
print("=" * 80)
print("FIX #1: Num_Credit_Inquiries")
print("=" * 80)

if 'Num_Credit_Inquiries' in df.columns:
    nci_null = df['Num_Credit_Inquiries'].isnull().sum()
    nci_total = len(df)
    nci_null_pct = (nci_null / nci_total * 100)
    
    print(f"\nNull Count: {nci_null:,} / {nci_total:,} ({nci_null_pct:.1f}%)")
    
    if nci_null_pct < 10:
        print("✅ PASS: Num_Credit_Inquiries is now populated!")
        print(f"\nSample values:")
        print(df['Num_Credit_Inquiries'].dropna().head(20).tolist())
        print(f"\nStats:")
        print(f"  Min: {df['Num_Credit_Inquiries'].min()}")
        print(f"  Max: {df['Num_Credit_Inquiries'].max()}")
        print(f"  Mean: {df['Num_Credit_Inquiries'].mean():.2f}")
        print(f"  Median: {df['Num_Credit_Inquiries'].median():.2f}")
    elif nci_null_pct == 100:
        print("❌ FAIL: Still 100% NULL - Fix not applied!")
    else:
        print(f"⚠️  PARTIAL: {nci_null_pct:.1f}% NULL (expected < 10%)")
else:
    print("❌ FAIL: Num_Credit_Inquiries column not found!")

FIX #1: Num_Credit_Inquiries

Null Count: 192 / 12,500 (1.5%)
✅ PASS: Num_Credit_Inquiries is now populated!

Sample values:
[3.0, 5.0, 0.0, 4.0, 11.0, 3.0, 8.0, 2.0, 9.0, 5.0, 2.0, 6.0, 2.0, 4.0, 6.0, 9.0, 10.0, 2.0, 5.0, 6.0]

Stats:
  Min: 0.0
  Max: 46.0
  Mean: 6.64
  Median: 6.00


## 3. Verify Fix #2: Age Dtype

**Expected**: Should be integer dtype (not string)

In [13]:
from pyspark.sql.types import IntegerType, LongType, StringType, DoubleType, FloatType
from pyspark.sql.functions import col, isnan, when, count, min, max, avg, percentile_approx

print("=" * 80)
print("FIX #2: Age Dtype (Verified on Spark DataFrame)")
print("=" * 80)

if 'Age' in df_spark.columns:
    # --- 1. Check Dtype ---
    # Get the data type from the Spark schema
    age_dtype = df_spark.schema['Age'].dataType
    print(f"\nAge dtype: {age_dtype}")
    
    # Check if the type is one of Spark's integer types
    if isinstance(age_dtype, (IntegerType, LongType)):
        print("✅ PASS: Age is now integer/long dtype!")
        is_numeric = True
    elif isinstance(age_dtype, StringType):
        print("❌ FAIL: Age is still string dtype - Fix not applied!")
        is_numeric = False
    else:
        print(f"⚠️  UNEXPECTED: Age dtype is {age_dtype}")
        is_numeric = isinstance(age_dtype, (DoubleType, FloatType)) # Still numeric, but not integer

    # --- 2. Calculate Nulls (Requires Spark Actions) ---
    total_count = df_spark.count()
    age_null = df_spark.filter(col('Age').isNull()).count()
    age_null_pct = (age_null / total_count * 100) if total_count > 0 else 0
    print(f"\nNull Count: {age_null:,} / {total_count:,} ({age_null_pct:.1f}%)")
    
    # --- 3. Calculate Stats (Requires Spark Actions) ---
    if age_null < total_count and is_numeric:
        print(f"\nStats:")
        try:
            # Calculate stats in a single pass for efficiency
            stats = df_spark.select(
                min('Age').alias('Min'),
                max('Age').alias('Max'),
                avg('Age').alias('Mean'),
                percentile_approx('Age', 0.5).alias('Median') # Median is an approximation in Spark
            ).first()
            
            print(f"  Min: {stats['Min']}")
            print(f"  Max: {stats['Max']}")
            print(f"  Mean: {stats['Mean']:.1f}")
            print(f"  Median: {stats['Median']:.1f}")
        except Exception as e:
            print(f"  Unable to calculate stats (column may be non-numeric): {e}")
    elif not is_numeric:
         print("\nStats: Skipped (column is not numeric)")
else:
    print("❌ FAIL: Age column not found!")

FIX #2: Age Dtype (Verified on Spark DataFrame)

Age dtype: IntegerType()
✅ PASS: Age is now integer/long dtype!


                                                                                


Null Count: 601 / 12,500 (4.8%)

Stats:




  Min: 16
  Max: 56
  Mean: 34.1
  Median: 34.0


                                                                                

## 4. Verify Fix #3: Outlier Bounds

**Expected**: 7 new columns should have outliers removed

In [10]:
print("=" * 80)
print("FIX #3: Outlier Bounds")
print("=" * 80)

# Define expected bounds
outlier_bounds = {
    "Num_of_Loan": (0, 20),
    "Num_of_Delayed_Payment": (0, 100),
    "Annual_Income": (0, 1000000),
    "Amount_invested_monthly": (0, 10000),
    "Monthly_Balance": (-10000, 100000),
    "Outstanding_Debt": (0, 50000),
    "Delay_from_due_date": (0, 365)
}

results = []

for col_name, (min_val, max_val) in outlier_bounds.items():
    if col_name in df.columns:
        non_null = df[col_name].dropna()
        if len(non_null) > 0:
            outside_bounds = ((non_null < min_val) | (non_null > max_val)).sum()
            actual_min = non_null.min()
            actual_max = non_null.max()
            
            status = "✅ PASS" if outside_bounds == 0 else "❌ FAIL"
            
            results.append({
                'Column': col_name,
                'Expected_Range': f"({min_val}, {max_val})",
                'Actual_Min': f"{actual_min:.2f}",
                'Actual_Max': f"{actual_max:.2f}",
                'Outside_Bounds': outside_bounds,
                'Status': status
            })
        else:
            results.append({
                'Column': col_name,
                'Expected_Range': f"({min_val}, {max_val})",
                'Actual_Min': 'N/A',
                'Actual_Max': 'N/A',
                'Outside_Bounds': 0,
                'Status': '⚠️  ALL NULL'
            })
    else:
        results.append({
            'Column': col_name,
            'Expected_Range': f"({min_val}, {max_val})",
            'Actual_Min': 'N/A',
            'Actual_Max': 'N/A',
            'Outside_Bounds': 0,
            'Status': '❌ NOT FOUND'
        })

df_results = pd.DataFrame(results)
print("\nOutlier Bounds Validation:")
print(df_results.to_string(index=False))

# Summary
passed = len([r for r in results if r['Status'] == '✅ PASS'])
total = len(results)
print(f"\n{'='*80}")
print(f"Summary: {passed}/{total} columns passed outlier bounds check")
if passed == total:
    print("✅ ALL OUTLIER BOUNDS APPLIED SUCCESSFULLY!")
else:
    print("⚠️  Some columns still have outliers or are missing")

FIX #3: Outlier Bounds

Outlier Bounds Validation:
                 Column   Expected_Range Actual_Min Actual_Max  Outside_Bounds Status
            Num_of_Loan          (0, 20)       0.00      18.00               0 ✅ PASS
 Num_of_Delayed_Payment         (0, 100)       0.00      88.00               0 ✅ PASS
          Annual_Income     (0, 1000000)    7005.93  580744.00               0 ✅ PASS
Amount_invested_monthly       (0, 10000)       0.00   10000.00               0 ✅ PASS
        Monthly_Balance (-10000, 100000)       0.38    1463.79               0 ✅ PASS
       Outstanding_Debt       (0, 50000)       0.23    4998.07               0 ✅ PASS
    Delay_from_due_date         (0, 365)       0.00      67.00               0 ✅ PASS

Summary: 7/7 columns passed outlier bounds check
✅ ALL OUTLIER BOUNDS APPLIED SUCCESSFULLY!


## 5. Verify Fix #4: Clickstream Conditional Fill

**Expected**: 
- Jan 2023: Should have NULL values (no Dec 2022 data)
- Other months: Should have 0 for non-clickers (not NULL)

In [11]:
print("=" * 80)
print("FIX #4: Clickstream Conditional Fill")
print("=" * 80)

clickstream_cols = ['visit_frequency', 'recent_total_activity', 'avg_activity_intensity', 'total_active_features']

# Check if columns exist
existing_click_cols = [c for c in clickstream_cols if c in df.columns]

if existing_click_cols:
    # Convert snapshot_date to datetime
    df['snapshot_date'] = pd.to_datetime(df['snapshot_date'])
    
    # Group by month and check NULL percentage
    monthly_nulls = df.groupby(df['snapshot_date'].dt.to_period('M')).agg({
        existing_click_cols[0]: lambda x: x.isnull().sum() / len(x) * 100
    }).round(1)
    
    print(f"\nClickstream NULL % by Month ({existing_click_cols[0]}):")
    print(monthly_nulls.head(12))
    
    # Check Jan 2023
    jan_2023 = df[df['snapshot_date'].dt.to_period('M') == '2023-01']
    if len(jan_2023) > 0:
        jan_null_pct = jan_2023[existing_click_cols[0]].isnull().sum() / len(jan_2023) * 100
        print(f"\nJan 2023:")
        print(f"  NULL %: {jan_null_pct:.1f}%")
        if jan_null_pct == 100:
            print("  ✅ PASS: Jan 2023 has 100% NULL (expected - no Dec 2022 data)")
        else:
            print(f"  ⚠️  Expected 100% NULL, got {jan_null_pct:.1f}%")
    
    # Check Feb 2023 onwards
    feb_onwards = df[df['snapshot_date'] >= '2023-02-01']
    if len(feb_onwards) > 0:
        feb_null_pct = feb_onwards[existing_click_cols[0]].isnull().sum() / len(feb_onwards) * 100
        print(f"\nFeb 2023 onwards:")
        print(f"  NULL %: {feb_null_pct:.1f}%")
        if feb_null_pct < 5:
            print("  ✅ PASS: Very few NULLs (filled with 0 for non-clickers)")
        elif feb_null_pct > 50:
            print("  ❌ FAIL: Too many NULLs - conditional fill not applied!")
        else:
            print(f"  ⚠️  Some NULLs present ({feb_null_pct:.1f}%) - might be expected")
else:
    print("❌ FAIL: No clickstream columns found!")

FIX #4: Clickstream Conditional Fill

Clickstream NULL % by Month (visit_frequency):
               visit_frequency
snapshot_date                 
2023-01                  100.0
2023-02                    0.0
2023-03                    0.0
2023-04                    0.0
2023-05                    0.0
2023-06                    0.0
2023-07                    0.0
2023-08                    0.0
2023-09                    0.0
2023-10                    0.0
2023-11                    0.0
2023-12                    0.0

Jan 2023:
  NULL %: 100.0%
  ✅ PASS: Jan 2023 has 100% NULL (expected - no Dec 2022 data)

Feb 2023 onwards:
  NULL %: 0.0%
  ✅ PASS: Very few NULLs (filled with 0 for non-clickers)


## 6. Overall Data Quality Summary

In [12]:
print("=" * 80)
print("OVERALL DATA QUALITY SUMMARY")
print("=" * 80)

# Get numeric columns
numeric_cols = df.select_dtypes(include=[np.number]).columns.tolist()
exclude_cols = ['snapshot_date', 'last_activity_date']
numeric_cols = [c for c in numeric_cols if c not in exclude_cols]

print(f"\nTotal rows: {len(df):,}")
print(f"Total columns: {len(df.columns)}")
print(f"Numeric columns: {len(numeric_cols)}")

# Missing value summary
missing_summary = pd.DataFrame({
    'Column': numeric_cols,
    'Null_Count': [df[c].isnull().sum() for c in numeric_cols],
    'Null_%': [(df[c].isnull().sum() / len(df) * 100) for c in numeric_cols]
}).sort_values('Null_%', ascending=False)

print("\nTop 10 Columns with Missing Values:")
print(missing_summary.head(10).to_string(index=False))

# Check for extreme outliers still present
print("\nChecking for Absurd Values:")
absurd_found = False

absurd_checks = {
    'Monthly_Balance': (-1e10, 1e10),
    'Annual_Income': (0, 10000000),
    'Num_of_Loan': (0, 1000),
    'Num_of_Delayed_Payment': (0, 1000)
}

for col_name, (min_sane, max_sane) in absurd_checks.items():
    if col_name in df.columns:
        absurd = df[(df[col_name] < min_sane) | (df[col_name] > max_sane)]
        if len(absurd) > 0:
            absurd_found = True
            print(f"  ⚠️  {col_name}: {len(absurd)} absurd values")
            print(f"     Range: {df[col_name].min():.2f} to {df[col_name].max():.2f}")

if not absurd_found:
    print("  ✅ No absurd values found!")

print("\n" + "=" * 80)
print("VALIDATION COMPLETE")
print("=" * 80)

OVERALL DATA QUALITY SUMMARY

Total rows: 12,500
Total columns: 30
Numeric columns: 22

Top 10 Columns with Missing Values:
                Column  Null_Count  Null_%
                   Age         601   4.808
           Num_of_Loan         566   4.528
 total_active_features         530   4.240
 recent_total_activity         530   4.240
avg_activity_intensity         530   4.240
       visit_frequency         530   4.240
   Total_EMI_per_month         376   3.008
       Num_Credit_Card         296   2.368
         Interest_Rate         270   2.160
  Changed_Credit_Limit         254   2.032

Checking for Absurd Values:
  ✅ No absurd values found!

VALIDATION COMPLETE


## 7. Final Verdict

Review the results above to confirm:
- ✅ Fix #1: Num_Credit_Inquiries populated
- ✅ Fix #2: Age is integer dtype
- ✅ Fix #3: All 7 outlier bounds applied
- ✅ Fix #4: Clickstream conditional fill working

If all checks pass, your pipeline rerun was successful!