# Snowflake to Postgres Manual Sync

This notebook provides manual control over syncing data from Snowflake to Snowflake Managed Postgres.

## Architecture
- **Snowflake**: Source of truth for all grid data
- **Postgres**: PostGIS cache for fast spatial queries (<20ms vs seconds)
- **Sync Strategy**: Full refresh (TRUNCATE + INSERT) - idempotent and recoverable

## Layer Mapping

| Layer | Snowflake Source | Postgres Target | Rows |
|-------|------------------|-----------------|------|
| Meters | METER_INFRASTRUCTURE | meters_spatial | 596,906 |
| Substations | SUBSTATIONS | substations_spatial | 275 |
| Transformers | TRANSFORMER_METADATA | transformers_spatial | 91,554 |
| Power Lines | GRID_POWER_LINES | power_lines_lod | 13,104 |
| Poles | OSM_POLES_TRULY_LAND_ONLY | poles_spatial | 62,735 |
| Buildings | HOUSTON_BUILDINGS_CLEAN | osm_buildings | 2,670,794 |
| Water Bodies | HOUSTON_WATER_BODIES | osm_water | 10,000 |
| Vegetation | VEGETATION_POWER_LINE_RISK | vegetation_risk_cache | 3,611 |

## 1. Configuration

In [None]:
# Configuration - UPDATE THESE VALUES
CONFIG = {
    # Snowflake
    'snowflake_connection': 'cpe_demo_CLI',  # Your Snowflake connection name
    'database': 'FLUX_DB',
    'schema': 'PRODUCTION',
    'warehouse': 'FLUX_WH',
    
    # Postgres (Snowflake Managed)
    'pg_host': 'YOUR_POSTGRES_HOST.snowflakecomputing.app',  # From SHOW POSTGRES INSTANCES
    'pg_port': 5432,
    'pg_database': 'postgres',
    'pg_user': 'application',
    'pg_password': 'YOUR_PASSWORD',  # From CREATE USER command
    
    # Sync settings
    'batch_size': 10000,  # For large tables
    'verbose': True
}

## 2. Initialize Connections

In [None]:
import snowflake.connector
import psycopg2
from datetime import datetime
import pandas as pd

# Snowflake connection
def get_snowflake_conn():
    return snowflake.connector.connect(
        connection_name=CONFIG['snowflake_connection'],
        database=CONFIG['database'],
        schema=CONFIG['schema'],
        warehouse=CONFIG['warehouse']
    )

# Postgres connection
def get_postgres_conn():
    conn = psycopg2.connect(
        host=CONFIG['pg_host'],
        port=CONFIG['pg_port'],
        database=CONFIG['pg_database'],
        user=CONFIG['pg_user'],
        password=CONFIG['pg_password']
    )
    conn.autocommit = True
    return conn

# Test connections
print("Testing Snowflake connection...")
sf_conn = get_snowflake_conn()
print(f"  Connected to Snowflake: {CONFIG['database']}.{CONFIG['schema']}")
sf_conn.close()

print("\nTesting Postgres connection...")
pg_conn = get_postgres_conn()
pg_cursor = pg_conn.cursor()
pg_cursor.execute("SELECT PostGIS_Version();")
print(f"  Connected to Postgres with PostGIS {pg_cursor.fetchone()[0]}")
pg_conn.close()

print("\n✓ All connections successful!")

## 3. Sync Helper Functions

In [None]:
class SyncResult:
    def __init__(self, layer):
        self.layer = layer
        self.started = datetime.now()
        self.rows = 0
        self.status = 'running'
        self.error = None
    
    def complete(self, rows):
        self.rows = rows
        self.status = 'success'
        self.duration = (datetime.now() - self.started).total_seconds()
        return self
    
    def fail(self, error):
        self.error = str(error)
        self.status = 'failed'
        self.duration = (datetime.now() - self.started).total_seconds()
        return self
    
    def __repr__(self):
        if self.status == 'success':
            return f"✓ {self.layer}: {self.rows:,} rows in {self.duration:.1f}s"
        return f"✗ {self.layer}: FAILED - {self.error}"


