# Healthcare Claims Database Ingestion

This notebook ingests the cleaned healthcare dataset into a SQLite database with a single `claims_table` for efficient storage and querying of healthcare claims data.

## Process Overview:
1. **Data Loading** - Load cleaned CSV from PHA scrubber
2. **Database Creation** - Create SQLite database with claims_table schema
3. **Claims Ingestion** - Bulk insert all healthcare claims with patient hashing
4. **Performance Optimization** - Create indexes for efficient querying
5. **Validation & Reporting** - Verify data integrity and generate summary report

---

In [1]:
# Import required libraries
import pandas as pd
import sqlite3
import hashlib
from datetime import datetime
from pathlib import Path
import warnings
import numpy as np

warnings.filterwarnings('ignore')

# Get project root directory (parent of notebooks-01)
project_root = Path(__file__).parent.parent if '__file__' in globals() else Path.cwd().parent

# Create database directory in project root
db_dir = project_root / "db"
db_dir.mkdir(parents=True, exist_ok=True)

print("Healthcare Claims Database Ingestion")
print("=" * 40)
print(f"Project root: {project_root}")
print(f"Database directory: {db_dir.absolute()}")
print(f"Target database: {db_dir / 'claims_db.sqlite'}")
print("Libraries imported successfully")

Healthcare Claims Database Ingestion
Project root: /Users/kxshrx/asylum/healix
Database directory: /Users/kxshrx/asylum/healix/db
Target database: /Users/kxshrx/asylum/healix/db/claims_db.sqlite
Libraries imported successfully


## 1. Data Loading and Validation

Load the cleaned healthcare dataset and perform initial validation to ensure data quality before database ingestion.

In [2]:
def load_cleaned_dataset(file_path):
    """
    Load cleaned healthcare dataset with validation.
    
    Args:
        file_path (str): Path to cleaned CSV file
        
    Returns:
        pandas.DataFrame: Loaded and validated dataset
    """
    try:
        if not Path(file_path).exists():
            raise FileNotFoundError(f"Cleaned dataset not found: {file_path}")
        
        # Load dataset
        df = pd.read_csv(file_path)
        
        print(f"Dataset loaded successfully:")
        print(f"  Records: {len(df):,}")
        print(f"  Columns: {len(df.columns)}")
        
        # Validate required columns for claims_table
        required_cols = [
            'Age', 'Gender', 'Blood Type', 'Medical Condition',
            'Admission Type', 'admission_year_month', 'admission_year',
            'length_of_stay_days', 'Medication', 'Test Results',
            'Insurance Provider', 'Billing Amount'
        ]
        
        missing_cols = [col for col in required_cols if col not in df.columns]
        if missing_cols:
            raise ValueError(f"Missing required columns: {missing_cols}")
        
        print(f"  Required columns validated: ✓")
        
        # Check for null values in critical fields
        critical_fields = ['Insurance Provider', 'Billing Amount']
        critical_nulls = df[critical_fields].isnull().sum()
        if critical_nulls.any():
            print(f"  Warning: Null values in critical fields:")
            for col, null_count in critical_nulls.items():
                if null_count > 0:
                    print(f"    {col}: {null_count} nulls")
        
        # Data type validation
        numeric_cols = ['Age', 'admission_year', 'length_of_stay_days', 'Billing Amount']
        for col in numeric_cols:
            if col in df.columns and not pd.api.types.is_numeric_dtype(df[col]):
                print(f"  Warning: {col} is not numeric type")
        
        return df
        
    except FileNotFoundError as e:
        print(f"Error: {e}")
        print("Please ensure the PHA scrubbing process has been completed first.")
        return None
    except Exception as e:
        print(f"Error loading dataset: {e}")
        return None

# Load cleaned dataset - use relative path from project root
cleaned_file = project_root / "outputs" / "cleaned" / "healthcare_dataset_cleaned.csv"
print(f"Loading cleaned dataset: {cleaned_file}")
print("=" * 50)

df_claims = load_cleaned_dataset(cleaned_file)

if df_claims is not None:
    print(f"\nDataset preview:")
    print(df_claims.head())
    
    print(f"\nData summary:")
    print(f"  Unique insurance providers: {df_claims['Insurance Provider'].nunique()}")
    print(f"  Date range: {df_claims['admission_year_month'].min()} to {df_claims['admission_year_month'].max()}")
    print(f"  Total billing amount: ${df_claims['Billing Amount'].sum():,.2f}")
else:
    print("Failed to load dataset. Stopping execution.")

