# Chicago SMB Market Radar — Business Analysis

This notebook will be where we clean our data in preparation for analysis

## Objectives
- Load data
- Perform EDA on the data
- Clean up the data for missing values, mixed datatypes by column, etc. 

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import warnings
warnings.filterwarnings('ignore')

# Add paths for correct imports
import sys
sys.path.append('../../shared')  # For shared utilities
sys.path.append('../../step2_data_ingestion')  # For config and data access

# Import our utility functions
sys.path.append('../../shared')
from notebook_utils import *

# Import our custom modules
from sheets_client import open_sheet
from config_manager import load_settings
from schema import SchemaManager

print("✅ Imports successful")


✅ Imports successful


## Load Data

Load the datasets from Google Sheets or from saved pickle files if available.


In [3]:
# Schema-driven data loading and validation
def load_dataset_with_schema_validation(dataset_name: str, sh, worksheet_name: str):
    """Load dataset and validate against schema."""
    print(f"\n📊 Loading {dataset_name}...")

    # Get schema information
    schema = SchemaManager.get_schema(dataset_name)
    expected_fields = SchemaManager.get_field_names(dataset_name)
    date_fields = SchemaManager.get_date_fields(dataset_name)
    required_fields = SchemaManager.get_required_fields(dataset_name)

    print(f"   Schema: {len(expected_fields)} expected fields")
    print(f"   Date fields: {date_fields}")
    print(f"   Required fields: {len(required_fields)} required")

    # Load data from sheets
    df = load_sheet_data(sh, worksheet_name, parse_dates=date_fields)

    if df.empty:
        print(f"   ⚠️  WARNING: {dataset_name} dataset is empty!")
        return df

    # Validate schema compliance
    actual_fields = list(df.columns)
    missing_expected = set(expected_fields) - set(actual_fields)
    extra_fields = set(actual_fields) - set(expected_fields)
    missing_required = set(required_fields) - set(actual_fields)

    print(f"   📈 Loaded: {len(df)} rows, {len(actual_fields)} columns")

    # Report schema validation results
    if not missing_expected and not extra_fields:
        print(f"   ✅ Schema validation: PERFECT MATCH")
    else:
        if missing_expected:
            print(f"   ⚠️  Missing expected fields ({len(missing_expected)}): {list(missing_expected)[:5]}{'...' if len(missing_expected) > 5 else ''}")
        if extra_fields:
            print(f"   ℹ️  Extra fields not in schema ({len(extra_fields)}): {list(extra_fields)[:5]}{'...' if len(extra_fields) > 5 else ''}")

    if missing_required:
        print(f"   ❌ CRITICAL: Missing required fields: {missing_required}")
    else:
        print(f"   ✅ All required fields present")

    # Validate data quality for required fields
    for field in required_fields:
        if field in df.columns:
            null_count = df[field].isnull().sum()
            if null_count > 0:
                print(f"   ⚠️  Field '{field}' has {null_count} null values ({null_count/len(df)*100:.1f}%)")

    return df

# Define dataset configurations (schema-driven)
datasets_config = {
    'business_licenses': {
        'worksheet': 'Business_Licenses_Full',
        'pickle_name': 'licenses_df'
    },
    'building_permits': {
        'worksheet': 'Building_Permits_Full',
        'pickle_name': 'permits_df'
    },
    'cta_boardings': {
        'worksheet': 'CTA_Full',
        'pickle_name': 'cta_df'
    }
}

# Try to load from saved pickle files first (faster)
datasets = {}
load_from_sheets = False

print("🔍 Checking for cached data...")
for dataset_name, config in datasets_config.items():
    try:
        df = load_analysis_results(config['pickle_name'])
        if df.empty:
            raise FileNotFoundError(f"{config['pickle_name']} is empty")
        datasets[dataset_name] = df
        print(f"   ✅ {dataset_name}: {len(df)} rows from cache")
    except FileNotFoundError:
        print(f"   ❌ {dataset_name}: Cache miss")
        load_from_sheets = True

if load_from_sheets:
    print("\n📊 Loading fresh data from Google Sheets...")

    # Load configuration and connect to Google Sheets
    settings = load_settings()
    sh = open_sheet(settings.sheet_id, settings.google_creds_path)

    # Load and validate each dataset
    for dataset_name, config in datasets_config.items():
        df = load_dataset_with_schema_validation(
            dataset_name,
            sh,
            config['worksheet']
        )
        datasets[dataset_name] = df

        # Save for future use
        save_analysis_results(df, config['pickle_name'])

    print("\n✅ All data loaded and cached for future use")
else:
    print("\n✅ All data loaded from cache")

# Extract datasets for easier access
licenses_df = datasets['business_licenses']
permits_df = datasets['building_permits']
cta_df = datasets['cta_boardings']

# Display comprehensive data summaries
print("\n" + "="*50)
print("📊 DATASET SUMMARY REPORT")
print("="*50)

for dataset_name, df in datasets.items():
    print(f"\n📈 {dataset_name.upper().replace('_', ' ')}")
    print("-" * 30)

    if df.empty:
        print("   ⚠️  Dataset is empty")
        continue

    # Basic stats
    print(f"   Rows: {len(df):,}")
    print(f"   Columns: {len(df.columns)}")

    # Schema compliance
    schema = SchemaManager.get_schema(dataset_name)
    expected_fields = SchemaManager.get_field_names(dataset_name)
    coverage = len(set(df.columns) & set(expected_fields)) / len(expected_fields) * 100
    print(f"   Schema coverage: {coverage:.1f}%")

    # Data freshness (for datasets with date fields)
    date_fields = SchemaManager.get_date_fields(dataset_name)
    if date_fields and date_fields[0] in df.columns:
        main_date_field = date_fields[0]
        if not df[main_date_field].empty:
            latest_date = pd.to_datetime(df[main_date_field]).max()
            oldest_date = pd.to_datetime(df[main_date_field]).min()
            days_span = (latest_date - oldest_date).days
            print(f"   Date range: {oldest_date.strftime('%Y-%m-%d')} to {latest_date.strftime('%Y-%m-%d')} ({days_span} days)")

    # Memory usage
    memory_mb = df.memory_usage(deep=True).sum() / 1024 / 1024
    print(f"   Memory usage: {memory_mb:.1f} MB")

print(f"\n🎯 TOTAL RECORDS: {sum(len(df) for df in datasets.values()):,}")
print(f"🎯 TOTAL MEMORY: {sum(df.memory_usage(deep=True).sum() for df in datasets.values()) / 1024 / 1024:.1f} MB")


🔍 Checking for cached data...
✅ Loaded analysis results from ../data/processed/licenses_df.pkl
   ✅ business_licenses: 2040 rows from cache
✅ Loaded analysis results from ../data/processed/permits_df.pkl
   ✅ building_permits: 8647 rows from cache
✅ Loaded analysis results from ../data/processed/cta_df.pkl
   ✅ cta_boardings: 668 rows from cache

✅ All data loaded from cache

📊 DATASET SUMMARY REPORT

📈 BUSINESS LICENSES
------------------------------
   Rows: 2,040
   Columns: 39
   Schema coverage: 100.0%
   Date range: 2020-12-03 to 2025-08-29 (1730 days)
   Memory usage: 3.1 MB

📈 BUILDING PERMITS
------------------------------
   Rows: 8,647
   Columns: 31
   Schema coverage: 100.0%
   Date range: 2015-03-12 to 2025-08-31 (3825 days)
   Memory usage: 9.5 MB

📈 CTA BOARDINGS
------------------------------
   Rows: 668
   Columns: 2
   Schema coverage: 100.0%
   Date range: 2023-09-02 to 2025-06-30 (667 days)
   Memory usage: 0.0 MB

🎯 TOTAL RECORDS: 11,355
🎯 TOTAL MEMORY: 12.6 MB


