# Medallion Architecture Migration

Test and execute schema migration to medallion architecture (bronze/silver/gold).

**Purpose:** Migrate existing DuckDB tables to medallion schema structure.

**Warning:** This notebook modifies the database. Use a copy for testing!

## 1. Setup

In [None]:
import duckdb
import shutil
from pathlib import Path
from datetime import datetime

# Source database
SOURCE_DB = Path('../data/unified.duckdb')

# Create test copy for migration
TEST_DB = Path(f'../data/unified_migration_test_{datetime.now().strftime("%Y%m%d_%H%M%S")}.duckdb')

print(f"Source: {SOURCE_DB}")
print(f"Test copy: {TEST_DB}")
print("\n⚠️  This notebook will work on a TEST COPY of the database.")

In [None]:
# Create test copy
if not TEST_DB.exists():
    shutil.copy(SOURCE_DB, TEST_DB)
    print(f"Created test copy: {TEST_DB}")
else:
    print(f"Using existing test copy: {TEST_DB}")

# Connect to test database (read-write)
con = duckdb.connect(str(TEST_DB))
print(f"\nConnected to: {TEST_DB}")

In [None]:
# Check current state
def show_schemas():
    schemas = con.execute("""
        SELECT schema_name 
        FROM information_schema.schemata 
        ORDER BY schema_name
    """).fetchall()
    print("Current Schemas:")
    for (s,) in schemas:
        print(f"  - {s}")

def show_tables():
    tables = con.execute("""
        SELECT table_schema, table_name 
        FROM information_schema.tables 
        WHERE table_schema != 'information_schema'
        ORDER BY table_schema, table_name
    """).fetchall()
    print("\nTables by Schema:")
    current_schema = None
    for schema, table in tables:
        if schema != current_schema:
            print(f"\n{schema}:")
            current_schema = schema
        count = con.execute(f'SELECT COUNT(*) FROM "{schema}"."{table}"').fetchone()[0]
        print(f"  {table:30} {count:>10,} rows")

show_schemas()
show_tables()

## 2. Create Medallion Schemas

In [None]:
# Create bronze, silver, gold schemas
print("Creating medallion schemas...")

con.execute("CREATE SCHEMA IF NOT EXISTS bronze")
print("  ✓ Created schema: bronze")

con.execute("CREATE SCHEMA IF NOT EXISTS silver")
print("  ✓ Created schema: silver")

con.execute("CREATE SCHEMA IF NOT EXISTS gold")
print("  ✓ Created schema: gold")

show_schemas()

## 3. Migrate Tables to Bronze

In [None]:
# Tables to migrate to bronze (raw layer)
BRONZE_TABLES = [
    'signals',
    'scan_sessions',
    'survey_segments',
    'rf_captures',
    'network_scans',
    'survey_signals',
]

print("Migrating tables to bronze schema...")
for table in BRONZE_TABLES:
    try:
        # Check if table exists in main
        exists = con.execute(f"""
            SELECT COUNT(*) FROM information_schema.tables 
            WHERE table_schema = 'main' AND table_name = '{table}'
        """).fetchone()[0]
        
        if exists:
            # Copy to bronze schema (DuckDB doesn't support ALTER TABLE SET SCHEMA)
            con.execute(f'CREATE TABLE bronze."{table}" AS SELECT * FROM main."{table}"')
            print(f"  ✓ Migrated: {table} → bronze.{table}")
        else:
            print(f"  - Skipped: {table} (not found in main)")
    except Exception as e:
        print(f"  ✗ Error migrating {table}: {e}")

In [None]:
# Verify bronze migration
print("\nBronze Layer Tables:")
bronze_tables = con.execute("""
    SELECT table_name FROM information_schema.tables 
    WHERE table_schema = 'bronze'
    ORDER BY table_name
""").fetchall()

