# Stream CockroachDB CDC to Databricks (Azure)

This notebook demonstrates how to stream CockroachDB changefeeds to Databricks using Azure Blob Storage.

## Prerequisites

- CockroachDB cluster (Cloud or self-hosted)
- Azure Storage Account with hierarchical namespace enabled
- Databricks workspace with Unity Catalog
- Unity Catalog External Location configured for your storage account

**Note:** This notebook uses the **YCSB (Yahoo! Cloud Serving Benchmark)** schema as the default table structure, with `ycsb_key` as the primary key and `field0-9` columns. The default schema name is `public`.

## CDC Mode Selection

This notebook supports **4 CDC ingestion modes** by combining two independent settings:

### 1. CDC Processing Mode (`cdc_mode`)
How CDC events are processed in the target table:

- **`append_only`**: Store all CDC events as rows (audit log)
  - **Behavior**: All events (INSERT/UPDATE/DELETE) are appended as new rows
  - **Use case**: History tracking, time-series analysis, audit logs
  - **Storage**: Higher (keeps all historical events)

- **`update_delete`**: Apply MERGE logic (current state replication)
  - **Behavior**: DELETE removes rows, UPDATE modifies rows in-place
  - **Use case**: Current state synchronization, production replication
  - **Storage**: Lower (only latest state per key)

### 2. Column Family Mode (`column_family_mode`)
Table structure and changefeed configuration:

- **`single_cf`**: Standard table (1 column family, default)
  - **Changefeed**: `split_column_families=false`
  - **Files**: 1 Parquet file per CDC event
  - **Use case**: Most tables, simpler configuration, better performance

- **`multi_cf`**: Multiple column families (for wide tables)
  - **Changefeed**: `split_column_families=true`
  - **Files**: Multiple Parquet files per CDC event (fragments need merging)
  - **Use case**: Wide tables (50+ columns), selective column access patterns

### Function Selection Matrix

The notebook automatically selects the appropriate ingestion function based on your configuration:

| CDC Mode | Column Family Mode | Function Called |
|----------|-------------------|-----------------|
| `append_only` | `single_cf` | `ingest_cdc_append_only_single_family()` |
| `append_only` | `multi_cf` | `ingest_cdc_append_only_multi_family()` |
| `update_delete` | `single_cf` | `ingest_cdc_with_merge_single_family()` |
| `update_delete` | `multi_cf` | `ingest_cdc_with_merge_multi_family()` |

---

In [None]:
# ============================================================================
# CELL 2: CONFIGURATION
# ============================================================================
import json
import os
from urllib.parse import quote

# Configuration file path (adjust as needed)
config_file = "/Users/robert.lee/github/lakeflow-community-connectors/sources/cockroachdb/.env/cockroachdb_cdc_tutorial_config.json"

# Try to load from file, fallback to embedded config
try:
    with open(config_file, 'r') as f:
        config = json.load(f)
    print(f"‚úÖ Configuration loaded from: {config_file}")
except Exception as e:
    print(f"‚ÑπÔ∏è  Using embedded configuration (config file error: {e})")
    config = None

# Embedded configuration (fallback)
if config is None:
    config = {
      "cockroachdb": {
        "host": "replace_me",
        "port": 26257,
        "user": "replace_me",
        "password": "replace_me",
        "database": "defaultdb"
      },
      "cockroachdb_source": {
        "catalog": "defaultdb",
        "schema": "public",
        "table_name": "usertable",
        "_schema_note": "Default schema is 'public'. Table uses YCSB structure (ycsb_key, field0-9)",
      },
      "azure_storage": {
        "account_name": "replace_me",
        "account_key": "replace_me",
        "container_name": "changefeed-events"
      },
      "databricks_target": {
        "catalog": "main",
        "schema": "replace_me",
        "table_name": "usertable",
      },
      "cdc_config": {
        "mode": "append_only",
        "column_family_mode": "multi_cf",
        "primary_key_columns": ["ycsb_key"],
        "auto_suffix_mode_family": True,
      },
      "workload_config": {
        "snapshot_count": 10,
        "insert_count": 10,
        "update_count": 9,
        "delete_count": 8,
      }
    }


In [None]:
from urllib.parse import quote

# Extract configuration values
cockroachdb_host = config["cockroachdb"]["host"]
cockroachdb_port = config["cockroachdb"]["port"]
cockroachdb_user = config["cockroachdb"]["user"]
cockroachdb_password = config["cockroachdb"]["password"]
cockroachdb_database = config["cockroachdb"]["database"]

source_catalog = config["cockroachdb_source"]["catalog"]
source_schema = config["cockroachdb_source"]["schema"]
source_table = config["cockroachdb_source"]["table_name"]

storage_account_name = config["azure_storage"]["account_name"]
storage_account_key = config["azure_storage"]["account_key"]
storage_account_key_encoded = quote(storage_account_key, safe='')
container_name = config["azure_storage"]["container_name"]

target_catalog = config["databricks_target"]["catalog"]
target_schema = config["databricks_target"]["schema"]
target_table = config["databricks_target"]["table_name"]

cdc_mode = config["cdc_config"]["mode"]
column_family_mode = config["cdc_config"]["column_family_mode"]
primary_key_columns = config["cdc_config"]["primary_key_columns"]

snapshot_count = config["workload_config"]["snapshot_count"]
insert_count = config["workload_config"]["insert_count"]
update_count = config["workload_config"]["update_count"]
delete_count = config["workload_config"]["delete_count"]

# Auto-suffix table names with mode and column family if enabled
auto_suffix = config["cdc_config"].get("auto_suffix_mode_family", False)
if auto_suffix:
    suffix = f"_{cdc_mode}_{column_family_mode}"
    
    # Add suffix to source_table if not already present
    if not source_table.endswith(suffix):
        source_table = f"{source_table}{suffix}"
    
    # Add suffix to target_table if not already present
    if not target_table.endswith(suffix):
        target_table = f"{target_table}{suffix}"

print("‚úÖ Configuration loaded")
print(f"   CDC Processing Mode: {cdc_mode}")
print(f"   Column Family Mode: {column_family_mode}")
print(f"   Primary Keys: {primary_key_columns}")
print(f"   Target Table: {target_table}")
print(f"   CDC Workload: {snapshot_count} snapshot ‚Üí +{insert_count} INSERTs, ~{update_count} UPDATEs, -{delete_count} DELETEs")


In [None]:
# ============================================================================
# CELL 3: INSTALL DEPENDENCIES
# ============================================================================
%pip install pg8000 azure-storage-blob --quiet
print("‚úÖ Dependencies installed")

In [None]:
# ============================================================================
# CELL 4: CONNECT TO COCKROACHDB
# ============================================================================
import pg8000
import ssl

def get_cockroachdb_connection():
    """Create connection to CockroachDB using pg8000"""
    # Create SSL context (required for CockroachDB Cloud)
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE
    
    # Parse host (in case port is accidentally included in host string)
    host = cockroachdb_host.split(':')[0] if ':' in cockroachdb_host else cockroachdb_host
    
    conn = pg8000.connect(
        user=cockroachdb_user,
        password=cockroachdb_password,
        host=host,
        port=cockroachdb_port,
        database=cockroachdb_database,
        ssl_context=ssl_context
    )
    return conn

# Test connection
try:
    conn = get_cockroachdb_connection()
    with conn.cursor() as cur:
        cur.execute("SELECT version()")
        version = cur.fetchone()[0]
    conn.close()
    
    print("‚úÖ Connected to CockroachDB")
    print(f"   Version: {version[:50]}...")
except Exception as e:
    print(f"‚ùå Connection failed: {e}")
    raise

In [None]:
# ============================================================================
# CELL 5: HELPER FUNCTIONS (CockroachDB & Azure)
# ============================================================================
from azure.storage.blob import BlobServiceClient
from datetime import datetime
import time

def get_table_stats(conn, table_name):
    """
    Get min key, max key, and count for a table.
    
    Args:
        conn: Database connection
        table_name: Name of the table
    
    Returns:
        dict with 'min_key', 'max_key', 'count', 'is_empty'
    """
    with conn.cursor() as cur:
        cur.execute(f"SELECT MIN(ycsb_key), MAX(ycsb_key), COUNT(*) FROM {table_name}")
        result = cur.fetchone()
        min_key, max_key, count = result
        
        return {
            'min_key': min_key,
            'max_key': max_key,
            'count': count,
            'is_empty': min_key is None and max_key is None
        }


def check_azure_files(storage_account_name, storage_account_key, container_name, 
                      source_catalog, source_schema, source_table, target_table, 
                      verbose=True):
    """
    Check for changefeed files in Azure Blob Storage.
    
    Args:
        storage_account_name: Azure storage account name
        storage_account_key: Azure storage account key
        container_name: Azure container name
        source_catalog: CockroachDB catalog (database)
        source_schema: CockroachDB schema
        source_table: Source table name
        target_table: Target table name
        verbose: Print detailed output
    
    Returns:
        dict with 'data_files' and 'resolved_files' lists
    """
    # Connect to Azure
    connection_string = f"DefaultEndpointsProtocol=https;AccountName={storage_account_name};AccountKey={storage_account_key};EndpointSuffix=core.windows.net"
    blob_service = BlobServiceClient.from_connection_string(connection_string)
    container_client = blob_service.get_container_client(container_name)
    
    # Build path - list ALL files recursively under this changefeed path
    prefix = f"parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}/"
    
    # List all blobs recursively (no date filtering)
    blobs = list(container_client.list_blobs(name_starts_with=prefix))
    
    # Categorize files (using same filtering logic as cockroachdb.py)
    # Data files: .parquet files, excluding:
    #   - .RESOLVED files (CDC watermarks)
    #   - _metadata/ directory (schema files)
    #   - Files starting with _ (_SUCCESS, _committed_*, etc.)
    data_files = [
        b for b in blobs 
        if b.name.endswith('.parquet') 
        and '.RESOLVED' not in b.name
        and '/_metadata/' not in b.name
        and not b.name.split('/')[-1].startswith('_')
    ]
    resolved_files = [b for b in blobs if '.RESOLVED' in b.name]
    
    if verbose:
        print(f"üìÅ Files in Azure changefeed path:")
        print(f"   Path: {prefix}")
        print(f"   üìÑ Data files: {len(data_files)}")
        print(f"   üïê Resolved files: {len(resolved_files)}")
        print(f"   üìä Total: {len(blobs)}")
        
        if data_files:
            print(f"\n   Example data file:")
            print(f"   {data_files[0].name}")
    
    return {
        'data_files': data_files,
        'resolved_files': resolved_files,
        'total': len(blobs)
    }


def wait_for_changefeed_files(storage_account_name, storage_account_key, container_name,
                               source_catalog, source_schema, source_table, target_table,
                               max_wait=120, check_interval=5, stabilization_wait=5):
    """
    Wait for changefeed files to appear in Azure with timeout and stabilization period.
    
    This function:
    1. Polls Azure until first file(s) appear
    2. Once files are detected, waits for additional files (important for column families)
    3. Exits when no new files appear for 'stabilization_wait' seconds
    
    Args:
        max_wait: Maximum seconds to wait for initial files (default: 120)
        check_interval: Seconds between checks (default: 5)
        stabilization_wait: Seconds to wait for file count to stabilize (default: 5)
                           Important for column family mode where multiple files are written
    
    Returns:
        True if files found, False if timeout
    """
    print(f"‚è≥ Waiting for initial snapshot files to appear in Azure...")
    
    elapsed = 0
    files_found = False
    last_file_count = 0
    stable_elapsed = 0
    
    while elapsed < max_wait:
        result = check_azure_files(
            storage_account_name, storage_account_key, container_name,
            source_catalog, source_schema, source_table, target_table,
            verbose=False
        )
        
        current_file_count = len(result['data_files'])
        
        if not files_found and current_file_count > 0:
            # First files detected - switch to stabilization mode
            files_found = True
            last_file_count = current_file_count
            stable_elapsed = 0
            print(f"\n‚úÖ First files appeared after {elapsed} seconds!")
            print(f"   Found {current_file_count} file(s) so far...")
            print(f"   Waiting {stabilization_wait}s for more files (column family fragments)...")
        
        elif files_found:
            # In stabilization mode - check if file count is stable
            if current_file_count > last_file_count:
                # More files arrived - reset stabilization timer
                print(f"   üìÑ File count increased: {last_file_count} ‚Üí {current_file_count}")
                last_file_count = current_file_count
                stable_elapsed = 0
            else:
                # File count unchanged - increment stabilization timer
                stable_elapsed += check_interval
                
                if stable_elapsed >= stabilization_wait:
                    # Stabilization period complete - all files have landed
                    print(f"\n‚úÖ File count stable at {current_file_count} for {stabilization_wait}s")
                    print(f"   Total wait time: {elapsed + stable_elapsed}s")
                    print(f"   Example: {result['data_files'][0].name}")
                    return True
        
        if not files_found:
            print(f"   Checking... ({elapsed}s elapsed)", end='\r')
        
        time.sleep(check_interval)
        elapsed += check_interval
    
    if files_found:
        # Files were found but stabilization didn't complete within max_wait
        print(f"\n‚ö†Ô∏è  Timeout after {max_wait}s (found {last_file_count} files but more may still be generating)")
    else:
        print(f"\n‚ö†Ô∏è  Timeout after {max_wait}s - no files appeared")
    
    print(f"   Run Cell 11 to check manually")
    return files_found  # Return True if we found at least some files



def get_column_sum(conn, table_name, column_name):
    """
    Get the sum of a numeric column in a table.
    Text columns have non-numeric characters stripped before casting.
    
    Args:
        conn: Database connection
        table_name: Name of the table
        column_name: Name of the column to sum
    
    Returns:
        Sum of the column (handles mixed text/numeric values)
    """
    with conn.cursor() as cur:
        # Strip non-numeric chars, handle empty strings, cast to BIGINT
        cur.execute(f"""
            SELECT SUM(
                CASE 
                    WHEN regexp_replace({column_name}::TEXT, '[^0-9]', '', 'g') = '' THEN 0
                    ELSE regexp_replace({column_name}::TEXT, '[^0-9]', '', 'g')::BIGINT
                END
            ) 
            FROM {table_name}
        """)
        result = cur.fetchone()
        return result[0]



def get_column_sum_spark(df, column_name):
    """
    Get the sum of a numeric column in a Spark DataFrame.
    Text columns have non-numeric characters stripped before casting.
    
    Args:
        df: Spark DataFrame
        column_name: Name of the column to sum
    
    Returns:
        Sum of the column (handles mixed text/numeric values)
    """
    from pyspark.sql import functions as F
    
    result = df.select(
        F.sum(
            F.when(
                F.regexp_replace(F.col(column_name).cast('string'), '[^0-9]', '') == '',
                0
            ).otherwise(
                F.regexp_replace(F.col(column_name).cast('string'), '[^0-9]', '').cast('bigint')
            )
        ).alias('sum')
    ).collect()[0]['sum']
    
    return result


print("‚úÖ Helper functions loaded (CockroachDB & Azure)")

In [None]:
# ============================================================================
# CELL 6: DATABRICKS STREAMING MODES (CDC Ingestion Functions)
# ============================================================================
# This cell contains different CDC ingestion functions for various scenarios.
# Select the appropriate function based on your use case:
#
# 1. ingest_cdc_append_only_single_family() - Simple append_only mode
# 2. ingest_cdc_with_merge_single_family() - Apply updates/deletes (single CF)
# 3. ingest_cdc_append_only_multi_family() - Append_only with column families
# 4. ingest_cdc_with_merge_multi_family() - Full CDC with column families
# ============================================================================

from pyspark.sql import functions as F


def ingest_cdc_append_only_single_family(
    storage_account_name, container_name, 
    source_catalog, source_schema, source_table, 
    target_catalog, target_schema, target_table,
    spark
):
    """
    Ingest CDC events in APPEND-ONLY mode for single column family tables.
    
    This function:
    - Reads Parquet CDC files from Azure using Auto Loader
    - Filters out .RESOLVED files and metadata
    - Transforms CockroachDB CDC columns (__crdb__*) to standard format
    - Writes all events (INSERT/UPDATE/DELETE) as rows to Delta table
    - Does NOT apply deletes or deduplicate updates (append_only)
    
    Use this for:
    - Audit logs and full history tracking
    - Tables WITHOUT column families (split_column_families=false)
    - Simple CDC pipelines without MERGE logic
    
    Args:
        storage_account_name: Azure storage account name
        container_name: Azure container name
        source_catalog: CockroachDB catalog (database)
        source_schema: CockroachDB schema
        source_table: Source table name
        target_catalog: Databricks catalog
        target_schema: Databricks schema
        target_table: Target table name
        spark: SparkSession
    
    Returns:
        StreamingQuery object
    """
    # Build paths
    source_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}"
    checkpoint_path = f"/checkpoints/{target_schema}_{target_table}"
    target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
    
    print("üìñ Ingesting CDC events (Append-Only Mode)")
    print("=" * 80)
    print(f"Mode: APPEND-ONLY (Single Column Family)")
    print(f"Source: {source_catalog}.{source_schema}.{source_table} (CockroachDB)")
    print(f"Target: {target_table_fqn} (Databricks Delta)")
    print(f"Source path: {source_path}/ (all dates, recursively)")
    print(f"File filter: *{source_table}*.parquet")
    print(f"   ‚úÖ Includes: Data files")
    print(f"   ‚ùå Excludes: .RESOLVED, _metadata/, _SUCCESS, etc.")
    print()
    
    # Read with Auto Loader (production-grade filtering)
    raw_df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", f"{checkpoint_path}/schema")
        .option("pathGlobFilter", f"*{source_table}*.parquet")
        .option("recursiveFileLookup", "true")
        .load(source_path)
    )
    
    print("‚úÖ Schema inferred from data files")
    print("   (Filtering matches cockroachdb.py production code)")
    print()
    
    # Transform: CockroachDB CDC ‚Üí Standard CDC format
    df = raw_df.select(
        "*",
        # Convert __crdb__updated (nanoseconds) to timestamp
        F.from_unixtime(
            F.col("__crdb__updated").cast("double").cast("bigint") / 1000000000
        ).cast("timestamp").alias("_cdc_timestamp"),
        # Map event type
        F.when(F.col("__crdb__event_type") == "d", "DELETE")
         .otherwise("UPSERT")
         .alias("_cdc_operation")
    ).drop("__crdb__updated", "__crdb__event_type")
    
    # Write to Delta table (append_only)
    query = (df.writeStream
        .format("delta")
        .option("checkpointLocation", f"{checkpoint_path}/data")
        .option("mergeSchema", "true")
        .trigger(availableNow=True)
        .toTable(target_table_fqn)
    )
    
    print("‚è≥ Processing CDC events...")
    return query