Loading cleaned dataset: /Users/kxshrx/asylum/healix/outputs/cleaned/healthcare_dataset_cleaned.csv
Dataset loaded successfully:
  Records: 55,500
  Columns: 12
  Required columns validated: ✓

Dataset preview:
   Age  Gender Blood Type Medical Condition Admission Type  \
0   30    Male         B-            Cancer         Urgent   
1   62    Male         A+           Obesity      Emergency   
2   76  Female         A-           Obesity      Emergency   
3   28  Female         O+          Diabetes       Elective   
4   43  Female        AB+            Cancer         Urgent   

  admission_year_month  admission_year  length_of_stay_days   Medication  \
0              2024-01            2024                    2  Paracetamol   
1              2019-08            2019                    6    Ibuprofen   
2              2022-09            2022                   15      Aspirin   
3              2020-11            2020                   30    Ibuprofen   
4              2022-09            20

## 2. Database Schema Creation

Create SQLite database with the claims_table schema optimized for healthcare claims storage and querying.

In [3]:
def create_claims_database(db_path):
    """
    Create SQLite database with claims_table schema.
    
    Args:
        db_path (str): Path to SQLite database file
        
    Returns:
        sqlite3.Connection: Database connection object
    """
    try:
        # Connect to database (creates file if doesn't exist)
        conn = sqlite3.connect(db_path)
        cursor = conn.cursor()
        
        print(f"Creating database schema in: {db_path}")
        
        # Claims table schema
        claims_schema = """
        CREATE TABLE IF NOT EXISTS claims_table (
            claim_id INTEGER PRIMARY KEY AUTOINCREMENT,
            patient_hash TEXT,
            age INTEGER,
            gender TEXT,
            blood_type TEXT,
            medical_condition TEXT,
            admission_year_month TEXT,
            admission_type TEXT,
            length_of_stay_days INTEGER,
            discharge_date TEXT,
            medication TEXT,
            test_results TEXT,
            insurance_provider TEXT,
            billing_amount REAL,
            created_at TEXT DEFAULT CURRENT_TIMESTAMP
        );
        """
        
        # Execute schema creation
        cursor.execute(claims_schema)
        conn.commit()
        
        print("Database schema created successfully:")
        print("  ✓ claims_table - Healthcare claims data with patient anonymization")
        
        return conn
        
    except Exception as e:
        print(f"Error creating database schema: {e}")
        return None

# Create database and schema
db_path = db_dir / "claims_db.sqlite"
print("DATABASE SCHEMA CREATION")
print("=" * 30)

db_conn = create_claims_database(db_path)

if db_conn:
    # Verify table was created
    cursor = db_conn.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
    tables = cursor.fetchall()
    
    print(f"\nCreated tables:")
    for table in tables:
        print(f"  - {table[0]}")
    
    # Show table schema
    cursor.execute("PRAGMA table_info(claims_table);")
    columns = cursor.fetchall()
    print(f"\nClaims table schema:")
    for col in columns:
        print(f"  {col[1]} ({col[2]})")
    
    print(f"\nInitial database file size: {db_path.stat().st_size / 1024:.2f} KB")

DATABASE SCHEMA CREATION
Creating database schema in: /Users/kxshrx/asylum/healix/db/claims_db.sqlite
Database schema created successfully:
  ✓ claims_table - Healthcare claims data with patient anonymization

Created tables:
  - claims_table
  - sqlite_sequence
  - sqlite_stat1
  - sqlite_stat4
  - policy_table
  - claims_with_policy_rules

Claims table schema:
  claim_id (INTEGER)
  patient_hash (TEXT)
  age (INTEGER)
  gender (TEXT)
  blood_type (TEXT)
  medical_condition (TEXT)
  admission_year_month (TEXT)
  admission_type (TEXT)
  length_of_stay_days (INTEGER)
  discharge_date (TEXT)
  medication (TEXT)
  test_results (TEXT)
  insurance_provider (TEXT)
  billing_amount (REAL)
  created_at (TEXT)

Initial database file size: 88132.00 KB


## 3. Claims Data Ingestion

Generate anonymous patient hashes and bulk insert all healthcare claims into the database using parameterized queries for security and performance.

In [4]:
def generate_patient_hash(row):
    """
    Generate anonymous patient hash from demographic data.
    
    Args:
        row: DataFrame row with patient data
        
    Returns:
        str: SHA-256 hash for patient anonymization
    """
    # Combine age, gender, blood type, and row index for unique but anonymous identifier
    patient_data = f"{row['Age']}_{row['Gender']}_{row['Blood Type']}_{row.name}"
    return hashlib.sha256(patient_data.encode()).hexdigest()[:16]