for (table,) in bronze_tables:
    count = con.execute(f'SELECT COUNT(*) FROM bronze."{table}"').fetchone()[0]
    print(f"  bronze.{table:25} {count:>10,} rows")

## 4. Create Silver Layer Views/Tables

In [None]:
# Protocol classification function (as SQL CASE)
PROTOCOL_CASE = """
CASE freq_band
    WHEN 'fm_broadcast' THEN 'FM_BROADCAST'
    WHEN 'aircraft' THEN 'AM_VOICE'
    WHEN 'adsb' THEN 'ADS_B'
    WHEN 'ism_433' THEN 'OOK'
    WHEN 'ism_315' THEN 'OOK'
    WHEN 'ism_868' THEN 'FSK'
    WHEN 'ism_900' THEN 'FSK'
    WHEN 'frs_gmrs' THEN 'FM_VOICE'
    WHEN 'marine_vhf' THEN 'FM_VOICE'
    WHEN 'noaa_weather' THEN 'FM_VOICE'
    WHEN 'uhf_amateur' THEN 'MIXED'
    WHEN 'vhf_amateur' THEN 'MIXED'
    ELSE 'UNKNOWN'
END
"""

print("Creating silver layer tables...")

In [None]:
# Silver: verified_signals (signals with quality gates)
# Note: Using power_db >= 0 since all detection_count = 1 currently

con.execute(f"""
    CREATE TABLE silver.verified_signals AS
    SELECT 
        signal_id,
        frequency_hz,
        power_db,
        bandwidth_hz,
        freq_band,
        detection_count,
        state,
        first_seen,
        last_seen,
        survey_id,
        segment_id,
        {PROTOCOL_CASE} AS rf_protocol,
        location_name,
        year,
        month
    FROM bronze.signals
    WHERE power_db >= 0  -- Quality gate: above noise floor
      AND freq_band IS NOT NULL
      AND freq_band NOT IN ('unknown', 'gap')
""")

count = con.execute("SELECT COUNT(*) FROM silver.verified_signals").fetchone()[0]
print(f"  ✓ Created: silver.verified_signals ({count:,} rows)")

In [None]:
# Silver: band_inventory (aggregated by band)
con.execute("""
    CREATE TABLE silver.band_inventory AS
    SELECT 
        freq_band,
        COUNT(*) as signal_count,
        MIN(frequency_hz) as min_freq_hz,
        MAX(frequency_hz) as max_freq_hz,
        AVG(power_db) as avg_power_db,
        MAX(power_db) as max_power_db,
        MIN(first_seen) as earliest_detection,
        MAX(last_seen) as latest_detection,
        SUM(detection_count) as total_detections
    FROM bronze.signals
    WHERE freq_band IS NOT NULL
    GROUP BY freq_band
    ORDER BY signal_count DESC
""")

count = con.execute("SELECT COUNT(*) FROM silver.band_inventory").fetchone()[0]
print(f"  ✓ Created: silver.band_inventory ({count:,} rows)")

In [None]:
# Silver: spectrum_surveys (survey metadata)
try:
    con.execute("""
        CREATE TABLE silver.spectrum_surveys AS
        SELECT * FROM main.spectrum_surveys
    """)
    count = con.execute("SELECT COUNT(*) FROM silver.spectrum_surveys").fetchone()[0]
    print(f"  ✓ Created: silver.spectrum_surveys ({count:,} rows)")
except Exception as e:
    print(f"  - Skipped: silver.spectrum_surveys ({e})")

In [None]:
# Verify silver layer
print("\nSilver Layer Tables:")
silver_tables = con.execute("""
    SELECT table_name FROM information_schema.tables 
    WHERE table_schema = 'silver'
    ORDER BY table_name
""").fetchall()

for (table,) in silver_tables:
    count = con.execute(f'SELECT COUNT(*) FROM silver."{table}"').fetchone()[0]
    print(f"  silver.{table:25} {count:>10,} rows")

## 5. Create Gold Layer Tables