def sync_layer(layer_name, sf_query, pg_table, pg_insert_sql, row_mapper):
    """Generic sync function for any layer."""
    result = SyncResult(layer_name)
    
    try:
        sf_conn = get_snowflake_conn()
        pg_conn = get_postgres_conn()
        pg_cursor = pg_conn.cursor()
        
        if CONFIG['verbose']:
            print(f"Syncing {layer_name}...")
        
        # Truncate target
        pg_cursor.execute(f'TRUNCATE TABLE {pg_table};')
        
        # Fetch from Snowflake
        sf_cursor = sf_conn.cursor()
        sf_cursor.execute(sf_query)
        
        # Insert in batches
        batch = []
        count = 0
        
        for row in sf_cursor:
            mapped = row_mapper(row)
            if mapped:
                batch.append(mapped)
            
            if len(batch) >= CONFIG['batch_size']:
                pg_cursor.executemany(pg_insert_sql, batch)
                count += len(batch)
                if CONFIG['verbose']:
                    print(f"  ... {count:,} rows")
                batch = []
        
        if batch:
            pg_cursor.executemany(pg_insert_sql, batch)
            count += len(batch)
        
        sf_conn.close()
        pg_conn.close()
        
        return result.complete(count)
        
    except Exception as e:
        return result.fail(e)

## 4. Sync Functions for Each Layer

In [None]:
def sync_meters():
    """Sync 596K smart meters."""
    return sync_layer(
        layer_name='meters',
        sf_query="""
            SELECT METER_ID, METER_LATITUDE, METER_LONGITUDE, TRANSFORMER_ID, CIRCUIT_ID, 
                   SUBSTATION_ID, POLE_ID, METER_TYPE, CUSTOMER_SEGMENT_ID, CITY, 
                   COUNTY_NAME, ZIP_CODE, COMMISSIONED_DATE, HEALTH_SCORE
            FROM METER_INFRASTRUCTURE
            WHERE METER_LATITUDE IS NOT NULL AND METER_LONGITUDE IS NOT NULL
        """,
        pg_table='meters_spatial',
        pg_insert_sql="""
            INSERT INTO meters_spatial 
            (meter_id, latitude, longitude, transformer_id, circuit_id, substation_id, 
             pole_id, meter_type, customer_segment, city, county, zip_code, 
             commissioned_date, health_score, geom)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
                    ST_SetSRID(ST_MakePoint(%s, %s), 4326))
        """,
        row_mapper=lambda r: (
            r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[10], r[11], r[12], r[13],
            r[2], r[1]  # lon, lat for geom
        )
    )

def sync_substations():
    """Sync 275 substations."""
    return sync_layer(
        layer_name='substations',
        sf_query="""
            SELECT SUBSTATION_ID, SUBSTATION_NAME, LATITUDE, LONGITUDE, CAPACITY_MVA,
                   CURRENT_LOAD_MW, PEAK_LOAD_MW, VOLTAGE_LEVEL, SUBSTATION_TYPE,
                   OPERATIONAL_STATUS, REGION, COMMISSIONED_DATE, LAST_INSPECTION_DATE,
                   CRITICAL_INFRASTRUCTURE_FLAG
            FROM SUBSTATIONS
            WHERE LATITUDE IS NOT NULL AND LONGITUDE IS NOT NULL
        """,
        pg_table='substations_spatial',
        pg_insert_sql="""
            INSERT INTO substations_spatial 
            (substation_id, name, latitude, longitude, capacity_mva, current_load_mw,
             peak_load_mw, voltage_level, substation_type, operational_status, region,
             commissioned_date, last_inspection_date, critical_infrastructure, geom)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
                    ST_SetSRID(ST_MakePoint(%s, %s), 4326))
        """,
        row_mapper=lambda r: (
            r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[10], r[11], r[12], r[13],
            r[3], r[2]  # lon, lat for geom
        )
    )

