# GlobalSupply Corp - Module 3: Data Reconciliation & Validation

## üìä Executive Overview

**Mission**: Validate data integrity between SQL Server source and Databricks target systems with **99%+ accuracy** before production cutover.

**Context**: Following successful assessment (Module 1) and transpilation (Module 2), GlobalSupply Corp now requires comprehensive data validation to ensure business continuity and stakeholder confidence.

---

## üéØ Learning Objectives

By completing this notebook, you will:
1. Configure reconciliation connections and validation rules
2. Execute comprehensive data comparison workflows
3. Analyze discrepancies and generate executive reports
4. Establish ongoing monitoring for data drift detection

---

## üîß Environment Setup

First, let's ensure we have all required dependencies and can connect to our systems.

In [None]:
# Import required libraries
import sys
import os
from pathlib import Path
import pandas as pd
import yaml
import sqlite3
from datetime import datetime
import json

# Add project root to path for imports
project_root = Path.cwd().parent.parent
sys.path.append(str(project_root))

# Import our reconciliation analyzer
from workshop.reconciliation.reconciliation_analyzer import ReconciliationAnalyzer

print("‚úÖ Environment setup complete")
print(f"üìÅ Working directory: {Path.cwd()}")
print(f"üïê Session started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")

## ‚öôÔ∏è Configuration Overview

Review reconciliation configuration and understand validation scope.

In [None]:
# Load and display configuration
config_path = "config/reconciliation_config.yaml"

with open(config_path, 'r') as f:
    config = yaml.safe_load(f)

print("üìã Reconciliation Configuration Summary")
print("=" * 50)
print(f"Source Type: {config['source']['type']}")
print(f"Target Type: {config['target']['type']}")
print(f"Tables to Validate: {len(config['source']['tables'])}")
print(f"Row Count Tolerance: {config['validation']['row_count']['tolerance_percent']}%")
print(f"Data Sampling: {config['validation']['data_sampling']['sample_percent']}%")
print(f"Output Directory: {config['reporting']['output_directory']}")

print("\nüìä Tables in Scope:")
for table in config['source']['tables']:
    print(f"  ‚Ä¢ {table['name']} (PK: {table['primary_key']}, ~{table['row_count_threshold']:,} rows)")

## üóÑÔ∏è Mock Data Generation (Simulated Mode)

Generate realistic source data for reconciliation testing.

In [None]:
# Check if mock data exists, generate if needed
mock_db_path = Path("mock_data/source_data.db")

if not mock_db_path.exists():
    print("üîÑ Generating mock source data...")
    
    # Import and run the mock data generator
    sys.path.append(str(Path("mock_data")))
    from generate_mock_source import MockDataGenerator
    
    # Generate with workshop-appropriate scale
    generator = MockDataGenerator(str(mock_db_path), scale_factor=0.1)
    
    try:
        generator.create_database()
        generator.generate_customers()
        generator.generate_suppliers()
        generator.generate_orders()
        generator.create_indexes()
        stats = generator.generate_statistics()
        generator.close()
        
        print("‚úÖ Mock data generation complete!")
        print(f"üìä Generated {sum(stats.values()):,} total records")
        
    except Exception as e:
        print(f"‚ùå Error generating mock data: {e}")
        raise
else:
    print("‚úÖ Mock data already exists")
    
    # Display existing data statistics
    conn = sqlite3.connect(mock_db_path)
    cursor = conn.cursor()
    
    print("\nüìä Existing Data Statistics:")
    tables = ['customers', 'suppliers', 'orders', 'lineitem']
    for table in tables:
        cursor.execute(f"SELECT COUNT(*) FROM {table}")
        count = cursor.fetchone()[0]
        print(f"  ‚Ä¢ {table}: {count:,} records")
    
    conn.close()

## üîó Connection Testing

Verify connectivity to both source and target systems.

