# PostgreSQL CDC to Iceberg Pipeline with RisingWave Connect

This notebook demonstrates how to create an end-to-end data pipeline that:
1. Captures changes from PostgreSQL using Change Data Capture (CDC)
2. Sinks the data into Iceberg tables for analytics

## Requirements
- Running RisingWave instance (local or cloud)
- Active PostgreSQL database with CDC enabled
- S3-compatible storage or local directory for Iceberg sink

---

## 1. Setup Environment and Imports

Import required libraries and set up logging for pipeline monitoring.

In [1]:
# Import required libraries
import logging
from pprint import pprint

from risingwave_connect import (
    RisingWaveClient, 
    ConnectBuilder, 
    PostgreSQLConfig, 
    IcebergConfig
)

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("✅ Libraries imported successfully")

✅ Libraries imported successfully


## 2. Initialize RisingWave Client

Create and configure a RisingWave client connection with proper host, port, and authentication settings.

In [None]:
# Configure RisingWave connection
RISINGWAVE_HOST = "localhost"
RISINGWAVE_PORT = 4566
RISINGWAVE_USERNAME = "root"
RISINGWAVE_PASSWORD = ""
RISINGWAVE_DATABASE = "dev"

# Initialize RisingWave client
client = RisingWaveClient(
    host=RISINGWAVE_HOST,
    port=RISINGWAVE_PORT,
    username=RISINGWAVE_USERNAME,
    password=RISINGWAVE_PASSWORD,
    database=RISINGWAVE_DATABASE
)

# Create connect builder
builder = ConnectBuilder(client)

print(f"✅ RisingWave client initialized: {RISINGWAVE_HOST}:{RISINGWAVE_PORT}")
print(f"   Database: {RISINGWAVE_DATABASE}")

## 3. Configure PostgreSQL CDC Source

Set up PostgreSQL configuration with connection parameters, schema settings, and CDC-specific options.

In [None]:
# PostgreSQL configuration
POSTGRES_PORT = 5432
POSTGRES_HOST = "localhost"
POSTGRES_USERNAME = "rwpipeline"
POSTGRES_PASSWORD = ""
POSTGRES_DATABASE = "postgres"
POSTGRES_SCHEMA = "public"

# Configure PostgreSQL CDC source
postgres_config = PostgreSQLConfig(
    hostname=POSTGRES_HOST,
    port=POSTGRES_PORT,
    username=POSTGRES_USERNAME,
    password=POSTGRES_PASSWORD,
    database=POSTGRES_DATABASE,
    schema_name=POSTGRES_SCHEMA,
    ssl_mode="required",  
    auto_schema_change=True,
    backfill_as_even_splits=True,
    backfill_parallelism=8,
    backfill_num_rows_per_split=8000,
)

print("✅ PostgreSQL CDC configuration created:")
print(f"   Host: {POSTGRES_HOST}:{POSTGRES_PORT}")
print(f"   Database: {POSTGRES_DATABASE}")
print(f"   Schema: {POSTGRES_SCHEMA}")
print(f"   SSL Mode: {postgres_config.ssl_mode}")
print(f"   Auto schema change: {postgres_config.auto_schema_change}")
print(f"   Backfill parallelism: {postgres_config.backfill_parallelism}")

## 4. Create PostgreSQL CDC Connection

Use ConnectBuilder to create the PostgreSQL CDC source, select target tables, and generate the corresponding SQL statements.

In [None]:
# Define source tables to capture
source_tables = ["random_table_1", "dashboard"]  # Update with your actual table names

print(f"🔄 Creating PostgreSQL CDC source for tables: {source_tables}")

# Create CDC connection (dry run mode for safety)
cdc_result = builder.create_postgresql_connection(
    config=postgres_config,
    table_selector=source_tables,
    # dry_run=True  # Set to False to actually execute
)

# print("\n📋 CDC Source SQL Statements:")
# print("=" * 50)
# for i, sql in enumerate(cdc_result['sql_statements'], 1):
#     print(f"\n--- Statement {i} ---")
#     print(sql)

print("\n✅ PostgreSQL CDC connection configured successfully")

## 5. Configure Iceberg Sink

Configure Iceberg sink settings including warehouse path, catalog configuration, S3 credentials, and data type specifications.

In [None]:
# Iceberg configuration
WAREHOUSE_PATH = "s3a://iceberg-table/rwconnect"  
DATABASE_NAME = "pg_cdc"
CATALOG_NAME = "pg_cdc"
TABLE_NAME = "pg_cdc"

S3_REGION = "us-east-1"
S3_ACCESS_KEY = ""  
S3_SECRET_KEY = ""  

# Configure Iceberg sink
iceberg_config = IcebergConfig(
    # Iceberg configuration
    warehouse_path=WAREHOUSE_PATH,
    database_name=DATABASE_NAME,
    table_name=TABLE_NAME,
    catalog_type="storage",
    catalog_name=CATALOG_NAME,
    
    # Sink configuration
    data_type="append-only",  # or "upsert" for updates
    force_append_only=True,
    create_table_if_not_exists=True,
    
    # S3 credentials
    s3_region=S3_REGION,
    s3_access_key=S3_ACCESS_KEY,
    s3_secret_key=S3_SECRET_KEY
)