In [4]:
# Detailed field analysis and data validation
def analyze_dataset_fields(dataset_name: str, df: pd.DataFrame):
    """Perform detailed field analysis for a dataset."""
    print(f"\n🔍 DETAILED FIELD ANALYSIS: {dataset_name.upper().replace('_', ' ')}")
    print("=" * 60)

    if df.empty:
        print("   ⚠️  Dataset is empty - skipping analysis")
        return

    # Get schema information
    schema = SchemaManager.get_schema(dataset_name)
    expected_fields = SchemaManager.get_field_names(dataset_name)
    date_fields = SchemaManager.get_date_fields(dataset_name)
    required_fields = SchemaManager.get_required_fields(dataset_name)
    business_fields = SchemaManager.get_business_fields(dataset_name)
    geographic_fields = SchemaManager.get_geographic_fields(dataset_name)

    print(f"📊 Schema-defined field categories:")
    print(f"   • Total expected: {len(expected_fields)}")
    print(f"   • Date fields: {len(date_fields)}")
    print(f"   • Required fields: {len(required_fields)}")
    print(f"   • Business fields: {len(business_fields)}")
    print(f"   • Geographic fields: {len(geographic_fields)}")

    # Analyze field compliance
    actual_fields = set(df.columns)
    expected_set = set(expected_fields)

    present_fields = actual_fields & expected_set
    missing_fields = expected_set - actual_fields
    extra_fields = actual_fields - expected_set

    print(f"\n📈 Field Coverage Analysis:")
    print(f"   • Present: {len(present_fields)}/{len(expected_fields)} ({len(present_fields)/len(expected_fields)*100:.1f}%)")
    print(f"   • Missing: {len(missing_fields)}")
    print(f"   • Extra: {len(extra_fields)}")

    if missing_fields:
        print(f"\n❌ Missing fields: {sorted(list(missing_fields))}")

    if extra_fields:
        print(f"\n➕ Extra fields (not in schema): {sorted(list(extra_fields))[:10]}{'...' if len(extra_fields) > 10 else ''}")

    # Analyze data quality by field category
    print(f"\n🎯 Data Quality by Field Category:")

    # Required fields analysis
    print(f"\n   📋 Required Fields ({len(required_fields)}):")
    for field in sorted(required_fields):
        if field in df.columns:
            null_count = df[field].isnull().sum()
            null_pct = null_count / len(df) * 100
            unique_count = df[field].nunique()
            print(f"      ✅ {field}: {null_count} nulls ({null_pct:.1f}%), {unique_count} unique values")
        else:
            print(f"      ❌ {field}: MISSING FROM DATASET")

    # Date fields analysis
    if date_fields:
        print(f"\n   📅 Date Fields ({len(date_fields)}):")
        for field in sorted(date_fields):
            if field in df.columns:
                try:
                    date_series = pd.to_datetime(df[field], errors='coerce')
                    null_count = date_series.isnull().sum()
                    if not date_series.dropna().empty:
                        min_date = date_series.min()
                        max_date = date_series.max()
                        print(f"      ✅ {field}: {min_date.strftime('%Y-%m-%d')} to {max_date.strftime('%Y-%m-%d')} ({null_count} nulls)")
                    else:
                        print(f"      ⚠️  {field}: All values are null/invalid")
                except Exception as e:
                    print(f"      ❌ {field}: Error parsing dates - {str(e)[:50]}")
            else:
                print(f"      ❌ {field}: MISSING FROM DATASET")

    # Geographic fields analysis
    if geographic_fields:
        print(f"\n   🌍 Geographic Fields ({len(geographic_fields)}):")
        for field in sorted(geographic_fields):
            if field in df.columns:
                null_count = df[field].isnull().sum()
                null_pct = null_count / len(df) * 100
                if field in ['latitude', 'longitude']:
                    try:
                        numeric_vals = pd.to_numeric(df[field], errors='coerce')
                        valid_count = numeric_vals.notna().sum()
                        if valid_count > 0:
                            min_val = numeric_vals.min()
                            max_val = numeric_vals.max()
                            print(f"      ✅ {field}: Range {min_val:.4f} to {max_val:.4f} ({valid_count} valid, {null_count} nulls)")
                        else:
                            print(f"      ⚠️  {field}: No valid numeric values")
                    except:
                        print(f"      ❌ {field}: Error parsing numeric values")
                else:
                    unique_count = df[field].nunique()
                    print(f"      ✅ {field}: {unique_count} unique values ({null_pct:.1f}% nulls)")
            else:
                print(f"      ❌ {field}: MISSING FROM DATASET")

# Run detailed analysis for each dataset
for dataset_name, df in datasets.items():
    analyze_dataset_fields(dataset_name, df)



🔍 DETAILED FIELD ANALYSIS: BUSINESS LICENSES
📊 Schema-defined field categories:
   • Total expected: 39
   • Date fields: 9
   • Required fields: 12
   • Business fields: 11
   • Geographic fields: 8

📈 Field Coverage Analysis:
   • Present: 39/39 (100.0%)
   • Missing: 0
   • Extra: 0

🎯 Data Quality by Field Category:

   📋 Required Fields (12):
      ✅ address: 0 nulls (0.0%), 1678 unique values
      ✅ application_type: 0 nulls (0.0%), 1 unique values
      ✅ city: 0 nulls (0.0%), 167 unique values
      ✅ community_area: 0 nulls (0.0%), 76 unique values
      ✅ community_area_name: 0 nulls (0.0%), 76 unique values
      ✅ id: 0 nulls (0.0%), 2040 unique values
      ✅ legal_name: 0 nulls (0.0%), 1722 unique values
      ✅ license_description: 0 nulls (0.0%), 37 unique values
      ✅ license_id: 0 nulls (0.0%), 2040 unique values
      ✅ license_start_date: 0 nulls (0.0%), 68 unique values
      ✅ license_status: 0 nulls (0.0%), 2 unique values
      ✅ state: 0 nulls (0.0%), 24 un

In [5]:
# Create validation summary table to verify data pipeline integrity
def create_validation_report():
    """Create a comprehensive validation report."""
    print("\n" + "="*80)
    print("🔍 DATA PIPELINE VALIDATION REPORT")
    print("="*80)

    validation_data = []

    for dataset_name, df in datasets.items():
        # Get schema info
        schema = SchemaManager.get_schema(dataset_name)
        expected_fields = SchemaManager.get_field_names(dataset_name)
        date_fields = SchemaManager.get_date_fields(dataset_name)
        required_fields = SchemaManager.get_required_fields(dataset_name)

        if df.empty:
            validation_data.append({
                'Dataset': dataset_name,
                'Status': '❌ EMPTY',
                'Rows': 0,
                'Columns': 0,
                'Schema Match': '0%',
                'Required Fields': '0%',
                'Date Range': 'N/A',
                'Issues': 'Dataset is empty'
            })
            continue

        # Calculate metrics
        actual_fields = set(df.columns)
        expected_set = set(expected_fields)
        present_fields = actual_fields & expected_set
        schema_match_pct = len(present_fields) / len(expected_fields) * 100

        required_present = set(required_fields) & actual_fields
        required_match_pct = len(required_present) / len(required_fields) * 100 if required_fields else 100

        # Check date range
        date_range = "N/A"
        if date_fields and date_fields[0] in df.columns:
            try:
                dates = pd.to_datetime(df[date_fields[0]], errors='coerce').dropna()
                if not dates.empty:
                    date_range = f"{dates.min().strftime('%Y-%m-%d')} to {dates.max().strftime('%Y-%m-%d')}"
            except:
                date_range = "Invalid dates"

        # Identify issues
        issues = []
        if schema_match_pct < 100:
            issues.append(f"{len(expected_set - actual_fields)} missing fields")
        if required_match_pct < 100:
            issues.append(f"{len(set(required_fields) - actual_fields)} missing required")

        # Check for null values in required fields
        null_issues = []
        for field in required_fields:
            if field in df.columns:
                null_count = df[field].isnull().sum()
                if null_count > 0:
                    null_issues.append(f"{field}({null_count})")
        if null_issues:
            issues.append(f"Nulls in required: {', '.join(null_issues[:3])}")

        # Overall status
        if schema_match_pct == 100 and required_match_pct == 100 and not null_issues:
            status = "✅ EXCELLENT"
        elif schema_match_pct >= 90 and required_match_pct == 100:
            status = "✅ GOOD"
        elif schema_match_pct >= 70 and required_match_pct >= 90:
            status = "⚠️ FAIR"
        else:
            status = "❌ POOR"

        validation_data.append({
            'Dataset': dataset_name.replace('_', ' ').title(),
            'Status': status,
            'Rows': f"{len(df):,}",
            'Columns': len(df.columns),
            'Schema Match': f"{schema_match_pct:.1f}%",
            'Required Fields': f"{required_match_pct:.1f}%",
            'Date Range': date_range,
            'Issues': '; '.join(issues) if issues else 'None'
        })

    # Create summary DataFrame
    validation_df = pd.DataFrame(validation_data)

    # Display formatted table
    print("\n📊 VALIDATION SUMMARY TABLE:")
    print("-" * 120)

    # Print headers
    headers = ['Dataset', 'Status', 'Rows', 'Cols', 'Schema', 'Required', 'Date Range', 'Issues']
    col_widths = [15, 12, 10, 5, 8, 8, 22, 40]

    header_row = ""
    for header, width in zip(headers, col_widths):
        header_row += f"{header:<{width}}"
    print(header_row)
    print("-" * 120)

    # Print data rows
    for _, row in validation_df.iterrows():
        data_row = ""
        for (col, width) in zip(['Dataset', 'Status', 'Rows', 'Columns', 'Schema Match', 'Required Fields', 'Date Range', 'Issues'], col_widths):
            value = str(row[col])
            if len(value) > width-1:
                value = value[:width-4] + "..."
            data_row += f"{value:<{width}}"
        print(data_row)

    print("-" * 120)

    # Summary statistics
    total_rows = sum(int(row['Rows'].replace(',', '')) for _, row in validation_df.iterrows() if row['Rows'] != '0')
    excellent_count = sum(1 for _, row in validation_df.iterrows() if row['Status'].startswith('✅ EXCELLENT'))
    good_count = sum(1 for _, row in validation_df.iterrows() if row['Status'].startswith('✅ GOOD'))

    print(f"\n📈 PIPELINE SUMMARY:")
    print(f"   • Total records processed: {total_rows:,}")
    print(f"   • Datasets with excellent quality: {excellent_count}/{len(validation_df)}")
    print(f"   • Datasets with good+ quality: {excellent_count + good_count}/{len(validation_df)}")

    # Schema compliance report
    print(f"\n🎯 SCHEMA COMPLIANCE:")
    for dataset_name in datasets.keys():
        schema = SchemaManager.get_schema(dataset_name)
        expected_count = len(SchemaManager.get_field_names(dataset_name))
        actual_count = len(datasets[dataset_name].columns) if not datasets[dataset_name].empty else 0
        print(f"   • {dataset_name}: {actual_count}/{expected_count} fields")

    return validation_df