def ingest_claims_data(df, conn):
    """
    Bulk insert claims data into database.
    
    Args:
        df (DataFrame): Claims data to insert
        conn: SQLite database connection
        
    Returns:
        dict: Ingestion statistics
    """
    try:
        print("CLAIMS DATA INGESTION")
        print("=" * 25)
        
        # Generate patient hashes for anonymization
        print("Generating anonymous patient hashes...")
        df['patient_hash'] = df.apply(generate_patient_hash, axis=1)
        
        # Prepare data for insertion
        print("Preparing data for database insertion...")
        
        # Handle missing values and data cleaning
        df_clean = df.copy()
        df_clean = df_clean.fillna({
            'admission_year_month': 'Unknown',
            'length_of_stay_days': 0
        })
        
        # Convert to appropriate types
        df_clean['age'] = df_clean['Age'].astype(int)
        df_clean['length_of_stay_days'] = df_clean['length_of_stay_days'].fillna(0).astype(int)
        df_clean['billing_amount'] = df_clean['Billing Amount'].astype(float)
        
        # Create estimated discharge date from admission period and length of stay
        df_clean['discharge_date'] = df_clean['admission_year_month'] + '-EST'
        
        # Prepare parameterized insertion query
        insert_query = """
        INSERT INTO claims_table (
            patient_hash, age, gender, blood_type, medical_condition,
            admission_year_month, admission_type, length_of_stay_days,
            discharge_date, medication, test_results, insurance_provider,
            billing_amount
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """
        
        # Prepare data tuples for bulk insert
        claims_data = []
        for _, row in df_clean.iterrows():
            claims_data.append((
                row['patient_hash'],
                row['age'],
                row['Gender'],
                row['Blood Type'],
                row['Medical Condition'],
                row['admission_year_month'],
                row['Admission Type'],
                row['length_of_stay_days'],
                row['discharge_date'],
                row['Medication'],
                row['Test Results'],
                row['Insurance Provider'],
                row['billing_amount']
            ))
        
        # Execute bulk insert with transaction
        print(f"Inserting {len(claims_data):,} claims records...")
        cursor = conn.cursor()
        cursor.executemany(insert_query, claims_data)
        conn.commit()
        
        # Verify insertion
        cursor.execute("SELECT COUNT(*) FROM claims_table")
        inserted_count = cursor.fetchone()[0]
        
        # Calculate ingestion statistics
        stats = {
            'total_records': len(df),
            'inserted_records': inserted_count,
            'unique_patients': df['patient_hash'].nunique(),
            'unique_providers': df['Insurance Provider'].nunique(),
            'unique_conditions': df['Medical Condition'].nunique(),
            'total_billing': df['Billing Amount'].sum(),
            'avg_billing': df['Billing Amount'].mean(),
            'date_range': f"{df['admission_year_month'].min()} to {df['admission_year_month'].max()}",
            'providers_list': sorted(df['Insurance Provider'].unique().tolist())
        }
        
        print(f"Claims ingestion completed successfully:")
        print(f"  Records inserted: {stats['inserted_records']:,}")
        print(f"  Unique patients: {stats['unique_patients']:,}")
        print(f"  Unique providers: {stats['unique_providers']}")
        print(f"  Unique conditions: {stats['unique_conditions']}")
        print(f"  Total billing: ${stats['total_billing']:,.2f}")
        print(f"  Average billing: ${stats['avg_billing']:,.2f}")
        print(f"  Date range: {stats['date_range']}")
        
        return stats
        
    except Exception as e:
        print(f"Error during claims ingestion: {e}")
        conn.rollback()
        return None

# Ingest claims data
if df_claims is not None and db_conn:
    ingestion_stats = ingest_claims_data(df_claims, db_conn)
    
    if ingestion_stats:
        # Sample a few records to verify insertion
        print(f"\nSample claims records:")
        cursor = db_conn.cursor()
        cursor.execute("SELECT claim_id, patient_hash, medical_condition, insurance_provider, billing_amount FROM claims_table LIMIT 3")
        sample_records = cursor.fetchall()
        
        for record in sample_records:
            print(f"  Claim {record[0]}: {record[1][:8]}... | {record[2]} | {record[3]} | ${record[4]:,.2f}")
else:
    print("Skipping claims ingestion due to previous errors")
    ingestion_stats = None

CLAIMS DATA INGESTION
Generating anonymous patient hashes...
Preparing data for database insertion...
Preparing data for database insertion...
Inserting 55,500 claims records...
Inserting 55,500 claims records...
Claims ingestion completed successfully:
  Records inserted: 222,000
  Unique patients: 55,500
  Unique providers: 5
  Unique conditions: 6
  Total billing: $1,417,432,043.40
  Average billing: $25,539.32
  Date range: 2019-05 to 2024-05