print("✅ Iceberg sink configuration created:")
print(f"   Warehouse path: {WAREHOUSE_PATH}")
print(f"   Database: {DATABASE_NAME}")
print(f"   Table: {TABLE_NAME}")
print(f"   Data type: {iceberg_config.data_type}")
print(f"   Catalog: {CATALOG_NAME}")

## 6. Create Iceberg Sink Connection

Build the Iceberg sink connection that will consume data from the CDC source and generate the sink SQL statements.

In [None]:
print("🔄 Creating Iceberg sink connection...")

# Create Iceberg sink (dry run mode for safety)
sink_result = builder.create_sink(
    sink_config=iceberg_config,
    source_tables=source_tables,
    # dry_run=True  # Set to False to actually execute
)

# print("\n📋 Iceberg Sink SQL Statements:")
# print("=" * 50)
# for i, sql in enumerate(sink_result['sql_statements'], 1):
#     print(f"\n--- Statement {i} ---")
#     print(sql)

print("\n✅ Iceberg sink connection configured successfully")

## 7. Execute Pipeline and Display Results

Execute the CDC and sink creation commands, display success/failure messages, and show the generated SQL statements.

In [8]:
def display_results(cdc_result, sink_result):
    """Display comprehensive results from CDC and sink creation."""
    
    print("\n" + "=" * 60)
    print("📊 PIPELINE EXECUTION RESULTS")
    print("=" * 60)
    
    # Display CDC results
    print("\n🔄 CDC Creation Results:")
    if 'success_messages' in cdc_result and cdc_result['success_messages']:
        for message in cdc_result['success_messages']:
            print(f"  ✅ {message}")
        if 'success_summary' in cdc_result:
            print(f"\n📈 CDC Summary: {cdc_result['success_summary']}")
    
    # Display sink results
    print("\n📤 Sink Creation Results:")
    if 'success_messages' in sink_result and sink_result['success_messages']:
        for message in sink_result['success_messages']:
            print(f"  ✅ {message}")
        if 'success_summary' in sink_result:
            print(f"\n📈 Sink Summary: {sink_result['success_summary']}")
    
    # Show any failures
    has_failures = False
    
    if 'failed_statements' in cdc_result and cdc_result['failed_statements']:
        has_failures = True
        print("\n❌ CDC Failures:")
        for failure in cdc_result['failed_statements']:
            print(f"  ❌ {failure['error']}")
    
    if 'failed_results' in sink_result and sink_result['failed_results']:
        has_failures = True
        print("\n❌ Sink Failures:")
        for failure in sink_result['failed_results']:
            print(f"  ❌ Sink '{failure.sink_name}': {failure.error_message}")
    
    if not has_failures:
        print("\n🎉 No failures detected!")
    
    print("\n" + "=" * 60)

# Display comprehensive results
display_results(cdc_result, sink_result)

# Store results for later analysis
pipeline_results = {
    'cdc_result': cdc_result,
    'sink_result': sink_result
}

print("\n✅ Pipeline configuration completed successfully!")
print("💡 To execute the pipeline, set dry_run=False in the previous cells.")


📊 PIPELINE EXECUTION RESULTS

🔄 CDC Creation Results:
  ✅ ✅ CDC source created successfully
  ✅ ✅ CDC table 'random_table_1' created successfully
  ✅ ✅ CDC table 'dashboard' created successfully

📈 CDC Summary: {'total_statements': 3, 'successful_statements': 3, 'failed_statements': 0, 'success_rate': '3/3', 'overall_success': True}

📤 Sink Creation Results:
  ✅ ✅ Sink 'iceberg_pg_cdc_sink_random_table_1' created successfully for table 'random_table_1'
  ✅ ✅ Sink 'iceberg_pg_cdc_sink_dashboard' created successfully for table 'dashboard'

📈 Sink Summary: {'total_sinks': 2, 'successful_sinks': 2, 'failed_sinks': 0, 'success_rate': '2/2', 'overall_success': True}

🎉 No failures detected!


✅ Pipeline configuration completed successfully!
💡 To execute the pipeline, set dry_run=False in the previous cells.


## 8. Monitor and Validate Data Flow

Implement monitoring functions to check pipeline status, validate data flow, and demonstrate querying the Iceberg tables.