# Generate the validation report
validation_summary = create_validation_report()



🔍 DATA PIPELINE VALIDATION REPORT

📊 VALIDATION SUMMARY TABLE:
------------------------------------------------------------------------------------------------------------------------
Dataset        Status      Rows      Cols Schema  RequiredDate Range            Issues                                  
------------------------------------------------------------------------------------------------------------------------
Business Li... ✅ EXCELLENT 2,040     39   100.0%  100.0%  2020-12-03 to 2025... None                                    
Building Pe... ✅ EXCELLENT 8,647     31   100.0%  100.0%  2015-03-12 to 2025... None                                    
Cta Boardings  ✅ EXCELLENT 668       2    100.0%  100.0%  2023-09-02 to 2025... None                                    
------------------------------------------------------------------------------------------------------------------------

📈 PIPELINE SUMMARY:
   • Total records processed: 11,355
   • Datasets with excellent qu

## Data Cleaning 

### 1. Data Quality Heatmap & Overview

In [6]:
# Enhanced Data Quality Matrix with Mixed Type Detection
def create_enhanced_data_quality_matrix():
    """Create comprehensive data quality assessment matrix with mixed type detection."""
    print("ENHANCED DATA QUALITY MATRIX")
    print("=" * 60)

    quality_data = []
    contamination_issues = []

    for dataset_name, df in datasets.items():
        print(f"\n{dataset_name.upper().replace('_', ' ')}")
        print("-" * 40)

        if df.empty:
            print("   WARNING: Dataset is empty")
            continue

        schema_fields = SchemaManager.get_field_names(dataset_name)
        required_fields = SchemaManager.get_required_fields(dataset_name)
        date_fields = SchemaManager.get_date_fields(dataset_name)

        for field in schema_fields:
            if field in df.columns:
                # Get non-null values for analysis
                non_null_values = df[field].dropna()

                if len(non_null_values) == 0:
                    print(f"   NULL FIELD {field:<25} 0.000 (ALL NULL)")
                    continue

                # Convert to string for pattern analysis
                str_values = non_null_values.astype(str)

                # Basic quality metrics
                null_pct = df[field].isnull().sum() / len(df)

                # MIXED TYPE DETECTION
                contamination_score = 1.0
                contamination_details = []

                # 1. Detect numeric contamination in supposedly numeric fields
                if field in ['community_area', 'ward', 'precinct', 'zip_code', 'latitude', 'longitude']:
                    # Check for non-numeric characters in numeric fields
                    numeric_pattern = r'^-?\d+\.?\d*$'
                    pure_numeric = str_values.str.match(numeric_pattern, na=False)
                    contaminated_count = (~pure_numeric).sum()

                    if contaminated_count > 0:
                        contamination_pct = (contaminated_count / len(str_values)) * 100
                        contamination_score *= (1 - contamination_pct / 100)
                        contamination_details.append(f"Non-numeric contamination: {contaminated_count} records ({contamination_pct:.1f}%)")

                        # Show examples of contaminated values
                        contaminated_examples = str_values[~pure_numeric].head(3).tolist()
                        contamination_details.append(f"Examples: {contaminated_examples}")

                # 2. Detect leading/trailing special characters
                special_char_issues = 0
                if df[field].dtype == 'object':
                    # Check for leading quotes, spaces, or other unwanted characters
                    leading_issues = str_values.str.match(r'^[\'\"\s`]+', na=False).sum()
                    trailing_issues = str_values.str.match(r'.*[\'\"\s`]+$', na=False).sum()

                    if leading_issues > 0:
                        special_char_issues += leading_issues
                        contamination_details.append(f"Leading special chars: {leading_issues} records")

                    if trailing_issues > 0:
                        special_char_issues += trailing_issues
                        contamination_details.append(f"Trailing special chars: {trailing_issues} records")

                    if special_char_issues > 0:
                        special_char_pct = (special_char_issues / len(str_values)) * 100
                        contamination_score *= (1 - special_char_pct / 200)  # Less penalty than full contamination

                # 3. Detect mixed case inconsistency in categorical fields
                case_inconsistency = 0
                if field in ['city', 'state', 'license_status', 'permit_status', 'license_description']:
                    unique_values = str_values.unique()
                    unique_upper = str_values.str.upper().unique()

                    if len(unique_upper) < len(unique_values):
                        case_inconsistency = len(unique_values) - len(unique_upper)
                        case_pct = (case_inconsistency / len(unique_values)) * 100
                        contamination_score *= (1 - case_pct / 300)  # Smaller penalty
                        contamination_details.append(f"Case inconsistency: {case_inconsistency} variations")

                # 4. Detect encoding issues (non-ASCII characters where not expected)
                encoding_issues = 0
                if df[field].dtype == 'object':
                    try:
                        non_ascii = str_values.str.encode('ascii', errors='ignore').str.decode('ascii')
                        encoding_issues = (str_values != non_ascii).sum()

                        if encoding_issues > 0:
                            encoding_pct = (encoding_issues / len(str_values)) * 100
                            contamination_score *= (1 - encoding_pct / 400)  # Small penalty
                            contamination_details.append(f"Encoding issues: {encoding_issues} records")
                    except:
                        pass

                # 5. Detect length inconsistencies in ID fields
                length_inconsistency = 0
                if 'id' in field.lower() or field in ['license_number', 'permit_']:
                    lengths = str_values.str.len()
                    length_std = lengths.std()

                    if length_std > 2:  # High variation in ID lengths
                        length_inconsistency = 1
                        contamination_score *= 0.9
                        contamination_details.append(f"Length variation: std={length_std:.1f}")

                # Type consistency check
                type_consistency = 1.0
                if field in date_fields:
                    try:
                        parsed_dates = pd.to_datetime(df[field], errors='coerce')
                        invalid_dates = parsed_dates.isna().sum() - df[field].isna().sum()
                        if invalid_dates > 0:
                            type_consistency = 1 - (invalid_dates / len(df))
                    except:
                        type_consistency = 0.5

                # Duplicate assessment
                duplicate_factor = 0
                if df[field].dtype == 'object' and len(df) > 1:
                    duplicate_pct = df[field].duplicated().sum() / len(df)
                    duplicate_factor = min(duplicate_pct * 0.1, 0.1)

                # Business logic compliance (enhanced)
                business_compliance = 1.0
                if field in ['latitude', 'longitude'] and dataset_name == 'business_licenses':
                    try:
                        numeric_vals = pd.to_numeric(df[field], errors='coerce')
                        valid_vals = numeric_vals.dropna()

                        if len(valid_vals) > 0:
                            if field == 'latitude':
                                out_of_bounds = ((valid_vals < 41.6) | (valid_vals > 42.1)).sum()
                            else:
                                out_of_bounds = ((valid_vals < -87.9) | (valid_vals > -87.5)).sum()
                            business_compliance = 1 - (out_of_bounds / len(df))
                    except:
                        business_compliance = 0.5

                # Enhanced composite quality score
                quality_score = (
                    (1 - null_pct) * 0.25 +          # Completeness (25%)
                    type_consistency * 0.25 +         # Type consistency (25%)
                    contamination_score * 0.30 +      # Contamination/purity (30%)
                    (1 - duplicate_factor) * 0.10 +   # Uniqueness (10%)
                    business_compliance * 0.10        # Business logic (10%)
                )

                # Quality assessment with contamination consideration
                if quality_score >= 0.95:
                    status = "EXCELLENT"
                elif quality_score >= 0.85:
                    status = "GOOD"
                elif quality_score >= 0.70:
                    status = "FAIR"
                elif quality_score >= 0.50:
                    status = "POOR"
                else:
                    status = "CRITICAL"

                importance = "CRITICAL" if field in required_fields else "OPTIONAL"

                # Display results
                contamination_indicator = ""
                if contamination_score < 0.95:
                    contamination_indicator = " [CONTAMINATED]"
                elif contamination_score < 0.99:
                    contamination_indicator = " [MINOR_ISSUES]"

                print(f"   {status:<10} {field:<25} {quality_score:.3f} {importance}{contamination_indicator}")

                # Show contamination details
                if contamination_details:
                    for detail in contamination_details[:2]:  # Show first 2 issues
                        print(f"      -> {detail}")

                # Store contamination issues for summary
                if contamination_score < 0.95:
                    contamination_issues.append({
                        'dataset': dataset_name,
                        'field': field,
                        'contamination_score': contamination_score,
                        'issues': contamination_details
                    })

                # Add to summary data
                quality_data.append({
                    'dataset': dataset_name,
                    'field': field,
                    'quality_score': quality_score,
                    'contamination_score': contamination_score,
                    'null_pct': null_pct * 100,
                    'is_required': field in required_fields,
                    'status': status,
                    'contamination_issues': len(contamination_details)
                })
            else:
                print(f"   MISSING    {field:<25} 0.000 CRITICAL")
                quality_data.append({
                    'dataset': dataset_name,
                    'field': field,
                    'quality_score': 0.0,
                    'contamination_score': 0.0,
                    'null_pct': 100.0,
                    'is_required': field in required_fields,
                    'status': 'MISSING',
                    'contamination_issues': 1
                })

    # Create summary DataFrame
    quality_df = pd.DataFrame(quality_data)

    print(f"\nENHANCED QUALITY SUMMARY")
    print("-" * 35)

    if not quality_df.empty:
        overall_score = quality_df['quality_score'].mean()
        contamination_avg = quality_df['contamination_score'].mean()
        critical_issues = len(quality_df[(quality_df['is_required']) & (quality_df['quality_score'] < 0.8)])
        contaminated_fields = len(quality_df[quality_df['contamination_score'] < 0.95])

        print(f"   Overall Quality Score: {overall_score:.3f}")
        print(f"   Data Purity Score: {contamination_avg:.3f}")
        print(f"   Critical Issues: {critical_issues}")
        print(f"   Contaminated Fields: {contaminated_fields}")
        print(f"   Fields Needing Attention: {len(quality_df[quality_df['quality_score'] < 0.85])}")

    # Contamination summary
    if contamination_issues:
        print(f"\nCONTAMINATION DETAILS")
        print("-" * 25)
        for issue in contamination_issues[:5]:  # Show top 5 contamination issues
            print(f"   {issue['dataset']}.{issue['field']}: {issue['contamination_score']:.3f}")
            for detail in issue['issues'][:1]:  # Show first issue
                print(f"      {detail}")

    return quality_df, contamination_issues