In [None]:
# Gold: assets (CMDB inventory from high-quality signals)
con.execute(f"""
    CREATE TABLE gold.assets AS
    SELECT 
        gen_random_uuid()::VARCHAR AS id,
        CONCAT(freq_band, '_', CAST(ROUND(frequency_hz/1e6, 1) AS VARCHAR), 'MHz') AS name,
        'RF_EMITTER' AS asset_type,
        first_seen,
        last_seen,
        1.0 AS correlation_confidence,
        
        -- RF fields
        frequency_hz AS rf_frequency_hz,
        power_db AS rf_signal_strength_db,
        bandwidth_hz AS rf_bandwidth_hz,
        rf_protocol,
        
        -- CMDB CI Class
        CASE freq_band
            WHEN 'fm_broadcast' THEN 'RF_BROADCAST_TRANSMITTER'
            WHEN 'adsb' THEN 'RF_ADSB_TRANSPONDER'
            WHEN 'aircraft' THEN 'RF_AVIATION_TRANSPONDER'
            WHEN 'ism_433' THEN 'RF_IOT_DEVICE'
            WHEN 'ism_315' THEN 'RF_IOT_DEVICE'
            WHEN 'ism_900' THEN 'RF_IOT_DEVICE'
            WHEN 'frs_gmrs' THEN 'RF_TWO_WAY_RADIO'
            WHEN 'marine_vhf' THEN 'RF_MARINE_RADIO'
            WHEN 'noaa_weather' THEN 'RF_WEATHER_STATION'
            ELSE 'RF_EMITTER'
        END AS cmdb_ci_class,
        
        -- Purdue Level
        CASE freq_band
            WHEN 'ism_433' THEN 0
            WHEN 'ism_315' THEN 0
            WHEN 'ism_868' THEN 0
            WHEN 'ism_900' THEN 1
            WHEN 'frs_gmrs' THEN 4
            ELSE 5
        END AS purdue_level,
        
        -- Security assessment
        CASE 
            WHEN freq_band IN ('ism_433', 'ism_315', 'ism_868') THEN 'REQUIRES_REVIEW'
            ELSE 'COMPLIANT'
        END AS security_posture,
        
        CASE 
            WHEN freq_band IN ('ism_433', 'ism_315', 'ism_868') THEN 'MEDIUM'
            ELSE 'LOW'
        END AS risk_level,
        
        -- Lineage
        signal_id AS source_signal_id,
        location_name
        
    FROM silver.verified_signals
    WHERE power_db >= 10  -- High-quality only
""")

count = con.execute("SELECT COUNT(*) FROM gold.assets").fetchone()[0]
print(f"  ✓ Created: gold.assets ({count:,} rows)")

In [None]:
# Gold: rf_emitters (view of RF-only assets)
con.execute("""
    CREATE VIEW gold.rf_emitters AS
    SELECT *
    FROM gold.assets
    WHERE rf_frequency_hz IS NOT NULL
""")

count = con.execute("SELECT COUNT(*) FROM gold.rf_emitters").fetchone()[0]
print(f"  ✓ Created: gold.rf_emitters (view, {count:,} rows)")

In [None]:
# Verify gold layer
print("\nGold Layer Tables:")
gold_tables = con.execute("""
    SELECT table_name, table_type FROM information_schema.tables 
    WHERE table_schema = 'gold'
    ORDER BY table_name
""").fetchall()

for table, ttype in gold_tables:
    count = con.execute(f'SELECT COUNT(*) FROM gold."{table}"').fetchone()[0]
    print(f"  gold.{table:25} {count:>10,} rows ({ttype})")

## 6. Verify Migration

In [None]:
# Cross-schema query test
print("Cross-Schema Query Test:")
print("=" * 60)