Sample claims records:
  Claim 1: 4d68303a... | Cancer | Blue Cross | $18,856.28
  Claim 2: ed9008fd... | Obesity | Medicare | $33,643.33
  Claim 3: 5386d150... | Obesity | Aetna | $27,955.10
Claims ingestion completed successfully:
  Records inserted: 222,000
  Unique patients: 55,500
  Unique providers: 5
  Unique conditions: 6
  Total billing: $1,417,432,043.40
  Average billing: $25,539.32
  Date range: 2019-05 to 2024-05

Sample claims records:
  Claim 1: 4d68303a... | Cancer | Blue Cross | $18,856.28
  Claim 2: ed9008fd... | Obesity | M

## 4. Performance Optimization

Create database indexes on frequently queried columns to improve query performance for analytics and reporting.

In [5]:
def create_database_indexes(conn):
    """
    Create performance indexes on key columns.
    
    Args:
        conn: SQLite database connection
        
    Returns:
        bool: Success status
    """
    try:
        print("CREATING DATABASE INDEXES")
        print("=" * 30)
        
        cursor = conn.cursor()
        
        # Define indexes for optimal query performance
        indexes = [
            # Primary indexes for frequent lookups
            "CREATE INDEX IF NOT EXISTS idx_insurance_provider ON claims_table(insurance_provider)",
            "CREATE INDEX IF NOT EXISTS idx_patient_hash ON claims_table(patient_hash)",
            
            # Secondary indexes for analytics
            "CREATE INDEX IF NOT EXISTS idx_medical_condition ON claims_table(medical_condition)",
            "CREATE INDEX IF NOT EXISTS idx_admission_year_month ON claims_table(admission_year_month)",
            "CREATE INDEX IF NOT EXISTS idx_billing_amount ON claims_table(billing_amount)",
            "CREATE INDEX IF NOT EXISTS idx_admission_type ON claims_table(admission_type)",
            
            # Composite indexes for common query patterns
            "CREATE INDEX IF NOT EXISTS idx_provider_condition ON claims_table(insurance_provider, medical_condition)",
            "CREATE INDEX IF NOT EXISTS idx_provider_amount ON claims_table(insurance_provider, billing_amount)"
        ]
        
        # Execute index creation
        indexes_created = 0
        for index_sql in indexes:
            try:
                cursor.execute(index_sql)
                indexes_created += 1
                # Extract index name for reporting
                index_name = index_sql.split("idx_")[1].split(" ")[0]
                print(f"  ✓ Created index: idx_{index_name}")
            except Exception as e:
                print(f"  ✗ Failed to create index: {e}")
        
        conn.commit()
        
        print(f"\nIndex creation completed:")
        print(f"  Indexes created: {indexes_created}/{len(indexes)}")
        
        # Verify indexes exist
        cursor.execute("SELECT name FROM sqlite_master WHERE type='index' AND name LIKE 'idx_%'")
        db_indexes = cursor.fetchall()
        
        print(f"  Total custom indexes in database: {len(db_indexes)}")
        
        # Update database statistics for query optimization
        cursor.execute("ANALYZE")
        conn.commit()
        print(f"  Database statistics updated for query optimization")
        
        return True
        
    except Exception as e:
        print(f"Error creating indexes: {e}")
        return False

# Create database indexes
if db_conn:
    index_success = create_database_indexes(db_conn)
    
    if index_success:
        # Check database size after indexing
        db_size_after = db_path.stat().st_size
        print(f"\nDatabase file size after indexing: {db_size_after / 1024:.2f} KB")
else:
    print("Skipping index creation due to database connection issues")
    index_success = False

CREATING DATABASE INDEXES
  ✓ Created index: idx_insurance_provider
  ✓ Created index: idx_patient_hash
  ✓ Created index: idx_medical_condition
  ✓ Created index: idx_admission_year_month
  ✓ Created index: idx_billing_amount
  ✓ Created index: idx_admission_type
  ✓ Created index: idx_provider_condition
  ✓ Created index: idx_provider_amount

Index creation completed:
  Indexes created: 8/8
  Total custom indexes in database: 16
  Database statistics updated for query optimization

Database file size after indexing: 105376.00 KB
  Database statistics updated for query optimization

Database file size after indexing: 105376.00 KB


## 5. Data Validation and Sample Queries

Verify database integrity and demonstrate query capabilities with sample analytics queries to ensure the data was ingested correctly.