def ingest_cdc_with_merge_single_family(
    storage_account_name, container_name,
    source_catalog, source_schema, source_table,
    target_catalog, target_schema, target_table,
    primary_key_columns,  # NEW: Required for MERGE join condition
    spark
):
    """
    Ingest CDC events with MERGE logic for single column family tables.
    
    This function:
    - Reads Parquet CDC files from Azure using Auto Loader
    - Filters out .RESOLVED files and metadata
    - Transforms CockroachDB CDC columns (__crdb__*) to standard format
    - Deduplicates events within each microbatch (handles column family fragments)
    - Applies MERGE logic to target Delta table:
      * UPDATE: When key exists and timestamp is newer
      * DELETE: When key exists and operation is DELETE
      * INSERT: When key doesn't exist and operation is UPSERT
    - Preserves _cdc_operation column for monitoring and observability
    
    Use this for:
    - Applications needing current state (not history)
    - Tables WITHOUT column families (split_column_families=false)
    - Production CDC pipelines with UPDATE/DELETE support
    - Lower storage requirements (only latest state)
    
    Target table will contain:
    - All data columns from source
    - _cdc_operation: "UPSERT" (shows last operation on each row)
    - _cdc_timestamp: Timestamp of last CDC event
    
    Args:
        storage_account_name: Azure storage account name
        container_name: Azure container name
        source_catalog: CockroachDB catalog (database)
        source_schema: CockroachDB schema
        source_table: Source table name
        target_catalog: Databricks catalog
        target_schema: Databricks schema
        target_table: Target table name
        primary_key_columns: List of primary key column names (e.g., ['ycsb_key'])
        spark: SparkSession
    
    Returns:
        Dict with query, staging_table, target_table, raw_count, deduped_count, merged
    """
    from pyspark.sql import functions as F, Window
    from delta.tables import DeltaTable
    
    # Build paths
    source_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}"
    checkpoint_path = f"/checkpoints/{target_schema}_{target_table}_merge"
    target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
    
    print("üìñ Ingesting CDC events (MERGE Mode)")
    print("=" * 80)
    print(f"Mode: MERGE (Apply UPDATE/DELETE)")
    print(f"Source: {source_catalog}.{source_schema}.{source_table} (CockroachDB)")
    print(f"Target: {target_table_fqn} (Databricks Delta)")
    print(f"Primary keys: {primary_key_columns}")
    print(f"Source path: {source_path}/ (all dates, recursively)")
    print(f"File filter: *{source_table}*.parquet")
    print(f"   ‚úÖ Includes: Data files")
    print(f"   ‚ùå Excludes: .RESOLVED, _metadata/, _SUCCESS, etc.")
    print()
    
    # Read with Auto Loader (production-grade filtering)
    raw_df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", f"{checkpoint_path}/schema")
        .option("pathGlobFilter", f"*{source_table}*.parquet")
        .option("recursiveFileLookup", "true")
        .load(source_path)
    )
    
    print("‚úÖ Schema inferred from data files")
    print("   (Filtering matches cockroachdb.py production code)")
    print()
    
    # Transform: CockroachDB CDC ‚Üí Standard CDC format
    transformed_df = raw_df.select(
        "*",
        # Convert __crdb__updated (nanoseconds) to timestamp
        F.from_unixtime(
            F.col("__crdb__updated").cast("double").cast("bigint") / 1000000000
        ).cast("timestamp").alias("_cdc_timestamp"),
        # Map event type
        F.when(F.col("__crdb__event_type") == "d", "DELETE")
         .otherwise("UPSERT")
         .alias("_cdc_operation")
    ).drop("__crdb__event_type", "__crdb__updated")
    
    print("‚úÖ CDC transformations applied (streaming compatible)")
    print("   ‚ÑπÔ∏è  Deduplication will happen in Stage 2 (batch mode)")
    print()
    
    # ========================================================================
    # STAGE 1: Stream to Staging Table (Serverless Compatible - No Python UDFs)
    # ========================================================================
    staging_table_fqn = f"{target_table_fqn}_staging"
    
    print("üî∑ STAGE 1: Streaming to staging table (no Python UDFs)")
    print(f"   Staging: {staging_table_fqn}")
    print()
    
    # Write to staging table (pure Spark, no foreachBatch, no window functions)
    query = (transformed_df.writeStream
        .format("delta")
        .option("checkpointLocation", f"{checkpoint_path}/data")
        .option("mergeSchema", "true")
        .trigger(availableNow=True)
        .toTable(staging_table_fqn)  # ‚Üê No Python UDFs, Serverless compatible!
    )
    
    print("‚è≥ Streaming CDC events to staging table...")
    query.awaitTermination()
    print("‚úÖ Stream completed\n")
    
    # ========================================================================
    # STAGE 2: Batch MERGE from Staging to Target (Runs on Driver)
    # ========================================================================
    print("üî∑ STAGE 2: Applying MERGE logic (batch operation)")
    print(f"   Source: {staging_table_fqn}")
    print(f"   Target: {target_table_fqn}")
    print()
    
    # Read staging table (batch mode - window functions allowed!)
    staging_df_raw = spark.read.table(staging_table_fqn)
    staging_count_raw = staging_df_raw.count()
    print(f"   üìä Raw staging events: {staging_count_raw}")
    
    if staging_count_raw == 0:
        print("   ‚ÑπÔ∏è  No new events to process")
        return {"query": query, "staging_table": staging_table_fqn, "merged": 0}
    
    # Deduplicate by primary key (batch mode, matches cockroachdb.py logic)
    # Keep only LATEST event per primary key based on timestamp
    from pyspark.sql import Window
    
    print(f"   üîÑ Deduplicating by primary keys: {primary_key_columns}...")
    window_spec = Window.partitionBy(*primary_key_columns).orderBy(F.col("_cdc_timestamp").desc())
    staging_df = (staging_df_raw
        .withColumn("_row_num", F.row_number().over(window_spec))
        .filter(F.col("_row_num") == 1)
        .drop("_row_num")
    )
    
    staging_count = staging_df.count()
    duplicates_removed = staging_count_raw - staging_count
    print(f"   ‚úÖ Deduplicated: {staging_count} unique events ({duplicates_removed} duplicates removed)")
    
    if staging_count == 0:
        print("   ‚ÑπÔ∏è  All events were duplicates")
        return {"query": query, "staging_table": staging_table_fqn, "merged": 0}
    
    # Create target table if it doesn't exist
    if not spark.catalog.tableExists(target_table_fqn):
        print(f"   üìù Creating new target table: {target_table_fqn}")
        
        # CRITICAL: Proper DELETE handling for initial table creation
        # This matches cockroachdb.py reference implementation
        
        # 1. Get keys that have DELETE events
        delete_keys = staging_df.filter(F.col("_cdc_operation") == "DELETE") \
            .select(*primary_key_columns) \
            .distinct()
        delete_count = delete_keys.count()
        
        # 2. Get all non-DELETE rows
        active_rows = staging_df.filter(F.col("_cdc_operation") != "DELETE")
        active_count = active_rows.count()
        
        # 3. Exclude rows with keys that are deleted (left anti join)
        # This handles case where key has UPSERT at T1, DELETE at T2
        rows_after_delete = active_rows.join(
            delete_keys,
            on=primary_key_columns,
            how="left_anti"
        )
        after_delete_count = rows_after_delete.count()
        
        # Note: staging_df is already deduplicated, so final_rows = rows_after_delete
        final_rows = rows_after_delete
        final_count = after_delete_count
        
        if delete_count > 0:
            print(f"   ‚ÑπÔ∏è  Found {delete_count} DELETE events")
            print(f"   ‚ÑπÔ∏è  Active rows before DELETE: {active_count}")
            print(f"   ‚ÑπÔ∏è  Active rows after DELETE: {after_delete_count}")
            print(f"   ‚ÑπÔ∏è  Rows removed by DELETE: {active_count - after_delete_count}")
        
        # Keep ALL columns including _cdc_operation for monitoring
        final_rows.write.format("delta").saveAsTable(target_table_fqn)
        merged_count = final_count
        print(f"   ‚úÖ Created table with {merged_count} initial rows")
        print(f"      Schema includes _cdc_operation for observability\n")
    else:
        # Get Delta table and apply MERGE
        delta_table = DeltaTable.forName(spark, target_table_fqn)
        
        # Check if _cdc_operation exists in target (might be missing from old tables)
        target_columns = set(spark.read.table(target_table_fqn).columns)
        if "_cdc_operation" not in target_columns:
            print(f"   ‚ö†Ô∏è  Target table missing _cdc_operation column (old schema)")
            print(f"   üîß Adding _cdc_operation column for observability...")
            spark.sql(f"""
                ALTER TABLE {target_table_fqn} 
                ADD COLUMN _cdc_operation STRING
            """)
            print(f"   ‚úÖ Column added\n")
        
        # Build join condition dynamically
        join_condition = " AND ".join([f"target.{col} = source.{col}" for col in primary_key_columns])
        
        # Get all data columns (KEEP _cdc_operation for observability!)
        data_columns = [col for col in staging_df.columns]
        
        # Build UPDATE/INSERT clauses dynamically
        update_set = {col: f"source.{col}" for col in data_columns}
        insert_values = {col: f"source.{col}" for col in data_columns}
        
        print(f"   üîÑ Executing MERGE...")
        print(f"      Join: {join_condition}")
        print(f"      ‚ÑπÔ∏è  _cdc_operation will be preserved for monitoring")
        
        # Apply MERGE (runs on driver, not workers)
        (delta_table.alias("target").merge(
            staging_df.alias("source"),
            join_condition
        )
        .whenMatchedUpdate(
            condition="source._cdc_operation = 'UPSERT' AND source._cdc_timestamp > target._cdc_timestamp",
            set=update_set
        )
        .whenMatchedDelete(
            condition="source._cdc_operation = 'DELETE'"
        )
        .whenNotMatchedInsert(
            condition="source._cdc_operation = 'UPSERT'",
            values=insert_values
        )
        .execute())
        
        merged_count = staging_count
        print(f"   ‚úÖ MERGE complete: processed {merged_count} events\n")
    
    print("=" * 80)
    print("‚úÖ CDC INGESTION COMPLETE (TWO-STAGE MERGE)")
    print("=" * 80)
    print(f"üìä Raw events: {staging_count_raw}")
    print(f"üìä After deduplication: {staging_count} unique events")
    print(f"üìä Staging table: {staging_table_fqn}")
    print(f"üìä Target table:  {target_table_fqn}")
    print()
    print("üìã Target table includes:")
    print("   - All data columns from source")
    print("   - _cdc_operation: UPSERT (for monitoring)")
    print("   - _cdc_timestamp: Last CDC event timestamp")
    print()
    print("üí° TIP: Staging table can be dropped after successful MERGE:")
    print(f"   spark.sql('DROP TABLE IF EXISTS {staging_table_fqn}')")
    
    return {
        "query": query,
        "staging_table": staging_table_fqn,
        "target_table": target_table_fqn,
        "raw_count": staging_count_raw,
        "deduped_count": staging_count,
        "merged": merged_count
    }


def merge_column_family_fragments(df, primary_key_columns, spark):
    """
    Merge column family fragments into complete rows (streaming-compatible).
    
    When split_column_families=true, CockroachDB creates multiple CDC events per row update,
    one for each column family. This function merges these fragments by:
    1. Grouping by (primary_key + _cdc_timestamp + _cdc_operation)
    2. Using first(col, ignorenulls=True) to coalesce NULL values from different fragments
    
    Each fragment has:
    - Primary key columns (always present)
    - Data for ONE column family (other columns are NULL)
    - Same _cdc_timestamp and _cdc_operation
    
    Args:
        df: Spark DataFrame with potential column family fragments
        primary_key_columns: List of primary key column names (e.g., ['ycsb_key'])
        spark: SparkSession
    
    Returns:
        Merged DataFrame with complete rows
    """
    from pyspark.sql import functions as F
    
    # Get all columns
    all_columns = df.columns
    
    # Metadata columns to preserve (not aggregate as data)
    metadata_columns = {'_cdc_operation', '_cdc_timestamp', '_rescued_data'}
    
    # Data columns = all columns except PK and metadata
    data_columns = [
        col for col in all_columns
        if col not in primary_key_columns 
        and col not in metadata_columns
    ]
    
    # Group by: PK + timestamp + operation (preserves all distinct CDC events)
    group_by_cols = primary_key_columns + ['_cdc_timestamp', '_cdc_operation']
    
    # Build aggregation expressions
    # Use first(col, ignorenulls=True) to merge NULL values from fragments
    agg_exprs = [
        F.first(col, ignorenulls=True).alias(col) 
        for col in data_columns
    ]
    
    # Add metadata columns that aren't in the grouping key
    for col in metadata_columns:
        if col in all_columns and col not in group_by_cols:
            agg_exprs.append(F.first(col, ignorenulls=True).alias(col))
    
    # Apply merge
    df_merged = df.groupBy(*group_by_cols).agg(*agg_exprs)
    
    return df_merged


def ingest_cdc_append_only_multi_family(
    storage_account_name, container_name,
    source_catalog, source_schema, source_table,
    target_catalog, target_schema, target_table,
    primary_key_columns,
    spark
):
    """
    Ingest CDC events in APPEND-ONLY mode with COLUMN FAMILY support.
    
    **Two-Stage Approach (Serverless Compatible)**:
    - Stage 1: Stream raw CDC events to staging table (no aggregations)
    - Stage 2: Batch merge column family fragments to target table
    
    This function:
    - Reads Parquet CDC files from Azure using Auto Loader
    - Filters out .RESOLVED files and metadata
    - Transforms CockroachDB CDC columns (__crdb__*) to standard format
    - MERGES column family fragments (split_column_families=true) in batch mode
    - Writes all events (INSERT/UPDATE/DELETE) as rows to Delta table
    - Does NOT apply deletes or deduplicate updates (append_only)
    
    Use this for:
    - Audit logs with column family tables
    - Tables WITH column families (split_column_families=true)
    - Full history tracking with wide tables
    
    Args:
        storage_account_name: Azure storage account name
        container_name: Azure container name
        source_catalog: CockroachDB catalog (database)
        source_schema: CockroachDB schema
        source_table: Source table name
        target_catalog: Databricks catalog
        target_schema: Databricks schema
        target_table: Target table name
        primary_key_columns: List of primary key column names (required for fragment merging)
        spark: SparkSession
    
    Returns:
        StreamingQuery object
    """
    # Build paths
    source_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}"
    checkpoint_path = f"/checkpoints/{target_schema}_{target_table}"
    target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
    
    print("üìñ Ingesting CDC events (Append-Only + Column Families)")
    print("=" * 80)
    print(f"Mode: APPEND-ONLY (Multi Column Family)")
    print(f"Source: {source_catalog}.{source_schema}.{source_table} (CockroachDB)")
    print(f"Target: {target_table_fqn} (Databricks Delta)")
    print(f"Primary keys: {primary_key_columns}")
    print(f"Source path: {source_path}/ (all dates, recursively)")
    print(f"File filter: *{source_table}*.parquet")
    print()
    
    # Read with Auto Loader
    raw_df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", f"{checkpoint_path}/schema")
        .option("pathGlobFilter", f"*{source_table}*.parquet")
        .option("recursiveFileLookup", "true")
        .load(source_path)
    )
    
    print("‚úÖ Schema inferred from data files")
    print()
    
    # Transform: CockroachDB CDC ‚Üí Standard CDC format
    transformed_df = raw_df.select(
        "*",
        F.from_unixtime(
            F.col("__crdb__updated").cast("double").cast("bigint") / 1000000000
        ).cast("timestamp").alias("_cdc_timestamp"),
        F.when(F.col("__crdb__event_type") == "d", "DELETE")
         .otherwise("UPSERT")
         .alias("_cdc_operation")
    ).drop("__crdb__event_type", "__crdb__updated")
    
    print("‚úÖ CDC transformations applied (streaming compatible)")
    print("   ‚ÑπÔ∏è  Column family merge will happen in Stage 2 (batch mode)")
    print()
    
    # ========================================================================
    # STAGE 1: Stream to Staging Table (Serverless Compatible - No Aggregations)
    # ========================================================================
    staging_table_fqn = f"{target_table_fqn}_staging_cf"
    
    print("üî∑ STAGE 1: Streaming to staging table (no aggregations)")
    print(f"   Staging: {staging_table_fqn}")
    print()
    
    # Write to staging table (pure Spark, no aggregations)
    query = (transformed_df.writeStream
        .format("delta")
        .option("checkpointLocation", f"{checkpoint_path}/data")
        .option("mergeSchema", "true")
        .trigger(availableNow=True)
        .toTable(staging_table_fqn)  # ‚Üê No aggregations, Serverless compatible!
    )
    
    print("‚è≥ Streaming CDC events to staging table...")
    query.awaitTermination()
    print("‚úÖ Stream completed\n")
    
    # ========================================================================
    # STAGE 2: Merge Column Families in Batch Mode
    # ========================================================================
    print("üî∑ STAGE 2: Merging column family fragments (batch mode)")
    print(f"   Reading from staging: {staging_table_fqn}")
    print(f"   Writing to target: {target_table_fqn}")
    print()
    
    # Read staging table in batch mode
    staging_df = spark.table(staging_table_fqn)
    
    # Merge column family fragments (batch mode - no streaming limitations!)
    print("üîß Merging column family fragments...")
    print(f"   Grouping by: {primary_key_columns} + _cdc_timestamp + _cdc_operation")
    print(f"   Using first(col, ignorenulls=True) to coalesce fragments")
    merged_df = merge_column_family_fragments(staging_df, primary_key_columns, spark)
    print("‚úÖ Column family fragments merged")
    print()
    
    # Write to final target table (batch mode, append_only)
    print(f"üíæ Writing merged events to {target_table_fqn}...")
    merged_df.write.format("delta").mode("append").saveAsTable(target_table_fqn)
    print("‚úÖ Append-only write complete")
    print()
    
    # Clean up staging table
    spark.sql(f"DROP TABLE IF EXISTS {staging_table_fqn}")
    print(f"üßπ Staging table dropped: {staging_table_fqn}")
    print()
    
    return query


def ingest_cdc_with_merge_multi_family(
    storage_account_name, container_name,
    source_catalog, source_schema, source_table,
    target_catalog, target_schema, target_table,
    primary_key_columns,
    spark
):
    """
    Ingest CDC events with MERGE logic and COLUMN FAMILY support.
    
    **Two-Stage Approach (Serverless Compatible)**:
    - Stage 1: Stream raw CDC events to staging table (no aggregations)
    - Stage 2: Batch merge column families + deduplicate + MERGE to target
    
    This function:
    - Reads Parquet CDC files from Azure using Auto Loader
    - Filters out .RESOLVED files and metadata
    - Transforms CockroachDB CDC columns (__crdb__*) to standard format
    - MERGES column family fragments (split_column_families=true) in batch mode
    - Streams to staging table (Serverless-compatible)
    - Deduplicates by primary key in batch mode
    - Applies MERGE logic to target Delta table
    
    Use this for:
    - Current state replication with column families
    - Tables WITH column families (split_column_families=true)
    - Production CDC with UPDATE/DELETE support
    
    Target table will contain:
    - All data columns from source
    - _cdc_operation: "UPSERT" (shows last operation)
    - _cdc_timestamp: Timestamp of last CDC event
    
    Args:
        storage_account_name: Azure storage account name
        container_name: Azure container name
        source_catalog: CockroachDB catalog (database)
        source_schema: CockroachDB schema
        source_table: Source table name
        target_catalog: Databricks catalog
        target_schema: Databricks schema
        target_table: Target table name
        primary_key_columns: List of primary key column names (required for fragments + MERGE)
        spark: SparkSession
    
    Returns:
        Dict with query, staging_table, target_table, raw_count, deduped_count, merged
    """
    from pyspark.sql import functions as F, Window
    from delta.tables import DeltaTable
    
    # Build paths
    source_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}"
    checkpoint_path = f"/checkpoints/{target_schema}_{target_table}_merge_cf"
    target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
    
    print("üìñ Ingesting CDC events (MERGE + Column Families)")
    print("=" * 80)
    print(f"Mode: MERGE with Column Families")
    print(f"Source: {source_catalog}.{source_schema}.{source_table} (CockroachDB)")
    print(f"Target: {target_table_fqn} (Databricks Delta)")
    print(f"Primary keys: {primary_key_columns}")
    print(f"Source path: {source_path}/ (all dates, recursively)")
    print()
    
    # Read with Auto Loader
    raw_df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", f"{checkpoint_path}/schema")
        .option("pathGlobFilter", f"*{source_table}*.parquet")
        .option("recursiveFileLookup", "true")
        .load(source_path)
    )
    
    print("‚úÖ Schema inferred from data files")
    print()
    
    # Transform: CockroachDB CDC ‚Üí Standard CDC format
    transformed_df = raw_df.select(
        "*",
        F.from_unixtime(
            F.col("__crdb__updated").cast("double").cast("bigint") / 1000000000
        ).cast("timestamp").alias("_cdc_timestamp"),
        F.when(F.col("__crdb__event_type") == "d", "DELETE")
         .otherwise("UPSERT")
         .alias("_cdc_operation")
    ).drop("__crdb__event_type", "__crdb__updated")
    
    print("‚úÖ CDC transformations applied (streaming compatible)")
    print("   ‚ÑπÔ∏è  Column family merge will happen in Stage 2 (batch mode)")
    print()
    
    # ========================================================================
    # STAGE 1: Stream to Staging Table (Serverless Compatible - No Aggregations)
    # ========================================================================
    staging_table_fqn = f"{target_table_fqn}_staging_cf"
    
    print("üî∑ STAGE 1: Streaming to staging table (no aggregations)")
    print(f"   Staging: {staging_table_fqn}")
    print()
    
    query = (transformed_df.writeStream  # ‚Üê Stream raw events (no column family merge yet)
        .format("delta")
        .option("checkpointLocation", f"{checkpoint_path}/data")
        .option("mergeSchema", "true")
        .trigger(availableNow=True)
        .toTable(staging_table_fqn)  # ‚Üê No aggregations, Serverless compatible!
    )
    
    print("‚è≥ Streaming CDC events to staging table...")
    query.awaitTermination()
    print("‚úÖ Stream completed\n")
    
    # ========================================================================
    # STAGE 2: Batch MERGE from Staging to Target
    # ========================================================================
    print("üî∑ STAGE 2: Applying MERGE logic (batch operation)")
    print(f"   Source: {staging_table_fqn}")
    print(f"   Target: {target_table_fqn}")
    print()
    
    # Read staging table (batch mode)
    staging_df_raw = spark.read.table(staging_table_fqn)
    staging_count_raw = staging_df_raw.count()
    print(f"   üìä Raw staging events: {staging_count_raw}")
    
    if staging_count_raw == 0:
        print("   ‚ÑπÔ∏è  No new events to process")
        return {"query": query, "staging_table": staging_table_fqn, "merged": 0}
    
    # Merge column family fragments (batch mode - no streaming limitations!)
    print(f"   üîß Merging column family fragments...")
    print(f"      Grouping by: {primary_key_columns} + _cdc_timestamp + _cdc_operation")
    staging_df_merged = merge_column_family_fragments(staging_df_raw, primary_key_columns, spark)
    print(f"   ‚úÖ Column family fragments merged")
    
    # Deduplicate by primary key (keep latest event)
    print(f"   üîÑ Deduplicating by primary keys: {primary_key_columns}...")
    window_spec = Window.partitionBy(*primary_key_columns).orderBy(F.col("_cdc_timestamp").desc())
    staging_df = (staging_df_merged  # ‚Üê Use merged DataFrame
        .withColumn("_row_num", F.row_number().over(window_spec))
        .filter(F.col("_row_num") == 1)
        .drop("_row_num")
    )
    
    staging_count = staging_df.count()
    fragments_removed = staging_count_raw - staging_df_merged.count()
    duplicates_removed = staging_df_merged.count() - staging_count
    print(f"   ‚úÖ Column family fragments coalesced: {fragments_removed} fragments merged")
    print(f"   ‚úÖ Deduplicated: {staging_count} unique events ({duplicates_removed} duplicates removed)")
    
    if staging_count == 0:
        print("   ‚ÑπÔ∏è  All events were duplicates")
        return {"query": query, "staging_table": staging_table_fqn, "merged": 0}
    
    # Create target table if it doesn't exist
    if not spark.catalog.tableExists(target_table_fqn):
        print(f"   üìù Creating new target table: {target_table_fqn}")
        
        # CRITICAL: Proper DELETE handling for initial table creation
        # This matches cockroachdb.py reference implementation
        
        # 1. Get keys that have DELETE events
        delete_keys = staging_df.filter(F.col("_cdc_operation") == "DELETE") \
            .select(*primary_key_columns) \
            .distinct()
        delete_count = delete_keys.count()
        
        # 2. Get all non-DELETE rows
        active_rows = staging_df.filter(F.col("_cdc_operation") != "DELETE")
        active_count = active_rows.count()
        
        # 3. Exclude rows with keys that are deleted (left anti join)
        # This handles case where key has UPSERT at T1, DELETE at T2
        rows_after_delete = active_rows.join(
            delete_keys,
            on=primary_key_columns,
            how="left_anti"
        )
        after_delete_count = rows_after_delete.count()
        
        # Note: staging_df is already deduplicated, so final_rows = rows_after_delete
        final_rows = rows_after_delete
        final_count = after_delete_count
        
        if delete_count > 0:
            print(f"   ‚ÑπÔ∏è  Found {delete_count} DELETE events")
            print(f"   ‚ÑπÔ∏è  Active rows before DELETE: {active_count}")
            print(f"   ‚ÑπÔ∏è  Active rows after DELETE: {after_delete_count}")
            print(f"   ‚ÑπÔ∏è  Rows removed by DELETE: {active_count - after_delete_count}")
        
        final_rows.write.format("delta").saveAsTable(target_table_fqn)
        merged_count = final_count
        print(f"   ‚úÖ Created table with {merged_count} initial rows")
        print(f"      Schema includes _cdc_operation for observability\n")
    else:
        # Get Delta table and apply MERGE
        delta_table = DeltaTable.forName(spark, target_table_fqn)
        
        # Check if _cdc_operation exists in target
        target_columns = set(spark.read.table(target_table_fqn).columns)
        if "_cdc_operation" not in target_columns:
            print(f"   ‚ö†Ô∏è  Target table missing _cdc_operation column")
            print(f"   üîß Adding _cdc_operation column...")
            spark.sql(f"ALTER TABLE {target_table_fqn} ADD COLUMN _cdc_operation STRING")
            print(f"   ‚úÖ Column added\n")
        
        # Build join condition
        join_condition = " AND ".join([f"target.{col} = source.{col}" for col in primary_key_columns])
        
        # Get all columns
        data_columns = [col for col in staging_df.columns]
        update_set = {col: f"source.{col}" for col in data_columns}
        insert_values = {col: f"source.{col}" for col in data_columns}
        
        print(f"   üîÑ Executing MERGE...")
        print(f"      Join: {join_condition}")
        print(f"      ‚ÑπÔ∏è  _cdc_operation preserved for monitoring")
        
        # Apply MERGE
        (delta_table.alias("target").merge(
            staging_df.alias("source"),
            join_condition
        )
        .whenMatchedUpdate(
            condition="source._cdc_operation = 'UPSERT' AND source._cdc_timestamp > target._cdc_timestamp",
            set=update_set
        )
        .whenMatchedDelete(
            condition="source._cdc_operation = 'DELETE'"
        )
        .whenNotMatchedInsert(
            condition="source._cdc_operation = 'UPSERT'",
            values=insert_values
        )
        .execute())
        
        merged_count = staging_count
        print(f"   ‚úÖ MERGE complete: processed {merged_count} events\n")
    
    print("=" * 80)
    print("‚úÖ CDC INGESTION COMPLETE (MERGE + COLUMN FAMILIES)")
    print("=" * 80)
    print(f"üìä Raw events: {staging_count_raw}")
    print(f"üìä After deduplication: {staging_count} unique events")
    print(f"üìä Staging table: {staging_table_fqn}")
    print(f"üìä Target table:  {target_table_fqn}")
    print()
    print("üí° TIP: Staging table can be dropped after successful MERGE:")
    print(f"   spark.sql('DROP TABLE IF EXISTS {staging_table_fqn}')")
    
    return {
        "query": query,
        "staging_table": staging_table_fqn,
        "target_table": target_table_fqn,
        "raw_count": staging_count_raw,
        "deduped_count": staging_count,
        "merged": merged_count
    }


