In [62]:
# Cell 0: Schema Verification
import pandas as pd
import numpy as np
import json

def verify_schema_compatibility():
    """Verify that CSV files match PostgreSQL schema requirements"""
    schema = {
        'census_records': {
            'required_columns': ['record_id', 'census_year', 'source_pk'],
            'integer_columns': ['record_id', 'census_year', 'source_pk'],
            'varchar_columns': ['ed', 'page_number']
        },
        'locations': {
            'required_columns': ['location_id', 'record_id'],
            'integer_columns': ['location_id', 'record_id'],
            'varchar_columns': ['street_name', 'house_num', 'build_num', 'dwelling_number', 'family_number']
        },
        'persons': {
            'required_columns': ['person_id', 'first_name', 'last_name'],
            'varchar_columns': ['person_id', 'first_name', 'last_name']
        },
        'personal_attributes': {
            'required_columns': ['attribute_id', 'person_id', 'record_id'],
            'integer_columns': ['attribute_id', 'record_id'],  # Removed 'age' from integer columns
            'varchar_columns': ['person_id', 'sex', 'race', 'place_birth', 'age']  # Added 'age' to varchar columns
        },
        'occupations': {
            'required_columns': ['occupation_id', 'person_id', 'record_id'],
            'integer_columns': ['occupation_id', 'record_id'],
            'varchar_columns': ['person_id', 'work', 'business']
        },
        'families': {
            'required_columns': ['family_id', 'record_id', 'location_id'],
            'integer_columns': ['record_id', 'location_id'],
            'varchar_columns': ['family_id', 'head_first_name', 'head_last_name']
        },
        'relationships': {
            'required_columns': ['relationship_id', 'person_id', 'family_id', 'record_id'],
            'integer_columns': ['relationship_id', 'record_id'],
            'varchar_columns': ['person_id', 'family_id', 'relation_to_head']
        },
        'property_status': {
            'required_columns': ['property_id', 'person_id', 'record_id'],
            'integer_columns': ['property_id', 'record_id'],
            'varchar_columns': ['person_id', 'owned_rented']
        },
        'marital_status': {
            'required_columns': ['marital_id', 'person_id', 'record_id'],
            'integer_columns': ['marital_id', 'record_id'],
            'varchar_columns': ['person_id', 'marital_status']
        }
    }
    
    issues = []
    tables = {}
    
    # Load and verify each table
    for table_name, requirements in schema.items():
        try:
            # Load table with string type for specific columns
            file_path = f'data/processed/{table_name}.csv'
            
            # Define dtype dictionary for pandas read_csv
            dtypes = {}
            for col in requirements.get('varchar_columns', []):
                dtypes[col] = str
            
            # Read CSV with specified dtypes
            df = pd.read_csv(file_path, dtype=dtypes)
            
            # Convert integer columns
            for col in requirements.get('integer_columns', []):
                if col in df.columns:
                    try:
                        df[col] = pd.to_numeric(df[col], errors='raise').astype('Int64')
                    except:
                        issues.append(f"{table_name}: Unable to convert {col} to integer")
            
            tables[table_name] = df
            
            # Check required columns
            missing_cols = set(requirements['required_columns']) - set(df.columns)
            if missing_cols:
                issues.append(f"{table_name}: Missing required columns: {missing_cols}")
            
            print(f"Verified {table_name}: {len(df)} rows")
            
        except Exception as e:
            issues.append(f"Error loading {table_name}: {str(e)}")
    
    if issues:
        print("\nSchema verification issues found:")
        for issue in issues:
            print(f"- {issue}")
        return False, tables
    
    print("\nAll tables verified successfully!")
    return True, tables

# Run verification and store results in global variables
schema_valid, tables = verify_schema_compatibility()

if not schema_valid:
    raise ValueError("Please fix schema issues before proceeding with upload")

print("\nSchema verification complete. Ready to proceed with upload.")

Verified census_records: 797 rows
Verified locations: 797 rows
Verified persons: 797 rows
Verified personal_attributes: 797 rows
Verified occupations: 797 rows
Verified families: 468 rows
Verified relationships: 797 rows
Verified property_status: 483 rows
Verified marital_status: 797 rows

All tables verified successfully!

Schema verification complete. Ready to proceed with upload.


In [63]:
# Cell 2: Define upload function
def upload_to_supabase(table_name, df, batch_size=100):
    """Upload dataframe to Supabase table in batches with error handling"""
    print(f"Uploading {table_name}...")
    total_rows = len(df)
    successful_uploads = 0
    
    # Clean NaN values before converting to records
    df = df.replace({np.nan: None})
    
    # Convert DataFrame to list of dictionaries
    records = df.to_dict('records')
    
    # Process in batches
    for i in range(0, total_rows, batch_size):
        batch = records[i:i + batch_size]
        try:
            data, count = supabase.table(table_name).insert(batch).execute()
            successful_uploads += len(batch)
            
            # Print progress
            progress = (i + len(batch)) / total_rows * 100
            print(f"Progress: {progress:.2f}% ({successful_uploads}/{total_rows} rows)")
            time.sleep(0.1)  # Small delay to avoid rate limits
            
        except Exception as e:
            error_msg = f"Error uploading batch starting at row {i}: {str(e)}"
            print(error_msg)
            
            # Save failed batch to logs directory with timestamp
            timestamp = time.strftime('%Y%m%d_%H%M%S')
            error_log_path = f'logs/failed_uploads/failed_{table_name}_batch_{i}_{timestamp}.json'
            
            error_data = {
                'table': table_name,
                'batch_start': i,
                'error': str(e),
                'timestamp': time.strftime('%Y-%m-%d %H:%M:%S'),
                'data': batch
            }
            
            # Ensure the directory exists
            os.makedirs('logs/failed_uploads', exist_ok=True)
            
            # Write the error log
            with open(error_log_path, 'w') as f:
                json.dump(error_data, f, indent=2, default=str)
            
            # If this is a foreign key violation, stop the upload
            if 'violates foreign key constraint' in str(e):
                raise ValueError(f"Foreign key violation in {table_name}. Upload aborted.")
    
    return successful_uploads