In [6]:
def validate_database_integrity(conn):
    """
    Perform database validation and sample queries.
    
    Args:
        conn: SQLite database connection
        
    Returns:
        dict: Validation results
    """
    try:
        print("DATABASE VALIDATION AND SAMPLE QUERIES")
        print("=" * 45)
        
        cursor = conn.cursor()
        validation_results = {}
        
        # 1. Basic table statistics
        print("1. Database Overview:")
        cursor.execute("SELECT COUNT(*) FROM claims_table")
        total_claims = cursor.fetchone()[0]
        validation_results['total_claims'] = total_claims
        print(f"   Total claims: {total_claims:,}")
        
        # 2. Data quality checks
        print(f"\n2. Data Quality Checks:")
        
        # Check for null values in critical fields
        cursor.execute("SELECT COUNT(*) FROM claims_table WHERE insurance_provider IS NULL")
        null_providers = cursor.fetchone()[0]
        validation_results['null_providers'] = null_providers
        print(f"   Claims with null insurance provider: {null_providers}")
        
        cursor.execute("SELECT COUNT(*) FROM claims_table WHERE billing_amount IS NULL OR billing_amount < 0")
        invalid_amounts = cursor.fetchone()[0]
        validation_results['invalid_amounts'] = invalid_amounts
        print(f"   Claims with invalid billing amounts: {invalid_amounts}")
        
        cursor.execute("SELECT COUNT(DISTINCT patient_hash) FROM claims_table")
        unique_patients = cursor.fetchone()[0]
        validation_results['unique_patients'] = unique_patients
        print(f"   Unique patients (hashed): {unique_patients:,}")
        
        # 3. Insurance provider analysis
        print(f"\n3. Insurance Provider Analysis:")
        cursor.execute("""
            SELECT insurance_provider, 
                   COUNT(*) as claim_count,
                   AVG(billing_amount) as avg_billing,
                   SUM(billing_amount) as total_billing
            FROM claims_table 
            GROUP BY insurance_provider 
            ORDER BY claim_count DESC
        """)
        
        provider_stats = cursor.fetchall()
        validation_results['provider_analysis'] = provider_stats
        
        print(f"   Provider Summary:")
        for provider, count, avg_bill, total_bill in provider_stats:
            print(f"     {provider}: {count:,} claims, Avg: ${avg_bill:,.2f}, Total: ${total_bill:,.2f}")
        
        # 4. Medical condition analysis
        print(f"\n4. Top Medical Conditions by Billing:")
        cursor.execute("""
            SELECT medical_condition,
                   COUNT(*) as case_count,
                   AVG(billing_amount) as avg_cost,
                   SUM(billing_amount) as total_cost
            FROM claims_table
            GROUP BY medical_condition
            ORDER BY total_cost DESC
            LIMIT 5
        """)
        
        condition_stats = cursor.fetchall()
        validation_results['top_conditions'] = condition_stats
        
        for condition, count, avg_cost, total_cost in condition_stats:
            print(f"     {condition}: {count:,} cases, Avg: ${avg_cost:,.2f}, Total: ${total_cost:,.2f}")
        
        # 5. Monthly admission trends
        print(f"\n5. Monthly Admission Trends (Sample):")
        cursor.execute("""
            SELECT admission_year_month,
                   COUNT(*) as admissions,
                   AVG(billing_amount) as avg_billing
            FROM claims_table
            WHERE admission_year_month != 'Unknown'
            GROUP BY admission_year_month
            ORDER BY admission_year_month
            LIMIT 8
        """)
        
        monthly_trends = cursor.fetchall()
        validation_results['monthly_trends'] = monthly_trends
        
        for month, admissions, avg_billing in monthly_trends:
            print(f"     {month}: {admissions:,} admissions, Avg: ${avg_billing:,.2f}")
        
        return validation_results
        
    except Exception as e:
        print(f"Error during database validation: {e}")
        return None

# Validate database integrity
if db_conn:
    validation_results = validate_database_integrity(db_conn)
else:
    print("Skipping validation due to database connection issues")
    validation_results = None

DATABASE VALIDATION AND SAMPLE QUERIES
1. Database Overview:
   Total claims: 222,000

2. Data Quality Checks:
   Claims with null insurance provider: 0
   Claims with invalid billing amounts: 432
   Unique patients (hashed): 55,500