In [None]:
# Test source connection (SQLite)
print("üîç Testing Source Connection...")
try:
    conn = sqlite3.connect(mock_db_path)
    cursor = conn.cursor()
    cursor.execute("SELECT 1")
    result = cursor.fetchone()
    conn.close()
    print("‚úÖ Source connection successful")
except Exception as e:
    print(f"‚ùå Source connection failed: {e}")

# Test target connection (Databricks)
print("\nüîç Testing Target Connection...")
try:
    # Note: This will require actual Databricks credentials
    # For workshop purposes, we'll simulate this check
    databricks_configured = os.getenv('DATABRICKS_TOKEN') is not None
    
    if databricks_configured:
        print("‚úÖ Databricks credentials detected")
        print("‚ÑπÔ∏è  For workshop: Connection testing would verify catalog access")
    else:
        print("‚ö†Ô∏è  Databricks credentials not configured")
        print("‚ÑπÔ∏è  Workshop will demonstrate reconciliation concepts using simulated results")
        
except Exception as e:
    print(f"‚ùå Target connection test failed: {e}")

print("\nüéØ Ready for reconciliation analysis!")

## üìä Row Count Validation

Start with fundamental row count comparison across all tables.

In [None]:
# Initialize reconciliation analyzer
analyzer = ReconciliationAnalyzer(
    config_path=config_path,
    mode="simulated"
)

print("üî¢ Executing Row Count Validation...")
print("=" * 50)

# Simulate row count validation results
# In actual implementation, this would query both source and target
validation_results = {
    'customers': {'source': 15000, 'target': 15000, 'variance': 0.0, 'status': 'PASS'},
    'suppliers': {'source': 1500, 'target': 1500, 'variance': 0.0, 'status': 'PASS'}, 
    'orders': {'source': 150000, 'target': 149995, 'variance': 0.003, 'status': 'PASS'},
    'lineitem': {'source': 600000, 'target': 599980, 'variance': 0.003, 'status': 'PASS'}
}

# Display results in a formatted table
results_df = pd.DataFrame(validation_results).T
results_df['variance_pct'] = results_df['variance'] * 100

print("üìã Row Count Validation Results:")
print(results_df[['source', 'target', 'variance_pct', 'status']].to_string())

# Summary
passed = sum(1 for r in validation_results.values() if r['status'] == 'PASS')
total = len(validation_results)

print(f"\n‚úÖ Validation Summary: {passed}/{total} tables passed")
print(f"üéØ Overall Accuracy: {(passed/total)*100:.1f}%")

if passed == total:
    print("üèÜ All row counts within acceptable tolerance!")
else:
    print("‚ö†Ô∏è  Some tables require investigation")

## üîç Schema Validation

Compare schema structures between source and target systems.

In [None]:
print("üìê Executing Schema Validation...")
print("=" * 50)

# Get source schema information
conn = sqlite3.connect(mock_db_path)
cursor = conn.cursor()

schema_comparison = {}

for table in ['customers', 'suppliers', 'orders', 'lineitem']:
    # Get column information from SQLite
    cursor.execute(f"PRAGMA table_info({table})")
    columns = cursor.fetchall()
    
    source_schema = {
        col[1]: {  # column name
            'type': col[2],  # data type
            'not_null': bool(col[3]),  # not null
            'primary_key': bool(col[5])  # primary key
        } for col in columns
    }
    
    # Simulate target schema (would come from Databricks in real scenario)
    target_schema = source_schema.copy()  # Assume perfect match for demo
    
    # Compare schemas
    schema_issues = []
    
    # Check for missing columns
    missing_in_target = set(source_schema.keys()) - set(target_schema.keys())
    missing_in_source = set(target_schema.keys()) - set(source_schema.keys())
    
    if missing_in_target:
        schema_issues.append(f"Missing in target: {list(missing_in_target)}")
    if missing_in_source:
        schema_issues.append(f"Missing in source: {list(missing_in_source)}")
    
    # Check data type compatibility
    for col_name in set(source_schema.keys()) & set(target_schema.keys()):
        source_type = source_schema[col_name]['type']
        target_type = target_schema[col_name]['type']
        
        # Simplified type compatibility check
        if source_type != target_type:
            schema_issues.append(f"{col_name}: {source_type} vs {target_type}")
    
    schema_comparison[table] = {
        'source_columns': len(source_schema),
        'target_columns': len(target_schema),
        'issues': schema_issues,
        'status': 'PASS' if not schema_issues else 'REVIEW'
    }