print("‚úÖ Databricks streaming modes loaded (4 functions available)")
print("   1. ingest_cdc_append_only_single_family (append_only, no column families)")
print("   2. ingest_cdc_with_merge_single_family (update_delete, no column families)")
print("   3. ingest_cdc_append_only_multi_family (append_only, WITH column families)")
print("   4. ingest_cdc_with_merge_multi_family (update_delete, WITH column families)")

In [None]:
# ============================================================================
# CELL 7: CREATE TEST TABLE (Mode-Aware: Single or Multiple Column Families)
# ============================================================================
# Create table structure based on column_family_mode:
# - single_cf: 1 column family (default, better performance)
# - multi_cf: 3 column families (for testing split_column_families=true)

if column_family_mode == "multi_cf":
    # Create table with MULTIPLE column families for testing split_column_families=true
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {source_table} (
        ycsb_key INT PRIMARY KEY,
        -- Family 1: Frequently accessed fields
        field0 TEXT,
        field1 TEXT,
        field2 TEXT,
        FAMILY frequently_read (ycsb_key, field0, field1, field2),
        
        -- Family 2: Medium-frequency fields
        field3 TEXT,
        field4 TEXT,
        field5 TEXT,
        FAMILY medium_read (field3, field4, field5),
        
        -- Family 3: Rarely accessed fields
        field6 TEXT,
        field7 TEXT,
        field8 TEXT,
        field9 TEXT,
        FAMILY rarely_read (field6, field7, field8, field9)
    )
    """
    family_info = "3 column families (frequently_read, medium_read, rarely_read)"
else:
    # Create table with SINGLE column family (default)
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {source_table} (
        ycsb_key INT PRIMARY KEY,
        field0 TEXT,
        field1 TEXT,
        field2 TEXT,
        field3 TEXT,
        field4 TEXT,
        field5 TEXT,
        field6 TEXT,
        field7 TEXT,
        field8 TEXT,
        field9 TEXT
    )
    """
    family_info = "1 column family (default primary)"

conn = get_cockroachdb_connection()
try:
    with conn.cursor() as cur:
        cur.execute(create_table_sql)
        conn.commit()
    print(f"‚úÖ Table '{source_table}' created (or already exists)")
    print(f"   Column Family Mode: {column_family_mode}")
    print(f"   Column families: {family_info}")
finally:
    conn.close()

In [None]:
# ============================================================================
# CELL 8: INSERT SAMPLE DATA (Snapshot Phase)
# ============================================================================
conn = get_cockroachdb_connection()
try:
    # Check if table is empty using helper function
    stats = get_table_stats(conn, source_table)
    
    if stats['is_empty']:
        # Table is empty - insert snapshot data
        print(f"üìä Table is empty. Inserting {snapshot_count} initial rows (snapshot phase)...")
        
        with conn.cursor() as cur:
            # Use generate_series for efficient bulk insert
            insert_sql = f"""
            INSERT INTO {source_table} 
            (ycsb_key, field0, field1, field2, field3, field4, field5, field6, field7, field8, field9)
            SELECT 
                i AS ycsb_key,
                'snapshot_value_' || i || '_0' AS field0,
                'snapshot_value_' || i || '_1' AS field1,
                'snapshot_value_' || i || '_2' AS field2,
                'snapshot_value_' || i || '_3' AS field3,
                'snapshot_value_' || i || '_4' AS field4,
                'snapshot_value_' || i || '_5' AS field5,
                'snapshot_value_' || i || '_6' AS field6,
                'snapshot_value_' || i || '_7' AS field7,
                'snapshot_value_' || i || '_8' AS field8,
                'snapshot_value_' || i || '_9' AS field9
            FROM generate_series(0, %s - 1) AS i
            """
            
            cur.execute(insert_sql, (snapshot_count,))
            conn.commit()
        
        print(f"‚úÖ Sample data inserted using generate_series")
        print(f"   Rows inserted: {snapshot_count} (keys 0 to {snapshot_count - 1})")
    else:
        # Table already has data - skip insert
        print(f"‚ÑπÔ∏è  Table already contains data - skipping snapshot insert")
        print(f"   Current key range: {stats['min_key']} to {stats['max_key']}")
        print(f"   Tip: If you want to re-run the snapshot, drop the table first (see Cleanup cells)")
finally:
    conn.close()

In [None]:
# ============================================================================
# CELL 9: CREATE CHANGEFEED
# ============================================================================
# Build Azure Blob Storage URI with table-specific path
# Note: For Azure, path goes in URI (not as path_prefix query parameter like S3)
path = f"parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}"
changefeed_path = f"azure://{container_name}/{path}?AZURE_ACCOUNT_NAME={storage_account_name}&AZURE_ACCOUNT_KEY={storage_account_key_encoded}"

# Build changefeed options based on column_family_mode
if column_family_mode == "multi_cf":
    # Include split_column_families for multi-family mode
    changefeed_options = """
    format='parquet',
    updated,
    resolved='10s',
    split_column_families
"""
else:
    # Standard options for single-family mode
    changefeed_options = """
    format='parquet',
    updated,
    resolved='10s'
"""

# Create changefeed SQL
create_changefeed_sql = f"""
CREATE CHANGEFEED FOR TABLE {source_table}
INTO '{changefeed_path}'
WITH {changefeed_options}
"""

conn = get_cockroachdb_connection()
try:
    with conn.cursor() as cur:
        # Check for existing changefeed with THIS specific destination path
        # (checks for source table AND full path to ensure uniqueness)
        path_pattern = f"%{source_table}%{source_catalog}/{source_schema}/{source_table}/{target_table}%"
        
        cur.execute("""
            SELECT job_id, status, description
            FROM [SHOW CHANGEFEED JOBS] 
            WHERE description LIKE %s
            AND status IN ('running', 'paused')
            LIMIT 1
        """, (path_pattern,))
        
        existing = cur.fetchone()
        
        if existing:
            job_id, status, description = existing
            print(f"‚úÖ Changefeed already exists for this source ‚Üí target mapping")
            print(f"   Job ID: {job_id}")
            print(f"   Status: {status}")
            print(f"   Source: {source_catalog}.{source_schema}.{source_table}")
            print(f"   Target path: .../{source_table}/{target_table}/")
            if column_family_mode == "multi_cf":
                print(f"   Expected: Column family fragments")
            print(f"")
            print(f"üí° Tip: Run Cell 10 to generate UPDATE/DELETE events")
            print(f"   Then check Cell 11 to verify new files appear")
        else:
            # Create new changefeed
            cur.execute(create_changefeed_sql)
            result = cur.fetchone()
            job_id = result[0]
            
            print(f"‚úÖ Changefeed created")
            print(f"   Job ID: {job_id}")
            print(f"   Source: {source_catalog}.{source_schema}.{source_table}")
            print(f"   Target path: .../{source_table}/{target_table}/")
            print(f"   Format: Parquet")
            if column_family_mode == "multi_cf":
                print(f"   Split column families: TRUE (fragments will be generated)")
            else:
                print(f"   Split column families: FALSE (single file per event)")
            print(f"   Destination: Azure Blob Storage")
            print(f"")
            
            # Wait for files to appear using helper function
            wait_for_changefeed_files(
                storage_account_name, storage_account_key, container_name,
                source_catalog, source_schema, source_table, target_table,
                max_wait=300, check_interval=5
            )
finally:
    conn.close()

In [None]:
# ============================================================================
# CELL 10: RUN CDC WORKLOAD (UPDATE & DELETE)
# ============================================================================
import time
from datetime import datetime

# Capture baseline file count BEFORE generating CDC events
print("üìä Capturing baseline file count...")
result_before = check_azure_files(
    storage_account_name, storage_account_key, container_name,
    source_catalog, source_schema, source_table, target_table,
    verbose=False
)
files_before = len(result_before['data_files'])
print(f"   Current files: {files_before}")
print()

conn = get_cockroachdb_connection()
try:
    # Get current table state using helper function
    stats_before = get_table_stats(conn, source_table)
    min_key = stats_before['min_key']
    max_key = stats_before['max_key']
    count_before = stats_before['count']
    
    print(f"üìä Current table state:")
    print(f"   Min key: {min_key}, Max key: {max_key}, Total rows: {count_before}")
    print()
    
    with conn.cursor() as cur:
        
        # 1. INSERT: Add new rows starting from max_key + 1 (using generate_series)
        print(f"‚ûï Running {insert_count} INSERTs (keys {max_key + 1} to {max_key + insert_count})...")
        insert_sql = f"""
        INSERT INTO {source_table} 
        (ycsb_key, field0, field1, field2, field3, field4, field5, field6, field7, field8, field9)
        SELECT 
            i AS ycsb_key,
            'inserted_value_' || i || '_0' AS field0,
            'inserted_value_' || i || '_1' AS field1,
            'inserted_value_' || i || '_2' AS field2,
            'inserted_value_' || i || '_3' AS field3,
            'inserted_value_' || i || '_4' AS field4,
            'inserted_value_' || i || '_5' AS field5,
            'inserted_value_' || i || '_6' AS field6,
            'inserted_value_' || i || '_7' AS field7,
            'inserted_value_' || i || '_8' AS field8,
            'inserted_value_' || i || '_9' AS field9
        FROM generate_series(%s, %s) AS i
        """
        cur.execute(insert_sql, (max_key + 1, max_key + insert_count))
        
        # 2. UPDATE: Update existing rows starting from min_key (single UPDATE statement)
        print(f"üìù Running {update_count} UPDATEs (keys {min_key} to {min_key + update_count - 1})...")
        timestamp = int(time.time())
        cur.execute(f"""
            UPDATE {source_table}
            SET field0 = %s
            WHERE ycsb_key >= %s AND ycsb_key < %s
        """, (f"updated_at_{timestamp}", min_key, min_key + update_count))
        
        # 3. DELETE: Delete oldest rows starting from min_key (single DELETE)
        delete_max = min_key + delete_count - 1
        print(f"üóëÔ∏è  Running {delete_count} DELETEs (keys {min_key} to {delete_max})...")
        cur.execute(f"""
            DELETE FROM {source_table}
            WHERE ycsb_key >= %s AND ycsb_key <= %s
        """, (min_key, delete_max))
        
        conn.commit()
    
    # Get final table state using helper function
    stats_after = get_table_stats(conn, source_table)
    min_key_after = stats_after['min_key']
    max_key_after = stats_after['max_key']
    count_after = stats_after['count']
    
    print(f"\n‚úÖ Workload complete")
    print(f"   Inserts: {insert_count}")
    print(f"   Updates: {update_count}")
    print(f"   Deletes: {delete_count}")
    print(f"   Before: {count_before} rows (keys {min_key}-{max_key})")
    print(f"   After:  {count_after} rows (keys {min_key_after}-{max_key_after})")
    print(f"   Net change: {count_after - count_before:+d} rows")
    print(f"")
    
    # Wait for new CDC files to appear in Azure (positive confirmation)
    print(f"‚è≥ Waiting for new CDC files to appear in Azure...")
    print(f"   Baseline: {files_before} files")
    print()
    
    # Poll for new files (max 90 seconds)
    max_wait = 90
    check_interval = 10
    elapsed = 0
    
    while elapsed < max_wait:
        result = check_azure_files(
            storage_account_name, storage_account_key, container_name,
            source_catalog, source_schema, source_table, target_table,
            verbose=False
        )
        files_now = len(result['data_files'])
        
        if files_now > files_before:
            print(f"‚úÖ New CDC files appeared after {elapsed} seconds!")
            print(f"   Baseline (before workload): {files_before} files")
            print(f"   Current (after workload): {files_now} files")
            print(f"   New files generated: {files_now - files_before}")
            break
        
        print(f"   Checking... ({elapsed}s elapsed, baseline: {files_before} files)", end='\r')
        time.sleep(check_interval)
        elapsed += check_interval
    else:
        print(f"\n‚ö†Ô∏è  Timeout after {max_wait}s - files may still be flushing")
        print(f"   Run Cell 11 to check manually")
finally:
    conn.close()

In [None]:
# ============================================================================
# CELL 11: CHECK AZURE FILES (Optional - Manual Check)
# ============================================================================
# Use the helper function from Cell 4 to check for files
result = check_azure_files(
    storage_account_name, storage_account_key, container_name,
    source_catalog, source_schema, source_table, target_table,
    verbose=True
)

# Provide guidance
if len(result['data_files']) == 0:
    print(f"\n‚ö†Ô∏è  No data files found yet.")
    print(f"   üí° Possible reasons:")
    print(f"   - Changefeed not created yet (run Cell 9)")
    print(f"   - Path configuration mismatch (check Cell 1 variables)")
    print(f"   - Azure credentials issue (check External Location)")
else:
    print(f"\n‚úÖ Files are ready! Proceed to Cell 10 to read with Databricks.")

In [None]:
# ============================================================================
# CELL 12: READ CDC EVENTS IN DATABRICKS
# ============================================================================
# Select and run CDC ingestion function based on both modes (from Cell 1)
# Functions are defined in Cell 5
# ============================================================================

print(f"üî∑ CDC Configuration:")
print(f"   Processing Mode: {cdc_mode}")
print(f"   Column Family Mode: {column_family_mode}")
print()

# Select function based on BOTH cdc_mode and column_family_mode
if cdc_mode == "append_only" and column_family_mode == "single_cf":
    print(f"üìò Running: ingest_cdc_append_only_single_family()")
    print(f"   - All CDC events will be stored as rows")
    print(f"   - No column family merging needed\n")
    
    query = ingest_cdc_append_only_single_family(
        storage_account_name=storage_account_name,
        container_name=container_name,
        source_catalog=source_catalog,
        source_schema=source_schema,
        source_table=source_table,
        target_catalog=target_catalog,
        target_schema=target_schema,
        target_table=target_table,
        spark=spark
    )

elif cdc_mode == "append_only" and column_family_mode == "multi_cf":
    print(f"üìô Running: ingest_cdc_append_only_multi_family()")
    print(f"   - All CDC events will be stored as rows")
    print(f"   - Column family fragments will be merged\n")
    
    if not primary_key_columns:
        raise ValueError("primary_key_columns required for multi_cf mode")
    
    query = ingest_cdc_append_only_multi_family(
        storage_account_name=storage_account_name,
        container_name=container_name,
        source_catalog=source_catalog,
        source_schema=source_schema,
        source_table=source_table,
        target_catalog=target_catalog,
        target_schema=target_schema,
        target_table=target_table,
        primary_key_columns=primary_key_columns,
        spark=spark
    )

elif cdc_mode == "update_delete" and column_family_mode == "single_cf":
    print(f"üìó Running: ingest_cdc_with_merge_single_family()")
    print(f"   - MERGE logic applied (UPDATE/DELETE processed)")
    print(f"   - No column family merging needed\n")
    
    if not primary_key_columns:
        raise ValueError("primary_key_columns required for update_delete mode")
    
    result = ingest_cdc_with_merge_single_family(
        storage_account_name=storage_account_name,
        container_name=container_name,
        source_catalog=source_catalog,
        source_schema=source_schema,
        source_table=source_table,
        target_catalog=target_catalog,
        target_schema=target_schema,
        target_table=target_table,
        primary_key_columns=primary_key_columns,
        spark=spark
    )
    
    query = result["query"]

elif cdc_mode == "update_delete" and column_family_mode == "multi_cf":
    print(f"üìï Running: ingest_cdc_with_merge_multi_family()")
    print(f"   - MERGE logic applied (UPDATE/DELETE processed)")
    print(f"   - Column family fragments will be merged\n")
    
    if not primary_key_columns:
        raise ValueError("primary_key_columns required for update_delete + multi_cf mode")
    
    result = ingest_cdc_with_merge_multi_family(
        storage_account_name=storage_account_name,
        container_name=container_name,
        source_catalog=source_catalog,
        source_schema=source_schema,
        source_table=source_table,
        target_catalog=target_catalog,
        target_schema=target_schema,
        target_table=target_table,
        primary_key_columns=primary_key_columns,
        spark=spark
    )
    
    query = result["query"]