def sync_transformers():
    """Sync 91K transformers."""
    return sync_layer(
        layer_name='transformers',
        sf_query="""
            SELECT TRANSFORMER_ID, SUBSTATION_ID, CIRCUIT_ID, LATITUDE, LONGITUDE,
                   RATED_KVA, CURRENT_LOAD_KVA, LOAD_UTILIZATION_PCT, AGE_YEARS, HEALTH_SCORE,
                   MANUFACTURER, MODEL_NUMBER, INSTALL_YEAR, LAST_MAINTENANCE_DATE,
                   TRANSFORMER_ROLE, PHASE_CODE, PRIMARY_VOLTAGE_KV
            FROM TRANSFORMER_METADATA
            WHERE LATITUDE IS NOT NULL AND LONGITUDE IS NOT NULL
        """,
        pg_table='transformers_spatial',
        pg_insert_sql="""
            INSERT INTO transformers_spatial 
            (transformer_id, substation_id, circuit_id, latitude, longitude, rated_kva,
             current_load_kva, load_utilization_pct, age_years, health_score, manufacturer,
             model_number, install_year, last_maintenance, transformer_role, phase_code,
             primary_voltage_kv, geom)
            VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
                    ST_SetSRID(ST_MakePoint(%s, %s), 4326))
        """,
        row_mapper=lambda r: (
            r[0], r[1], r[2], r[3], r[4], r[5], r[6], r[7], r[8], r[9], r[10], r[11], r[12], r[13], r[14], r[15], r[16],
            r[4], r[3]  # lon, lat for geom
        )
    )

def sync_poles():
    """Sync 62K utility poles."""
    return sync_layer(
        layer_name='poles',
        sf_query="""
            SELECT OSM_POLE_ID, OSM_LAT, OSM_LON, POWER_TYPE, VOLTAGE
            FROM OSM_POLES_TRULY_LAND_ONLY
            WHERE OSM_LAT IS NOT NULL AND OSM_LON IS NOT NULL
        """,
        pg_table='poles_spatial',
        pg_insert_sql="""
            INSERT INTO poles_spatial 
            (pole_id, latitude, longitude, power_type, voltage, osm_source, geom)
            VALUES (%s, %s, %s, %s, %s, TRUE, ST_SetSRID(ST_MakePoint(%s, %s), 4326))
        """,
        row_mapper=lambda r: (
            str(r[0]), r[1], r[2], r[3], r[4],
            r[2], r[1]  # lon, lat for geom
        )
    )

def sync_vegetation():
    """Sync 3.6K vegetation risk records."""
    return sync_layer(
        layer_name='vegetation',
        sf_query="""
            SELECT TREE_ID, TREE_LAT, TREE_LON, NEAREST_POWER_LINE_ID, 
                   DISTANCE_TO_LINE_METERS, VEGETATION_RISK_LEVEL, RISK_SCORE
            FROM VEGETATION_POWER_LINE_RISK
            WHERE TREE_LAT IS NOT NULL AND TREE_LON IS NOT NULL
        """,
        pg_table='vegetation_risk_cache',
        pg_insert_sql="""
            INSERT INTO vegetation_risk_cache 
            (tree_id, latitude, longitude, nearest_line_id, nearest_line_distance_m,
             encroachment_category, risk_score, geom)
            VALUES (%s, %s, %s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326))
        """,
        row_mapper=lambda r: (
            r[0], r[1], r[2], r[3], r[4], r[5], r[6],
            r[2], r[1]  # lon, lat for geom
        )
    )

In [None]:
def sync_power_lines():
    """Sync 13K power lines with LineString geometry."""
    result = SyncResult('power_lines')
    
    try:
        sf_conn = get_snowflake_conn()
        pg_conn = get_postgres_conn()
        pg_cursor = pg_conn.cursor()
        
        print("Syncing power_lines...")
        pg_cursor.execute('TRUNCATE TABLE power_lines_lod;')
        
        sf_cursor = sf_conn.cursor()
        sf_cursor.execute("""
            SELECT LINE_ID, CIRCUIT_ID, SUBSTATION_ID, LINE_TYPE, VOLTAGE_CLASS,
                   LINE_LENGTH_M, ST_ASGEOJSON(GEOMETRY) as GEOMETRY_JSON
            FROM GRID_POWER_LINES
            WHERE GEOMETRY IS NOT NULL
        """)
        
        count = 0
        for row in sf_cursor:
            line_id, circuit_id, sub_id, line_type, voltage, length_m, geojson = row
            if geojson:
                pg_cursor.execute("""
                    INSERT INTO power_lines_lod 
                    (line_id, circuit_id, substation_id, voltage_class, length_km, geom)
                    VALUES (%s, %s, %s, %s, %s, ST_GeomFromGeoJSON(%s))
                """, (line_id, circuit_id, sub_id, voltage, length_m/1000 if length_m else None, geojson))
                count += 1
        
        sf_conn.close()
        pg_conn.close()
        return result.complete(count)
        
    except Exception as e:
        return result.fail(e)