In [64]:
# Cell 3: Data cleaning function
def clean_table_data(df, table_name):
    """Clean table data before upload"""
    df = df.copy()
    
    # Replace NaN with None
    df = df.replace({np.nan: None})
    
    # Table-specific cleaning
    if table_name == 'locations':
        # Rename house_number to house_num if it exists
        if 'house_number' in df.columns:
            df = df.rename(columns={'house_number': 'house_num'})
    
    elif table_name == 'personal_attributes':
        # Rename birth_place to place_birth if it exists
        if 'birth_place' in df.columns:
            df = df.rename(columns={'birth_place': 'place_birth'})
    
    elif table_name == 'families':
        # Remove census_year if it exists (not in schema)
        if 'census_year' in df.columns:
            df = df.drop('census_year', axis=1)
    
    return df

In [65]:
# Cell 3.5: Initialize Supabase Connection
from supabase import create_client
import os
from dotenv import load_dotenv
import time

# Load environment variables from .env.local
load_dotenv('.env.local')

# Get Supabase credentials from environment variables
SUPABASE_URL = os.getenv('SUPABASE_URL')
SUPABASE_KEY = os.getenv('SUPABASE_KEY')

if not SUPABASE_URL or not SUPABASE_KEY:
    raise ValueError("Supabase credentials not found in .env.local file")

# Initialize Supabase client
supabase = create_client(SUPABASE_URL, SUPABASE_KEY)

print("Supabase connection initialized successfully")

Supabase connection initialized successfully


In [66]:
# Cell 4: Upload tables in correct order
upload_order = [
    'census_records',  # Base table - no dependencies
    'persons',        # Base table - no dependencies
    'locations',      # Depends on census_records
    'families',       # Depends on census_records and locations
    'personal_attributes',  # Depends on persons and census_records
    'occupations',    # Depends on persons and census_records
    'property_status',  # Depends on persons and census_records
    'marital_status', # Depends on persons and census_records
    'relationships'   # Depends on persons, families, and census_records (load last)
]

# Track upload statistics
upload_stats = {}

# Upload tables in order
for table_name in upload_order:
    if table_name in tables:
        df = tables[table_name]
        if 'created_at' in df.columns:
            df = df.drop('created_at', axis=1)
        
        print(f"\nProcessing {table_name}...")
        try:
            # Clean the data before upload
            df = clean_table_data(df, table_name)
            successful_rows = upload_to_supabase(table_name, df)
            upload_stats[table_name] = {
                'total_rows': len(df),
                'uploaded_rows': successful_rows
            }
        except Exception as e:
            print(f"Failed to upload {table_name}: {str(e)}")
            print("Stopping upload process due to error.")
            break

# Add verification summary
print("\nUpload Summary:")
for table_name, stats in upload_stats.items():
    print(f"{table_name}: {stats['uploaded_rows']}/{stats['total_rows']} rows uploaded")
    if stats['uploaded_rows'] != stats['total_rows']:
        print(f"WARNING: Missing rows in {table_name}")


Processing census_records...
Uploading census_records...
Progress: 12.55% (100/797 rows)
Progress: 25.09% (200/797 rows)
Progress: 37.64% (300/797 rows)
Progress: 50.19% (400/797 rows)
Progress: 62.74% (500/797 rows)
Progress: 75.28% (600/797 rows)
Progress: 87.83% (700/797 rows)
Progress: 100.00% (797/797 rows)

Processing persons...
Uploading persons...
Progress: 12.55% (100/797 rows)
Progress: 25.09% (200/797 rows)
Progress: 37.64% (300/797 rows)
Progress: 50.19% (400/797 rows)
Progress: 62.74% (500/797 rows)
Progress: 75.28% (600/797 rows)
Progress: 87.83% (700/797 rows)
Progress: 100.00% (797/797 rows)

Processing locations...
Uploading locations...
Progress: 12.55% (100/797 rows)
Progress: 25.09% (200/797 rows)
Progress: 37.64% (300/797 rows)
Progress: 50.19% (400/797 rows)
Progress: 62.74% (500/797 rows)
Progress: 75.28% (600/797 rows)
Progress: 87.83% (700/797 rows)
Progress: 100.00% (797/797 rows)

Processing families...
Uploading families...
Progress: 21.37% (100/468 rows)
P