conn.close()

# Display schema validation results
print("üìã Schema Validation Results:")
for table, result in schema_comparison.items():
    status_icon = "‚úÖ" if result['status'] == 'PASS' else "‚ö†Ô∏è"
    print(f"{status_icon} {table}: {result['source_columns']} columns, {result['status']}")
    
    if result['issues']:
        for issue in result['issues']:
            print(f"    ‚Ä¢ {issue}")

# Schema validation summary
schema_passed = sum(1 for r in schema_comparison.values() if r['status'] == 'PASS')
schema_total = len(schema_comparison)

print(f"\nüìä Schema Validation: {schema_passed}/{schema_total} tables have compatible schemas")

## üé≤ Data Sampling Validation

Perform detailed value-level comparison on data samples.

In [None]:
print("üé≤ Executing Data Sampling Validation...")
print("=" * 50)

# Sample data from customers table for demonstration
conn = sqlite3.connect(mock_db_path)

# Get a sample of customer data
sample_size = 1000
customers_sample = pd.read_sql_query(
    f"SELECT * FROM customers ORDER BY RANDOM() LIMIT {sample_size}",
    conn
)

print(f"üìä Analyzing {sample_size} customer records...")

# Simulate data quality checks
data_quality_results = {
    'total_records': len(customers_sample),
    'null_values': customers_sample.isnull().sum().sum(),
    'duplicate_keys': customers_sample['c_custkey'].duplicated().sum(),
    'invalid_phone_format': 0,  # Would implement actual validation
    'negative_balances': (customers_sample['c_acctbal'] < 0).sum(),
    'data_integrity_score': 99.8
}

print("\nüìã Data Quality Analysis:")
print(f"  ‚Ä¢ Total Records Sampled: {data_quality_results['total_records']:,}")
print(f"  ‚Ä¢ Null Values Found: {data_quality_results['null_values']}")
print(f"  ‚Ä¢ Duplicate Primary Keys: {data_quality_results['duplicate_keys']}")
print(f"  ‚Ä¢ Invalid Phone Formats: {data_quality_results['invalid_phone_format']}")
print(f"  ‚Ä¢ Negative Account Balances: {data_quality_results['negative_balances']}")
print(f"  ‚Ä¢ Overall Data Integrity: {data_quality_results['data_integrity_score']:.1f}%")

# Value distribution analysis
print("\nüìä Value Distribution Analysis:")
print(f"Market Segments:")
segment_dist = customers_sample['c_mktsegment'].value_counts()
for segment, count in segment_dist.items():
    percentage = (count / len(customers_sample)) * 100
    print(f"  ‚Ä¢ {segment}: {count} ({percentage:.1f}%)")

print(f"\nAccount Balance Statistics:")
balance_stats = customers_sample['c_acctbal'].describe()
print(f"  ‚Ä¢ Mean: ${balance_stats['mean']:.2f}")
print(f"  ‚Ä¢ Median: ${balance_stats['50%']:.2f}")
print(f"  ‚Ä¢ Min: ${balance_stats['min']:.2f}")
print(f"  ‚Ä¢ Max: ${balance_stats['max']:.2f}")

conn.close()

# Simulated comparison with target data
print("\nüéØ Source vs Target Comparison:")
comparison_metrics = {
    'exact_matches': 985,
    'value_differences': 12,
    'format_differences': 3,
    'missing_records': 0,
    'match_percentage': 98.5
}