def sync_buildings():
    """Sync 2.6M building footprints."""
    result = SyncResult('buildings')
    
    try:
        sf_conn = get_snowflake_conn()
        pg_conn = get_postgres_conn()
        pg_cursor = pg_conn.cursor()
        
        print("Syncing buildings (this may take several minutes)...")
        pg_cursor.execute('TRUNCATE TABLE osm_buildings;')
        
        sf_cursor = sf_conn.cursor()
        sf_cursor.execute("""
            SELECT BUILDING_ID, BUILDING_NAME, BUILDING_TYPE, HEIGHT_METERS, NUM_FLOORS,
                   LONGITUDE, LATITUDE
            FROM HOUSTON_BUILDINGS_CLEAN
            WHERE LATITUDE IS NOT NULL AND LONGITUDE IS NOT NULL
        """)
        
        batch = []
        count = 0
        
        for row in sf_cursor:
            batch.append((row[0], row[1], row[2], row[3], row[4], row[5], row[6]))
            
            if len(batch) >= CONFIG['batch_size']:
                pg_cursor.executemany("""
                    INSERT INTO osm_buildings 
                    (osm_id, name, building_type, height_m, levels, centroid)
                    VALUES (%s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326))
                """, batch)
                count += len(batch)
                print(f"  ... {count:,} rows")
                batch = []
        
        if batch:
            pg_cursor.executemany("""
                INSERT INTO osm_buildings 
                (osm_id, name, building_type, height_m, levels, centroid)
                VALUES (%s, %s, %s, %s, %s, ST_SetSRID(ST_MakePoint(%s, %s), 4326))
            """, batch)
            count += len(batch)
        
        sf_conn.close()
        pg_conn.close()
        return result.complete(count)
        
    except Exception as e:
        return result.fail(e)


def sync_water_bodies():
    """Sync 10K water body polygons."""
    result = SyncResult('water_bodies')
    
    try:
        sf_conn = get_snowflake_conn()
        pg_conn = get_postgres_conn()
        pg_cursor = pg_conn.cursor()
        
        print("Syncing water_bodies...")
        pg_cursor.execute('TRUNCATE TABLE osm_water;')
        
        sf_cursor = sf_conn.cursor()
        sf_cursor.execute("""
            SELECT ID, NAMES:primary::VARCHAR as NAME, ST_ASGEOJSON(GEOMETRY) as GEOMETRY_JSON
            FROM HOUSTON_WATER_BODIES
            WHERE GEOMETRY IS NOT NULL
        """)
        
        count = 0
        for row in sf_cursor:
            water_id, name, geojson = row
            if geojson:
                pg_cursor.execute("""
                    INSERT INTO osm_water (osm_id, name, geom)
                    VALUES (%s, %s, ST_Multi(ST_GeomFromGeoJSON(%s)))
                """, (water_id, name, geojson))
                count += 1
        
        sf_conn.close()
        pg_conn.close()
        return result.complete(count)
        
    except Exception as e:
        return result.fail(e)

## 5. Sync All Layers