# Run enhanced quality assessment
quality_matrix, contamination_report = create_enhanced_data_quality_matrix()

ENHANCED DATA QUALITY MATRIX

BUSINESS LICENSES
----------------------------------------
   EXCELLENT  id                        1.000 CRITICAL
   EXCELLENT  license_id                1.000 CRITICAL
   EXCELLENT  account_number            1.000 OPTIONAL
   EXCELLENT  site_number               1.000 OPTIONAL
   EXCELLENT  legal_name                0.998 CRITICAL
   EXCELLENT  doing_business_as_name    0.999 OPTIONAL
   EXCELLENT  license_code              1.000 OPTIONAL
   EXCELLENT  license_number            1.000 OPTIONAL
   EXCELLENT  license_description       0.990 CRITICAL
   EXCELLENT  business_activity_id      0.962 OPTIONAL [CONTAMINATED]
      -> Length variation: std=5.4
   EXCELLENT  business_activity         0.992 OPTIONAL
   EXCELLENT  address                   0.998 CRITICAL
   EXCELLENT  city                      0.991 CRITICAL
   EXCELLENT  state                     0.990 CRITICAL
   EXCELLENT  zip_code                  0.991 OPTIONAL
      -> Non-numeric contamination: 

### 2. Automated Anamoly Detection

In [7]:
# Dedicated Data Contamination Analyzer
def analyze_data_contamination():
    """Deep analysis of data contamination and mixed types."""
    print("DATA CONTAMINATION ANALYSIS")
    print("=" * 45)

    contamination_report = {}

    for dataset_name, df in datasets.items():
        print(f"\n{dataset_name.upper().replace('_', ' ')} CONTAMINATION SCAN")
        print("-" * 50)

        if df.empty:
            continue

        dataset_contamination = {
            'mixed_types': [],
            'special_chars': [],
            'encoding_issues': [],
            'format_inconsistencies': [],
            'recommendations': []
        }

        for field in df.columns:
            non_null = df[field].dropna()
            if len(non_null) == 0:
                continue

            str_values = non_null.astype(str)

            # 1. MIXED TYPE DETECTION
            if field in ['community_area', 'ward', 'precinct', 'zip_code']:
                print(f"\n   Analyzing {field} (expected: numeric)")

                # Pattern analysis
                pure_numeric = str_values.str.match(r'^\d+$', na=False)
                has_quotes = str_values.str.contains(r"['\"]", na=False)
                has_spaces = str_values.str.contains(r'^\s|\s$', na=False)
                has_letters = str_values.str.contains(r'[a-zA-Z]', na=False)
                has_special = str_values.str.contains(r'[^0-9\.\-]', na=False)

                contamination_found = False

                if has_quotes.sum() > 0:
                    contamination_found = True
                    quote_examples = str_values[has_quotes].head(3).tolist()
                    print(f"      QUOTES: {has_quotes.sum()} records with quotes - {quote_examples}")
                    dataset_contamination['special_chars'].append({
                        'field': field, 'type': 'quotes', 'count': has_quotes.sum(), 'examples': quote_examples
                    })

                if has_spaces.sum() > 0:
                    contamination_found = True
                    space_examples = str_values[has_spaces].head(3).tolist()
                    print(f"      SPACES: {has_spaces.sum()} records with leading/trailing spaces - {space_examples}")
                    dataset_contamination['special_chars'].append({
                        'field': field, 'type': 'spaces', 'count': has_spaces.sum(), 'examples': space_examples
                    })

                if has_letters.sum() > 0:
                    contamination_found = True
                    letter_examples = str_values[has_letters].head(3).tolist()
                    print(f"      LETTERS: {has_letters.sum()} records with letters - {letter_examples}")
                    dataset_contamination['mixed_types'].append({
                        'field': field, 'type': 'letters_in_numeric', 'count': has_letters.sum(), 'examples': letter_examples
                    })

                if not contamination_found:
                    print(f"      CLEAN: All {len(str_values)} values are properly numeric")

            # 2. TEXT FIELD CONTAMINATION
            elif df[field].dtype == 'object':
                print(f"\n   Analyzing {field} (text field)")

                # Check for embedded numbers where inappropriate
                if field in ['legal_name', 'doing_business_as_name', 'address']:
                    mostly_numeric = str_values.str.match(r'^\d+$', na=False)
                    if mostly_numeric.sum() > len(str_values) * 0.1:  # More than 10% pure numbers
                        numeric_examples = str_values[mostly_numeric].head(3).tolist()
                        print(f"      NUMERIC CONTAMINATION: {mostly_numeric.sum()} purely numeric values in text field")
                        dataset_contamination['mixed_types'].append({
                            'field': field, 'type': 'numeric_in_text', 'count': mostly_numeric.sum(), 'examples': numeric_examples
                        })

                # Check for control characters or unusual encoding
                control_chars = str_values.str.contains(r'[\x00-\x1f\x7f-\x9f]', na=False)
                if control_chars.sum() > 0:
                    control_examples = str_values[control_chars].head(2).tolist()
                    print(f"      CONTROL CHARS: {control_chars.sum()} records with control characters")
                    dataset_contamination['encoding_issues'].append({
                        'field': field, 'type': 'control_chars', 'count': control_chars.sum(), 'examples': control_examples
                    })

                # Check for mixed encoding (non-ASCII where not expected)
                try:
                    ascii_only = str_values.str.encode('ascii', errors='ignore').str.decode('ascii')
                    non_ascii = str_values != ascii_only
                    if non_ascii.sum() > 0 and field not in ['legal_name', 'doing_business_as_name']:  # Names can have accents
                        non_ascii_examples = str_values[non_ascii].head(2).tolist()
                        print(f"      NON-ASCII: {non_ascii.sum()} records with non-ASCII characters")
                        dataset_contamination['encoding_issues'].append({
                            'field': field, 'type': 'non_ascii', 'count': non_ascii.sum(), 'examples': non_ascii_examples
                        })
                except:
                    pass

            # 3. DATE FIELD CONTAMINATION
            elif field in SchemaManager.get_date_fields(dataset_name):
                print(f"\n   Analyzing {field} (date field)")

                # Check for multiple date formats
                formats = {
                    'iso_date': str_values.str.match(r'^\d{4}-\d{2}-\d{2}$', na=False),
                    'us_date': str_values.str.match(r'^\d{1,2}/\d{1,2}/\d{4}$', na=False),
                    'datetime': str_values.str.contains(r'\d{4}-\d{2}-\d{2}T\d{2}:\d{2}', na=False),
                    'timestamp': str_values.str.match(r'^\d{4}-\d{2}-\d{2} \d{2}:\d{2}', na=False)
                }

                format_counts = {fmt: mask.sum() for fmt, mask in formats.items()}
                active_formats = {fmt: count for fmt, count in format_counts.items() if count > 0}

                if len(active_formats) > 1:
                    print(f"      FORMAT MIX: Multiple date formats detected - {active_formats}")
                    dataset_contamination['format_inconsistencies'].append({
                        'field': field, 'type': 'mixed_date_formats', 'formats': active_formats
                    })
                elif len(active_formats) == 0:
                    unrecognized_examples = str_values.head(3).tolist()
                    print(f"      UNRECOGNIZED: Date format not recognized - {unrecognized_examples}")
                    dataset_contamination['format_inconsistencies'].append({
                        'field': field, 'type': 'unrecognized_date_format', 'examples': unrecognized_examples
                    })
                else:
                    print(f"      CLEAN: Consistent {list(active_formats.keys())[0]} format")

        # Generate cleaning recommendations
        recommendations = []

        for item in dataset_contamination['special_chars']:
            if item['type'] == 'quotes':
                recommendations.append(f"Strip quotes from {item['field']}: df['{item['field']}'].str.replace(r'[\\'\\\"]', '', regex=True)")
            elif item['type'] == 'spaces':
                recommendations.append(f"Strip whitespace from {item['field']}: df['{item['field']}'].str.strip()")

        for item in dataset_contamination['mixed_types']:
            if item['type'] == 'letters_in_numeric':
                recommendations.append(f"Convert {item['field']} to numeric: pd.to_numeric(df['{item['field']}'], errors='coerce')")

        dataset_contamination['recommendations'] = recommendations
        contamination_report[dataset_name] = dataset_contamination

        # Summary for this dataset
        total_issues = (len(dataset_contamination['mixed_types']) +
                       len(dataset_contamination['special_chars']) +
                       len(dataset_contamination['encoding_issues']) +
                       len(dataset_contamination['format_inconsistencies']))

        print(f"\n   DATASET SUMMARY: {total_issues} contamination types detected")
        if recommendations:
            print(f"   CLEANING ACTIONS: {len(recommendations)} recommended")

    return contamination_report