else:
    raise ValueError(
        f"Invalid mode combination:\n"
        f"  cdc_mode='{cdc_mode}' (valid: 'append_only', 'update_delete')\n"
        f"  column_family_mode='{column_family_mode}' (valid: 'single_cf', 'multi_cf')\n"
        f"Change modes in Cell 1."
    )

# Wait for completion (if not already complete)
if cdc_mode == "append_only":
    query.awaitTermination()
    print("\n" + "=" * 80)
    print(f"‚úÖ CDC INGESTION COMPLETE")
    print("=" * 80)
    print(f"   Mode: {cdc_mode} + {column_family_mode}")
    print(f"   Target: {target_catalog}.{target_schema}.{target_table}")
    print()
    print(f"üìä Query your data: SELECT * FROM {target_catalog}.{target_schema}.{target_table}")
else:
    # update_delete mode already completed inside the function
    print(f"üìä Query your data: SELECT * FROM {target_catalog}.{target_schema}.{target_table}")

In [None]:
# ============================================================================
# CELL 13: QUERY CDC RESULTS
# ============================================================================
target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"

# Get total count
df = spark.read.table(target_table_fqn)
total_count = df.count()

print("üìä CDC Event Summary")
print("=" * 80)
print(f"Total rows: {total_count}")
print(f"CDC Processing Mode: {cdc_mode}")
print(f"Column Family Mode: {column_family_mode}")
print()

# Show operation breakdown (works for both modes now!)
print("Rows by last CDC operation:")
ops_df = df.groupBy("_cdc_operation").count().orderBy("_cdc_operation")
ops_df.show()

print("\nüìã Sample rows (showing first 5):")
df.select(
    "ycsb_key", 
    "field0", 
    "_cdc_operation", 
    "_cdc_timestamp"
).orderBy("_cdc_timestamp").show(5, truncate=False)

if cdc_mode == "append_only":
    print("\n‚úÖ CDC data successfully loaded (append_only mode)")
    print("   üìä All CDC events stored as rows")
    print("   üìä _cdc_operation shows: DELETE, UPSERT for each event")
    print("   üìä Row count = all events (including DELETEs and multiple UPDATEs)")
elif cdc_mode == "update_delete":
    print("\n‚úÖ CDC data successfully loaded (update_delete mode)")
    print("   üìä MERGE operations applied: DELETEs removed, UPDATEs applied, INSERTs added")
    print("   üìä _cdc_operation shows: UPSERT (last operation on each row)")
    print("   üìä Row count = current state (deduplicated)")

print("\nüí° Key Takeaway:")
print("   - Using pathGlobFilter to exclude .RESOLVED files avoids DECIMAL errors")
print("   - _cdc_operation is preserved in both modes for monitoring")
print("\nüìç Next: Run Cell 14 to verify source and target tables are in sync")

In [None]:
# ============================================================================
# CELL 14: VERIFY SOURCE AND TARGET ARE IN SYNC
# ============================================================================
print("üîç Verifying source and target tables are in sync...")
print("=" * 80)

# Get source table stats (CockroachDB)
conn = get_cockroachdb_connection()
try:
    source_stats = get_table_stats(conn, source_table)
    source_sum = get_column_sum(conn, source_table, 'ycsb_key')
    
    # Get target table stats (Databricks Delta)
    target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
    target_df = spark.read.table(target_table_fqn)
    
    # Calculate target stats using Spark
    from pyspark.sql import functions as F
    target_stats_df = target_df.agg(
        F.min("ycsb_key").alias("min_key"),
        F.max("ycsb_key").alias("max_key"),
        F.count("*").alias("count")
    ).collect()[0]
    
    target_stats = {
        'min_key': target_stats_df['min_key'],
        'max_key': target_stats_df['max_key'],
        'count': target_stats_df['count']
    }
    
    # Calculate sum using Spark helper function
    target_sum = get_column_sum_spark(target_df, 'ycsb_key')
    
    # Display comparison
    print(f"\nüìä Source Table (CockroachDB): {source_catalog}.{source_schema}.{source_table}")
    print(f"   Min key: {source_stats['min_key']}")
    print(f"   Max key: {source_stats['max_key']}")
    print(f"   Count:   {source_stats['count']}")
    print(f"   Sum (ycsb_key): {source_sum}")
    
    print(f"\nüìä Target Table (Databricks Delta): {target_table_fqn}")
    print(f"   Min key: {target_stats['min_key']}")
    print(f"   Max key: {target_stats['max_key']}")
    print(f"   Count:   {target_stats['count']}")
    print(f"   Sum (ycsb_key): {target_sum}")
    
    # Verify all numeric columns (YCSB schema: field0-9)
    print("\nüìä Column Sums Comparison (All Fields):")
    print("-" * 80)
    
    columns_to_verify = ['ycsb_key', 'field0', 'field1', 'field2', 'field3',
                         'field4', 'field5', 'field6', 'field7', 'field8', 'field9']
    
    all_columns_match = True
    for col in columns_to_verify:
        try:
            source_col_sum = get_column_sum(conn, source_table, col)
            target_col_sum = get_column_sum_spark(target_df, col)
            col_matches = source_col_sum == target_col_sum
            match_icon = "‚úÖ" if col_matches else "‚ùå"
            
            # Format with commas for readability
            source_str = f"{source_col_sum:,}" if source_col_sum else "NULL"
            target_str = f"{target_col_sum:,}" if target_col_sum else "NULL"
            
            print(f"{match_icon} {col:12s}: Source={source_str:>20s} | Target={target_str:>20s}")
            
            if not col_matches:
                all_columns_match = False
                diff = (target_col_sum or 0) - (source_col_sum or 0)
                print(f"   ‚ö†Ô∏è  Difference: {diff:+,}")
        except Exception as e:
            print(f"‚ö†Ô∏è  {col:12s}: Error calculating sum - {e}")
            all_columns_match = False
    
    if all_columns_match:
        print("\n‚úÖ All column sums match!")
    else:
        print("\n‚ö†Ô∏è  Some column sums do not match - check data integrity")
    
finally:
    # Always close the connection, even if there's an error
    conn.close()

# Check CDC sync status (mode-aware verification)
print(f"\n{'=' * 80}")
print(f"Mode: {cdc_mode.upper()}")
print("=" * 80)

# Compare keys and counts
min_key_matches = source_stats['min_key'] == target_stats['min_key']
max_key_matches = source_stats['max_key'] == target_stats['max_key']
count_matches = source_stats['count'] == target_stats['count']
sum_matches = source_sum == target_sum

if cdc_mode == "append_only":
    # In append_only mode, max_key is the key indicator of sync
    # Min key and count are expected to differ due to deletes/updates not being applied
    
    if max_key_matches:
        print("‚úÖ CDC PIPELINE IS WORKING!")
        print(f"   Max key matches: {source_stats['max_key']}")
        print(f"   Sum matches: {source_sum}")
        print()
        
        if not min_key_matches or not count_matches:
            print("üìã Append-Only Mode (Expected Differences):")
            if not min_key_matches:
                print(f"   ‚ÑπÔ∏è  Min key differs (Source={source_stats['min_key']}, Target={target_stats['min_key']})")
                print(f"      ‚Üí This is EXPECTED: DELETE events are captured but not applied")
            if not count_matches:
                print(f"   ‚ÑπÔ∏è  Row count differs (Source={source_stats['count']}, Target={target_stats['count']})")
                print(f"      ‚Üí This is EXPECTED: All CDC events (INSERT/UPDATE/DELETE) are stored as rows")
            print()
            print("üí° To apply deletes and updates:")
            print("   - Change cdc_mode='update_delete' in Cell 1")
            print("   - Or manually deduplicate using SQL window functions")
    else:
        print("‚ö†Ô∏è  MAX KEY MISMATCH - CDC may be lagging")
        print("\n   Key Statistics:")
        print(f"   - Source max: {source_stats['max_key']}")
        print(f"   - Target max: {target_stats['max_key']}")
        print(f"   - Difference: {source_stats['max_key'] - target_stats['max_key']:+d}")
        
        print("\n   üí° Possible reasons:")
        print("   - Auto Loader hasn't picked up all files yet (re-run Cell 12)")
        print("   - MERGE logic issue (check Cell 12 output)")
        print("   - Run diagnostic cell (Cell 15) to inspect target table")

elif cdc_mode == "update_delete":
    # In update_delete mode, ALL stats should match exactly
    # DELETE operations are applied, UPDATE operations modify existing rows
    
    if min_key_matches and max_key_matches and count_matches and sum_matches:
        print("‚úÖ CDC PIPELINE IS WORKING PERFECTLY!")
        print("   All statistics match:")
        print(f"   ‚úÖ Min key: {source_stats['min_key']}")
        print(f"   ‚úÖ Max key: {source_stats['max_key']}")
        print(f"   ‚úÖ Count:   {source_stats['count']}")
        print(f"   ‚úÖ Sum:     {source_sum}")
        print()
        print("üìã Update-Delete Mode:")
        print("   ‚úÖ DELETE events are applied (rows removed)")
        print("   ‚úÖ UPDATE events are applied (rows modified)")
        print("   ‚úÖ INSERT events are applied (rows added)")
    else:
        print("‚ö†Ô∏è  SYNC MISMATCH - Tables are out of sync")
        print("\n   Key Statistics:")
        if not min_key_matches:
            print(f"   ‚ùå Min key: Source={source_stats['min_key']}, Target={target_stats['min_key']}")
        else:
            print(f"   ‚úÖ Min key: {source_stats['min_key']}")
        
        if not max_key_matches:
            print(f"   ‚ùå Max key: Source={source_stats['max_key']}, Target={target_stats['max_key']}")
        else:
            print(f"   ‚úÖ Max key: {source_stats['max_key']}")
        
        if not count_matches:
            print(f"   ‚ùå Count: Source={source_stats['count']}, Target={target_stats['count']}")
            print(f"      Difference: {target_stats['count'] - source_stats['count']:+d} rows")
        else:
            print(f"   ‚úÖ Count: {source_stats['count']}")
        
        if not sum_matches:
            print(f"   ‚ùå Sum: Source={source_sum}, Target={target_sum}")
            print(f"      Difference: {target_sum - source_sum:+d}")
        else:
            print(f"   ‚úÖ Sum: {source_sum}")
        
        print("\n   üí° Possible reasons:")
        print("   - Auto Loader hasn't picked up all files yet (re-run Cell 12)")
        print("   - MERGE logic issue (check Cell 12 output for errors)")
        print("   - DELETE rows stored as data (run Cell 16 to fix)")
        print("   - Run diagnostic cell (Cell 15) to inspect target table")

else:
    print(f"‚ö†Ô∏è  Unknown mode: {cdc_mode}")
    print("   Cannot verify sync status")

# Stream CockroachDB CDC to Databricks (Azure)

This notebook demonstrates how to stream CockroachDB changefeeds to Databricks using Azure Blob Storage.

## Prerequisites

- CockroachDB cluster (Cloud or self-hosted)
- Azure Storage Account with hierarchical namespace enabled
- Databricks workspace with Unity Catalog
- Unity Catalog External Location configured for your storage account

## CDC Mode Selection

This notebook supports **4 CDC ingestion modes** by combining two independent settings:

### 1. CDC Processing Mode (`cdc_mode`)
How CDC events are processed in the target table:

- **`append_only`**: Store all CDC events as rows (audit log)
  - **Behavior**: All events (INSERT/UPDATE/DELETE) are appended as new rows
  - **Use case**: History tracking, time-series analysis, audit logs
  - **Storage**: Higher (keeps all historical events)

- **`update_delete`**: Apply MERGE logic (current state replication)
  - **Behavior**: DELETE removes rows, UPDATE modifies rows in-place
  - **Use case**: Current state synchronization, production replication
  - **Storage**: Lower (only latest state per key)

### 2. Column Family Mode (`column_family_mode`)
Table structure and changefeed configuration:

- **`single_cf`**: Standard table (1 column family, default)
  - **Changefeed**: `split_column_families=false`
  - **Files**: 1 Parquet file per CDC event
  - **Use case**: Most tables, simpler configuration, better performance

- **`multi_cf`**: Multiple column families (for wide tables)
  - **Changefeed**: `split_column_families=true`
  - **Files**: Multiple Parquet files per CDC event (fragments need merging)
  - **Use case**: Wide tables (50+ columns), selective column access patterns

### Function Selection Matrix

The notebook automatically selects the appropriate ingestion function based on your configuration:

| CDC Mode | Column Family Mode | Function Called |
|----------|-------------------|-----------------|
| `append_only` | `single_cf` | `ingest_cdc_append_only_single_family()` |
| `append_only` | `multi_cf` | `ingest_cdc_append_only_multi_family()` |
| `update_delete` | `single_cf` | `ingest_cdc_with_merge_single_family()` |
| `update_delete` | `multi_cf` | `ingest_cdc_with_merge_multi_family()` |

---

In [None]:
# ============================================================================
# CELL 2: CONFIGURATION
# ============================================================================
import json
import os
from urllib.parse import quote

# Configuration file path (adjust as needed)
config_file = "/Users/robert.lee/github/lakeflow-community-connectors/sources/cockroachdb/.env/cockroachdb_cdc_tutorial_config.json"

# Try to load from file, fallback to embedded config
try:
    with open(config_file, 'r') as f:
        config = json.load(f)
    print(f"‚úÖ Configuration loaded from: {config_file}")
except Exception as e:
    print(f"‚ÑπÔ∏è  Using embedded configuration (config file error: {e})")
    config = None

# Embedded configuration (fallback)
if config is None:
    config = {
      "cockroachdb": {
        "host": "replace_me",
        "port": 26257,
        "user": "replace_me",
        "password": "replace_me",
        "database": "defaultdb"
      },
      "cockroachdb_source": {
        "catalog": "defaultdb",
        "schema": "public",
        "table_name": "usertable",
      },
      "azure_storage": {
        "account_name": "replace_me",
        "account_key": "replace_me",
        "container_name": "changefeed-events"
      },
      "databricks_target": {
        "catalog": "main",
        "schema": "replace_me",
        "table_name": "usertable",
      },
      "cdc_config": {
        "mode": "append_only",
        "column_family_mode": "multi_cf",
        "primary_key_columns": ["ycsb_key"],
        "auto_suffix_mode_family": True,
      },
      "workload_config": {
        "snapshot_count": 10,
        "insert_count": 10,
        "update_count": 9,
        "delete_count": 8,
      }
    }


In [None]:
from urllib.parse import quote

# Extract configuration values
cockroachdb_host = config["cockroachdb"]["host"]
cockroachdb_port = config["cockroachdb"]["port"]
cockroachdb_user = config["cockroachdb"]["user"]
cockroachdb_password = config["cockroachdb"]["password"]
cockroachdb_database = config["cockroachdb"]["database"]

source_catalog = config["cockroachdb_source"]["catalog"]
source_schema = config["cockroachdb_source"]["schema"]
source_table = config["cockroachdb_source"]["table_name"]

storage_account_name = config["azure_storage"]["account_name"]
storage_account_key = config["azure_storage"]["account_key"]
storage_account_key_encoded = quote(storage_account_key, safe='')
container_name = config["azure_storage"]["container_name"]

target_catalog = config["databricks_target"]["catalog"]
target_schema = config["databricks_target"]["schema"]
target_table = config["databricks_target"]["table_name"]

cdc_mode = config["cdc_config"]["mode"]
column_family_mode = config["cdc_config"]["column_family_mode"]
primary_key_columns = config["cdc_config"]["primary_key_columns"]

snapshot_count = config["workload_config"]["snapshot_count"]
insert_count = config["workload_config"]["insert_count"]
update_count = config["workload_config"]["update_count"]
delete_count = config["workload_config"]["delete_count"]

# Auto-suffix table names with mode and column family if enabled
auto_suffix = config["cdc_config"].get("auto_suffix_mode_family", False)
if auto_suffix:
    suffix = f"_{cdc_mode}_{column_family_mode}"
    
    # Add suffix to source_table if not already present
    if not source_table.endswith(suffix):
        source_table = f"{source_table}{suffix}"
    
    # Add suffix to target_table if not already present
    if not target_table.endswith(suffix):
        target_table = f"{target_table}{suffix}"

print("‚úÖ Configuration loaded")
print(f"   CDC Processing Mode: {cdc_mode}")
print(f"   Column Family Mode: {column_family_mode}")
print(f"   Primary Keys: {primary_key_columns}")
print(f"   Target Table: {target_table}")
print(f"   CDC Workload: {snapshot_count} snapshot ‚Üí +{insert_count} INSERTs, ~{update_count} UPDATEs, -{delete_count} DELETEs")


In [None]:
# ============================================================================
# CELL 3: INSTALL DEPENDENCIES
# ============================================================================
%pip install pg8000 azure-storage-blob --quiet
print("‚úÖ Dependencies installed")

In [None]:
# ============================================================================
# CELL 4: CONNECT TO COCKROACHDB
# ============================================================================
import pg8000
import ssl

def get_cockroachdb_connection():
    """Create connection to CockroachDB using pg8000"""
    # Create SSL context (required for CockroachDB Cloud)
    ssl_context = ssl.create_default_context()
    ssl_context.check_hostname = False
    ssl_context.verify_mode = ssl.CERT_NONE
    
    # Parse host (in case port is accidentally included in host string)
    host = cockroachdb_host.split(':')[0] if ':' in cockroachdb_host else cockroachdb_host
    
    conn = pg8000.connect(
        user=cockroachdb_user,
        password=cockroachdb_password,
        host=host,
        port=cockroachdb_port,
        database=cockroachdb_database,
        ssl_context=ssl_context
    )
    return conn

# Test connection
try:
    conn = get_cockroachdb_connection()
    with conn.cursor() as cur:
        cur.execute("SELECT version()")
        version = cur.fetchone()[0]
    conn.close()
    
    print("‚úÖ Connected to CockroachDB")
    print(f"   Version: {version[:50]}...")
except Exception as e:
    print(f"‚ùå Connection failed: {e}")
    raise

In [None]:
# ============================================================================
# CELL 5: HELPER FUNCTIONS (CockroachDB & Azure)
# ============================================================================
from azure.storage.blob import BlobServiceClient
from datetime import datetime
import time

def get_table_stats(conn, table_name):
    """
    Get min key, max key, and count for a table.
    
    Args:
        conn: Database connection
        table_name: Name of the table
    
    Returns:
        dict with 'min_key', 'max_key', 'count', 'is_empty'
    """
    with conn.cursor() as cur:
        cur.execute(f"SELECT MIN(ycsb_key), MAX(ycsb_key), COUNT(*) FROM {table_name}")
        result = cur.fetchone()
        min_key, max_key, count = result
        
        return {
            'min_key': min_key,
            'max_key': max_key,
            'count': count,
            'is_empty': min_key is None and max_key is None
        }


def check_azure_files(storage_account_name, storage_account_key, container_name, 
                      source_catalog, source_schema, source_table, target_table, 
                      verbose=True):
    """
    Check for changefeed files in Azure Blob Storage.
    
    Args:
        storage_account_name: Azure storage account name
        storage_account_key: Azure storage account key
        container_name: Azure container name
        source_catalog: CockroachDB catalog (database)
        source_schema: CockroachDB schema
        source_table: Source table name
        target_table: Target table name
        verbose: Print detailed output
    
    Returns:
        dict with 'data_files' and 'resolved_files' lists
    """
    # Connect to Azure
    connection_string = f"DefaultEndpointsProtocol=https;AccountName={storage_account_name};AccountKey={storage_account_key};EndpointSuffix=core.windows.net"
    blob_service = BlobServiceClient.from_connection_string(connection_string)
    container_client = blob_service.get_container_client(container_name)
    
    # Build path - list ALL files recursively under this changefeed path
    prefix = f"parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}/"
    
    # List all blobs recursively (no date filtering)
    blobs = list(container_client.list_blobs(name_starts_with=prefix))
    
    # Categorize files (using same filtering logic as cockroachdb.py)
    # Data files: .parquet files, excluding:
    #   - .RESOLVED files (CDC watermarks)
    #   - _metadata/ directory (schema files)
    #   - Files starting with _ (_SUCCESS, _committed_*, etc.)
    data_files = [
        b for b in blobs 
        if b.name.endswith('.parquet') 
        and '.RESOLVED' not in b.name
        and '/_metadata/' not in b.name
        and not b.name.split('/')[-1].startswith('_')
    ]
    resolved_files = [b for b in blobs if '.RESOLVED' in b.name]
    
    if verbose:
        print(f"üìÅ Files in Azure changefeed path:")
        print(f"   Path: {prefix}")
        print(f"   üìÑ Data files: {len(data_files)}")
        print(f"   üïê Resolved files: {len(resolved_files)}")
        print(f"   üìä Total: {len(blobs)}")
        
        if data_files:
            print(f"\n   Example data file:")
            print(f"   {data_files[0].name}")
    
    return {
        'data_files': data_files,
        'resolved_files': resolved_files,
        'total': len(blobs)
    }