3. Insurance Provider Analysis:
   Provider Summary:
     Cigna: 44,996 claims, Avg: $25,525.77, Total: $1,148,557,381.07
     Medicare: 44,616 claims, Avg: $25,615.99, Total: $1,142,883,032.50
     UnitedHealthcare: 44,500 claims, Avg: $25,389.17, Total: $1,129,818,171.37
     Blue Cross: 44,236 claims, Avg: $25,613.01, Total: $1,133,017,176.85
     Aetna: 43,652 claims, Avg: $25,553.29, Total: $1,115,452,411.78

4. Top Medical Conditions by Billing:
     Diabetes: 37,216 cases, Avg: $25,638.41, Total: $954,158,901.97
     Obesity: 36,924 cases, Avg: $25,805.97, Total: $952,859,682.77
     Arthritis: 37,232 cases, Avg: $25,497.33, Total: $949,316,480.94
     Hypertension: 36,980 cases, Avg: $25,497.10, Total: $942,882,601.23
     Asthma: 36,740 cases, Avg

## 6. Generate Ingestion Report

Create a comprehensive report summarizing the database ingestion process, statistics, and any warnings.

In [7]:
def generate_ingestion_report(ingestion_stats, validation_results, output_path):
    """
    Generate comprehensive database ingestion report.
    
    Args:
        ingestion_stats (dict): Claims ingestion statistics
        validation_results (dict): Database validation results
        output_path (str): Path to save report file
        
    Returns:
        bool: Success status
    """
    try:
        print("GENERATING INGESTION REPORT")
        print("=" * 30)
        
        # Get current database file size
        db_size = db_path.stat().st_size / 1024  # KB
        
        # Create comprehensive report
        report = f"""
HEALTHCARE CLAIMS DATABASE INGESTION REPORT
{'='*50}
Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Database: {db_path.absolute()}

EXECUTIVE SUMMARY
{'-'*20}
✓ Database creation: SUCCESS
✓ Schema deployment: SUCCESS (claims_table)
✓ Claims data ingestion: {'SUCCESS' if ingestion_stats else 'FAILED'}
✓ Index optimization: {'SUCCESS' if index_success else 'FAILED'}
✓ Data validation: {'SUCCESS' if validation_results else 'FAILED'}

DATABASE STATISTICS
{'-'*20}
Database File Size: {db_size:.2f} KB
Table Created: claims_table
"""

        # Add ingestion statistics if available
        if ingestion_stats:
            report += f"""
CLAIMS INGESTION RESULTS
{'-'*25}
Total Records Processed: {ingestion_stats['total_records']:,}
Records Successfully Inserted: {ingestion_stats['inserted_records']:,}
Unique Patients (Hashed): {ingestion_stats['unique_patients']:,}
Unique Insurance Providers: {ingestion_stats['unique_providers']}
Unique Medical Conditions: {ingestion_stats['unique_conditions']}
Total Billing Amount: ${ingestion_stats['total_billing']:,.2f}
Average Billing per Claim: ${ingestion_stats['avg_billing']:,.2f}
Date Range: {ingestion_stats['date_range']}

INSURANCE PROVIDERS FOUND:
"""
            for provider in ingestion_stats['providers_list']:
                report += f"  - {provider}\n"

        # Add validation results if available
        if validation_results:
            report += f"""
DATA VALIDATION RESULTS
{'-'*23}
Total Claims in Database: {validation_results.get('total_claims', 'N/A'):,}
Unique Patients: {validation_results.get('unique_patients', 'N/A'):,}

Data Quality Checks:
- Claims with Null Insurance Providers: {validation_results.get('null_providers', 'N/A')}
- Claims with Invalid Billing Amounts: {validation_results.get('invalid_amounts', 'N/A')}

Top Medical Conditions by Total Billing:
"""
            # Add top conditions if available
            if 'top_conditions' in validation_results:
                for i, (condition, count, avg_cost, total_cost) in enumerate(validation_results['top_conditions'], 1):
                    report += f"  {i}. {condition}: {count:,} cases, Total: ${total_cost:,.2f}\n"

        # Add database schema information
        report += f"""
DATABASE SCHEMA
{'-'*15}
Table: claims_table
Purpose: Healthcare claims with patient demographics and billing information

Columns:
- claim_id: Primary key (auto-increment)
- patient_hash: Anonymous patient identifier (SHA-256 hash)
- age, gender, blood_type: Patient demographics
- medical_condition: Primary diagnosis/condition
- admission_year_month: Admission period (anonymized)
- admission_type: Type of admission (Emergency, Urgent, Elective)
- length_of_stay_days: Duration of stay
- discharge_date: Estimated discharge period
- medication, test_results: Treatment information
- insurance_provider: Insurance company name
- billing_amount: Total billing amount
- created_at: Record creation timestamp

INDEXES CREATED
{'-'*15}
Primary Indexes:
- idx_insurance_provider: Fast provider lookups
- idx_patient_hash: Patient-specific queries

Secondary Indexes:
- idx_medical_condition: Condition-based analysis
- idx_admission_year_month: Temporal analysis
- idx_billing_amount: Cost analysis
- idx_admission_type: Admission type filtering

Composite Indexes:
- idx_provider_condition: Provider-condition analysis
- idx_provider_amount: Provider billing analysis
"""

        # Add warnings and recommendations
        warnings = []
        recommendations = []
        
        if validation_results:
            if validation_results.get('null_providers', 0) > 0:
                warnings.append(f"Found {validation_results['null_providers']} claims with null insurance providers")
            
            if validation_results.get('invalid_amounts', 0) > 0:
                warnings.append(f"Found {validation_results['invalid_amounts']} claims with invalid billing amounts")
        
        # Add recommendations
        recommendations.extend([
            "Run VACUUM command periodically to optimize database file size",
            "Consider adding additional indexes based on specific query patterns",
            "Implement regular backup procedures for production use",
            "Monitor query performance and add covering indexes for complex queries",
            "Consider partitioning by admission_year_month for very large datasets"
        ])
        
        if warnings:
            report += f"""
WARNINGS
{'-'*8}
"""
            for warning in warnings:
                report += f"⚠ {warning}\n"
        
        report += f"""
RECOMMENDATIONS
{'-'*15}
"""
        for rec in recommendations:
            report += f"• {rec}\n"
        
        report += f"""
USAGE EXAMPLES
{'-'*13}
# Connect to database
import sqlite3
import pandas as pd

conn = sqlite3.connect('{db_path}')

# Example queries for analysis
# 1. Claims by insurance provider
provider_summary = pd.read_sql_query('''
    SELECT insurance_provider, 
           COUNT(*) as total_claims,
           AVG(billing_amount) as avg_billing,
           SUM(billing_amount) as total_billing
    FROM claims_table 
    GROUP BY insurance_provider 
    ORDER BY total_billing DESC
''', conn)

# 2. Most expensive medical conditions
expensive_conditions = pd.read_sql_query('''
    SELECT medical_condition,
           COUNT(*) as case_count,
           AVG(billing_amount) as avg_cost
    FROM claims_table
    GROUP BY medical_condition
    ORDER BY avg_cost DESC
    LIMIT 10
''', conn)

# 3. Monthly admission trends
monthly_trends = pd.read_sql_query('''
    SELECT admission_year_month,
           COUNT(*) as admissions,
           AVG(billing_amount) as avg_billing
    FROM claims_table
    WHERE admission_year_month != 'Unknown'
    GROUP BY admission_year_month
    ORDER BY admission_year_month
''', conn)

# 4. Patient claim history (using anonymous hash)
patient_claims = pd.read_sql_query('''
    SELECT claim_id, medical_condition, billing_amount, admission_year_month
    FROM claims_table
    WHERE patient_hash = 'specific_hash_value'
    ORDER BY admission_year_month
''', conn)

conn.close()

END OF REPORT
{'-'*13}
Report generated by Healix Claims Database Ingestion Pipeline
Database ready for healthcare analytics and claims processing.
"""

        # Save report to file
        with open(output_path, 'w') as f:
            f.write(report)
        
        print(f"Report saved to: {output_path}")
        
        # Print summary to console
        print(f"\nREPORT SUMMARY:")
        if ingestion_stats:
            print(f"  Claims ingested: {ingestion_stats['inserted_records']:,}")
            print(f"  Unique providers: {ingestion_stats['unique_providers']}")
        print(f"  Database size: {db_size:.2f} KB")
        print(f"  Warnings: {len(warnings)}")
        
        return True
        
    except Exception as e:
        print(f"Error generating report: {e}")
        return False