for metric, value in comparison_metrics.items():
    if 'percentage' in metric:
        print(f"  ‚Ä¢ {metric.replace('_', ' ').title()}: {value:.1f}%")
    else:
        print(f"  ‚Ä¢ {metric.replace('_', ' ').title()}: {value}")

if comparison_metrics['match_percentage'] >= 99.0:
    print("\nüèÜ Data sampling validation PASSED!")
else:
    print("\n‚ö†Ô∏è  Data sampling requires investigation")

## üìà Aggregate Validation

Validate financial totals and business-critical aggregations.

In [None]:
print("üìà Executing Aggregate Validation...")
print("=" * 50)

conn = sqlite3.connect(mock_db_path)

# Key business aggregates to validate
aggregates = {
    'total_order_value': "SELECT SUM(o_totalprice) FROM orders",
    'total_customers': "SELECT COUNT(*) FROM customers",
    'avg_order_value': "SELECT AVG(o_totalprice) FROM orders",
    'max_account_balance': "SELECT MAX(c_acctbal) FROM customers",
    'total_line_items': "SELECT COUNT(*) FROM lineitem",
    'avg_line_quantity': "SELECT AVG(l_quantity) FROM lineitem"
}

source_aggregates = {}
for name, query in aggregates.items():
    cursor = conn.cursor()
    cursor.execute(query)
    result = cursor.fetchone()[0]
    source_aggregates[name] = result

conn.close()

# Simulate target aggregates (with minor variations for demo)
target_aggregates = {
    'total_order_value': source_aggregates['total_order_value'] * 0.9998,  # Tiny variance
    'total_customers': source_aggregates['total_customers'],
    'avg_order_value': source_aggregates['avg_order_value'] * 0.9998,
    'max_account_balance': source_aggregates['max_account_balance'],
    'total_line_items': source_aggregates['total_line_items'] - 20,  # Small difference
    'avg_line_quantity': source_aggregates['avg_line_quantity'] * 1.0001
}

print("üìä Business-Critical Aggregate Validation:")
print()

tolerance = 0.01  # 1% tolerance
all_passed = True

for metric in source_aggregates.keys():
    source_val = source_aggregates[metric]
    target_val = target_aggregates[metric]
    
    if source_val != 0:
        variance = abs((target_val - source_val) / source_val)
    else:
        variance = 0 if target_val == 0 else 1
    
    status = "PASS" if variance <= tolerance else "FAIL"
    if status == "FAIL":
        all_passed = False
    
    status_icon = "‚úÖ" if status == "PASS" else "‚ùå"
    
    # Format values appropriately
    if 'total_order_value' in metric or 'avg_order_value' in metric or 'balance' in metric:
        source_str = f"${source_val:,.2f}"
        target_str = f"${target_val:,.2f}"
    else:
        source_str = f"{source_val:,.2f}"
        target_str = f"{target_val:,.2f}"
    
    print(f"{status_icon} {metric.replace('_', ' ').title()}:")
    print(f"    Source: {source_str}")
    print(f"    Target: {target_str}")
    print(f"    Variance: {variance*100:.4f}% - {status}")
    print()

# Overall aggregate validation result
if all_passed:
    print("üèÜ All aggregate validations PASSED!")
    print("üí∞ Financial data integrity confirmed")
else:
    print("‚ö†Ô∏è  Some aggregates failed validation - requires investigation")

# Additional business rule validations
print("\nüîç Business Rule Validations:")
business_rules = {
    'Orders have valid customers': 'PASS',
    'Line items have valid orders': 'PASS', 
    'Line items have valid suppliers': 'PASS',
    'Order totals match line item sums': 'PASS',
    'Date consistency (commit <= ship <= receipt)': 'PASS'
}

for rule, status in business_rules.items():
    status_icon = "‚úÖ" if status == "PASS" else "‚ùå"
    print(f"{status_icon} {rule}: {status}")

print("\nüéØ Business rules validation completed!")

## üìã Executive Summary Report

Generate stakeholder-ready validation summary.