# Run contamination analysis
contamination_analysis = analyze_data_contamination()

DATA CONTAMINATION ANALYSIS

BUSINESS LICENSES CONTAMINATION SCAN
--------------------------------------------------

   Analyzing id (text field)

   Analyzing legal_name (text field)

   Analyzing doing_business_as_name (text field)

   Analyzing license_description (text field)

   Analyzing business_activity_id (text field)

   Analyzing business_activity (text field)

   Analyzing address (text field)

   Analyzing city (text field)

   Analyzing state (text field)

   Analyzing zip_code (expected: numeric)
      LETTERS: 1 records with letters - ['M6K']

   Analyzing ward (expected: numeric)
      CLEAN: All 2040 values are properly numeric

   Analyzing precinct (expected: numeric)
      CLEAN: All 2040 values are properly numeric

   Analyzing ward_precinct (text field)

   Analyzing police_district (text field)

   Analyzing community_area (expected: numeric)
      CLEAN: All 2040 values are properly numeric

   Analyzing community_area_name (text field)

   Analyzing neighbor

In [8]:
# Automated Anomaly Detection
def detect_data_anomalies():
    """Schema-aware anomaly detection across all datasets."""
    print("AUTOMATED ANOMALY DETECTION")
    print("=" * 50)

    all_anomalies = {}

    for dataset_name, df in datasets.items():
        print(f"\n{dataset_name.upper().replace('_', ' ')} ANOMALIES")
        print("-" * 40)

        if df.empty:
            print("   Dataset is empty - skipping analysis")
            continue

        dataset_anomalies = []

        # Numeric field outliers using IQR method
        numeric_fields = df.select_dtypes(include=[np.number]).columns
        schema_numeric = [f for f in SchemaManager.get_field_names(dataset_name) if f in numeric_fields]

        for field in schema_numeric:
            if field in df.columns and df[field].notna().sum() > 5:  # Need at least 5 values
                Q1 = df[field].quantile(0.25)
                Q3 = df[field].quantile(0.75)
                IQR = Q3 - Q1
                lower_bound = Q1 - 1.5 * IQR
                upper_bound = Q3 + 1.5 * IQR

                outliers = df[(df[field] < lower_bound) | (df[field] > upper_bound)]
                if len(outliers) > 0:
                    outlier_pct = len(outliers) / len(df) * 100
                    dataset_anomalies.append({
                        'field': field,
                        'type': 'outliers',
                        'count': len(outliers),
                        'percentage': outlier_pct,
                        'severity': 'HIGH' if outlier_pct > 5 else 'MEDIUM' if outlier_pct > 1 else 'LOW'
                    })
                    print(f"   OUTLIERS in {field}: {len(outliers)} records ({outlier_pct:.1f}%)")

        # Date field anomalies
        date_fields = SchemaManager.get_date_fields(dataset_name)
        for field in date_fields:
            if field in df.columns:
                date_series = pd.to_datetime(df[field], errors='coerce')

                # Future dates (beyond reasonable business timeframe)
                future_cutoff = datetime.now() + timedelta(days=365*2)  # 2 years in future
                future_dates = date_series[date_series > future_cutoff]
                if len(future_dates) > 0:
                    dataset_anomalies.append({
                        'field': field,
                        'type': 'future_dates',
                        'count': len(future_dates),
                        'examples': future_dates.head(3).tolist(),
                        'severity': 'HIGH'
                    })
                    print(f"   FUTURE DATES in {field}: {len(future_dates)} records")

                # Very old dates (potentially data entry errors)
                old_cutoff = datetime.now() - timedelta(days=365*20)  # 20 years ago
                old_dates = date_series[date_series < old_cutoff]
                if len(old_dates) > 0:
                    dataset_anomalies.append({
                        'field': field,
                        'type': 'very_old_dates',
                        'count': len(old_dates),
                        'examples': old_dates.head(3).tolist(),
                        'severity': 'MEDIUM'
                    })
                    print(f"   VERY OLD DATES in {field}: {len(old_dates)} records")

        # Text field anomalies
        text_fields = df.select_dtypes(include=['object']).columns
        for field in text_fields:
            if field in df.columns:
                # Extremely long text entries
                text_lengths = df[field].astype(str).str.len()
                long_threshold = text_lengths.quantile(0.99)  # 99th percentile
                if long_threshold > 100:  # Only check if reasonable threshold
                    long_entries = df[text_lengths > long_threshold]
                    if len(long_entries) > 0:
                        dataset_anomalies.append({
                            'field': field,
                            'type': 'unusually_long_text',
                            'count': len(long_entries),
                            'max_length': text_lengths.max(),
                            'severity': 'LOW'
                        })
                        print(f"   LONG TEXT in {field}: {len(long_entries)} records (max: {text_lengths.max()} chars)")

                # Suspicious patterns (all caps, all numbers, etc.)
                if field not in ['id', 'license_id', 'permit_']:  # Skip ID fields
                    all_caps = df[df[field].astype(str).str.isupper() & (df[field].astype(str).str.len() > 5)]
                    if len(all_caps) > len(df) * 0.1:  # More than 10% all caps
                        dataset_anomalies.append({
                            'field': field,
                            'type': 'excessive_all_caps',
                            'count': len(all_caps),
                            'percentage': len(all_caps) / len(df) * 100,
                            'severity': 'LOW'
                        })
                        print(f"   ALL CAPS TEXT in {field}: {len(all_caps)} records")

        all_anomalies[dataset_name] = dataset_anomalies

        if not dataset_anomalies:
            print("   No significant anomalies detected")

    # Summary
    total_anomalies = sum(len(anomalies) for anomalies in all_anomalies.values())
    high_severity = sum(1 for anomalies in all_anomalies.values() for a in anomalies if a.get('severity') == 'HIGH')

    print(f"\nANOMALY DETECTION SUMMARY")
    print("-" * 30)
    print(f"   Total anomaly types detected: {total_anomalies}")
    print(f"   High severity issues: {high_severity}")
    print(f"   Datasets with anomalies: {len([k for k, v in all_anomalies.items() if v])}")

    return all_anomalies