# Generate ingestion report
report_path = db_dir / "db_ingest_report.txt"
report_success = generate_ingestion_report(ingestion_stats, validation_results, report_path)

GENERATING INGESTION REPORT
Report saved to: /Users/kxshrx/asylum/healix/db/db_ingest_report.txt

REPORT SUMMARY:
  Claims ingested: 222,000
  Unique providers: 5
  Database size: 105376.00 KB


## 7. Cleanup and Final Output Summary

Close database connections and provide final summary of all generated files and next steps.

In [8]:
def finalize_ingestion():
    """
    Finalize database ingestion process and cleanup resources.
    """
    print("FINALIZING CLAIMS DATABASE INGESTION")
    print("=" * 40)
    
    # Close database connection
    if db_conn:
        db_conn.close()
        print("✓ Database connection closed")
    
    # Verify all output files
    outputs = {
        'Database File': db_path,
        'Ingestion Report': db_dir / "db_ingest_report.txt"
    }
    
    print(f"\nOUTPUT FILES VERIFICATION:")
    all_outputs_exist = True
    
    for output_name, output_path in outputs.items():
        if output_path.exists():
            size = output_path.stat().st_size
            print(f"  ✓ {output_name}: {output_path}")
            print(f"    Size: {size / 1024:.2f} KB")
        else:
            print(f"  ✗ {output_name}: MISSING - {output_path}")
            all_outputs_exist = False
    
    # Final status
    print(f"\nFINAL STATUS:")
    print(f"  Database Creation: {'✓ SUCCESS' if db_path.exists() else '✗ FAILED'}")
    print(f"  All Outputs Generated: {'✓ YES' if all_outputs_exist else '✗ NO'}")
    
    # Database summary
    if ingestion_stats:
        print(f"\nDATABASE SUMMARY:")
        print(f"  Claims ingested: {ingestion_stats['inserted_records']:,}")
        print(f"  Insurance providers: {ingestion_stats['unique_providers']}")
        print(f"  Total billing: ${ingestion_stats['total_billing']:,.2f}")
        print(f"  Database indexes: ✓ Created for performance")
    
    # Next steps
    print(f"\nNEXT STEPS:")
    print(f"  1. Review ingestion report: {db_dir / 'db_ingest_report.txt'}")
    print(f"  2. Connect to database for analysis: {db_path}")
    print(f"  3. Run sample queries to explore the data")
    print(f"  4. Consider adding policy_table and results_table for full claims processing")
    print(f"  5. Implement backup procedures for production use")
    
    return all_outputs_exist

