# Migrate Unity Catalog Tables to Lakebase Synced Tables

This notebook migrates the fashion ecommerce Unity Catalog tables to Lakebase PostgreSQL synced tables.

## What are Synced Tables?

Synced tables are read-only Postgres tables in Lakebase that automatically synchronize data from Unity Catalog tables. They enable:
- **Low-latency reads** for your FastAPI app
- **Automatic synchronization** using managed Lakeflow pipelines
- **Query-time joins** with other Postgres tables
- **Unity Catalog governance** maintained

## Sync Modes

- **SNAPSHOT**: Full refresh, runs on demand. Best if >10% of data changes.
- **TRIGGERED**: Incremental refresh on demand. Good balance of cost/performance.
- **CONTINUOUS**: Real-time incremental updates. Lowest lag but higher cost.

## Prerequisites

1. Unity Catalog tables exist in `main.fashion_demo`
2. You have `CAN USE` permissions on the Lakebase database instance
3. For TRIGGERED/CONTINUOUS modes, source tables need Change Data Feed enabled

## Configuration

In [None]:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import (
    SyncedDatabaseTable,
    SyncedTableSpec,
    NewPipelineSpec,
    SyncedTableSchedulingPolicy
)
import time
from typing import Dict, List, Optional

# Initialize Databricks SDK
w = WorkspaceClient()

print("‚úÖ Databricks SDK initialized")

In [None]:
# Configuration
SOURCE_CATALOG = "main"
SOURCE_SCHEMA = "fashion_demo"

# Lakebase configuration
# Use standard catalog approach (not database catalog)
TARGET_CATALOG = "main"  # Standard catalog
TARGET_SCHEMA = "fashion_demo_lakebase"  # Schema for synced tables
DATABASE_INSTANCE_NAME = "instance-e2ff35b5-a3fc-44f3-9d65-7cba8332db7c"  # Your Lakebase instance
LOGICAL_DATABASE_NAME = "databricks_postgres"  # The Postgres database name

# Pipeline storage (for staging tables)
PIPELINE_STORAGE_CATALOG = "main"
PIPELINE_STORAGE_SCHEMA = "fashion_demo_staging"

# Sync mode: SNAPSHOT, TRIGGERED, or CONTINUOUS
DEFAULT_SYNC_MODE = SyncedTableSchedulingPolicy.TRIGGERED

print(f"Source: {SOURCE_CATALOG}.{SOURCE_SCHEMA}")
print(f"Target: {TARGET_CATALOG}.{TARGET_SCHEMA} (Lakebase)")
print(f"Database Instance: {DATABASE_INSTANCE_NAME}")
print(f"Postgres Database: {LOGICAL_DATABASE_NAME}")
print(f"Default Sync Mode: {DEFAULT_SYNC_MODE}")

## Table Definitions

Define all tables to migrate with their primary keys and optional timeseries keys for deduplication.

In [None]:
# Table configurations
TABLES_TO_MIGRATE = [
    {
        "source_table": "productsdb",
        "synced_table_name": "products_synced",
        "primary_key_columns": ["product_id"],
        "timeseries_key": "ingested_at",  # For deduplication if duplicate product_ids exist
        "sync_mode": SyncedTableSchedulingPolicy.TRIGGERED,
        "description": "Product catalog with prices, categories, and metadata"
    },
    {
        "source_table": "usersdb",
        "synced_table_name": "users_synced",
        "primary_key_columns": ["user_id"],
        "timeseries_key": "created_date",
        "sync_mode": SyncedTableSchedulingPolicy.TRIGGERED,
        "description": "User profiles with preferences and segments"
    },
    {
        "source_table": "product_image_embeddingsdb",
        "synced_table_name": "product_embeddings_synced",
        "primary_key_columns": ["product_id"],
        "timeseries_key": "created_at",
        "sync_mode": SyncedTableSchedulingPolicy.TRIGGERED,
        "description": "CLIP embeddings for visual search"
    },
    {
        "source_table": "user_style_featuresdb",
        "synced_table_name": "user_features_synced",
        "primary_key_columns": ["user_id"],
        "timeseries_key": "created_at",
        "sync_mode": SyncedTableSchedulingPolicy.TRIGGERED,
        "description": "User style preferences and embeddings"
    },
    # Uncomment if you have transactions table
    # {
    #     "source_table": "transactionsdb",
    #     "synced_table_name": "transactions_synced",
    #     "primary_key_columns": ["transaction_id"],
    #     "timeseries_key": "transaction_timestamp",
    #     "sync_mode": SyncedTableSchedulingPolicy.CONTINUOUS,  # Real-time for transactions
    #     "description": "Transaction history"
    # },
]