# Run anomaly detection
anomalies = detect_data_anomalies()

AUTOMATED ANOMALY DETECTION

BUSINESS LICENSES ANOMALIES
----------------------------------------
   OUTLIERS in license_id: 182 records (8.9%)
   OUTLIERS in account_number: 357 records (17.5%)
   OUTLIERS in site_number: 344 records (16.9%)
   OUTLIERS in license_number: 182 records (8.9%)
   FUTURE DATES in expiration_date: 221 records
   ALL CAPS TEXT in legal_name: 1833 records
   ALL CAPS TEXT in doing_business_as_name: 1677 records
   LONG TEXT in business_activity: 6 records (max: 480 chars)
   ALL CAPS TEXT in address: 2040 records
   ALL CAPS TEXT in city: 2029 records
   ALL CAPS TEXT in community_area_name: 1676 records
   ALL CAPS TEXT in neighborhood: 1698 records

BUILDING PERMITS ANOMALIES
----------------------------------------
   OUTLIERS in building_fee_paid: 1004 records (11.6%)
   OUTLIERS in zoning_fee_paid: 1051 records (12.2%)
   OUTLIERS in other_fee_paid: 353 records (4.1%)
   OUTLIERS in subtotal_paid: 1044 records (12.1%)
   OUTLIERS in building_fee_unpaid:

### 3. Business Logic Validation


In [9]:
# Business Rule Validation
def validate_business_rules():
    """Chicago-specific business logic validation."""
    print("BUSINESS LOGIC VALIDATION")
    print("=" * 40)

    all_business_issues = {}

    for dataset_name, df in datasets.items():
        print(f"\n{dataset_name.upper().replace('_', ' ')} BUSINESS RULES")
        print("-" * 30)

        if df.empty:
            print("   Dataset is empty - skipping validation")
            continue

        issues = []

        # Business Licenses specific validation
        if dataset_name == 'business_licenses':

            # License date logic validation
            if all(field in df.columns for field in ['license_start_date', 'expiration_date']):
                start_dates = pd.to_datetime(df['license_start_date'], errors='coerce')
                exp_dates = pd.to_datetime(df['expiration_date'], errors='coerce')
                invalid_dates = df[(start_dates >= exp_dates) & start_dates.notna() & exp_dates.notna()]
                if len(invalid_dates) > 0:
                    issues.append(f"Invalid date logic: {len(invalid_dates)} licenses start after expiration")
                    print(f"   DATE LOGIC ERROR: {len(invalid_dates)} records")

            # Application workflow validation
            if all(field in df.columns for field in ['application_created_date', 'license_start_date']):
                app_dates = pd.to_datetime(df['application_created_date'], errors='coerce')
                start_dates = pd.to_datetime(df['license_start_date'], errors='coerce')
                workflow_issues = df[(app_dates > start_dates) & app_dates.notna() & start_dates.notna()]
                if len(workflow_issues) > 0:
                    issues.append(f"Workflow logic error: {len(workflow_issues)} licenses start before application")
                    print(f"   WORKFLOW ERROR: {len(workflow_issues)} records")

            # Chicago geographic bounds validation
            if all(field in df.columns for field in ['latitude', 'longitude']):
                # Chicago bounds: roughly 41.6-42.1 latitude, -87.9 to -87.5 longitude
                lat_series = pd.to_numeric(df['latitude'], errors='coerce')
                lon_series = pd.to_numeric(df['longitude'], errors='coerce')

                out_of_bounds = df[
                    ((lat_series < 41.6) | (lat_series > 42.1) |
                     (lon_series < -87.9) | (lon_series > -87.5)) &
                    lat_series.notna() & lon_series.notna()
                ]
                if len(out_of_bounds) > 0:
                    issues.append(f"Geographic bounds: {len(out_of_bounds)} records outside Chicago area")
                    print(f"   GEOGRAPHIC ERROR: {len(out_of_bounds)} records outside Chicago bounds")

            # License status consistency
            if 'license_status' in df.columns:
                valid_statuses = ['ISSUED', 'ACTIVE', 'EXPIRED', 'REVOKED', 'CANCELLED']
                invalid_status = df[~df['license_status'].isin(valid_statuses)]
                if len(invalid_status) > 0:
                    unique_invalid = invalid_status['license_status'].unique()
                    issues.append(f"Invalid license status: {list(unique_invalid)}")
                    print(f"   STATUS ERROR: {len(invalid_status)} records with invalid status")

        # Building Permits specific validation
        elif dataset_name == 'building_permits':

            # Permit workflow validation
            if all(field in df.columns for field in ['application_start_date', 'issue_date']):
                app_dates = pd.to_datetime(df['application_start_date'], errors='coerce')
                issue_dates = pd.to_datetime(df['issue_date'], errors='coerce')
                workflow_issues = df[(app_dates > issue_dates) & app_dates.notna() & issue_dates.notna()]
                if len(workflow_issues) > 0:
                    issues.append(f"Permit workflow: {len(workflow_issues)} permits issued before application")
                    print(f"   WORKFLOW ERROR: {len(workflow_issues)} records")

            # Permit type validation
            if 'permit_type' in df.columns:
                expected_types = ['PERMIT', 'EASY_PERMIT', 'REVIEW', 'INSPECTION']
                invalid_types = df[~df['permit_type'].isin(expected_types)]
                if len(invalid_types) > 0:
                    unique_invalid = invalid_types['permit_type'].unique()
                    issues.append(f"Invalid permit types: {list(unique_invalid)}")
                    print(f"   TYPE ERROR: {len(invalid_types)} records with invalid permit type")

        # CTA Boardings specific validation
        elif dataset_name == 'cta_boardings':

            # Ridership reasonableness check
            if 'total_rides' in df.columns:
                rides = pd.to_numeric(df['total_rides'], errors='coerce')
                # Check for unreasonably high ridership (> 10M per day)
                high_ridership = df[rides > 10000000]
                if len(high_ridership) > 0:
                    issues.append(f"Unreasonably high ridership: {len(high_ridership)} days with >10M rides")
                    print(f"   RIDERSHIP ERROR: {len(high_ridership)} records with excessive ridership")

                # Check for negative ridership
                negative_rides = df[rides < 0]
                if len(negative_rides) > 0:
                    issues.append(f"Negative ridership: {len(negative_rides)} records")
                    print(f"   NEGATIVE VALUES: {len(negative_rides)} records")

        all_business_issues[dataset_name] = issues

        if not issues:
            print("   All business rules validated successfully")

    # Summary
    total_issues = sum(len(issues) for issues in all_business_issues.values())
    datasets_with_issues = len([k for k, v in all_business_issues.items() if v])

    print(f"\nBUSINESS VALIDATION SUMMARY")
    print("-" * 30)
    print(f"   Total business rule violations: {total_issues}")
    print(f"   Datasets with violations: {datasets_with_issues}")

    return all_business_issues

# Run business rules validation
business_issues = validate_business_rules()

BUSINESS LOGIC VALIDATION

BUSINESS LICENSES BUSINESS RULES
------------------------------
   DATE LOGIC ERROR: 70 records
   GEOGRAPHIC ERROR: 4 records outside Chicago bounds
   STATUS ERROR: 2040 records with invalid status

BUILDING PERMITS BUSINESS RULES
------------------------------
   TYPE ERROR: 8647 records with invalid permit type

CTA BOARDINGS BUSINESS RULES
------------------------------
   All business rules validated successfully

BUSINESS VALIDATION SUMMARY
------------------------------
   Total business rule violations: 4
   Datasets with violations: 2


### 4. Completeness & Consistency Analysis