result = con.execute("""
    SELECT 
        'bronze' as layer, COUNT(*) as count FROM bronze.signals
    UNION ALL
    SELECT 
        'silver' as layer, COUNT(*) as count FROM silver.verified_signals
    UNION ALL
    SELECT 
        'gold' as layer, COUNT(*) as count FROM gold.assets
""").df()

print(result.to_string(index=False))
print(f"\nData Reduction: bronze → silver → gold")
bronze_count = result[result['layer'] == 'bronze']['count'].values[0]
silver_count = result[result['layer'] == 'silver']['count'].values[0]
gold_count = result[result['layer'] == 'gold']['count'].values[0]
print(f"  Bronze: {bronze_count:,} (100%)")
print(f"  Silver: {silver_count:,} ({silver_count/bronze_count*100:.1f}%)")
print(f"  Gold:   {gold_count:,} ({gold_count/bronze_count*100:.1f}%)")

In [None]:
# Cross-layer lineage query
print("\nCross-Layer Lineage Sample:")
print("=" * 80)

lineage = con.execute("""
    SELECT 
        g.name AS asset_name,
        g.cmdb_ci_class,
        g.risk_level,
        s.rf_protocol,
        s.power_db,
        b.detection_count,
        b.survey_id
    FROM gold.assets g
    JOIN silver.verified_signals s ON g.source_signal_id = s.signal_id
    JOIN bronze.signals b ON s.signal_id = b.signal_id
    ORDER BY g.rf_signal_strength_db DESC
    LIMIT 10
""").df()

print(lineage.to_string(index=False))

## 7. Rollback Plan

If migration needs to be undone:

In [None]:
# Rollback SQL (DO NOT RUN unless needed)
rollback_sql = """
-- ROLLBACK: Remove medallion architecture
-- WARNING: This will delete all migrated data!

-- Drop gold layer
DROP VIEW IF EXISTS gold.rf_emitters;
DROP TABLE IF EXISTS gold.assets;
DROP SCHEMA IF EXISTS gold;

-- Drop silver layer
DROP TABLE IF EXISTS silver.spectrum_surveys;
DROP TABLE IF EXISTS silver.band_inventory;
DROP TABLE IF EXISTS silver.verified_signals;
DROP SCHEMA IF EXISTS silver;

-- Drop bronze layer
DROP TABLE IF EXISTS bronze.survey_signals;
DROP TABLE IF EXISTS bronze.network_scans;
DROP TABLE IF EXISTS bronze.rf_captures;
DROP TABLE IF EXISTS bronze.survey_segments;
DROP TABLE IF EXISTS bronze.scan_sessions;
DROP TABLE IF EXISTS bronze.signals;
DROP SCHEMA IF EXISTS bronze;

-- Original tables in 'main' schema remain intact
"""

print("Rollback SQL (not executed):")
print(rollback_sql)

In [None]:
# To execute rollback, uncomment and run:
# for stmt in rollback_sql.strip().split(';'):
#     if stmt.strip() and not stmt.strip().startswith('--'):
#         con.execute(stmt)
#         print(f"Executed: {stmt.strip()[:50]}...")

## Summary

### Migration Complete

| Layer | Schema | Tables | Rows |
|-------|--------|--------|------|
| Bronze | `bronze` | signals, scan_sessions, survey_segments, ... | 14,202+ |
| Silver | `silver` | verified_signals, band_inventory, spectrum_surveys | ~1,100 |
| Gold | `gold` | assets, rf_emitters (view) | ~97 |

### Validation
- ✓ Cross-schema queries work
- ✓ Lineage from gold → silver → bronze verified
- ✓ Protocol classification applied in silver
- ✓ CMDB enrichment applied in gold

### Next Steps
1. If satisfied, apply migration to production database
2. Update application code to use schema-qualified table names
3. Delete test database file

In [None]:
# Final schema overview
show_tables()

In [None]:
# Cleanup
con.close()
print(f"\nConnection closed.")
print(f"Test database: {TEST_DB}")
print("\nTo apply to production, copy the migration SQL to unified_db.py")