print(f"\nConfigured {len(TABLES_TO_MIGRATE)} tables for migration:")
for table in TABLES_TO_MIGRATE:
    print(f"  - {table['source_table']} ‚Üí {table['synced_table_name']} (PK: {table['primary_key_columns']})")

## Helper Functions

In [None]:
def check_table_exists(catalog: str, schema: str, table: str) -> bool:
    """Check if a Unity Catalog table exists"""
    try:
        w.tables.get(f"{catalog}.{schema}.{table}")
        return True
    except Exception as e:
        return False


def check_cdf_enabled(catalog: str, schema: str, table: str) -> bool:
    """Check if Change Data Feed is enabled on a table"""
    try:
        table_info = w.tables.get(f"{catalog}.{schema}.{table}")
        # Check if delta.enableChangeDataFeed is set to true
        properties = table_info.properties or {}
        return properties.get("delta.enableChangeDataFeed", "false").lower() == "true"
    except Exception as e:
        print(f"‚ö†Ô∏è  Could not check CDF status for {table}: {e}")
        return False


def get_sync_status(synced_table_name: str) -> Dict:
    """Get the sync status of a synced table"""
    try:
        status = w.database.get_synced_database_table(name=synced_table_name)
        return {
            "status": status.data_synchronization_status.detailed_state if status.data_synchronization_status else "UNKNOWN",
            "message": status.data_synchronization_status.message if status.data_synchronization_status else "",
            "exists": True
        }
    except Exception as e:
        if "does not exist" in str(e).lower() or "not found" in str(e).lower():
            return {"status": "NOT_CREATED", "message": "", "exists": False}
        return {"status": "ERROR", "message": str(e), "exists": False}


def wait_for_sync(synced_table_name: str, timeout_seconds: int = 600, check_interval: int = 10):
    """Wait for a synced table to reach ONLINE status"""
    print(f"‚è≥ Waiting for {synced_table_name} to sync...")
    start_time = time.time()
    
    while time.time() - start_time < timeout_seconds:
        status = get_sync_status(synced_table_name)
        current_status = status["status"]
        
        if current_status == "ONLINE":
            print(f"‚úÖ {synced_table_name} is ONLINE")
            return True
        elif current_status in ["FAILED", "ERROR"]:
            print(f"‚ùå {synced_table_name} sync failed: {status['message']}")
            return False
        else:
            print(f"   Status: {current_status} - {status.get('message', '')}")
            time.sleep(check_interval)
    
    print(f"‚è±Ô∏è  Timeout waiting for {synced_table_name} to sync")
    return False


print("‚úÖ Helper functions defined")

## Pre-Migration Checks

In [None]:
print("üîç Running pre-migration checks...\n")

all_checks_passed = True
warnings = []