In [None]:
# Compile comprehensive validation results
validation_summary = {
    'execution_time': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
    'validation_scope': {
        'tables_validated': 4,
        'total_records_checked': sum(r['source'] for r in validation_results.values()),
        'sample_size_analyzed': 1000,
        'business_rules_tested': len(business_rules)
    },
    'accuracy_metrics': {
        'row_count_accuracy': 100.0,
        'schema_compatibility': 100.0,
        'data_sampling_accuracy': 98.5,
        'aggregate_validation_accuracy': 100.0,
        'business_rules_compliance': 100.0
    },
    'overall_confidence': 99.7
}

print("="*60)
print("üìä GLOBALSUPPLY CORP - DATA RECONCILIATION EXECUTIVE SUMMARY")
print("="*60)
print(f"üìÖ Validation Date: {validation_summary['execution_time']}")
print(f"üéØ Migration Phase: Module 3 - Data Reconciliation")
print(f"üë§ Validation Team: Data Engineering")
print()

print("üîç VALIDATION SCOPE:")
scope = validation_summary['validation_scope']
print(f"  ‚Ä¢ Tables Validated: {scope['tables_validated']}")
print(f"  ‚Ä¢ Total Records: {scope['total_records_checked']:,}")
print(f"  ‚Ä¢ Sample Analysis: {scope['sample_size_analyzed']:,} records")
print(f"  ‚Ä¢ Business Rules: {scope['business_rules_tested']} validated")
print()

print("üìà ACCURACY METRICS:")
metrics = validation_summary['accuracy_metrics']
for metric, accuracy in metrics.items():
    metric_name = metric.replace('_', ' ').title()
    status_icon = "‚úÖ" if accuracy >= 99.0 else "‚ö†Ô∏è" if accuracy >= 95.0 else "‚ùå"
    print(f"  {status_icon} {metric_name}: {accuracy:.1f}%")
print()

print("üèÜ OVERALL ASSESSMENT:")
confidence = validation_summary['overall_confidence']
if confidence >= 99.0:
    confidence_level = "EXCELLENT"
    recommendation = "APPROVED FOR PRODUCTION CUTOVER"
    risk_level = "LOW"
elif confidence >= 95.0:
    confidence_level = "GOOD"
    recommendation = "MINOR ISSUES TO RESOLVE"
    risk_level = "MEDIUM"
else:
    confidence_level = "REQUIRES ATTENTION"
    recommendation = "SIGNIFICANT VALIDATION NEEDED"
    risk_level = "HIGH"

print(f"  üéØ Overall Data Confidence: {confidence:.1f}% ({confidence_level})")
print(f"  üìã Recommendation: {recommendation}")
print(f"  ‚ö†Ô∏è  Risk Level: {risk_level}")
print()

print("üí° KEY FINDINGS:")
if confidence >= 99.0:
    print("  ‚úÖ All critical validation checks passed")
    print("  ‚úÖ Row counts match within tolerance")
    print("  ‚úÖ Financial aggregates validated successfully")
    print("  ‚úÖ Business rules compliance confirmed")
    print("  ‚úÖ Data quality meets production standards")
else:
    print("  ‚ö†Ô∏è  Minor data sampling variances detected")
    print("  ‚úÖ Critical financial data integrity confirmed")
    print("  ‚úÖ No blocking issues identified")

print()
print("üìÖ NEXT STEPS:")
if confidence >= 99.0:
    print("  1. ‚úÖ Validation complete - ready for production")
    print("  2. üìä Establish ongoing reconciliation monitoring")
    print("  3. üìã Document cutover procedures")
    print("  4. üë• Brief stakeholders on migration readiness")
else:
    print("  1. üîç Investigate data sampling discrepancies")
    print("  2. üîß Implement data quality improvements")
    print("  3. üîÑ Re-run validation after fixes")
    print("  4. üìã Update migration timeline as needed")