In [None]:
def sync_all_layers():
    """Sync all layers in optimal order."""
    print("="*60)
    print("SNOWFLAKE TO POSTGRES FULL SYNC")
    print("="*60)
    print(f"Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print()
    
    # Sync order: smaller tables first, large tables last
    sync_functions = [
        ('Substations', sync_substations),
        ('Vegetation', sync_vegetation),
        ('Water Bodies', sync_water_bodies),
        ('Power Lines', sync_power_lines),
        ('Poles', sync_poles),
        ('Transformers', sync_transformers),
        ('Meters', sync_meters),
        ('Buildings', sync_buildings),
    ]
    
    results = []
    total_rows = 0
    
    for name, sync_fn in sync_functions:
        print(f"\n--- {name} ---")
        result = sync_fn()
        results.append(result)
        print(result)
        if result.status == 'success':
            total_rows += result.rows
    
    print("\n" + "="*60)
    print("SYNC SUMMARY")
    print("="*60)
    
    success = sum(1 for r in results if r.status == 'success')
    failed = sum(1 for r in results if r.status == 'failed')
    
    for r in results:
        print(r)
    
    print(f"\nTotal: {success} succeeded, {failed} failed")
    print(f"Total rows synced: {total_rows:,}")
    print(f"Completed: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    return results

# Uncomment to run full sync:
# results = sync_all_layers()

## 6. Sync Individual Layers

Run individual sync operations as needed:

In [None]:
# Sync individual layers - uncomment as needed:

# print(sync_substations())
# print(sync_transformers())
# print(sync_meters())
# print(sync_poles())
# print(sync_power_lines())
# print(sync_buildings())
# print(sync_water_bodies())
# print(sync_vegetation())

## 7. Verify Sync Results

In [None]:
def verify_sync():
    """Verify row counts in Postgres match expected values."""
    pg_conn = get_postgres_conn()
    pg_cursor = pg_conn.cursor()
    
    tables = [
        ('meters_spatial', 596906),
        ('substations_spatial', 275),
        ('transformers_spatial', 91554),
        ('power_lines_lod', 13104),
        ('poles_spatial', 62735),
        ('osm_buildings', 2670794),
        ('osm_water', 10000),
        ('vegetation_risk_cache', 3611),
    ]
    
    print("Postgres Table Row Counts")
    print("="*50)
    
    for table, expected in tables:
        pg_cursor.execute(f"SELECT COUNT(*) FROM {table}")
        actual = pg_cursor.fetchone()[0]
        status = "✓" if actual > 0 else "✗ EMPTY"
        pct = (actual / expected * 100) if expected > 0 else 0
        print(f"{status} {table:30} {actual:>10,} / {expected:>10,}  ({pct:.1f}%)")
    
    pg_conn.close()

verify_sync()

## 8. Test Spatial Queries

In [None]:
def test_spatial_queries():
    """Run sample spatial queries to verify PostGIS functionality."""
    pg_conn = get_postgres_conn()
    pg_cursor = pg_conn.cursor()
    
    print("Testing Spatial Queries")
    print("="*50)
    
    # Test 1: Find transformers within 1km of a substation
    pg_cursor.execute("""
        SELECT COUNT(*) 
        FROM transformers_spatial t, substations_spatial s
        WHERE ST_DWithin(t.geom::geography, s.geom::geography, 1000)
        LIMIT 1
    """)
    print(f"✓ Proximity query (transformers within 1km of substations): {pg_cursor.fetchone()[0]:,} pairs")
    
    # Test 2: Find meters in a bounding box
    pg_cursor.execute("""
        SELECT COUNT(*) FROM meters_spatial
        WHERE geom && ST_MakeEnvelope(-95.5, 29.6, -95.3, 29.8, 4326)
    """)
    print(f"✓ Bounding box query (meters in Houston center): {pg_cursor.fetchone()[0]:,} meters")
    
    # Test 3: K-nearest neighbors
    pg_cursor.execute("""
        SELECT transformer_id, ST_Distance(geom::geography, ST_SetSRID(ST_MakePoint(-95.4, 29.7), 4326)::geography) as dist_m
        FROM transformers_spatial
        ORDER BY geom <-> ST_SetSRID(ST_MakePoint(-95.4, 29.7), 4326)
        LIMIT 5
    """)
    print(f"✓ K-nearest query (5 nearest transformers to downtown): {pg_cursor.fetchone()[0]}")
    
    pg_conn.close()
    print("\nAll spatial queries working!")

test_spatial_queries()