def wait_for_changefeed_files(storage_account_name, storage_account_key, container_name,
                               source_catalog, source_schema, source_table, target_table,
                               max_wait=120, check_interval=5):
    """
    Wait for changefeed files to appear in Azure with timeout.
    
    Args:
        max_wait: Maximum seconds to wait (default: 120)
        check_interval: Seconds between checks (default: 5)
    
    Returns:
        True if files found, False if timeout
    """
    print(f"‚è≥ Waiting for initial snapshot files to appear in Azure...")
    
    elapsed = 0
    while elapsed < max_wait:
        result = check_azure_files(
            storage_account_name, storage_account_key, container_name,
            source_catalog, source_schema, source_table, target_table,
            verbose=False
        )
        
        if result['data_files']:
            print(f"‚úÖ Files appeared after {elapsed} seconds!")
            print(f"   Found {len(result['data_files'])} data files")
            print(f"   Example: {result['data_files'][0].name}")
            return True
        
        print(f"   Checking... ({elapsed}s elapsed)", end='\r')
        time.sleep(check_interval)
        elapsed += check_interval
    
    print(f"\n‚ö†Ô∏è  Timeout after {max_wait}s - files may still be generating")
    print(f"   Run Cell 9 to check manually")
    return False


print("‚úÖ Helper functions loaded (CockroachDB & Azure)")

In [None]:
# ============================================================================
# CELL 6: DATABRICKS STREAMING MODES (CDC Ingestion Functions)
# ============================================================================
# This cell contains different CDC ingestion functions for various scenarios.
# Select the appropriate function based on your use case:
#
# 1. ingest_cdc_append_only_single_family() - Simple append_only (this tutorial)
# 2. ingest_cdc_with_merge_single_family() - Apply updates/deletes (implemented below)
# 3. TODO: ingest_cdc_append_only_multi_family() - Column families support
# 4. TODO: ingest_cdc_with_merge_multi_family() - Full CDC with column families
# ============================================================================

from pyspark.sql import functions as F


def ingest_cdc_append_only_single_family(
    storage_account_name, container_name, 
    source_catalog, source_schema, source_table, 
    target_catalog, target_schema, target_table,
    spark
):
    """
    Ingest CDC events in APPEND-ONLY mode for single column family tables.
    
    This function:
    - Reads Parquet CDC files from Azure using Auto Loader
    - Filters out .RESOLVED files and metadata
    - Transforms CockroachDB CDC columns (__crdb__*) to standard format
    - Writes all events (INSERT/UPDATE/DELETE) as rows to Delta table
    - Does NOT apply deletes or deduplicate updates (append_only)
    
    Use this for:
    - Audit logs and full history tracking
    - Tables WITHOUT column families (split_column_families=false)
    - Simple CDC pipelines without MERGE logic
    
    Args:
        storage_account_name: Azure storage account name
        container_name: Azure container name
        source_catalog: CockroachDB catalog (database)
        source_schema: CockroachDB schema
        source_table: Source table name
        target_catalog: Databricks catalog
        target_schema: Databricks schema
        target_table: Target table name
        spark: SparkSession
    
    Returns:
        StreamingQuery object
    """
    # Build paths
    source_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}"
    checkpoint_path = f"/checkpoints/{target_schema}_{target_table}"
    target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
    
    print("üìñ Ingesting CDC events (Append-Only Mode)")
    print("=" * 80)
    print(f"Mode: APPEND-ONLY (Single Column Family)")
    print(f"Source: {source_catalog}.{source_schema}.{source_table} (CockroachDB)")
    print(f"Target: {target_table_fqn} (Databricks Delta)")
    print(f"Source path: {source_path}/ (all dates, recursively)")
    print(f"File filter: *{source_table}*.parquet")
    print(f"   ‚úÖ Includes: Data files")
    print(f"   ‚ùå Excludes: .RESOLVED, _metadata/, _SUCCESS, etc.")
    print()
    
    # Read with Auto Loader (production-grade filtering)
    raw_df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", f"{checkpoint_path}/schema")
        .option("pathGlobFilter", f"*{source_table}*.parquet")
        .option("recursiveFileLookup", "true")
        .load(source_path)
    )
    
    print("‚úÖ Schema inferred from data files")
    print("   (Filtering matches cockroachdb.py production code)")
    print()
    
    # Transform: CockroachDB CDC ‚Üí Standard CDC format
    df = raw_df.select(
        "*",
        # Convert __crdb__updated (nanoseconds) to timestamp
        F.from_unixtime(
            F.col("__crdb__updated").cast("double").cast("bigint") / 1000000000
        ).cast("timestamp").alias("_cdc_timestamp"),
        # Map event type
        F.when(F.col("__crdb__event_type") == "d", "DELETE")
         .otherwise("UPSERT")
         .alias("_cdc_operation")
    ).drop("__crdb__updated", "__crdb__event_type")
    
    # Write to Delta table (append_only)
    query = (df.writeStream
        .format("delta")
        .option("checkpointLocation", f"{checkpoint_path}/data")
        .option("mergeSchema", "true")
        .trigger(availableNow=True)
        .toTable(target_table_fqn)
    )
    
    print("‚è≥ Processing CDC events...")
    return query


def ingest_cdc_with_merge_single_family(
    storage_account_name, container_name,
    source_catalog, source_schema, source_table,
    target_catalog, target_schema, target_table,
    primary_key_columns,  # NEW: Required for MERGE join condition
    spark
):
    """
    Ingest CDC events with MERGE logic for single column family tables.
    
    This function:
    - Reads Parquet CDC files from Azure using Auto Loader
    - Filters out .RESOLVED files and metadata
    - Transforms CockroachDB CDC columns (__crdb__*) to standard format
    - Deduplicates events within each microbatch (handles column family fragments)
    - Applies MERGE logic to target Delta table:
      * UPDATE: When key exists and timestamp is newer
      * DELETE: When key exists and operation is DELETE
      * INSERT: When key doesn't exist and operation is UPSERT
    - Preserves _cdc_operation column for monitoring and observability
    
    Use this for:
    - Applications needing current state (not history)
    - Tables WITHOUT column families (split_column_families=false)
    - Production CDC pipelines with UPDATE/DELETE support
    - Lower storage requirements (only latest state)
    
    Target table will contain:
    - All data columns from source
    - _cdc_operation: "UPSERT" (shows last operation on each row)
    - _cdc_timestamp: Timestamp of last CDC event
    
    Args:
        storage_account_name: Azure storage account name
        container_name: Azure container name
        source_catalog: CockroachDB catalog (database)
        source_schema: CockroachDB schema
        source_table: Source table name
        target_catalog: Databricks catalog
        target_schema: Databricks schema
        target_table: Target table name
        primary_key_columns: List of primary key column names (e.g., ['ycsb_key'])
        spark: SparkSession
    
    Returns:
        Dict with query, staging_table, target_table, raw_count, deduped_count, merged
    """
    from pyspark.sql import functions as F, Window
    from delta.tables import DeltaTable
    
    # Build paths
    source_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}"
    checkpoint_path = f"/checkpoints/{target_schema}_{target_table}_merge"
    target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
    
    print("üìñ Ingesting CDC events (MERGE Mode)")
    print("=" * 80)
    print(f"Mode: MERGE (Apply UPDATE/DELETE)")
    print(f"Source: {source_catalog}.{source_schema}.{source_table} (CockroachDB)")
    print(f"Target: {target_table_fqn} (Databricks Delta)")
    print(f"Primary keys: {primary_key_columns}")
    print(f"Source path: {source_path}/ (all dates, recursively)")
    print(f"File filter: *{source_table}*.parquet")
    print(f"   ‚úÖ Includes: Data files")
    print(f"   ‚ùå Excludes: .RESOLVED, _metadata/, _SUCCESS, etc.")
    print()
    
    # Read with Auto Loader (production-grade filtering)
    raw_df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", f"{checkpoint_path}/schema")
        .option("pathGlobFilter", f"*{source_table}*.parquet")
        .option("recursiveFileLookup", "true")
        .load(source_path)
    )
    
    print("‚úÖ Schema inferred from data files")
    print("   (Filtering matches cockroachdb.py production code)")
    print()
    
    # Transform: CockroachDB CDC ‚Üí Standard CDC format
    transformed_df = raw_df.select(
        "*",
        # Convert __crdb__updated (nanoseconds) to timestamp
        F.from_unixtime(
            F.col("__crdb__updated").cast("double").cast("bigint") / 1000000000
        ).cast("timestamp").alias("_cdc_timestamp"),
        # Map event type
        F.when(F.col("__crdb__event_type") == "d", "DELETE")
         .otherwise("UPSERT")
         .alias("_cdc_operation")
    ).drop("__crdb__event_type", "__crdb__updated")
    
    print("‚úÖ CDC transformations applied (streaming compatible)")
    print("   ‚ÑπÔ∏è  Deduplication will happen in Stage 2 (batch mode)")
    print()
    
    # ========================================================================
    # STAGE 1: Stream to Staging Table (Serverless Compatible - No Python UDFs)
    # ========================================================================
    staging_table_fqn = f"{target_table_fqn}_staging"
    
    print("üî∑ STAGE 1: Streaming to staging table (no Python UDFs)")
    print(f"   Staging: {staging_table_fqn}")
    print()
    
    # Write to staging table (pure Spark, no foreachBatch, no window functions)
    query = (transformed_df.writeStream
        .format("delta")
        .option("checkpointLocation", f"{checkpoint_path}/data")
        .option("mergeSchema", "true")
        .trigger(availableNow=True)
        .toTable(staging_table_fqn)  # ‚Üê No Python UDFs, Serverless compatible!
    )
    
    print("‚è≥ Streaming CDC events to staging table...")
    query.awaitTermination()
    print("‚úÖ Stream completed\n")
    
    # ========================================================================
    # STAGE 2: Batch MERGE from Staging to Target (Runs on Driver)
    # ========================================================================
    print("üî∑ STAGE 2: Applying MERGE logic (batch operation)")
    print(f"   Source: {staging_table_fqn}")
    print(f"   Target: {target_table_fqn}")
    print()
    
    # Read staging table (batch mode - window functions allowed!)
    staging_df_raw = spark.read.table(staging_table_fqn)
    staging_count_raw = staging_df_raw.count()
    print(f"   üìä Raw staging events: {staging_count_raw}")
    
    if staging_count_raw == 0:
        print("   ‚ÑπÔ∏è  No new events to process")
        return {"query": query, "staging_table": staging_table_fqn, "merged": 0}
    
    # Deduplicate by primary key (batch mode, matches cockroachdb.py logic)
    # Keep only LATEST event per primary key based on timestamp
    from pyspark.sql import Window
    
    print(f"   üîÑ Deduplicating by primary keys: {primary_key_columns}...")
    window_spec = Window.partitionBy(*primary_key_columns).orderBy(F.col("_cdc_timestamp").desc())
    staging_df = (staging_df_raw
        .withColumn("_row_num", F.row_number().over(window_spec))
        .filter(F.col("_row_num") == 1)
        .drop("_row_num")
    )
    
    staging_count = staging_df.count()
    duplicates_removed = staging_count_raw - staging_count
    print(f"   ‚úÖ Deduplicated: {staging_count} unique events ({duplicates_removed} duplicates removed)")
    
    if staging_count == 0:
        print("   ‚ÑπÔ∏è  All events were duplicates")
        return {"query": query, "staging_table": staging_table_fqn, "merged": 0}
    
    # Create target table if it doesn't exist
    if not spark.catalog.tableExists(target_table_fqn):
        print(f"   üìù Creating new target table: {target_table_fqn}")
        
        # CRITICAL: Proper DELETE handling for initial table creation
        # This matches cockroachdb.py reference implementation
        
        # 1. Get keys that have DELETE events
        delete_keys = staging_df.filter(F.col("_cdc_operation") == "DELETE") \
            .select(*primary_key_columns) \
            .distinct()
        delete_count = delete_keys.count()
        
        # 2. Get all non-DELETE rows
        active_rows = staging_df.filter(F.col("_cdc_operation") != "DELETE")
        active_count = active_rows.count()
        
        # 3. Exclude rows with keys that are deleted (left anti join)
        # This handles case where key has UPSERT at T1, DELETE at T2
        rows_after_delete = active_rows.join(
            delete_keys,
            on=primary_key_columns,
            how="left_anti"
        )
        after_delete_count = rows_after_delete.count()
        
        # Note: staging_df is already deduplicated, so final_rows = rows_after_delete
        final_rows = rows_after_delete
        final_count = after_delete_count
        
        if delete_count > 0:
            print(f"   ‚ÑπÔ∏è  Found {delete_count} DELETE events")
            print(f"   ‚ÑπÔ∏è  Active rows before DELETE: {active_count}")
            print(f"   ‚ÑπÔ∏è  Active rows after DELETE: {after_delete_count}")
            print(f"   ‚ÑπÔ∏è  Rows removed by DELETE: {active_count - after_delete_count}")
        
        # Keep ALL columns including _cdc_operation for monitoring
        final_rows.write.format("delta").saveAsTable(target_table_fqn)
        merged_count = final_count
        print(f"   ‚úÖ Created table with {merged_count} initial rows")
        print(f"      Schema includes _cdc_operation for observability\n")
    else:
        # Get Delta table and apply MERGE
        delta_table = DeltaTable.forName(spark, target_table_fqn)
        
        # Check if _cdc_operation exists in target (might be missing from old tables)
        target_columns = set(spark.read.table(target_table_fqn).columns)
        if "_cdc_operation" not in target_columns:
            print(f"   ‚ö†Ô∏è  Target table missing _cdc_operation column (old schema)")
            print(f"   üîß Adding _cdc_operation column for observability...")
            spark.sql(f"""
                ALTER TABLE {target_table_fqn} 
                ADD COLUMN _cdc_operation STRING
            """)
            print(f"   ‚úÖ Column added\n")
        
        # Build join condition dynamically
        join_condition = " AND ".join([f"target.{col} = source.{col}" for col in primary_key_columns])
        
        # Get all data columns (KEEP _cdc_operation for observability!)
        data_columns = [col for col in staging_df.columns]
        
        # Build UPDATE/INSERT clauses dynamically
        update_set = {col: f"source.{col}" for col in data_columns}
        insert_values = {col: f"source.{col}" for col in data_columns}
        
        print(f"   üîÑ Executing MERGE...")
        print(f"      Join: {join_condition}")
        print(f"      ‚ÑπÔ∏è  _cdc_operation will be preserved for monitoring")
        
        # Apply MERGE (runs on driver, not workers)
        (delta_table.alias("target").merge(
            staging_df.alias("source"),
            join_condition
        )
        .whenMatchedUpdate(
            condition="source._cdc_operation = 'UPSERT' AND source._cdc_timestamp > target._cdc_timestamp",
            set=update_set
        )
        .whenMatchedDelete(
            condition="source._cdc_operation = 'DELETE'"
        )
        .whenNotMatchedInsert(
            condition="source._cdc_operation = 'UPSERT'",
            values=insert_values
        )
        .execute())
        
        merged_count = staging_count
        print(f"   ‚úÖ MERGE complete: processed {merged_count} events\n")
    
    print("=" * 80)
    print("‚úÖ CDC INGESTION COMPLETE (TWO-STAGE MERGE)")
    print("=" * 80)
    print(f"üìä Raw events: {staging_count_raw}")
    print(f"üìä After deduplication: {staging_count} unique events")
    print(f"üìä Staging table: {staging_table_fqn}")
    print(f"üìä Target table:  {target_table_fqn}")
    print()
    print("üìã Target table includes:")
    print("   - All data columns from source")
    print("   - _cdc_operation: UPSERT (for monitoring)")
    print("   - _cdc_timestamp: Last CDC event timestamp")
    print()
    print("üí° TIP: Staging table can be dropped after successful MERGE:")
    print(f"   spark.sql('DROP TABLE IF EXISTS {staging_table_fqn}')")
    
    return {
        "query": query,
        "staging_table": staging_table_fqn,
        "target_table": target_table_fqn,
        "raw_count": staging_count_raw,
        "deduped_count": staging_count,
        "merged": merged_count
    }


def merge_column_family_fragments(df, primary_key_columns, spark):
    """
    Merge column family fragments into complete rows (streaming-compatible).
    
    When split_column_families=true, CockroachDB creates multiple CDC events per row update,
    one for each column family. This function merges these fragments by:
    1. Grouping by (primary_key + _cdc_timestamp + _cdc_operation)
    2. Using first(col, ignorenulls=True) to coalesce NULL values from different fragments
    
    Each fragment has:
    - Primary key columns (always present)
    - Data for ONE column family (other columns are NULL)
    - Same _cdc_timestamp and _cdc_operation
    
    Args:
        df: Spark DataFrame with potential column family fragments
        primary_key_columns: List of primary key column names (e.g., ['ycsb_key'])
        spark: SparkSession
    
    Returns:
        Merged DataFrame with complete rows
    """
    from pyspark.sql import functions as F
    
    # Get all columns
    all_columns = df.columns
    
    # Metadata columns to preserve (not aggregate as data)
    metadata_columns = {'_cdc_operation', '_cdc_timestamp', '_rescued_data'}
    
    # Data columns = all columns except PK and metadata
    data_columns = [
        col for col in all_columns
        if col not in primary_key_columns 
        and col not in metadata_columns
    ]
    
    # Group by: PK + timestamp + operation (preserves all distinct CDC events)
    group_by_cols = primary_key_columns + ['_cdc_timestamp', '_cdc_operation']
    
    # Build aggregation expressions
    # Use first(col, ignorenulls=True) to merge NULL values from fragments
    agg_exprs = [
        F.first(col, ignorenulls=True).alias(col) 
        for col in data_columns
    ]
    
    # Add metadata columns that aren't in the grouping key
    for col in metadata_columns:
        if col in all_columns and col not in group_by_cols:
            agg_exprs.append(F.first(col, ignorenulls=True).alias(col))
    
    # Apply merge
    df_merged = df.groupBy(*group_by_cols).agg(*agg_exprs)
    
    return df_merged


def ingest_cdc_append_only_multi_family(
    storage_account_name, container_name,
    source_catalog, source_schema, source_table,
    target_catalog, target_schema, target_table,
    primary_key_columns,
    spark
):
    """
    Ingest CDC events in APPEND-ONLY mode with COLUMN FAMILY support.
    
    **Two-Stage Approach (Serverless Compatible)**:
    - Stage 1: Stream raw CDC events to staging table (no aggregations)
    - Stage 2: Batch merge column family fragments to target table
    
    This function:
    - Reads Parquet CDC files from Azure using Auto Loader
    - Filters out .RESOLVED files and metadata
    - Transforms CockroachDB CDC columns (__crdb__*) to standard format
    - MERGES column family fragments (split_column_families=true) in batch mode
    - Writes all events (INSERT/UPDATE/DELETE) as rows to Delta table
    - Does NOT apply deletes or deduplicate updates (append_only)
    
    Use this for:
    - Audit logs with column family tables
    - Tables WITH column families (split_column_families=true)
    - Full history tracking with wide tables
    
    Args:
        storage_account_name: Azure storage account name
        container_name: Azure container name
        source_catalog: CockroachDB catalog (database)
        source_schema: CockroachDB schema
        source_table: Source table name
        target_catalog: Databricks catalog
        target_schema: Databricks schema
        target_table: Target table name
        primary_key_columns: List of primary key column names (required for fragment merging)
        spark: SparkSession
    
    Returns:
        StreamingQuery object
    """
    # Build paths
    source_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}"
    checkpoint_path = f"/checkpoints/{target_schema}_{target_table}"
    target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
    
    print("üìñ Ingesting CDC events (Append-Only + Column Families)")
    print("=" * 80)
    print(f"Mode: APPEND-ONLY (Multi Column Family)")
    print(f"Source: {source_catalog}.{source_schema}.{source_table} (CockroachDB)")
    print(f"Target: {target_table_fqn} (Databricks Delta)")
    print(f"Primary keys: {primary_key_columns}")
    print(f"Source path: {source_path}/ (all dates, recursively)")
    print(f"File filter: *{source_table}*.parquet")
    print()
    
    # Read with Auto Loader
    raw_df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", f"{checkpoint_path}/schema")
        .option("pathGlobFilter", f"*{source_table}*.parquet")
        .option("recursiveFileLookup", "true")
        .load(source_path)
    )
    
    print("‚úÖ Schema inferred from data files")
    print()
    
    # Transform: CockroachDB CDC ‚Üí Standard CDC format
    transformed_df = raw_df.select(
        "*",
        F.from_unixtime(
            F.col("__crdb__updated").cast("double").cast("bigint") / 1000000000
        ).cast("timestamp").alias("_cdc_timestamp"),
        F.when(F.col("__crdb__event_type") == "d", "DELETE")
         .otherwise("UPSERT")
         .alias("_cdc_operation")
    ).drop("__crdb__event_type", "__crdb__updated")
    
    print("‚úÖ CDC transformations applied (streaming compatible)")
    print("   ‚ÑπÔ∏è  Column family merge will happen in Stage 2 (batch mode)")
    print()
    
    # ========================================================================
    # STAGE 1: Stream to Staging Table (Serverless Compatible - No Aggregations)
    # ========================================================================
    staging_table_fqn = f"{target_table_fqn}_staging_cf"
    
    print("üî∑ STAGE 1: Streaming to staging table (no aggregations)")
    print(f"   Staging: {staging_table_fqn}")
    print()
    
    # Write to staging table (pure Spark, no aggregations)
    query = (transformed_df.writeStream
        .format("delta")
        .option("checkpointLocation", f"{checkpoint_path}/data")
        .option("mergeSchema", "true")
        .trigger(availableNow=True)
        .toTable(staging_table_fqn)  # ‚Üê No aggregations, Serverless compatible!
    )
    
    print("‚è≥ Streaming CDC events to staging table...")
    query.awaitTermination()
    print("‚úÖ Stream completed\n")
    
    # ========================================================================
    # STAGE 2: Merge Column Families in Batch Mode
    # ========================================================================
    print("üî∑ STAGE 2: Merging column family fragments (batch mode)")
    print(f"   Reading from staging: {staging_table_fqn}")
    print(f"   Writing to target: {target_table_fqn}")
    print()
    
    # Read staging table in batch mode
    staging_df = spark.table(staging_table_fqn)
    
    # Merge column family fragments (batch mode - no streaming limitations!)
    print("üîß Merging column family fragments...")
    print(f"   Grouping by: {primary_key_columns} + _cdc_timestamp + _cdc_operation")
    print(f"   Using first(col, ignorenulls=True) to coalesce fragments")
    merged_df = merge_column_family_fragments(staging_df, primary_key_columns, spark)
    print("‚úÖ Column family fragments merged")
    print()
    
    # Write to final target table (batch mode, append_only)
    print(f"üíæ Writing merged events to {target_table_fqn}...")
    merged_df.write.format("delta").mode("append").saveAsTable(target_table_fqn)
    print("‚úÖ Append-only write complete")
    print()
    
    # Clean up staging table
    spark.sql(f"DROP TABLE IF EXISTS {staging_table_fqn}")
    print(f"üßπ Staging table dropped: {staging_table_fqn}")
    print()
    
    return query