for table_config in TABLES_TO_MIGRATE:
    source_table = table_config["source_table"]
    full_source_name = f"{SOURCE_CATALOG}.{SOURCE_SCHEMA}.{source_table}"
    sync_mode = table_config.get("sync_mode", DEFAULT_SYNC_MODE)
    
    print(f"Checking {source_table}...")
    
    # Check if source table exists
    if not check_table_exists(SOURCE_CATALOG, SOURCE_SCHEMA, source_table):
        print(f"  ‚ùå Source table does not exist: {full_source_name}")
        all_checks_passed = False
        continue
    else:
        print(f"  ‚úÖ Source table exists")
    
    # Check CDF if using TRIGGERED or CONTINUOUS mode
    if sync_mode in [SyncedTableSchedulingPolicy.TRIGGERED, SyncedTableSchedulingPolicy.CONTINUOUS]:
        cdf_enabled = check_cdf_enabled(SOURCE_CATALOG, SOURCE_SCHEMA, source_table)
        if not cdf_enabled:
            warning_msg = f"  ‚ö†Ô∏è  Change Data Feed not enabled on {source_table}. Use SNAPSHOT mode or enable CDF."
            print(warning_msg)
            warnings.append(warning_msg)
            # Automatically switch to SNAPSHOT mode
            table_config["sync_mode"] = SyncedTableSchedulingPolicy.SNAPSHOT
            print(f"  üîÑ Auto-switched to SNAPSHOT mode for {source_table}")
        else:
            print(f"  ‚úÖ Change Data Feed enabled")
    
    print()

if warnings:
    print("\n‚ö†Ô∏è  WARNINGS:")
    for warning in warnings:
        print(warning)
    print()

if not all_checks_passed:
    print("‚ùå Pre-migration checks failed. Fix issues before proceeding.")
else:
    print("‚úÖ All pre-migration checks passed!")

## Enable Change Data Feed (Optional)

Run this cell if you want to enable Change Data Feed on tables that need TRIGGERED or CONTINUOUS mode.

In [None]:
# Uncomment to enable CDF on all tables
# for table_config in TABLES_TO_MIGRATE:
#     source_table = table_config["source_table"]
#     full_source_name = f"{SOURCE_CATALOG}.{SOURCE_SCHEMA}.{source_table}"
#     
#     try:
#         spark.sql(f"""
#             ALTER TABLE {full_source_name}
#             SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
#         """)
#         print(f"‚úÖ Enabled CDF on {source_table}")
#     except Exception as e:
#         print(f"‚ùå Failed to enable CDF on {source_table}: {e}")

print("‚ÑπÔ∏è  To enable CDF, uncomment the code in this cell and run it.")

## Create Synced Tables

This will create synced tables in Lakebase for all configured tables.

In [None]:
print("üöÄ Creating synced tables...\n")

created_tables = []
failed_tables = []

for table_config in TABLES_TO_MIGRATE:
    source_table = table_config["source_table"]
    synced_table_name = table_config["synced_table_name"]
    primary_key_columns = table_config["primary_key_columns"]
    timeseries_key = table_config.get("timeseries_key")
    sync_mode = table_config.get("sync_mode", DEFAULT_SYNC_MODE)
    description = table_config.get("description", "")
    
    full_source_name = f"{SOURCE_CATALOG}.{SOURCE_SCHEMA}.{source_table}"
    full_synced_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.{synced_table_name}"
    
    print(f"Creating synced table: {synced_table_name}")
    print(f"  Source: {full_source_name}")
    print(f"  Target: {full_synced_name}")
    print(f"  Primary Key: {primary_key_columns}")
    print(f"  Timeseries Key: {timeseries_key}")
    print(f"  Sync Mode: {sync_mode}")
    
    # Check if synced table already exists
    existing_status = get_sync_status(full_synced_name)
    if existing_status["exists"]:
        print(f"  ‚ö†Ô∏è  Synced table already exists (status: {existing_status['status']})")
        print(f"  ‚ÑπÔ∏è  Skipping creation. Use the 'Update Synced Tables' section to modify.\n")
        continue
    
    try:
        # Build spec
        spec = SyncedTableSpec(
            source_table_full_name=full_source_name,
            primary_key_columns=primary_key_columns,
            scheduling_policy=sync_mode,
            create_database_objects_if_missing=True,  # Create schema if needed
            new_pipeline_spec=NewPipelineSpec(
                storage_catalog=PIPELINE_STORAGE_CATALOG,
                storage_schema=PIPELINE_STORAGE_SCHEMA
            )
        )
        
        # Add timeseries key if specified
        if timeseries_key:
            spec.timeseries_key = timeseries_key
        
        # Create synced table
        synced_table = w.database.create_synced_database_table(
            SyncedDatabaseTable(
                name=full_synced_name,
                database_instance_name=DATABASE_INSTANCE_NAME,
                logical_database_name=LOGICAL_DATABASE_NAME,
                spec=spec
            )
        )
        
        print(f"  ‚úÖ Created synced table: {synced_table.name}")
        created_tables.append({
            "source": full_source_name,
            "synced": full_synced_name,
            "config": table_config
        })
        
    except Exception as e:
        print(f"  ‚ùå Failed to create synced table: {e}")
        failed_tables.append({
            "source": full_source_name,
            "synced": full_synced_name,
            "error": str(e)
        })
    
    print()