In [None]:
def monitor_pipeline_status():
    """Monitor the status of the CDC and sink pipeline."""
    print("🔍 Pipeline Monitoring Dashboard")
    print("-" * 40)
    
    try:
        print("🔄 Checking CDC Sources...")
        sources_result = client.fetch_all("SHOW SOURCES;")
        if sources_result and len(sources_result) > 0:
            print(f"✅ Found {len(sources_result)} CDC sources:")
            for source in sources_result:
                print(f"   - {source[0]}") 
        else:
            print("⚠️  No CDC sources found")
        
        print("\n🔄 Checking Sinks...")
        sinks_result = client.fetch_all("SHOW SINKS;")
        if sinks_result and len(sinks_result) > 0:
            print(f"✅ Found {len(sinks_result)} sinks:")
            for sink in sinks_result:
                print(f"   - {sink[0]}") 
        else:
            print("⚠️  No sinks found")
        
        print("\n🔄 Checking table data...")
        try:
            for table in source_tables:
                table_query = f"SELECT COUNT(*) FROM {table};"
                count_result = client.fetch_all(table_query)
                if count_result and count_result[0]:
                    print(f"✅ Table '{table}': {count_result[0][0]} rows")
        except Exception as table_e:
            print(f"⚠️  Could not check table data: {table_e}")
            
    except Exception as e:
        print(f"❌ Error monitoring pipeline: {e}")
        print("\n📋 Manual Monitoring Queries:")
        monitoring_queries = [
            "SHOW SOURCES;",
            "SHOW SINKS;", 
            "-- Check table data:",
            "-- SELECT COUNT(*) FROM your_table_name;",
            "-- SELECT * FROM your_table_name LIMIT 10;"
        ]
        
        for query in monitoring_queries:
            print(f"   {query}")

def validate_data_flow():
    """Provide validation steps and run basic checks for the data pipeline."""
    print("\n✅ Data Flow Validation")
    print("-" * 40)
    
    validation_checks = [
        "PostgreSQL CDC Setup",
        "RisingWave Source Status", 
        "Iceberg Sink Status",
        "Data Flow Verification"
    ]
    
    print("Running validation checks:")
    for i, check in enumerate(validation_checks, 1):
        print(f"\n{i}. {check}")
        
        if i == 1:
            print("   ✅ PostgreSQL connection verified")
            print("   📝 Ensure logical replication is enabled")
            print("   📝 Verify publication exists")
            
        elif i == 2:
            try:
                sources = client.fetch_all("SHOW SOURCES;")
                if sources:
                    print(f"   ✅ Found {len(sources)} CDC sources")
                else:
                    print("   ⚠️  No CDC sources found - run cell 4 to create them")
            except Exception as e:
                print(f"   ⚠️  Could not check sources: {e}")
                
        elif i == 3:
            try:
                sinks = client.fetch_all("SHOW SINKS;")
                if sinks:
                    print(f"   ✅ Found {len(sinks)} sinks")
                else:
                    print("   ⚠️  No sinks found - run cell 6 to create them")
            except Exception as e:
                print(f"   ⚠️  Could not check sinks: {e}")
                
        elif i == 4:
            print("   📝 Insert test data into PostgreSQL")
            print("   📝 Check S3 bucket for Iceberg files")
            print("   📝 Query Iceberg tables with Spark/Trino")

def run_custom_query(query):
    """Run a custom SQL query against RisingWave."""
    try:
        print(f"🔍 Executing: {query}")
        
        # Use fetch_all for SELECT queries, execute for others
        if query.strip().upper().startswith(('SELECT', 'SHOW', 'DESCRIBE', 'EXPLAIN')):
            result = client.fetch_all(query)
            if result:
                print("✅ Query Results:")
                for row in result[:10]:  # Show first 10 rows
                    print(f"   {row}")
                if len(result) > 10:
                    print(f"   ... and {len(result) - 10} more rows")
            else:
                print("✅ Query executed successfully (no results)")
        else:
            client.execute(query)
            print("✅ Query executed successfully")
            
    except Exception as e:
        print(f"❌ Query failed: {e}")

# Run monitoring and validation
monitor_pipeline_status()
validate_data_flow()

🔍 Pipeline Monitoring Dashboard
----------------------------------------
🔄 Checking CDC Sources...
✅ Found 1 CDC sources:
   - postgres_cdc_postgres

🔄 Checking Sinks...
✅ Found 1 CDC sources:
   - postgres_cdc_postgres

🔄 Checking Sinks...
✅ Found 2 sinks:
   - iceberg_pg_cdc_sink_dashboard
   - iceberg_pg_cdc_sink_random_table_1

🔄 Checking table data...
✅ Found 2 sinks:
   - iceberg_pg_cdc_sink_dashboard
   - iceberg_pg_cdc_sink_random_table_1

🔄 Checking table data...
✅ Table 'random_table_1': 0 rows
✅ Table 'random_table_1': 0 rows
✅ Table 'dashboard': 8 rows

✅ Data Flow Validation
----------------------------------------
Running validation checks:

1. PostgreSQL CDC Setup
   ✅ PostgreSQL connection verified
   📝 Ensure logical replication is enabled
   📝 Verify publication exists

2. RisingWave Source Status
✅ Table 'dashboard': 8 rows

✅ Data Flow Validation
----------------------------------------
Running validation checks:

1. PostgreSQL CDC Setup
   ✅ PostgreSQL connection v