def ingest_cdc_with_merge_multi_family(
    storage_account_name, container_name,
    source_catalog, source_schema, source_table,
    target_catalog, target_schema, target_table,
    primary_key_columns,
    spark
):
    """
    Ingest CDC events with MERGE logic and COLUMN FAMILY support.
    
    **Two-Stage Approach (Serverless Compatible)**:
    - Stage 1: Stream raw CDC events to staging table (no aggregations)
    - Stage 2: Batch merge column families + deduplicate + MERGE to target
    
    This function:
    - Reads Parquet CDC files from Azure using Auto Loader
    - Filters out .RESOLVED files and metadata
    - Transforms CockroachDB CDC columns (__crdb__*) to standard format
    - MERGES column family fragments (split_column_families=true) in batch mode
    - Streams to staging table (Serverless-compatible)
    - Deduplicates by primary key in batch mode
    - Applies MERGE logic to target Delta table
    
    Use this for:
    - Current state replication with column families
    - Tables WITH column families (split_column_families=true)
    - Production CDC with UPDATE/DELETE support
    
    Target table will contain:
    - All data columns from source
    - _cdc_operation: "UPSERT" (shows last operation)
    - _cdc_timestamp: Timestamp of last CDC event
    
    Args:
        storage_account_name: Azure storage account name
        container_name: Azure container name
        source_catalog: CockroachDB catalog (database)
        source_schema: CockroachDB schema
        source_table: Source table name
        target_catalog: Databricks catalog
        target_schema: Databricks schema
        target_table: Target table name
        primary_key_columns: List of primary key column names (required for fragments + MERGE)
        spark: SparkSession
    
    Returns:
        Dict with query, staging_table, target_table, raw_count, deduped_count, merged
    """
    from pyspark.sql import functions as F, Window
    from delta.tables import DeltaTable
    
    # Build paths
    source_path = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}"
    checkpoint_path = f"/checkpoints/{target_schema}_{target_table}_merge_cf"
    target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
    
    print("üìñ Ingesting CDC events (MERGE + Column Families)")
    print("=" * 80)
    print(f"Mode: MERGE with Column Families")
    print(f"Source: {source_catalog}.{source_schema}.{source_table} (CockroachDB)")
    print(f"Target: {target_table_fqn} (Databricks Delta)")
    print(f"Primary keys: {primary_key_columns}")
    print(f"Source path: {source_path}/ (all dates, recursively)")
    print()
    
    # Read with Auto Loader
    raw_df = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", f"{checkpoint_path}/schema")
        .option("pathGlobFilter", f"*{source_table}*.parquet")
        .option("recursiveFileLookup", "true")
        .load(source_path)
    )
    
    print("‚úÖ Schema inferred from data files")
    print()
    
    # Transform: CockroachDB CDC ‚Üí Standard CDC format
    transformed_df = raw_df.select(
        "*",
        F.from_unixtime(
            F.col("__crdb__updated").cast("double").cast("bigint") / 1000000000
        ).cast("timestamp").alias("_cdc_timestamp"),
        F.when(F.col("__crdb__event_type") == "d", "DELETE")
         .otherwise("UPSERT")
         .alias("_cdc_operation")
    ).drop("__crdb__event_type", "__crdb__updated")
    
    print("‚úÖ CDC transformations applied (streaming compatible)")
    print("   ‚ÑπÔ∏è  Column family merge will happen in Stage 2 (batch mode)")
    print()
    
    # ========================================================================
    # STAGE 1: Stream to Staging Table (Serverless Compatible - No Aggregations)
    # ========================================================================
    staging_table_fqn = f"{target_table_fqn}_staging_cf"
    
    print("üî∑ STAGE 1: Streaming to staging table (no aggregations)")
    print(f"   Staging: {staging_table_fqn}")
    print()
    
    query = (transformed_df.writeStream  # ‚Üê Stream raw events (no column family merge yet)
        .format("delta")
        .option("checkpointLocation", f"{checkpoint_path}/data")
        .option("mergeSchema", "true")
        .trigger(availableNow=True)
        .toTable(staging_table_fqn)  # ‚Üê No aggregations, Serverless compatible!
    )
    
    print("‚è≥ Streaming CDC events to staging table...")
    query.awaitTermination()
    print("‚úÖ Stream completed\n")
    
    # ========================================================================
    # STAGE 2: Batch MERGE from Staging to Target
    # ========================================================================
    print("üî∑ STAGE 2: Applying MERGE logic (batch operation)")
    print(f"   Source: {staging_table_fqn}")
    print(f"   Target: {target_table_fqn}")
    print()
    
    # Read staging table (batch mode)
    staging_df_raw = spark.read.table(staging_table_fqn)
    staging_count_raw = staging_df_raw.count()
    print(f"   üìä Raw staging events: {staging_count_raw}")
    
    if staging_count_raw == 0:
        print("   ‚ÑπÔ∏è  No new events to process")
        return {"query": query, "staging_table": staging_table_fqn, "merged": 0}
    
    # Merge column family fragments (batch mode - no streaming limitations!)
    print(f"   üîß Merging column family fragments...")
    print(f"      Grouping by: {primary_key_columns} + _cdc_timestamp + _cdc_operation")
    staging_df_merged = merge_column_family_fragments(staging_df_raw, primary_key_columns, spark)
    print(f"   ‚úÖ Column family fragments merged")
    
    # Deduplicate by primary key (keep latest event)
    print(f"   üîÑ Deduplicating by primary keys: {primary_key_columns}...")
    window_spec = Window.partitionBy(*primary_key_columns).orderBy(F.col("_cdc_timestamp").desc())
    staging_df = (staging_df_merged  # ‚Üê Use merged DataFrame
        .withColumn("_row_num", F.row_number().over(window_spec))
        .filter(F.col("_row_num") == 1)
        .drop("_row_num")
    )
    
    staging_count = staging_df.count()
    fragments_removed = staging_count_raw - staging_df_merged.count()
    duplicates_removed = staging_df_merged.count() - staging_count
    print(f"   ‚úÖ Column family fragments coalesced: {fragments_removed} fragments merged")
    print(f"   ‚úÖ Deduplicated: {staging_count} unique events ({duplicates_removed} duplicates removed)")
    
    if staging_count == 0:
        print("   ‚ÑπÔ∏è  All events were duplicates")
        return {"query": query, "staging_table": staging_table_fqn, "merged": 0}
    
    # Create target table if it doesn't exist
    if not spark.catalog.tableExists(target_table_fqn):
        print(f"   üìù Creating new target table: {target_table_fqn}")
        
        # CRITICAL: Proper DELETE handling for initial table creation
        # This matches cockroachdb.py reference implementation
        
        # 1. Get keys that have DELETE events
        delete_keys = staging_df.filter(F.col("_cdc_operation") == "DELETE") \
            .select(*primary_key_columns) \
            .distinct()
        delete_count = delete_keys.count()
        
        # 2. Get all non-DELETE rows
        active_rows = staging_df.filter(F.col("_cdc_operation") != "DELETE")
        active_count = active_rows.count()
        
        # 3. Exclude rows with keys that are deleted (left anti join)
        # This handles case where key has UPSERT at T1, DELETE at T2
        rows_after_delete = active_rows.join(
            delete_keys,
            on=primary_key_columns,
            how="left_anti"
        )
        after_delete_count = rows_after_delete.count()
        
        # Note: staging_df is already deduplicated, so final_rows = rows_after_delete
        final_rows = rows_after_delete
        final_count = after_delete_count
        
        if delete_count > 0:
            print(f"   ‚ÑπÔ∏è  Found {delete_count} DELETE events")
            print(f"   ‚ÑπÔ∏è  Active rows before DELETE: {active_count}")
            print(f"   ‚ÑπÔ∏è  Active rows after DELETE: {after_delete_count}")
            print(f"   ‚ÑπÔ∏è  Rows removed by DELETE: {active_count - after_delete_count}")
        
        final_rows.write.format("delta").saveAsTable(target_table_fqn)
        merged_count = final_count
        print(f"   ‚úÖ Created table with {merged_count} initial rows")
        print(f"      Schema includes _cdc_operation for observability\n")
    else:
        # Get Delta table and apply MERGE
        delta_table = DeltaTable.forName(spark, target_table_fqn)
        
        # Check if _cdc_operation exists in target
        target_columns = set(spark.read.table(target_table_fqn).columns)
        if "_cdc_operation" not in target_columns:
            print(f"   ‚ö†Ô∏è  Target table missing _cdc_operation column")
            print(f"   üîß Adding _cdc_operation column...")
            spark.sql(f"ALTER TABLE {target_table_fqn} ADD COLUMN _cdc_operation STRING")
            print(f"   ‚úÖ Column added\n")
        
        # Build join condition
        join_condition = " AND ".join([f"target.{col} = source.{col}" for col in primary_key_columns])
        
        # Get all columns
        data_columns = [col for col in staging_df.columns]
        update_set = {col: f"source.{col}" for col in data_columns}
        insert_values = {col: f"source.{col}" for col in data_columns}
        
        print(f"   üîÑ Executing MERGE...")
        print(f"      Join: {join_condition}")
        print(f"      ‚ÑπÔ∏è  _cdc_operation preserved for monitoring")
        
        # Apply MERGE
        (delta_table.alias("target").merge(
            staging_df.alias("source"),
            join_condition
        )
        .whenMatchedUpdate(
            condition="source._cdc_operation = 'UPSERT' AND source._cdc_timestamp > target._cdc_timestamp",
            set=update_set
        )
        .whenMatchedDelete(
            condition="source._cdc_operation = 'DELETE'"
        )
        .whenNotMatchedInsert(
            condition="source._cdc_operation = 'UPSERT'",
            values=insert_values
        )
        .execute())
        
        merged_count = staging_count
        print(f"   ‚úÖ MERGE complete: processed {merged_count} events\n")
    
    print("=" * 80)
    print("‚úÖ CDC INGESTION COMPLETE (MERGE + COLUMN FAMILIES)")
    print("=" * 80)
    print(f"üìä Raw events: {staging_count_raw}")
    print(f"üìä After deduplication: {staging_count} unique events")
    print(f"üìä Staging table: {staging_table_fqn}")
    print(f"üìä Target table:  {target_table_fqn}")
    print()
    print("üí° TIP: Staging table can be dropped after successful MERGE:")
    print(f"   spark.sql('DROP TABLE IF EXISTS {staging_table_fqn}')")
    
    return {
        "query": query,
        "staging_table": staging_table_fqn,
        "target_table": target_table_fqn,
        "raw_count": staging_count_raw,
        "deduped_count": staging_count,
        "merged": merged_count
    }


print("‚úÖ Databricks streaming modes loaded (4 functions available)")
print("   1. ingest_cdc_append_only_single_family (append_only, no column families)")
print("   2. ingest_cdc_with_merge_single_family (update_delete, no column families)")
print("   3. ingest_cdc_append_only_multi_family (append_only, WITH column families)")
print("   4. ingest_cdc_with_merge_multi_family (update_delete, WITH column families)")

In [None]:
# ============================================================================
# CELL 7: CREATE TEST TABLE (Mode-Aware: Single or Multiple Column Families)
# ============================================================================
# Create table structure based on column_family_mode:
# - single_cf: 1 column family (default, better performance)
# - multi_cf: 3 column families (for testing split_column_families=true)

if column_family_mode == "multi_cf":
    # Create table with MULTIPLE column families for testing split_column_families=true
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {source_table} (
        ycsb_key INT PRIMARY KEY,
        -- Family 1: Frequently accessed fields
        field0 TEXT,
        field1 TEXT,
        field2 TEXT,
        FAMILY frequently_read (ycsb_key, field0, field1, field2),
        
        -- Family 2: Medium-frequency fields
        field3 TEXT,
        field4 TEXT,
        field5 TEXT,
        FAMILY medium_read (field3, field4, field5),
        
        -- Family 3: Rarely accessed fields
        field6 TEXT,
        field7 TEXT,
        field8 TEXT,
        field9 TEXT,
        FAMILY rarely_read (field6, field7, field8, field9)
    )
    """
    family_info = "3 column families (frequently_read, medium_read, rarely_read)"
else:
    # Create table with SINGLE column family (default)
    create_table_sql = f"""
    CREATE TABLE IF NOT EXISTS {source_table} (
        ycsb_key INT PRIMARY KEY,
        field0 TEXT,
        field1 TEXT,
        field2 TEXT,
        field3 TEXT,
        field4 TEXT,
        field5 TEXT,
        field6 TEXT,
        field7 TEXT,
        field8 TEXT,
        field9 TEXT
    )
    """
    family_info = "1 column family (default primary)"

conn = get_cockroachdb_connection()
try:
    with conn.cursor() as cur:
        cur.execute(create_table_sql)
        conn.commit()
    print(f"‚úÖ Table '{source_table}' created (or already exists)")
    print(f"   Column Family Mode: {column_family_mode}")
    print(f"   Column families: {family_info}")
finally:
    conn.close()

In [None]:
# ============================================================================
# CELL 8: INSERT SAMPLE DATA (Snapshot Phase)
# ============================================================================
conn = get_cockroachdb_connection()
try:
    # Check if table is empty using helper function
    stats = get_table_stats(conn, source_table)
    
    if stats['is_empty']:
        # Table is empty - insert snapshot data
        print(f"üìä Table is empty. Inserting {snapshot_count} initial rows (snapshot phase)...")
        
        with conn.cursor() as cur:
            # Use generate_series for efficient bulk insert
            insert_sql = f"""
            INSERT INTO {source_table} 
            (ycsb_key, field0, field1, field2, field3, field4, field5, field6, field7, field8, field9)
            SELECT 
                i AS ycsb_key,
                'snapshot_value_' || i || '_0' AS field0,
                'snapshot_value_' || i || '_1' AS field1,
                'snapshot_value_' || i || '_2' AS field2,
                'snapshot_value_' || i || '_3' AS field3,
                'snapshot_value_' || i || '_4' AS field4,
                'snapshot_value_' || i || '_5' AS field5,
                'snapshot_value_' || i || '_6' AS field6,
                'snapshot_value_' || i || '_7' AS field7,
                'snapshot_value_' || i || '_8' AS field8,
                'snapshot_value_' || i || '_9' AS field9
            FROM generate_series(0, %s - 1) AS i
            """
            
            cur.execute(insert_sql, (snapshot_count,))
            conn.commit()
        
        print(f"‚úÖ Sample data inserted using generate_series")
        print(f"   Rows inserted: {snapshot_count} (keys 0 to {snapshot_count - 1})")
    else:
        # Table already has data - skip insert
        print(f"‚ÑπÔ∏è  Table already contains data - skipping snapshot insert")
        print(f"   Current key range: {stats['min_key']} to {stats['max_key']}")
        print(f"   Tip: If you want to re-run the snapshot, drop the table first (see Cleanup cells)")
finally:
    conn.close()

In [None]:
# ============================================================================
# CELL 9: CREATE CHANGEFEED
# ============================================================================
# Build Azure Blob Storage URI with table-specific path
# Note: For Azure, path goes in URI (not as path_prefix query parameter like S3)
path = f"parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}"
changefeed_path = f"azure://{container_name}/{path}?AZURE_ACCOUNT_NAME={storage_account_name}&AZURE_ACCOUNT_KEY={storage_account_key_encoded}"

# Build changefeed options based on column_family_mode
if column_family_mode == "multi_cf":
    # Include split_column_families for multi-family mode
    changefeed_options = """
    format='parquet',
    updated,
    resolved='10s',
    split_column_families
"""
else:
    # Standard options for single-family mode
    changefeed_options = """
    format='parquet',
    updated,
    resolved='10s'