print()
print("="*60)
print(f"üöÄ GlobalSupply Corp is {'READY' if confidence >= 99.0 else 'PREPARING'} for Databricks production cutover!")
print("="*60)

## üìÅ Report Generation

Save validation results for stakeholder distribution.

In [None]:
# Create reports directory if it doesn't exist
reports_dir = Path("reports")
reports_dir.mkdir(exist_ok=True)

# Generate timestamp for report files
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')

# Save detailed validation results as JSON
detailed_results = {
    'metadata': {
        'generated_at': validation_summary['execution_time'],
        'generator': 'GlobalSupply Corp Reconciliation Analyzer',
        'version': '1.0.0',
        'migration_phase': 'Module 3 - Data Reconciliation'
    },
    'configuration': {
        'source_type': config['source']['type'],
        'target_type': config['target']['type'],
        'validation_mode': 'simulated',
        'tolerance_settings': config['validation']
    },
    'validation_results': {
        'row_count_validation': validation_results,
        'schema_validation': schema_comparison,
        'data_quality_metrics': data_quality_results,
        'aggregate_validation': {
            'source_aggregates': source_aggregates,
            'target_aggregates': target_aggregates,
            'business_rules': business_rules
        }
    },
    'summary': validation_summary
}

# Save JSON report
json_report_path = reports_dir / f"reconciliation_results_{timestamp}.json"
with open(json_report_path, 'w') as f:
    json.dump(detailed_results, f, indent=2, default=str)

# Create executive summary CSV
summary_data = {
    'Metric': list(validation_summary['accuracy_metrics'].keys()),
    'Accuracy_Percentage': list(validation_summary['accuracy_metrics'].values()),
    'Status': ['PASS' if acc >= 99.0 else 'REVIEW' for acc in validation_summary['accuracy_metrics'].values()]
}
summary_df = pd.DataFrame(summary_data)
csv_report_path = reports_dir / f"executive_summary_{timestamp}.csv"
summary_df.to_csv(csv_report_path, index=False)

print("üìÅ Reports Generated Successfully:")
print(f"  üìä Detailed Results: {json_report_path}")
print(f"  üìã Executive Summary: {csv_report_path}")
print(f"  üìÇ Reports Directory: {reports_dir.absolute()}")

print("\nüì§ Ready for Stakeholder Distribution:")
print("  ‚Ä¢ Email detailed JSON to technical teams")
print("  ‚Ä¢ Share CSV summary with business stakeholders")
print("  ‚Ä¢ Present executive summary in migration governance meetings")

print("\nüéØ Module 3 Reconciliation Analysis Complete!")
print(f"üèÜ Overall Data Confidence: {validation_summary['overall_confidence']:.1f}%")

---

## üéâ Congratulations!

You have successfully completed **Module 3: Data Reconciliation & Validation** for GlobalSupply Corp's SQL Server to Databricks migration.

### üèÜ What You Accomplished:

1. **‚úÖ Configured Comprehensive Reconciliation** - Set up validation rules and connection parameters
2. **‚úÖ Executed Multi-Level Validation** - Row counts, schema, data sampling, and aggregates
3. **‚úÖ Achieved 99%+ Data Confidence** - Met business requirements for production readiness
4. **‚úÖ Generated Executive Reports** - Created stakeholder-ready validation documentation
5. **‚úÖ Established Monitoring Foundation** - Ready for ongoing data drift detection

### üöÄ Migration Journey Progress:

- **Module 1**: ‚úÖ Assessment & Planning Complete
- **Module 2**: ‚úÖ SQL Transpilation Complete  
- **Module 3**: ‚úÖ Data Reconciliation Complete
- **Production Cutover**: üéØ **READY TO PROCEED**

### üí° Key Takeaways:

- **Data reconciliation is mission-critical** for migration success
- **Multi-layer validation** provides comprehensive confidence
- **Executive reporting** ensures stakeholder alignment
- **Automated reconciliation** scales for enterprise migrations

**GlobalSupply Corp is now ready for production cutover with confidence! üéØ**