# Finalize ingestion process
ingestion_complete = finalize_ingestion()

print(f"\nCLAIMS DATABASE INGESTION COMPLETE")
print(f"Success: {'✓' if ingestion_complete else '✗'}")

FINALIZING CLAIMS DATABASE INGESTION
✓ Database connection closed

OUTPUT FILES VERIFICATION:
  ✓ Database File: /Users/kxshrx/asylum/healix/db/claims_db.sqlite
    Size: 105376.00 KB
  ✓ Ingestion Report: /Users/kxshrx/asylum/healix/db/db_ingest_report.txt
    Size: 4.57 KB

FINAL STATUS:
  Database Creation: ✓ SUCCESS
  All Outputs Generated: ✓ YES

DATABASE SUMMARY:
  Claims ingested: 222,000
  Insurance providers: 5
  Total billing: $1,417,432,043.40
  Database indexes: ✓ Created for performance

NEXT STEPS:
  1. Review ingestion report: /Users/kxshrx/asylum/healix/db/db_ingest_report.txt
  2. Connect to database for analysis: /Users/kxshrx/asylum/healix/db/claims_db.sqlite
  3. Run sample queries to explore the data
  4. Consider adding policy_table and results_table for full claims processing
  5. Implement backup procedures for production use

CLAIMS DATABASE INGESTION COMPLETE
Success: ✓


## Final Output Summary

### Generated Files:
- **`db/claims_db.sqlite`** - SQLite database with healthcare claims
- **`db/db_ingest_report.txt`** - Comprehensive ingestion report with statistics and usage examples

### Database Structure:
- **`claims_table`** - Complete healthcare claims with anonymous patient hashing
  - 55,500 claims records with patient demographics and billing information
  - Optimized indexes for efficient querying by provider, patient, condition, and billing amount
  - Anonymous patient identification using SHA-256 hashing

### Usage Notes:

```python
import sqlite3
import pandas as pd

# Connect to the claims database
conn = sqlite3.connect('db/claims_db.sqlite')

# Sample analytics queries
# Total claims by insurance provider
provider_analysis = pd.read_sql_query("""
    SELECT insurance_provider, COUNT(*) as claims, AVG(billing_amount) as avg_billing
    FROM claims_table 
    GROUP BY insurance_provider 
    ORDER BY claims DESC
""", conn)

# Most expensive medical conditions
condition_costs = pd.read_sql_query("""
    SELECT medical_condition, COUNT(*) as cases, AVG(billing_amount) as avg_cost
    FROM claims_table 
    GROUP BY medical_condition 
    ORDER BY avg_cost DESC
""", conn)

conn.close()
```

**The healthcare claims database is now ready for analytics, reporting, and claims processing workflows!**

For detailed usage examples and database schema information, refer to the generated ingestion report at `db/db_ingest_report.txt`.