"""

# Create changefeed SQL
create_changefeed_sql = f"""
CREATE CHANGEFEED FOR TABLE {source_table}
INTO '{changefeed_path}'
WITH {changefeed_options}
"""

conn = get_cockroachdb_connection()
try:
    with conn.cursor() as cur:
        # Check for existing changefeed with THIS specific destination path
        # (checks for source table AND full path to ensure uniqueness)
        path_pattern = f"%{source_table}%{source_catalog}/{source_schema}/{source_table}/{target_table}%"
        
        cur.execute("""
            SELECT job_id, status, description
            FROM [SHOW CHANGEFEED JOBS] 
            WHERE description LIKE %s
            AND status IN ('running', 'paused')
            LIMIT 1
        """, (path_pattern,))
        
        existing = cur.fetchone()
        
        if existing:
            job_id, status, description = existing
            print(f"‚úÖ Changefeed already exists for this source ‚Üí target mapping")
            print(f"   Job ID: {job_id}")
            print(f"   Status: {status}")
            print(f"   Source: {source_catalog}.{source_schema}.{source_table}")
            print(f"   Target path: .../{source_table}/{target_table}/")
            if column_family_mode == "multi_cf":
                print(f"   Expected: Column family fragments")
            print(f"")
            print(f"üí° Tip: Run Cell 9 to generate UPDATE/DELETE events")
            print(f"   Then check Cell 10 to verify new files appear")
        else:
            # Create new changefeed
            cur.execute(create_changefeed_sql)
            result = cur.fetchone()
            job_id = result[0]
            
            print(f"‚úÖ Changefeed created")
            print(f"   Job ID: {job_id}")
            print(f"   Source: {source_catalog}.{source_schema}.{source_table}")
            print(f"   Target path: .../{source_table}/{target_table}/")
            print(f"   Format: Parquet")
            if column_family_mode == "multi_cf":
                print(f"   Split column families: TRUE (fragments will be generated)")
            else:
                print(f"   Split column families: FALSE (single file per event)")
            print(f"   Destination: Azure Blob Storage")
            print(f"")
            
            # Wait for files to appear using helper function
            wait_for_changefeed_files(
                storage_account_name, storage_account_key, container_name,
                source_catalog, source_schema, source_table, target_table,
                max_wait=300, check_interval=5
            )
finally:
    conn.close()

In [None]:
# ============================================================================
# CELL 10: RUN CDC WORKLOAD (UPDATE & DELETE)
# ============================================================================
import time
from datetime import datetime

# Capture baseline file count BEFORE generating CDC events
print("üìä Capturing baseline file count...")
result_before = check_azure_files(
    storage_account_name, storage_account_key, container_name,
    source_catalog, source_schema, source_table, target_table,
    verbose=False
)
files_before = len(result_before['data_files'])
print(f"   Current files: {files_before}")
print()

conn = get_cockroachdb_connection()
try:
    # Get current table state using helper function
    stats_before = get_table_stats(conn, source_table)
    min_key = stats_before['min_key']
    max_key = stats_before['max_key']
    count_before = stats_before['count']
    
    print(f"üìä Current table state:")
    print(f"   Min key: {min_key}, Max key: {max_key}, Total rows: {count_before}")
    print()
    
    with conn.cursor() as cur:
        
        # 1. INSERT: Add new rows starting from max_key + 1 (using generate_series)
        print(f"‚ûï Running {insert_count} INSERTs (keys {max_key + 1} to {max_key + insert_count})...")
        insert_sql = f"""
        INSERT INTO {source_table} 
        (ycsb_key, field0, field1, field2, field3, field4, field5, field6, field7, field8, field9)
        SELECT 
            i AS ycsb_key,
            'inserted_value_' || i || '_0' AS field0,
            'inserted_value_' || i || '_1' AS field1,
            'inserted_value_' || i || '_2' AS field2,
            'inserted_value_' || i || '_3' AS field3,
            'inserted_value_' || i || '_4' AS field4,
            'inserted_value_' || i || '_5' AS field5,
            'inserted_value_' || i || '_6' AS field6,
            'inserted_value_' || i || '_7' AS field7,
            'inserted_value_' || i || '_8' AS field8,
            'inserted_value_' || i || '_9' AS field9
        FROM generate_series(%s, %s) AS i
        """
        cur.execute(insert_sql, (max_key + 1, max_key + insert_count))
        
        # 2. UPDATE: Update existing rows starting from min_key (single UPDATE statement)
        print(f"üìù Running {update_count} UPDATEs (keys {min_key} to {min_key + update_count - 1})...")
        timestamp = int(time.time())
        cur.execute(f"""
            UPDATE {source_table}
            SET field0 = %s
            WHERE ycsb_key >= %s AND ycsb_key < %s
        """, (f"updated_at_{timestamp}", min_key, min_key + update_count))
        
        # 3. DELETE: Delete oldest rows starting from min_key (single DELETE)
        delete_max = min_key + delete_count - 1
        print(f"üóëÔ∏è  Running {delete_count} DELETEs (keys {min_key} to {delete_max})...")
        cur.execute(f"""
            DELETE FROM {source_table}
            WHERE ycsb_key >= %s AND ycsb_key <= %s
        """, (min_key, delete_max))
        
        conn.commit()
    
    # Get final table state using helper function
    stats_after = get_table_stats(conn, source_table)
    min_key_after = stats_after['min_key']
    max_key_after = stats_after['max_key']
    count_after = stats_after['count']
    
    print(f"\n‚úÖ Workload complete")
    print(f"   Inserts: {insert_count}")
    print(f"   Updates: {update_count}")
    print(f"   Deletes: {delete_count}")
    print(f"   Before: {count_before} rows (keys {min_key}-{max_key})")
    print(f"   After:  {count_after} rows (keys {min_key_after}-{max_key_after})")
    print(f"   Net change: {count_after - count_before:+d} rows")
    print(f"")
    
    # Wait for new CDC files to appear in Azure (positive confirmation)
    print(f"‚è≥ Waiting for new CDC files to appear in Azure...")
    print(f"   Baseline: {files_before} files")
    print()
    
    # Poll for new files (max 90 seconds)
    max_wait = 90
    check_interval = 10
    elapsed = 0
    
    while elapsed < max_wait:
        result = check_azure_files(
            storage_account_name, storage_account_key, container_name,
            source_catalog, source_schema, source_table, target_table,
            verbose=False
        )
        files_now = len(result['data_files'])
        
        if files_now > files_before:
            print(f"‚úÖ New CDC files appeared after {elapsed} seconds!")
            print(f"   Baseline (before workload): {files_before} files")
            print(f"   Current (after workload): {files_now} files")
            print(f"   New files generated: {files_now - files_before}")
            break
        
        print(f"   Checking... ({elapsed}s elapsed, baseline: {files_before} files)", end='\r')
        time.sleep(check_interval)
        elapsed += check_interval
    else:
        print(f"\n‚ö†Ô∏è  Timeout after {max_wait}s - files may still be flushing")
        print(f"   Run Cell 9 to check manually")
finally:
    conn.close()

In [None]:
# ============================================================================
# CELL 11: CHECK AZURE FILES (Optional - Manual Check)
# ============================================================================
# Use the helper function from Cell 4 to check for files
result = check_azure_files(
    storage_account_name, storage_account_key, container_name,
    source_catalog, source_schema, source_table, target_table,
    verbose=True
)

# Provide guidance
if len(result['data_files']) == 0:
    print(f"\n‚ö†Ô∏è  No data files found yet.")
    print(f"   üí° Possible reasons:")
    print(f"   - Changefeed not created yet (run Cell 9)")
    print(f"   - Path configuration mismatch (check Cell 1 variables)")
    print(f"   - Azure credentials issue (check External Location)")
else:
    print(f"\n‚úÖ Files are ready! Proceed to Cell 10 to read with Databricks.")

In [None]:
# ============================================================================
# CELL 12: READ CDC EVENTS IN DATABRICKS
# ============================================================================
# Select and run CDC ingestion function based on both modes (from Cell 1)
# Functions are defined in Cell 5
# ============================================================================

print(f"üî∑ CDC Configuration:")
print(f"   Processing Mode: {cdc_mode}")
print(f"   Column Family Mode: {column_family_mode}")
print()

# Select function based on BOTH cdc_mode and column_family_mode
if cdc_mode == "append_only" and column_family_mode == "single_cf":
    print(f"üìò Running: ingest_cdc_append_only_single_family()")
    print(f"   - All CDC events will be stored as rows")
    print(f"   - No column family merging needed\n")
    
    query = ingest_cdc_append_only_single_family(
        storage_account_name=storage_account_name,
        container_name=container_name,
        source_catalog=source_catalog,
        source_schema=source_schema,
        source_table=source_table,
        target_catalog=target_catalog,
        target_schema=target_schema,
        target_table=target_table,
        spark=spark
    )

elif cdc_mode == "append_only" and column_family_mode == "multi_cf":
    print(f"üìô Running: ingest_cdc_append_only_multi_family()")
    print(f"   - All CDC events will be stored as rows")
    print(f"   - Column family fragments will be merged\n")
    
    if not primary_key_columns:
        raise ValueError("primary_key_columns required for multi_cf mode")
    
    query = ingest_cdc_append_only_multi_family(
        storage_account_name=storage_account_name,
        container_name=container_name,
        source_catalog=source_catalog,
        source_schema=source_schema,
        source_table=source_table,
        target_catalog=target_catalog,
        target_schema=target_schema,
        target_table=target_table,
        primary_key_columns=primary_key_columns,
        spark=spark
    )

elif cdc_mode == "update_delete" and column_family_mode == "single_cf":
    print(f"üìó Running: ingest_cdc_with_merge_single_family()")
    print(f"   - MERGE logic applied (UPDATE/DELETE processed)")
    print(f"   - No column family merging needed\n")
    
    if not primary_key_columns:
        raise ValueError("primary_key_columns required for update_delete mode")
    
    result = ingest_cdc_with_merge_single_family(
        storage_account_name=storage_account_name,
        container_name=container_name,
        source_catalog=source_catalog,
        source_schema=source_schema,
        source_table=source_table,
        target_catalog=target_catalog,
        target_schema=target_schema,
        target_table=target_table,
        primary_key_columns=primary_key_columns,
        spark=spark
    )
    
    query = result["query"]

elif cdc_mode == "update_delete" and column_family_mode == "multi_cf":
    print(f"üìï Running: ingest_cdc_with_merge_multi_family()")
    print(f"   - MERGE logic applied (UPDATE/DELETE processed)")
    print(f"   - Column family fragments will be merged\n")
    
    if not primary_key_columns:
        raise ValueError("primary_key_columns required for update_delete + multi_cf mode")
    
    result = ingest_cdc_with_merge_multi_family(
        storage_account_name=storage_account_name,
        container_name=container_name,
        source_catalog=source_catalog,
        source_schema=source_schema,
        source_table=source_table,
        target_catalog=target_catalog,
        target_schema=target_schema,
        target_table=target_table,
        primary_key_columns=primary_key_columns,
        spark=spark
    )
    
    query = result["query"]

else:
    raise ValueError(
        f"Invalid mode combination:\n"
        f"  cdc_mode='{cdc_mode}' (valid: 'append_only', 'update_delete')\n"
        f"  column_family_mode='{column_family_mode}' (valid: 'single_cf', 'multi_cf')\n"
        f"Change modes in Cell 1."
    )

# Wait for completion (if not already complete)
if cdc_mode == "append_only":
    query.awaitTermination()
    print("\n" + "=" * 80)
    print(f"‚úÖ CDC INGESTION COMPLETE")
    print("=" * 80)
    print(f"   Mode: {cdc_mode} + {column_family_mode}")
    print(f"   Target: {target_catalog}.{target_schema}.{target_table}")
    print()
    print(f"üìä Query your data: SELECT * FROM {target_catalog}.{target_schema}.{target_table}")
else:
    # update_delete mode already completed inside the function
    print(f"üìä Query your data: SELECT * FROM {target_catalog}.{target_schema}.{target_table}")

In [None]:
# ============================================================================
# CELL 13: QUERY CDC RESULTS
# ============================================================================
target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"

# Get total count
df = spark.read.table(target_table_fqn)
total_count = df.count()

print("üìä CDC Event Summary")
print("=" * 80)
print(f"Total rows: {total_count}")
print(f"CDC Processing Mode: {cdc_mode}")
print(f"Column Family Mode: {column_family_mode}")
print()

# Show operation breakdown (works for both modes now!)
print("Rows by last CDC operation:")
ops_df = df.groupBy("_cdc_operation").count().orderBy("_cdc_operation")
ops_df.show()

print("\nüìã Sample rows (showing first 5):")
df.select(
    "ycsb_key", 
    "field0", 
    "_cdc_operation", 
    "_cdc_timestamp"
).orderBy("_cdc_timestamp").show(5, truncate=False)

if cdc_mode == "append_only":
    print("\n‚úÖ CDC data successfully loaded (append_only mode)")
    print("   üìä All CDC events stored as rows")
    print("   üìä _cdc_operation shows: DELETE, UPSERT for each event")
    print("   üìä Row count = all events (including DELETEs and multiple UPDATEs)")
elif cdc_mode == "update_delete":
    print("\n‚úÖ CDC data successfully loaded (update_delete mode)")
    print("   üìä MERGE operations applied: DELETEs removed, UPDATEs applied, INSERTs added")
    print("   üìä _cdc_operation shows: UPSERT (last operation on each row)")
    print("   üìä Row count = current state (deduplicated)")

print("\nüí° Key Takeaway:")
print("   - Using pathGlobFilter to exclude .RESOLVED files avoids DECIMAL errors")
print("   - _cdc_operation is preserved in both modes for monitoring")
print("\nüìç Next: Run Cell 13 to verify source and target tables are in sync")

## Optional: Cleanup

Run the cells below if you want to clean up the test resources.

In [None]:
# ============================================================================
# ‚ö†Ô∏è  SAFETY STOP: Cleanup Section
# ============================================================================
# This cell prevents accidental cleanup when running "Run All"
# 
# To cleanup resources, manually run each cell below INDIVIDUALLY:
#   - Cell 16: Cancel changefeed
#   - Cell 17: Drop CockroachDB source table  
#   - Cell 18: Drop Databricks target table & checkpoint
#   - Cell 19: Clear Azure changefeed data (optional - use for complete reset)
# ============================================================================

raise RuntimeError(
    "\n"
    "‚ö†Ô∏è  CLEANUP SAFETY STOP\n"
    "\n"
    "The cells below will DELETE your resources.\n"
    "Do NOT run all cells - run each cleanup cell individually.\n"
    "\n"
    "üí° TIP: If Cell 13 shows sync issues due to old data,\n"
    "   run Cell 19 to clear Azure changefeed data completely.\n"
)

In [None]:
# ============================================================================
# CLEANUP CELL 1: CANCEL CHANGEFEED
# ============================================================================
conn = get_cockroachdb_connection()
try:
    with conn.cursor() as cur:
        # Find changefeed job for THIS specific source ‚Üí target mapping
        # (matches the same path pattern used in Cell 7)
        path_pattern = f"%{source_table}%{source_catalog}/{source_schema}/{source_table}/{target_table}%"
        
        cur.execute("""
            SELECT job_id 
            FROM [SHOW CHANGEFEED JOBS] 
            WHERE description LIKE %s
            AND status IN ('running', 'paused')
            LIMIT 1
        """, (path_pattern,))
        
        result = cur.fetchone()
        if result:
            job_id = result[0]
            cur.execute(f"CANCEL JOB {job_id}")
            print(f"‚úÖ Changefeed {job_id} cancelled")
            print(f"   Source: {source_catalog}.{source_schema}.{source_table}")
            print(f"   Target path: .../{source_table}/{target_table}/")
        else:
            print("‚ÑπÔ∏è  No active changefeed found for this source ‚Üí target mapping")
finally:
    conn.close()

In [None]:
# ============================================================================
# CLEANUP CELL 2: DROP SOURCE TABLE (CockroachDB)
# ============================================================================
conn = get_cockroachdb_connection()
try:
    with conn.cursor() as cur:
        cur.execute(f"DROP TABLE IF EXISTS {source_table} CASCADE")
        conn.commit()
    print(f"‚úÖ Table '{source_table}' dropped from CockroachDB")
finally:
    conn.close()

In [None]:
# ============================================================================
# CLEANUP CELL 19: CLEAR AZURE CHANGEFEED DATA (Optional)
# ============================================================================
# ‚ö†Ô∏è  WARNING: This will DELETE all changefeed data in Azure for this table!
#
# Use this when:
# - You want to start completely fresh
# - Old data from previous runs is causing sync issues
# - You changed the table schema (e.g., VARCHAR ‚Üí INT)
#
# Uses Azure SDK (same as Cell 11 for checking files)
# ============================================================================

from azure.storage.blob import BlobServiceClient

# Build Azure path (must match Cell 7 changefeed path)
changefeed_path = f"parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}/"

print(f"üóëÔ∏è  Deleting Azure changefeed data...")
print(f"=" * 80)
print(f"Container: {container_name}")
print(f"Path: {changefeed_path}")
print()

# Connect to Azure (same as Cell 9)
connection_string = f"DefaultEndpointsProtocol=https;AccountName={storage_account_name};AccountKey={storage_account_key};EndpointSuffix=core.windows.net"
blob_service = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service.get_container_client(container_name)

# List all blobs with this prefix
print(f"üîç Scanning for files...")
blobs = list(container_client.list_blobs(name_starts_with=changefeed_path))

if not blobs:
    print(f"‚ÑπÔ∏è  No files found at: {changefeed_path}")
    print(f"   Files may have already been deleted, or path is incorrect")
    print()
    print(f"üí° To check what's in the container, run Cell 9")
else:
    print(f"‚úÖ Found {len(blobs)} items to delete")
    
    # Show sample items
    data_files = [b for b in blobs if b.size > 0 and '.parquet' in b.name]
    resolved_files = [b for b in blobs if '.RESOLVED' in b.name]
    directories = [b for b in blobs if b.size == 0]
    
    print(f"   üìÑ Data files: {len(data_files)}")
    print(f"   üïê Resolved files: {len(resolved_files)}")
    print(f"   üìÅ Directories: {len(directories)}")
    print()
    
    # Delete all blobs with this prefix
    # Note: Azure SDK doesn't have recursive delete - we list all blobs and delete each one
    print(f"üîÑ Deleting {len(blobs)} items...")
    deleted = 0
    failed = 0
    
    for blob in blobs:
        try:
            container_client.delete_blob(blob.name)
            deleted += 1
            if deleted % 50 == 0:
                print(f"   Deleted {deleted}/{len(blobs)} items...", end='\r')
        except Exception as e:
            # Some errors are expected (e.g., directories already removed)
            error_str = str(e)
            if "DirectoryIsNotEmpty" not in error_str and "BlobNotFound" not in error_str:
                failed += 1
                print(f"\n   ‚ö†Ô∏è  Failed: {blob.name[:60]}... - {e}")
    
    print(f"‚úÖ Deleted {deleted} items from Azure                    ")
    if failed > 0:
        print(f"   ‚ö†Ô∏è  Failed to delete {failed} items")
    
    print()
    print(f"=" * 80)
    print(f"‚úÖ Cleanup complete!")
    print()
    print(f"üí° Next steps:")
    print(f"   1. Drop the Databricks target table (Cell 17)")
    print(f"   2. Re-run from Cell 6 (Snapshot) to start fresh")

In [None]:
# ============================================================================
# CLEANUP CELL 3: DROP TARGET TABLE & CHECKPOINT (Databricks)
# ============================================================================
target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
checkpoint_path = f"/checkpoints/{target_schema}_{target_table}"  # Must match Cell 10

# Drop Delta table
spark.sql(f"DROP TABLE IF EXISTS {target_table_fqn}")
print(f"‚úÖ Delta table '{target_table_fqn}' dropped")

# Remove checkpoint
try:
    dbutils.fs.rm(checkpoint_path, True)
    print(f"‚úÖ Checkpoint '{checkpoint_path}' removed")
except:
    print(f"‚ÑπÔ∏è  Checkpoint not found (may have been already removed)")

print("\n‚úÖ Cleanup complete!")

# Debug Codes

In [None]:
raise RuntimeError(
    "\n"
    "‚ö†Ô∏è  DEBUG SAFETY STOP\n"
    "\n"
)

In [None]:
# ============================================================================
# DEBUG CELL 1: Quick Missing Keys Check
# ============================================================================
# Lightweight check to see if specific keys exist in CockroachDB and staging
# Update missing_keys list based on Cell 14/15 output
# ============================================================================

target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
staging_table_cf = f"{target_table_fqn}_staging_cf"
missing_keys = [17, 18, 19]  # ‚Üê Update this based on Cell 14/15 output

print("üîç Quick Debug: Checking missing keys...")
print("=" * 80)

# Check CockroachDB
print(f"\nüìä CockroachDB ({source_table}):")
with get_cockroachdb_connection() as conn:
    cursor = conn.cursor()
    for key in missing_keys:
        cursor.execute(f"SELECT * FROM {source_table} WHERE ycsb_key = %s", (key,))
        result = cursor.fetchone()
        print(f"   Key {key}: {'‚úÖ EXISTS' if result else '‚ùå NOT FOUND (deleted)'}")

# Check Staging Table
print(f"\nüìä Staging Table ({staging_table_cf}):")
if spark.catalog.tableExists(staging_table_cf):
    staging_df = spark.read.table(staging_table_cf)
    for key in missing_keys:
        count = staging_df.filter(F.col("ycsb_key") == key).count()
        print(f"   Key {key}: {count} row(s)")
    
    print("\nüí° Next steps:")
    print("   - If keys exist in CockroachDB but not in staging:")
    print("     ‚Üí Re-run Cell 12 to pick up new CDC files")
    print("   - If keys exist in staging but not in target:")
    print("     ‚Üí Check Cell 12 output for MERGE errors")
    print("     ‚Üí Run DEBUG CELL 2 for detailed analysis")
else:
    print("   ‚ö†Ô∏è  Staging table doesn't exist (Cell 12 dropped it)")
    print("   üí° Re-run Cell 12 to recreate staging for debugging")


In [None]:
# ============================================================================
# DEBUG CELL 2: Inspect Target Table
# ============================================================================
# Comprehensive analysis of target table:
# - CDC operation distribution
# - Key distribution and gaps
# - Duplicate detection
# - Sample records
# 
# Use this to diagnose sync issues and data quality problems
# ============================================================================

target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"

print(f"üìä Target Table Analysis: {target_table_fqn}")
print("=" * 80)

# Read target table
df = spark.read.table(target_table_fqn)
total_rows = df.count()
print(f"\nüìà Total Rows: {total_rows:,}")

# Group by CDC operation type
if "_cdc_operation" in df.columns:
    print("\nüîç CDC Operations:")
    df.groupBy("_cdc_operation").count().orderBy("_cdc_operation").show()
    
    # Check if DELETE rows are stored as data (should NOT happen)
    delete_count = df.filter(F.col("_cdc_operation") == "DELETE").count()
    if delete_count > 0:
        print(f"\n‚ö†Ô∏è  WARNING: Found {delete_count} DELETE rows stored as data!")
        print("   This is a bug - DELETE rows should not be in the target table.")
        print("   üí° Run Cell 16 to fix (drops and recreates table)")

# Show key distribution
if "ycsb_key" in df.columns:
    print("\nüîç Key Distribution:")
    key_dist = df.groupBy("ycsb_key").count().orderBy("ycsb_key")
    key_dist.show(50)
    
    # Check for duplicates
    duplicates = key_dist.filter("count > 1")
    dup_count = duplicates.count()
    if dup_count > 0:
        print(f"\n‚ö†Ô∏è  Found {dup_count} duplicate keys!")
        duplicates.show()
        print("\n   üí° This indicates deduplication failure in MERGE logic")
    else:
        print("\n‚úÖ No duplicate keys found")
    
    # Show key range and gaps
    key_stats = df.select(
        F.min("ycsb_key").alias("min_key"),
        F.max("ycsb_key").alias("max_key"),
        F.count("ycsb_key").alias("count")
    ).collect()[0]
    
    expected_count = key_stats["max_key"] - key_stats["min_key"] + 1
    actual_count = key_stats["count"]
    missing_count = expected_count - actual_count
    
    print(f"\nüìä Key Range Analysis:")
    print(f"   Min key: {key_stats['min_key']}")
    print(f"   Max key: {key_stats['max_key']}")
    print(f"   Expected rows (if contiguous): {expected_count}")
    print(f"   Actual rows: {actual_count}")
    
    if missing_count > 0:
        print(f"   ‚ö†Ô∏è  Missing {missing_count} keys (gaps in range)")
        print(f"\n   üí° Run DEBUG CELL 1 or CELL 3 to investigate specific keys")
    else:
        print(f"   ‚úÖ No gaps (keys are contiguous)")

# Show sample records
print("\nüîç Sample Records (ordered by key):")
df.orderBy("ycsb_key").show(30, truncate=False)

print("\n" + "=" * 80)
print("üí° If you see issues:")
print("   - DELETE rows stored as data ‚Üí Run Cell 16 (recreate table)")
print("   - Duplicate keys ‚Üí Check MERGE deduplication logic")
print("   - Missing keys ‚Üí Run DEBUG CELL 1 or CELL 3")
print("   - Gaps in key range ‚Üí Keys were deleted (normal for update_delete mode)")


In [None]:
# ============================================================================
# DEBUG CELL 3: Detailed Missing Keys Investigation
# ============================================================================
# Detailed investigation of missing keys:
# - Checks CockroachDB source
# - Checks staging table (if it exists)
# - Shows CDC operation and timestamps
# - Provides detailed troubleshooting steps
#
# Update missing_keys list based on Cell 14/15 or DEBUG CELL 2 output
# ============================================================================

target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
staging_table_cf = f"{target_table_fqn}_staging_cf"

# Update this list based on what's missing
missing_keys = [17, 18, 19]  # ‚Üê Update based on DEBUG CELL 2 output

print("üîç Detailed Missing Keys Investigation")
print("=" * 80)
print(f"Investigating keys: {missing_keys}")
print()

# ============================================================================
# Step 1: Check CockroachDB Source
# ============================================================================
print("üìä STEP 1: Checking CockroachDB Source")
print("-" * 80)

with get_cockroachdb_connection() as conn:
    cursor = conn.cursor()
    
    for key in missing_keys:
        cursor.execute(f"SELECT * FROM {source_table} WHERE ycsb_key = %s", (key,))
        result = cursor.fetchone()
        
        if result:
            print(f"‚úÖ Key {key}: EXISTS in CockroachDB")
            # Show first 3 fields for verification
            print(f"   Sample data: {result[:min(3, len(result))]}")
        else:
            print(f"‚ùå Key {key}: NOT FOUND in CockroachDB")
            print(f"   ‚Üí This key was deleted (expected for update_delete mode)")

# ============================================================================
# Step 2: Check Staging Table
# ============================================================================
print(f"\nüìä STEP 2: Checking Staging Table")
print("-" * 80)

if spark.catalog.tableExists(staging_table_cf):
    staging_df = spark.read.table(staging_table_cf)
    print(f"‚úÖ Staging table exists: {staging_table_cf}")
    print()
    
    for key in missing_keys:
        key_rows = staging_df.filter(F.col("ycsb_key") == key)
        count = key_rows.count()
        
        if count > 0:
            print(f"‚úÖ Key {key}: {count} row(s) in staging table")
            print("   Details:")
            key_rows.select(
                "ycsb_key", 
                "_cdc_timestamp", 
                "_cdc_operation", 
                "field0", 
                "field1"
            ).show(truncate=False)
        else:
            print(f"‚ùå Key {key}: NOT in staging table")
    
    # Show staging table summary
    print("\nüìà Staging Table Summary:")
    staging_df.groupBy("_cdc_operation").count().show()
    
else:
    print(f"‚ö†Ô∏è  Staging table doesn't exist: {staging_table_cf}")
    print("   This means Cell 12 completed and dropped the staging table")
    print("\nüí° To debug further:")
    print("   1. Re-run Cell 12 (it will process new files and recreate staging)")
    print("   2. Run this cell again to check staging table")

# ============================================================================
# Step 3: Troubleshooting Recommendations
# ============================================================================
print("\n" + "=" * 80)
print("üí° TROUBLESHOOTING GUIDE")
print("=" * 80)

print("\nüìã If keys EXIST in CockroachDB but NOT in staging:")
print("   ‚Üí CDC files haven't been picked up by Auto Loader yet")
print("   ‚úÖ Solution: Re-run Cell 12 to process new CDC files")

print("\nüìã If keys EXIST in staging but NOT in target:")
print("   ‚Üí MERGE logic failed or conditions are wrong")
print("   ‚úÖ Solution: Check Cell 12 output for MERGE errors")
print("   ‚úÖ Alternative: Check MERGE conditions in Cell 6")

print("\nüìã If keys DON'T EXIST in CockroachDB:")
print("   ‚Üí Keys were deleted (normal for update_delete mode)")
print("   ‚úÖ Expected: Target should also not have these keys")
print("   ‚ö†Ô∏è  If target HAS these keys: MERGE delete logic isn't working")

print("\nüìã If keys DON'T EXIST anywhere:")
print("   ‚Üí Keys were never created, or CDC didn't capture them")
print("   ‚úÖ Check: Run Cell 10 again to verify workload ran correctly")


## Optional: Cleanup

Run the cells below if you want to clean up the test resources.

In [None]:
# ============================================================================
# ‚ö†Ô∏è  SAFETY STOP: Cleanup Section
# ============================================================================
# This cell prevents accidental cleanup when running "Run All"
# 
# To cleanup resources, manually run each cell below INDIVIDUALLY:
#   - Cell 16: Cancel changefeed
#   - Cell 17: Drop CockroachDB source table  
#   - Cell 18: Drop Databricks target table & checkpoint
#   - Cell 19: Clear Azure changefeed data (optional - use for complete reset)
# ============================================================================

raise RuntimeError(
    "\n"
    "‚ö†Ô∏è  CLEANUP SAFETY STOP\n"
    "\n"
    "The cells below will DELETE your resources.\n"
    "Do NOT run all cells - run each cleanup cell individually.\n"
    "\n"
    "üí° TIP: If Cell 13 shows sync issues due to old data,\n"
    "   run Cell 19 to clear Azure changefeed data completely.\n"
)

In [None]:
# ============================================================================
# CLEANUP CELL 1: CANCEL CHANGEFEED
# ============================================================================
conn = get_cockroachdb_connection()
try:
    with conn.cursor() as cur:
        # Find changefeed job for THIS specific source ‚Üí target mapping
        # (matches the same path pattern used in Cell 7)
        path_pattern = f"%{source_table}%{source_catalog}/{source_schema}/{source_table}/{target_table}%"
        
        cur.execute("""
            SELECT job_id 
            FROM [SHOW CHANGEFEED JOBS] 
            WHERE description LIKE %s
            AND status IN ('running', 'paused')
            LIMIT 1
        """, (path_pattern,))
        
        result = cur.fetchone()
        if result:
            job_id = result[0]
            cur.execute(f"CANCEL JOB {job_id}")
            print(f"‚úÖ Changefeed {job_id} cancelled")
            print(f"   Source: {source_catalog}.{source_schema}.{source_table}")
            print(f"   Target path: .../{source_table}/{target_table}/")
        else:
            print("‚ÑπÔ∏è  No active changefeed found for this source ‚Üí target mapping")
finally:
    conn.close()

In [None]:
# ============================================================================
# CLEANUP CELL 2: DROP SOURCE TABLE (CockroachDB)
# ============================================================================
conn = get_cockroachdb_connection()
try:
    with conn.cursor() as cur:
        cur.execute(f"DROP TABLE IF EXISTS {source_table} CASCADE")
        conn.commit()
    print(f"‚úÖ Table '{source_table}' dropped from CockroachDB")
finally:
    conn.close()

In [None]:
# ============================================================================
# CLEANUP CELL 19: CLEAR AZURE CHANGEFEED DATA (Optional)
# ============================================================================
# ‚ö†Ô∏è  WARNING: This will DELETE all changefeed data in Azure for this table!
#
# Use this when:
# - You want to start completely fresh
# - Old data from previous runs is causing sync issues
# - You changed the table schema (e.g., VARCHAR ‚Üí INT)
#
# Uses Azure SDK (same as Cell 11 for checking files)
# ============================================================================

from azure.storage.blob import BlobServiceClient

# Build Azure path (must match Cell 7 changefeed path)
changefeed_path = f"parquet/{source_catalog}/{source_schema}/{source_table}/{target_table}/"

print(f"üóëÔ∏è  Deleting Azure changefeed data...")
print(f"=" * 80)
print(f"Container: {container_name}")
print(f"Path: {changefeed_path}")
print()

# Connect to Azure (same as Cell 9)
connection_string = f"DefaultEndpointsProtocol=https;AccountName={storage_account_name};AccountKey={storage_account_key};EndpointSuffix=core.windows.net"
blob_service = BlobServiceClient.from_connection_string(connection_string)
container_client = blob_service.get_container_client(container_name)

# List all blobs with this prefix
print(f"üîç Scanning for files...")
blobs = list(container_client.list_blobs(name_starts_with=changefeed_path))

if not blobs:
    print(f"‚ÑπÔ∏è  No files found at: {changefeed_path}")
    print(f"   Files may have already been deleted, or path is incorrect")
    print()
    print(f"üí° To check what's in the container, run Cell 9")
else:
    print(f"‚úÖ Found {len(blobs)} items to delete")
    
    # Show sample items
    data_files = [b for b in blobs if b.size > 0 and '.parquet' in b.name]
    resolved_files = [b for b in blobs if '.RESOLVED' in b.name]
    directories = [b for b in blobs if b.size == 0]
    
    print(f"   üìÑ Data files: {len(data_files)}")
    print(f"   üïê Resolved files: {len(resolved_files)}")
    print(f"   üìÅ Directories: {len(directories)}")
    print()
    
    # Delete all blobs with this prefix
    # Note: Azure SDK doesn't have recursive delete - we list all blobs and delete each one
    print(f"üîÑ Deleting {len(blobs)} items...")
    deleted = 0
    failed = 0
    
    for blob in blobs:
        try:
            container_client.delete_blob(blob.name)
            deleted += 1
            if deleted % 50 == 0:
                print(f"   Deleted {deleted}/{len(blobs)} items...", end='\r')
        except Exception as e:
            # Some errors are expected (e.g., directories already removed)
            error_str = str(e)
            if "DirectoryIsNotEmpty" not in error_str and "BlobNotFound" not in error_str:
                failed += 1
                print(f"\n   ‚ö†Ô∏è  Failed: {blob.name[:60]}... - {e}")
    
    print(f"‚úÖ Deleted {deleted} items from Azure                    ")
    if failed > 0:
        print(f"   ‚ö†Ô∏è  Failed to delete {failed} items")
    
    print()
    print(f"=" * 80)
    print(f"‚úÖ Cleanup complete!")
    print()
    print(f"üí° Next steps:")
    print(f"   1. Drop the Databricks target table (Cell 17)")
    print(f"   2. Re-run from Cell 6 (Snapshot) to start fresh")

In [None]:
# ============================================================================
# CLEANUP CELL 3: DROP TARGET TABLE & CHECKPOINT (Databricks)
# ============================================================================
target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
checkpoint_path = f"/checkpoints/{target_schema}_{target_table}"  # Must match Cell 10

# Drop Delta table
spark.sql(f"DROP TABLE IF EXISTS {target_table_fqn}")
print(f"‚úÖ Delta table '{target_table_fqn}' dropped")

# Remove checkpoint
try:
    dbutils.fs.rm(checkpoint_path, True)
    print(f"‚úÖ Checkpoint '{checkpoint_path}' removed")
except:
    print(f"‚ÑπÔ∏è  Checkpoint not found (may have been already removed)")

print("\n‚úÖ Cleanup complete!")

# Debug Codes

In [None]:
raise RuntimeError(
    "\n"
    "‚ö†Ô∏è  DEBUG SAFETY STOP\n"
    "\n"
)

In [None]:
# ============================================================================
# DEBUG CELL 1: Quick Missing Keys Check
# ============================================================================
# Lightweight check to see if specific keys exist in CockroachDB and staging
# Update missing_keys list based on Cell 14/15 output
# ============================================================================

target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
staging_table_cf = f"{target_table_fqn}_staging_cf"
missing_keys = [17, 18, 19]  # ‚Üê Update this based on Cell 14/15 output

print("üîç Quick Debug: Checking missing keys...")
print("=" * 80)

# Check CockroachDB
print(f"\nüìä CockroachDB ({source_table}):")
with get_cockroachdb_connection() as conn:
    cursor = conn.cursor()
    for key in missing_keys:
        cursor.execute(f"SELECT * FROM {source_table} WHERE ycsb_key = %s", (key,))
        result = cursor.fetchone()
        print(f"   Key {key}: {'‚úÖ EXISTS' if result else '‚ùå NOT FOUND (deleted)'}")

# Check Staging Table
print(f"\nüìä Staging Table ({staging_table_cf}):")
if spark.catalog.tableExists(staging_table_cf):
    staging_df = spark.read.table(staging_table_cf)
    for key in missing_keys:
        count = staging_df.filter(F.col("ycsb_key") == key).count()
        print(f"   Key {key}: {count} row(s)")
    
    print("\nüí° Next steps:")
    print("   - If keys exist in CockroachDB but not in staging:")
    print("     ‚Üí Re-run Cell 12 to pick up new CDC files")
    print("   - If keys exist in staging but not in target:")
    print("     ‚Üí Check Cell 12 output for MERGE errors")
    print("     ‚Üí Run DEBUG CELL 2 for detailed analysis")
else:
    print("   ‚ö†Ô∏è  Staging table doesn't exist (Cell 12 dropped it)")
    print("   üí° Re-run Cell 12 to recreate staging for debugging")


In [None]:
# ============================================================================
# DEBUG CELL 2: Inspect Target Table
# ============================================================================
# Comprehensive analysis of target table:
# - CDC operation distribution
# - Key distribution and gaps
# - Duplicate detection
# - Sample records
# 
# Use this to diagnose sync issues and data quality problems
# ============================================================================

target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"

print(f"üìä Target Table Analysis: {target_table_fqn}")
print("=" * 80)

# Read target table
df = spark.read.table(target_table_fqn)
total_rows = df.count()
print(f"\nüìà Total Rows: {total_rows:,}")

# Group by CDC operation type
if "_cdc_operation" in df.columns:
    print("\nüîç CDC Operations:")
    df.groupBy("_cdc_operation").count().orderBy("_cdc_operation").show()
    
    # Check if DELETE rows are stored as data (should NOT happen)
    delete_count = df.filter(F.col("_cdc_operation") == "DELETE").count()
    if delete_count > 0:
        print(f"\n‚ö†Ô∏è  WARNING: Found {delete_count} DELETE rows stored as data!")
        print("   This is a bug - DELETE rows should not be in the target table.")
        print("   üí° Run Cell 16 to fix (drops and recreates table)")

# Show key distribution
if "ycsb_key" in df.columns:
    print("\nüîç Key Distribution:")
    key_dist = df.groupBy("ycsb_key").count().orderBy("ycsb_key")
    key_dist.show(50)
    
    # Check for duplicates
    duplicates = key_dist.filter("count > 1")
    dup_count = duplicates.count()
    if dup_count > 0:
        print(f"\n‚ö†Ô∏è  Found {dup_count} duplicate keys!")
        duplicates.show()
        print("\n   üí° This indicates deduplication failure in MERGE logic")
    else:
        print("\n‚úÖ No duplicate keys found")
    
    # Show key range and gaps
    key_stats = df.select(
        F.min("ycsb_key").alias("min_key"),
        F.max("ycsb_key").alias("max_key"),
        F.count("ycsb_key").alias("count")
    ).collect()[0]
    
    expected_count = key_stats["max_key"] - key_stats["min_key"] + 1
    actual_count = key_stats["count"]
    missing_count = expected_count - actual_count
    
    print(f"\nüìä Key Range Analysis:")
    print(f"   Min key: {key_stats['min_key']}")
    print(f"   Max key: {key_stats['max_key']}")
    print(f"   Expected rows (if contiguous): {expected_count}")
    print(f"   Actual rows: {actual_count}")
    
    if missing_count > 0:
        print(f"   ‚ö†Ô∏è  Missing {missing_count} keys (gaps in range)")
        print(f"\n   üí° Run DEBUG CELL 1 or CELL 3 to investigate specific keys")
    else:
        print(f"   ‚úÖ No gaps (keys are contiguous)")

# Show sample records
print("\nüîç Sample Records (ordered by key):")
df.orderBy("ycsb_key").show(30, truncate=False)

print("\n" + "=" * 80)
print("üí° If you see issues:")
print("   - DELETE rows stored as data ‚Üí Run Cell 16 (recreate table)")
print("   - Duplicate keys ‚Üí Check MERGE deduplication logic")
print("   - Missing keys ‚Üí Run DEBUG CELL 1 or CELL 3")
print("   - Gaps in key range ‚Üí Keys were deleted (normal for update_delete mode)")


In [None]:
# ============================================================================
# DEBUG CELL 3: Detailed Missing Keys Investigation
# ============================================================================
# Detailed investigation of missing keys:
# - Checks CockroachDB source
# - Checks staging table (if it exists)
# - Shows CDC operation and timestamps
# - Provides detailed troubleshooting steps
#
# Update missing_keys list based on Cell 14/15 or DEBUG CELL 2 output
# ============================================================================

target_table_fqn = f"{target_catalog}.{target_schema}.{target_table}"
staging_table_cf = f"{target_table_fqn}_staging_cf"

# Update this list based on what's missing
missing_keys = [17, 18, 19]  # ‚Üê Update based on DEBUG CELL 2 output

print("üîç Detailed Missing Keys Investigation")
print("=" * 80)
print(f"Investigating keys: {missing_keys}")
print()

# ============================================================================
# Step 1: Check CockroachDB Source
# ============================================================================
print("üìä STEP 1: Checking CockroachDB Source")
print("-" * 80)

with get_cockroachdb_connection() as conn:
    cursor = conn.cursor()
    
    for key in missing_keys:
        cursor.execute(f"SELECT * FROM {source_table} WHERE ycsb_key = %s", (key,))
        result = cursor.fetchone()
        
        if result:
            print(f"‚úÖ Key {key}: EXISTS in CockroachDB")
            # Show first 3 fields for verification
            print(f"   Sample data: {result[:min(3, len(result))]}")
        else:
            print(f"‚ùå Key {key}: NOT FOUND in CockroachDB")
            print(f"   ‚Üí This key was deleted (expected for update_delete mode)")

# ============================================================================
# Step 2: Check Staging Table
# ============================================================================
print(f"\nüìä STEP 2: Checking Staging Table")
print("-" * 80)

if spark.catalog.tableExists(staging_table_cf):
    staging_df = spark.read.table(staging_table_cf)
    print(f"‚úÖ Staging table exists: {staging_table_cf}")
    print()
    
    for key in missing_keys:
        key_rows = staging_df.filter(F.col("ycsb_key") == key)
        count = key_rows.count()
        
        if count > 0:
            print(f"‚úÖ Key {key}: {count} row(s) in staging table")
            print("   Details:")
            key_rows.select(
                "ycsb_key", 
                "_cdc_timestamp", 
                "_cdc_operation", 
                "field0", 
                "field1"
            ).show(truncate=False)
        else:
            print(f"‚ùå Key {key}: NOT in staging table")
    
    # Show staging table summary
    print("\nüìà Staging Table Summary:")
    staging_df.groupBy("_cdc_operation").count().show()
    
else:
    print(f"‚ö†Ô∏è  Staging table doesn't exist: {staging_table_cf}")
    print("   This means Cell 12 completed and dropped the staging table")
    print("\nüí° To debug further:")
    print("   1. Re-run Cell 12 (it will process new files and recreate staging)")
    print("   2. Run this cell again to check staging table")

# ============================================================================
# Step 3: Troubleshooting Recommendations
# ============================================================================
print("\n" + "=" * 80)
print("üí° TROUBLESHOOTING GUIDE")
print("=" * 80)

print("\nüìã If keys EXIST in CockroachDB but NOT in staging:")
print("   ‚Üí CDC files haven't been picked up by Auto Loader yet")
print("   ‚úÖ Solution: Re-run Cell 12 to process new CDC files")

print("\nüìã If keys EXIST in staging but NOT in target:")
print("   ‚Üí MERGE logic failed or conditions are wrong")
print("   ‚úÖ Solution: Check Cell 12 output for MERGE errors")
print("   ‚úÖ Alternative: Check MERGE conditions in Cell 6")

print("\nüìã If keys DON'T EXIST in CockroachDB:")
print("   ‚Üí Keys were deleted (normal for update_delete mode)")
print("   ‚úÖ Expected: Target should also not have these keys")
print("   ‚ö†Ô∏è  If target HAS these keys: MERGE delete logic isn't working")

print("\nüìã If keys DON'T EXIST anywhere:")
print("   ‚Üí Keys were never created, or CDC didn't capture them")
print("   ‚úÖ Check: Run Cell 10 again to verify workload ran correctly")