print("\n" + "="*80)
print(f"‚úÖ Successfully created: {len(created_tables)} tables")
print(f"‚ùå Failed: {len(failed_tables)} tables")
print("="*80)

## Monitor Sync Status

Wait for all synced tables to reach ONLINE status.

In [None]:
print("üìä Monitoring sync status...\n")

for created in created_tables:
    synced_name = created["synced"]
    print(f"Checking: {synced_name}")
    
    # Wait for sync to complete (10 minute timeout)
    success = wait_for_sync(synced_name, timeout_seconds=600, check_interval=15)
    
    if success:
        print(f"‚úÖ {synced_name} is ready!\n")
    else:
        print(f"‚ö†Ô∏è  {synced_name} did not reach ONLINE status. Check pipeline logs.\n")

## Check Final Status of All Tables

In [None]:
print("üìã Final Status Report\n")
print("="*100)
print(f"{'Table':<40} {'Status':<15} {'Message':<45}")
print("="*100)

for table_config in TABLES_TO_MIGRATE:
    synced_table_name = table_config["synced_table_name"]
    full_synced_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.{synced_table_name}"
    
    status = get_sync_status(full_synced_name)
    status_str = status["status"]
    message = status["message"][:45] if status["message"] else ""
    
    # Add emoji based on status
    if status_str == "ONLINE":
        emoji = "‚úÖ"
    elif status_str == "NOT_CREATED":
        emoji = "‚ö™"
    elif status_str in ["FAILED", "ERROR"]:
        emoji = "‚ùå"
    else:
        emoji = "üîÑ"
    
    print(f"{emoji} {synced_table_name:<38} {status_str:<15} {message:<45}")

print("="*100)

## Query Synced Tables from Postgres

Example queries to test your synced tables in Lakebase.

In [None]:
# You can query synced tables using SQL in notebooks or from your FastAPI app

# Example: Count rows in products_synced
# spark.sql(f"""
#     SELECT COUNT(*) as total_products
#     FROM {TARGET_CATALOG}.{TARGET_SCHEMA}.products_synced
# """).show()

# Example: Join products with embeddings
# spark.sql(f"""
#     SELECT p.product_id, p.product_display_name, p.price
#     FROM {TARGET_CATALOG}.{TARGET_SCHEMA}.products_synced p
#     INNER JOIN {TARGET_CATALOG}.{TARGET_SCHEMA}.product_embeddings_synced e
#         ON p.product_id = e.product_id
#     LIMIT 10
# """).show()

print("‚ÑπÔ∏è  Uncomment the SQL examples above to test your synced tables")

## Update Synced Tables (Trigger Refresh)

For TRIGGERED mode tables, you can manually trigger a refresh.

In [None]:
# To manually trigger a refresh for TRIGGERED mode tables:

def trigger_sync_refresh(synced_table_name: str):
    """Trigger a manual refresh for a synced table in TRIGGERED mode"""
    try:
        # Get the synced table to find its pipeline ID
        synced_table = w.database.get_synced_database_table(name=synced_table_name)
        
        # The pipeline ID is in the synced table metadata
        # You would trigger it using the pipelines API
        print(f"‚úÖ Triggered refresh for {synced_table_name}")
        print(f"   Monitor progress in the Databricks Workflows UI")
    except Exception as e:
        print(f"‚ùå Failed to trigger refresh: {e}")

# Example usage:
# trigger_sync_refresh(f"{TARGET_CATALOG}.{TARGET_SCHEMA}.products_synced")

print("‚ÑπÔ∏è  Use trigger_sync_refresh() to manually refresh TRIGGERED mode tables")

## Cleanup (Delete Synced Tables)

**‚ö†Ô∏è WARNING**: This will delete synced tables from Unity Catalog and stop synchronization.
You'll need to manually drop tables in Postgres to free up space.

In [None]:
# UNCOMMENT TO ENABLE CLEANUP
# ENABLE_CLEANUP = True
ENABLE_CLEANUP = False

if ENABLE_CLEANUP:
    print("‚ö†Ô∏è  CLEANUP MODE ENABLED\n")
    print("This will delete synced tables from Unity Catalog.\n")
    
    for table_config in TABLES_TO_MIGRATE:
        synced_table_name = table_config["synced_table_name"]
        full_synced_name = f"{TARGET_CATALOG}.{TARGET_SCHEMA}.{synced_table_name}"
        
        try:
            print(f"Deleting: {full_synced_name}")
            w.database.delete_synced_database_table(name=full_synced_name)
            print(f"  ‚úÖ Deleted from Unity Catalog")
            print(f"  ‚ö†Ô∏è  Remember to DROP TABLE in Postgres to free up space\n")
        except Exception as e:
            if "does not exist" in str(e).lower() or "not found" in str(e).lower():
                print(f"  ‚ÑπÔ∏è  Table does not exist (already deleted)\n")
            else:
                print(f"  ‚ùå Failed to delete: {e}\n")
    
    print("\nüìã Next Steps:")
    print("1. Connect to your Lakebase instance with psql or SQL editor")
    print("2. Run the following to drop tables in Postgres:")
    print()
    for table_config in TABLES_TO_MIGRATE:
        synced_table_name = table_config["synced_table_name"]
        print(f"   DROP TABLE IF EXISTS {TARGET_SCHEMA}.{synced_table_name};")
else:
    print("‚ÑπÔ∏è  Cleanup is disabled. Set ENABLE_CLEANUP = True to delete synced tables.")

## Next Steps

After migration:

1. **Update your FastAPI app** to use the synced tables:
   ```python
   # In core/config.py, update:
   LAKEBASE_SCHEMA = "fashion_demo_lakebase"
   PRODUCTS_TABLE = "products_synced"
   USERS_TABLE = "users_synced"
   EMBEDDINGS_TABLE = "product_embeddings_synced"
   USER_FEATURES_TABLE = "user_features_synced"
   ```

2. **Test your queries** with the new synced tables

3. **Set up the correct PGPASSWORD** (see [LAKEBASE_PASSWORD_SETUP.md](../LAKEBASE_PASSWORD_SETUP.md))

4. **Monitor pipeline performance** in Databricks Workflows UI

5. **Create indexes** in Postgres for better query performance:
   ```sql
   CREATE INDEX idx_products_category ON fashion_demo_lakebase.products_synced(master_category);
   CREATE INDEX idx_products_price ON fashion_demo_lakebase.products_synced(price);
   ```

6. **Grant permissions** to other users if needed:
   ```sql
   GRANT SELECT ON ALL TABLES IN SCHEMA fashion_demo_lakebase TO user;
   ```