In [10]:
# Data Completeness Analysis
def analyze_data_completeness():
    """Comprehensive completeness analysis with cleaning recommendations."""
    print("DATA COMPLETENESS & CONSISTENCY ANALYSIS")
    print("=" * 50)

    completeness_report = {}

    for dataset_name, df in datasets.items():
        print(f"\n{dataset_name.upper().replace('_', ' ')} COMPLETENESS")
        print("-" * 40)

        if df.empty:
            print("   Dataset is empty - skipping analysis")
            continue

        required_fields = SchemaManager.get_required_fields(dataset_name)
        all_fields = SchemaManager.get_field_names(dataset_name)
        optional_fields = set(all_fields) - set(required_fields)

        report = {
            'required_field_completeness': {},
            'optional_field_completeness': {},
            'cleaning_recommendations': [],
            'consistency_issues': []
        }

        print(f"   Total fields: {len(all_fields)} (Required: {len(required_fields)}, Optional: {len(optional_fields)})")

        # Required fields analysis - must be 100% complete
        print(f"\n   REQUIRED FIELDS ANALYSIS:")
        critical_issues = 0
        for field in required_fields:
            if field in df.columns:
                completeness = (df[field].notna().sum() / len(df)) * 100
                report['required_field_completeness'][field] = completeness

                if completeness < 100:
                    critical_issues += 1
                    missing_count = df[field].isna().sum()
                    print(f"     CRITICAL: {field} - {completeness:.1f}% complete ({missing_count} missing)")

                    # Generate imputation strategy
                    if df[field].dtype in ['int64', 'float64']:
                        strategy = 'median_imputation'
                        suggestion = f"Use median value: {df[field].median():.2f}"
                    elif df[field].dtype == 'object':
                        mode_val = df[field].mode()
                        if len(mode_val) > 0:
                            strategy = 'mode_imputation'
                            suggestion = f"Use most common value: '{mode_val.iloc[0]}'"
                        else:
                            strategy = 'default_value'
                            suggestion = "Assign default value or 'UNKNOWN'"
                    else:
                        strategy = 'investigation_required'
                        suggestion = "Manual investigation needed"

                    report['cleaning_recommendations'].append({
                        'field': field,
                        'action': 'CRITICAL_IMPUTATION',
                        'strategy': strategy,
                        'missing_count': missing_count,
                        'missing_pct': 100 - completeness,
                        'suggestion': suggestion
                    })
                else:
                    print(f"     OK: {field} - 100% complete")
            else:
                print(f"     MISSING: {field} - Field not present in dataset")
                report['cleaning_recommendations'].append({
                    'field': field,
                    'action': 'FIELD_MISSING',
                    'strategy': 'data_source_investigation',
                    'suggestion': 'Check data source - required field completely absent'
                })

        # Optional fields analysis
        print(f"\n   OPTIONAL FIELDS ANALYSIS:")
        low_value_fields = 0
        for field in sorted(optional_fields):
            if field in df.columns:
                completeness = (df[field].notna().sum() / len(df)) * 100
                report['optional_field_completeness'][field] = completeness

                if completeness < 25:  # Very sparse
                    low_value_fields += 1
                    print(f"     LOW VALUE: {field} - {completeness:.1f}% complete (consider dropping)")
                    report['cleaning_recommendations'].append({
                        'field': field,
                        'action': 'CONSIDER_DROPPING',
                        'missing_pct': 100 - completeness,
                        'suggestion': f'Only {completeness:.1f}% populated - assess business value'
                    })
                elif completeness < 75:  # Moderately sparse
                    print(f"     SPARSE: {field} - {completeness:.1f}% complete")
                else:
                    print(f"     GOOD: {field} - {completeness:.1f}% complete")

        # Consistency checks
        print(f"\n   CONSISTENCY ANALYSIS:")

        # Check for mixed data types in object columns
        for field in df.select_dtypes(include=['object']).columns:
            sample_values = df[field].dropna().head(100)
            if len(sample_values) > 0:
                # Check for numeric values in text fields
                numeric_like = sample_values.astype(str).str.isnumeric().sum()
                if numeric_like > len(sample_values) * 0.8:  # 80% numeric-like
                    report['consistency_issues'].append(f"{field}: Appears numeric but stored as text")
                    print(f"     TYPE ISSUE: {field} - appears numeric but stored as text")

                # Check for mixed case inconsistency
                if field in ['city', 'state', 'license_status', 'permit_status']:
                    case_variations = len(sample_values.str.upper().unique())
                    actual_variations = len(sample_values.unique())
                    if case_variations < actual_variations:
                        report['consistency_issues'].append(f"{field}: Mixed case inconsistency")
                        print(f"     CASE ISSUE: {field} - inconsistent capitalization")

        # Date format consistency
        date_fields = SchemaManager.get_date_fields(dataset_name)
        for field in date_fields:
            if field in df.columns:
                sample_dates = df[field].dropna().astype(str).head(50)
                if len(sample_dates) > 0:
                    # Check for multiple date formats
                    formats_found = []
                    for date_str in sample_dates:
                        if '-' in date_str and len(date_str) == 10:
                            formats_found.append('YYYY-MM-DD')
                        elif '/' in date_str:
                            formats_found.append('MM/DD/YYYY')
                        elif len(date_str) > 15:
                            formats_found.append('DATETIME')

                    unique_formats = set(formats_found)
                    if len(unique_formats) > 1:
                        report['consistency_issues'].append(f"{field}: Multiple date formats detected")
                        print(f"     DATE FORMAT: {field} - mixed formats: {unique_formats}")

        completeness_report[dataset_name] = report

        # Field summary
        print(f"\n   SUMMARY:")
        print(f"     Critical issues: {critical_issues}")
        print(f"     Low-value fields: {low_value_fields}")
        print(f"     Consistency issues: {len(report['consistency_issues'])}")
        print(f"     Cleaning recommendations: {len(report['cleaning_recommendations'])}")

    # Overall summary
    total_recommendations = sum(len(report['cleaning_recommendations']) for report in completeness_report.values())
    total_consistency_issues = sum(len(report['consistency_issues']) for report in completeness_report.values())

    print(f"\nOVERALL COMPLETENESS SUMMARY")
    print("-" * 35)
    print(f"   Total cleaning recommendations: {total_recommendations}")
    print(f"   Total consistency issues: {total_consistency_issues}")
    print(f"   Datasets requiring attention: {len([k for k, v in completeness_report.items() if v['cleaning_recommendations']])}")

    return completeness_report

# Run completeness analysis
completeness_analysis = analyze_data_completeness()

DATA COMPLETENESS & CONSISTENCY ANALYSIS

BUSINESS LICENSES COMPLETENESS
----------------------------------------
   Total fields: 39 (Required: 12, Optional: 27)

   REQUIRED FIELDS ANALYSIS:
     OK: id - 100% complete
     OK: license_id - 100% complete
     OK: legal_name - 100% complete
     OK: license_description - 100% complete
     OK: address - 100% complete
     OK: city - 100% complete
     OK: state - 100% complete
     OK: community_area - 100% complete
     OK: community_area_name - 100% complete
     OK: application_type - 100% complete
     OK: license_start_date - 100% complete
     OK: license_status - 100% complete

   OPTIONAL FIELDS ANALYSIS:
     GOOD: account_number - 100.0% complete
     GOOD: application_created_date - 99.9% complete
     GOOD: application_requirements_complete - 100.0% complete
     GOOD: business_activity - 100.0% complete
     GOOD: business_activity_id - 100.0% complete
     LOW VALUE: conditional_approval - 0.0% complete (consider droppin

# Data Quality Check Caching

In [None]:
# Fixed Save All Quality Analysis Results for Data Cleaning Workflow
import pickle
import json
from datetime import datetime
from pathlib import Path
import shutil

def save_quality_analysis_results():
    """Save all data quality analysis results with organized batch folders."""
    print("SAVING QUALITY ANALYSIS RESULTS")
    print("=" * 50)

    # Create main results directory
    results_dir = Path("../../data/quality_analysis")
    results_dir.mkdir(parents=True, exist_ok=True)

    # Create unique batch folder with descriptive naming
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    batch_name = f"data_quality_batch_{timestamp}"
    batch_dir = results_dir / batch_name
    batch_dir.mkdir(exist_ok=True)

    print(f"   📁 Created batch folder: {batch_name}")

    # Generate comprehensive metadata
    metadata = {
        'batch_id': batch_name,
        'analysis_timestamp': timestamp,
        'analysis_datetime': datetime.now().isoformat(),
        'datasets_analyzed': list(datasets.keys()),
        'total_records': sum(len(df) for df in datasets.values()),
        'analysis_components': [
            'quality_matrix',
            'contamination_analysis',
            'anomaly_detection',
            'business_validation',
            'completeness_analysis'
        ]
    }

    # 1. Save Quality Matrix Results
    quality_results = {
        'quality_matrix': quality_matrix.to_dict('records') if not quality_matrix.empty else [],
        'contamination_report': contamination_report,
        'metadata': metadata
    }

    quality_file = batch_dir / "quality_matrix.pkl"
    with open(quality_file, 'wb') as f:
        pickle.dump(quality_results, f)
    print(f"   ✅ Quality matrix: {quality_file.name}")

    # 2. Save Contamination Analysis
    contamination_file = batch_dir / "contamination_analysis.pkl"
    with open(contamination_file, 'wb') as f:
        pickle.dump(contamination_analysis, f)
    print(f"   ✅ Contamination analysis: {contamination_file.name}")

    # 3. Save Anomaly Detection Results
    anomaly_file = batch_dir / "anomaly_detection.pkl"
    with open(anomaly_file, 'wb') as f:
        pickle.dump(anomalies, f)
    print(f"   ✅ Anomaly detection: {anomaly_file.name}")

    # 4. Save Business Logic Validation
    business_file = batch_dir / "business_validation.pkl"
    with open(business_file, 'wb') as f:
        pickle.dump(business_issues, f)
    print(f"   ✅ Business validation: {business_file.name}")

    # 5. Save Completeness Analysis
    completeness_file = batch_dir / "completeness_analysis.pkl"
    with open(completeness_file, 'wb') as f:
        pickle.dump(completeness_analysis, f)
    print(f"   ✅ Completeness analysis: {completeness_file.name}")

    # 6. Calculate summary metrics first
    # Quality metrics
    overall_score = float(quality_matrix['quality_score'].mean()) if not quality_matrix.empty else 0
    data_purity_score = float(quality_matrix['contamination_score'].mean()) if not quality_matrix.empty else 0
    contaminated_fields = len(quality_matrix[quality_matrix['contamination_score'] < 0.95]) if not quality_matrix.empty else 0
    critical_issues = len(quality_matrix[(quality_matrix['is_required']) & (quality_matrix['quality_score'] < 0.8)]) if not quality_matrix.empty else 0
    excellent_fields = len(quality_matrix[quality_matrix['quality_score'] >= 0.95]) if not quality_matrix.empty else 0

    # Contamination metrics
    total_contamination_types = sum(
        len(report.get('mixed_types', [])) +
        len(report.get('special_chars', [])) +
        len(report.get('encoding_issues', [])) +
        len(report.get('format_inconsistencies', []))
        for report in contamination_analysis.values()
    )
    datasets_with_contamination = len([k for k, v in contamination_analysis.items() if any([
        v.get('mixed_types'), v.get('special_chars'),
        v.get('encoding_issues'), v.get('format_inconsistencies')
    ])])

    # Business rules metrics
    total_violations = sum(len(issues) for issues in business_issues.values())
    datasets_with_violations = len([k for k, v in business_issues.items() if v])

    # Completeness metrics
    total_recommendations = sum(len(report['cleaning_recommendations']) for report in completeness_analysis.values())
    total_consistency_issues = sum(len(report['consistency_issues']) for report in completeness_analysis.values())
    critical_imputation_needed = sum(1 for report in completeness_analysis.values()
                                    for rec in report['cleaning_recommendations']
                                    if rec.get('action') == 'CRITICAL_IMPUTATION')

    # 7. Create Comprehensive Summary (without circular references)
    summary = {
        'batch_info': metadata,
        'quality_summary': {
            'overall_score': overall_score,
            'data_purity_score': data_purity_score,
            'contaminated_fields': contaminated_fields,
            'critical_issues': critical_issues,
            'excellent_fields': excellent_fields
        },
        'contamination_summary': {
            'total_contamination_types': total_contamination_types,
            'datasets_with_contamination': datasets_with_contamination,
            'mixed_type_issues': sum(len(report.get('mixed_types', [])) for report in contamination_analysis.values()),
            'special_char_issues': sum(len(report.get('special_chars', [])) for report in contamination_analysis.values()),
            'encoding_issues': sum(len(report.get('encoding_issues', [])) for report in contamination_analysis.values())
        },
        'anomaly_summary': {
            'total_anomaly_types': sum(len(anomaly_list) for anomaly_list in anomalies.values()),
            'high_severity_issues': sum(1 for anomaly_list in anomalies.values() for a in anomaly_list if a.get('severity') == 'HIGH'),
            'medium_severity_issues': sum(1 for anomaly_list in anomalies.values() for a in anomaly_list if a.get('severity') == 'MEDIUM'),
            'low_severity_issues': sum(1 for anomaly_list in anomalies.values() for a in anomaly_list if a.get('severity') == 'LOW'),
            'datasets_with_anomalies': len([k for k, v in anomalies.items() if v])
        },
        'business_rules_summary': {
            'total_violations': total_violations,
            'datasets_with_violations': datasets_with_violations,
            'violation_types': [issue for issues in business_issues.values() for issue in issues]
        },
        'completeness_summary': {
            'total_recommendations': total_recommendations,
            'total_consistency_issues': total_consistency_issues,
            'datasets_requiring_attention': len([k for k, v in completeness_analysis.items() if v['cleaning_recommendations']]),
            'critical_imputation_needed': critical_imputation_needed,
            'low_value_fields': sum(1 for report in completeness_analysis.values()
                                  for rec in report['cleaning_recommendations']
                                  if rec.get('action') == 'CONSIDER_DROPPING')
        },
        'file_manifest': {
            'batch_directory': str(batch_dir),
            'files': {
                'quality_matrix': str(quality_file),
                'contamination_analysis': str(contamination_file),
                'anomaly_detection': str(anomaly_file),
                'business_validation': str(business_file),
                'completeness_analysis': str(completeness_file)
            }
        },
        'cleaning_priority_summary': {
            'critical_actions_needed': total_violations + critical_imputation_needed,
            'high_priority_actions': total_contamination_types,
            'total_fields_analyzed': len(quality_matrix) if not quality_matrix.empty else 0
        }
    }

    # Save comprehensive summary
    summary_file = batch_dir / "analysis_summary.json"
    with open(summary_file, 'w') as f:
        json.dump(summary, f, indent=2, default=str)
    print(f"   ✅ Analysis summary: {summary_file.name}")

    # 8. Create/Update Latest Directory (Always Points to Most Recent)
    latest_dir = results_dir / "latest"

    # Remove existing latest directory if it exists
    if latest_dir.exists():
        shutil.rmtree(latest_dir)

    # Create fresh latest directory and copy all files
    latest_dir.mkdir(exist_ok=True)

    # Copy all batch files to latest
    for file_path in batch_dir.glob("*.pkl"):
        shutil.copy2(file_path, latest_dir / file_path.name)
    for file_path in batch_dir.glob("*.json"):
        shutil.copy2(file_path, latest_dir / file_path.name)

    # Create a latest_batch_info.txt for easy reference
    batch_info_file = latest_dir / "latest_batch_info.txt"
    with open(batch_info_file, 'w') as f:
        f.write(f"Latest Quality Analysis Batch\n")
        f.write(f"=" * 30 + "\n")
        f.write(f"Batch ID: {batch_name}\n")
        f.write(f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
        f.write(f"Source Directory: {batch_dir}\n")
        f.write(f"Datasets Analyzed: {', '.join(list(datasets.keys()))}\n")
        f.write(f"Total Records: {sum(len(df) for df in datasets.values()):,}\n")
        f.write(f"\nAnalysis Components:\n")
        for component in metadata['analysis_components']:
            f.write(f"  - {component}.pkl\n")

    print(f"   ✅ Latest directory updated: {latest_dir}")

    # 9. Create Batch Index for Easy Navigation
    index_file = results_dir / "batch_index.json"

    # Load existing index or create new
    if index_file.exists():
        with open(index_file, 'r') as f:
            batch_index = json.load(f)
    else:
        batch_index = {'batches': []}

    # Add current batch to index
    batch_entry = {
        'batch_id': batch_name,
        'timestamp': timestamp,
        'datetime': datetime.now().isoformat(),
        'directory': str(batch_dir),
        'datasets': list(datasets.keys()),
        'total_records': sum(len(df) for df in datasets.values()),
        'summary_scores': {
            'overall_quality': overall_score,
            'contaminated_fields': contaminated_fields,
            'total_recommendations': total_recommendations
        }
    }

    # Add to beginning of list (most recent first)
    batch_index['batches'].insert(0, batch_entry)

    # Keep only last 20 batches in index for performance
    batch_index['batches'] = batch_index['batches'][:20]
    batch_index['last_updated'] = datetime.now().isoformat()

    with open(index_file, 'w') as f:
        json.dump(batch_index, f, indent=2, default=str)

    print(f"   ✅ Batch index updated: {index_file.name}")

    print(f"\n🎯 QUALITY ANALYSIS BATCH COMPLETE!")
    print(f"   📂 Batch Directory: {batch_name}")
    print(f"   📊 Overall Quality Score: {overall_score:.3f}")
    print(f"   🔍 Contaminated Fields: {contaminated_fields}")
    print(f"   🔧 Cleaning Recommendations: {total_recommendations}")
    print(f"   📋 Business Rule Violations: {total_violations}")

    return summary, batch_name

# Run the enhanced caching function
analysis_summary, batch_id = save_quality_analysis_results()

SAVING QUALITY ANALYSIS RESULTS
   📁 Created batch folder: data_quality_batch_20250901_153504
   ✅ Quality matrix: quality_matrix.pkl
   ✅ Contamination analysis: contamination_analysis.pkl
   ✅ Anomaly detection: anomaly_detection.pkl
   ✅ Business validation: business_validation.pkl
   ✅ Completeness analysis: completeness_analysis.pkl


UnboundLocalError: cannot access local variable 'summary' where it is not associated with a value