
<div style="text-align: center; line-height: 0; padding-top: 9px;">
  <img
    src="https://databricks.com/wp-content/uploads/2018/03/db-academy-rgb-1200px.png"
    alt="Databricks Learning"
  >
</div>

# 3.1 DEMO: Cross Cloud Replication with Cloudflare R2 [Recipient]

## Overview
This demo showcases how recipients can access replicated data from Cloudflare R2 and maintain local synchronized copies using MERGE operations for Type 1 Slowly Changing Dimensions (SCD). Recipients read changes from the R2-hosted external table and apply them to their local tables.

## Learning Objectives
By the end of this demo, you will understand:
1. How to access external tables hosted on Cloudflare R2
2. How to detect changes in replicated data
3. How to implement Type 1 SCD using MERGE operations
4. How to schedule automatic synchronization jobs

## Architecture
```
Provider R2 External Table → Change Detection → Local Table (MERGE) → Type 1 SCD
```

**Benefits:**
- Access globally distributed data with zero egress costs
- Maintain local copies for optimal query performance
- Automatic change detection and synchronization
- Type 1 SCD pattern for historical data management

## Setup

Run the common setup and demo configuration scripts.

In [None]:
%run ./_common

In [None]:
%run ./Demo-Setup-3_1

## Step 1: Configure Cloudflare R2 Access

Configure access to the Cloudflare R2 bucket where the provider has replicated data.

In [None]:
# R2 Configuration - Same as provider (read-only access)
r2_endpoint = "https://4132d7d5587ee99b9d482ecfc2c1853c.r2.cloudflarestorage.com"
r2_bucket = "databricks-demo"
r2_access_key = "<your-r2-access-key>"  # Replace with actual key
r2_secret_key = "<your-r2-secret-key>"  # Replace with actual key

# Path to the replicated data
r2_table_path = f"s3a://{r2_bucket}/{DA.r2_path}"

# Local table for synchronized copy
local_table_name = f"{DA.catalog}.{DA.schema}.local_transactions"

print(f"R2 Source Path: {r2_table_path}")
print(f"Local Table: {local_table_name}")

In [None]:
# Configure Spark for R2 access
spark.conf.set(f"fs.s3a.bucket.{r2_bucket}.endpoint", r2_endpoint)
spark.conf.set(f"fs.s3a.bucket.{r2_bucket}.access.key", r2_access_key)
spark.conf.set(f"fs.s3a.bucket.{r2_bucket}.secret.key", r2_secret_key)
spark.conf.set(f"fs.s3a.bucket.{r2_bucket}.path.style.access", "true")
spark.conf.set(f"fs.s3a.bucket.{r2_bucket}.connection.ssl.enabled", "true")

print("✅ Spark configured for R2 access")

## Step 2: Discover and Access R2 External Table

Access the external table hosted on Cloudflare R2 by the provider.

In [None]:
# Create external table reference to R2 data
# Note: This assumes the provider has shared the table structure and location
spark.sql(f"""
CREATE OR REPLACE TABLE {DA.catalog}.{DA.schema}.r2_source_data (
  transaction_id STRING,
  customer_id STRING,
  product_category STRING,
  amount DECIMAL(10,2),
  transaction_date DATE,
  region STRING,
  created_at TIMESTAMP
)
USING DELTA
LOCATION '{r2_table_path}'
""")

print(f"✅ Created external table reference: {DA.catalog}.{DA.schema}.r2_source_data")
print(f"✅ Location: {r2_table_path}")

In [None]:
# Verify we can access the R2 data
r2_count = spark.table(f"{DA.catalog}.{DA.schema}.r2_source_data").count()
print(f"📊 Found {r2_count} records in R2 external table")

# Show sample data
print("\n📄 Sample data from R2:")
display(spark.table(f"{DA.catalog}.{DA.schema}.r2_source_data").limit(5))

## Step 3: Create Local Target Table

Create a local managed table that will store our synchronized copy of the data.

In [None]:
# Create local target table with additional SCD metadata
spark.sql(f"""
CREATE OR REPLACE TABLE {local_table_name} (
  transaction_id STRING,
  customer_id STRING,
  product_category STRING,
  amount DECIMAL(10,2),
  transaction_date DATE,
  region STRING,
  created_at TIMESTAMP,
  -- SCD Type 1 metadata
  last_updated_at TIMESTAMP,
  source_version BIGINT,
  sync_timestamp TIMESTAMP
)
USING DELTA
PARTITIONED BY (transaction_date)
TBLPROPERTIES (
  'delta.autoOptimize.optimizeWrite' = 'true',
  'delta.autoOptimize.autoCompact' = 'true',
  'delta.enableChangeDataFeed' = 'true'
)
""")

print(f"✅ Created local target table: {local_table_name}")
print("✅ Table includes SCD Type 1 metadata columns")

## Step 4: Initial Data Load

Perform the initial load of data from R2 to our local table.

In [None]:
from datetime import datetime

# Get the current version of R2 source
r2_version = spark.sql(f"DESCRIBE DETAIL {DA.catalog}.{DA.schema}.r2_source_data").select("version").collect()[0][0]
sync_time = datetime.now()

print(f"📊 R2 source version: {r2_version}")
print(f"📊 Sync timestamp: {sync_time}")

# Initial load with SCD metadata
initial_data = spark.sql(f"""
SELECT 
  transaction_id,
  customer_id,
  product_category,
  amount,
  transaction_date,
  region,
  created_at,
  -- SCD Type 1 metadata
  created_at as last_updated_at,
  {r2_version} as source_version,
  '{sync_time}' as sync_timestamp
FROM {DA.catalog}.{DA.schema}.r2_source_data
""")

initial_data.write.mode("overwrite").saveAsTable(local_table_name)

loaded_count = spark.table(local_table_name).count()
print(f"✅ Initial load completed: {loaded_count} records synchronized")

## Step 5: Create Change Detection Function

Create a function to detect changes in the R2 source data.

In [None]:
def detect_r2_changes(r2_table, local_table):
    """
    Detect changes between R2 source and local table.
    
    Args:
        r2_table: Name of R2 external table
        local_table: Name of local synchronized table
    
    Returns:
        dict: Information about detected changes
    """
    try:
        # Get current versions and record counts
        r2_version = spark.sql(f"DESCRIBE DETAIL {r2_table}").select("version").collect()[0][0]
        r2_count = spark.table(r2_table).count()
        
        local_version_result = spark.sql(f"SELECT MAX(source_version) as max_version FROM {local_table}").collect()
        local_version = local_version_result[0][0] if local_version_result[0][0] is not None else 0
        local_count = spark.table(local_table).count()
        
        # Detect if there are changes
        has_changes = (r2_version > local_version) or (r2_count != local_count)
        
        # If there are changes, identify new/updated records
        new_records = 0
        updated_records = 0
        
        if has_changes:
            # Find records that exist in R2 but not in local (new records)
            new_records_df = spark.sql(f"""
                SELECT r2.*
                FROM {r2_table} r2
                LEFT ANTI JOIN {local_table} local
                ON r2.transaction_id = local.transaction_id
            """)
            new_records = new_records_df.count()
            
            # Find records that exist in both but have different values (updated records)
            updated_records_df = spark.sql(f"""
                SELECT r2.*
                FROM {r2_table} r2
                INNER JOIN {local_table} local
                ON r2.transaction_id = local.transaction_id
                WHERE r2.amount != local.amount 
                   OR r2.product_category != local.product_category
                   OR r2.region != local.region
                   OR r2.created_at != local.created_at
            """)
            updated_records = updated_records_df.count()
        
        return {
            'has_changes': has_changes,
            'r2_version': r2_version,
            'local_version': local_version,
            'r2_count': r2_count,
            'local_count': local_count,
            'new_records': new_records,
            'updated_records': updated_records,
            'total_changes': new_records + updated_records
        }
        
    except Exception as e:
        return {
            'has_changes': False,
            'error': str(e),
            'message': f'Error detecting changes: {str(e)}'
        }

print("✅ Change detection function created")

## Step 6: Create MERGE-based Synchronization Function

Create a function that uses MERGE operations to synchronize changes from R2 to local table.

In [None]:
def sync_r2_to_local(r2_table, local_table):
    """
    Synchronize data from R2 to local table using MERGE (Type 1 SCD).
    
    Args:
        r2_table: Name of R2 external table
        local_table: Name of local target table
    
    Returns:
        dict: Synchronization results
    """
    try:
        sync_start = datetime.now()
        
        # Detect changes first
        change_info = detect_r2_changes(r2_table, local_table)
        
        if not change_info.get('has_changes', False):
            return {
                'status': 'up_to_date',
                'records_processed': 0,
                'message': 'No changes detected - local table is up to date'
            }
        
        print(f"🔄 Synchronizing changes...")
        print(f"   New records: {change_info['new_records']}")
        print(f"   Updated records: {change_info['updated_records']}")
        
        # Get current R2 version for metadata
        r2_version = change_info['r2_version']
        
        # Perform MERGE operation (Type 1 SCD)
        merge_sql = f"""
        MERGE INTO {local_table} AS target
        USING (
            SELECT 
                transaction_id,
                customer_id,
                product_category,
                amount,
                transaction_date,
                region,
                created_at,
                -- SCD metadata
                current_timestamp() as last_updated_at,
                {r2_version} as source_version,
                current_timestamp() as sync_timestamp
            FROM {r2_table}
        ) AS source
        ON target.transaction_id = source.transaction_id
        
        WHEN MATCHED THEN UPDATE SET
            customer_id = source.customer_id,
            product_category = source.product_category,
            amount = source.amount,
            transaction_date = source.transaction_date,
            region = source.region,
            created_at = source.created_at,
            last_updated_at = source.last_updated_at,
            source_version = source.source_version,
            sync_timestamp = source.sync_timestamp
        
        WHEN NOT MATCHED THEN INSERT (
            transaction_id, customer_id, product_category, amount,
            transaction_date, region, created_at, last_updated_at,
            source_version, sync_timestamp
        ) VALUES (
            source.transaction_id, source.customer_id, source.product_category,
            source.amount, source.transaction_date, source.region,
            source.created_at, source.last_updated_at, source.source_version,
            source.sync_timestamp
        )
        """
        
        # Execute the MERGE
        merge_result = spark.sql(merge_sql)
        
        sync_end = datetime.now()
        duration = (sync_end - sync_start).total_seconds()
        
        # Get final counts
        final_local_count = spark.table(local_table).count()
        
        return {
            'status': 'success',
            'records_processed': change_info['total_changes'],
            'new_records': change_info['new_records'],
            'updated_records': change_info['updated_records'],
            'final_count': final_local_count,
            'duration_seconds': duration,
            'r2_version': r2_version,
            'sync_timestamp': sync_start.isoformat(),
            'message': f'Successfully synchronized {change_info["total_changes"]} changes'
        }
        
    except Exception as e:
        return {
            'status': 'error',
            'records_processed': 0,
            'error': str(e),
            'message': f'Synchronization failed: {str(e)}'
        }

print("✅ MERGE-based synchronization function created")

## Step 7: Test Change Detection

Test our change detection capabilities.

In [None]:
# Test change detection
change_info = detect_r2_changes(
    f"{DA.catalog}.{DA.schema}.r2_source_data",
    local_table_name
)

print("📊 Change Detection Results:")
print(f"   Has changes: {change_info.get('has_changes', 'Unknown')}")
print(f"   R2 version: {change_info.get('r2_version', 'Unknown')}")
print(f"   Local version: {change_info.get('local_version', 'Unknown')}")
print(f"   R2 record count: {change_info.get('r2_count', 'Unknown')}")
print(f"   Local record count: {change_info.get('local_count', 'Unknown')}")
print(f"   New records: {change_info.get('new_records', 'Unknown')}")
print(f"   Updated records: {change_info.get('updated_records', 'Unknown')}")

if change_info.get('error'):
    print(f"   Error: {change_info['error']}")

## Step 8: Simulate Provider Adding New Data

For demo purposes, let's simulate the provider adding new data to R2. 
In practice, this would happen automatically through the provider's replication process.

In [None]:
# Simulate new data being added to R2 by the provider
# In real scenarios, this would come from the provider's replication job

import uuid
from datetime import date
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, DateType, TimestampType

# Create some new simulated data
new_provider_data = []
regions = ['North America', 'Europe', 'Asia Pacific', 'Latin America']
categories = ['Electronics', 'Clothing', 'Books', 'Home & Garden', 'Sports']

for i in range(300, 325):
    new_provider_data.append((
        str(uuid.uuid4()),
        f"customer_{i % 80}",
        categories[i % len(categories)],
        round(120 + (i * 3.1) % 350, 2),
        date(2024, 10, 27),  # Today's date
        regions[i % len(regions)],
        datetime.now()
    ))

schema = StructType([
    StructField("transaction_id", StringType(), True),
    StructField("customer_id", StringType(), True),
    StructField("product_category", StringType(), True),
    StructField("amount", DecimalType(10,2), True),
    StructField("transaction_date", DateType(), True),
    StructField("region", StringType(), True),
    StructField("created_at", TimestampType(), True)
])

new_df = spark.createDataFrame(new_provider_data, schema)

# Add to the R2 table (simulating provider replication)
new_df.write.mode("append").saveAsTable(f"{DA.catalog}.{DA.schema}.r2_source_data")

print(f"🎭 Simulated provider adding {new_df.count()} new records to R2")

# Check the new state
updated_r2_count = spark.table(f"{DA.catalog}.{DA.schema}.r2_source_data").count()
print(f"📊 R2 table now has {updated_r2_count} total records")

## Step 9: Test Synchronization with MERGE

Now test our synchronization function to pull the new changes.

In [None]:
# Run synchronization
sync_result = sync_r2_to_local(
    f"{DA.catalog}.{DA.schema}.r2_source_data",
    local_table_name
)

print("🔄 Synchronization Results:")
print(f"   Status: {sync_result['status']}")
print(f"   Records processed: {sync_result.get('records_processed', 0)}")
print(f"   New records: {sync_result.get('new_records', 0)}")
print(f"   Updated records: {sync_result.get('updated_records', 0)}")
print(f"   Duration: {sync_result.get('duration_seconds', 0):.2f} seconds")
print(f"   Final count: {sync_result.get('final_count', 0)}")
print(f"   Message: {sync_result.get('message', 'No message')}")

if sync_result.get('error'):
    print(f"   Error: {sync_result['error']}")

## Step 10: Verify Type 1 SCD Implementation

Verify that our Type 1 SCD implementation is working correctly with proper metadata.

In [None]:
# Check the SCD metadata in our local table
scd_summary = spark.sql(f"""
SELECT 
    COUNT(*) as total_records,
    COUNT(DISTINCT source_version) as distinct_versions,
    MIN(sync_timestamp) as first_sync,
    MAX(sync_timestamp) as last_sync,
    MAX(source_version) as latest_version,
    COUNT(DISTINCT transaction_date) as distinct_dates
FROM {local_table_name}
""")

print("📊 Type 1 SCD Summary:")
display(scd_summary)

# Show sample records with SCD metadata
print("\n📄 Sample records with SCD metadata:")
sample_scd = spark.sql(f"""
SELECT 
    transaction_id,
    amount,
    region,
    transaction_date,
    last_updated_at,
    source_version,
    sync_timestamp
FROM {local_table_name}
ORDER BY sync_timestamp DESC
LIMIT 10
""")
display(sample_scd)

## Step 11: Test Update Scenarios

Test how our MERGE logic handles updates to existing records.

In [None]:
# Simulate the provider updating some existing records in R2
# Get a few transaction IDs to update
sample_ids = spark.sql(f"""
    SELECT transaction_id 
    FROM {local_table_name} 
    LIMIT 5
""").collect()

if len(sample_ids) > 0:
    # Update these records in the R2 table (simulate provider changes)
    for row in sample_ids[:3]:  # Update first 3 records
        transaction_id = row[0]
        # Increase amount by 10% and change region
        spark.sql(f"""
            UPDATE {DA.catalog}.{DA.schema}.r2_source_data
            SET amount = amount * 1.1,
                region = 'Updated Region',
                created_at = current_timestamp()
            WHERE transaction_id = '{transaction_id}'
        """)
    
    print(f"🔄 Simulated provider updating 3 existing records in R2")
    
    # Now synchronize the updates
    update_sync_result = sync_r2_to_local(
        f"{DA.catalog}.{DA.schema}.r2_source_data",
        local_table_name
    )
    
    print("\n📊 Update Synchronization Results:")
    print(f"   Status: {update_sync_result['status']}")
    print(f"   Records processed: {update_sync_result.get('records_processed', 0)}")
    print(f"   Updated records: {update_sync_result.get('updated_records', 0)}")
    print(f"   Message: {update_sync_result.get('message', 'No message')}")
    
    # Show the updated records
    print("\n📄 Updated records (Type 1 SCD - current values only):")
    updated_records = spark.sql(f"""
        SELECT 
            transaction_id,
            amount,
            region,
            last_updated_at,
            source_version
        FROM {local_table_name}
        WHERE region = 'Updated Region'
        ORDER BY last_updated_at DESC
    """)
    display(updated_records)
else:
    print("ℹ️  No records found to update")

## Step 12: Create Production Synchronization Job

Create a production-ready job function that can be scheduled to run automatically.

In [None]:
def production_sync_job():
    """
    Production-ready synchronization job for recipient.
    This function includes comprehensive error handling, logging, and monitoring.
    """
    import json
    from datetime import datetime
    
    job_start = datetime.now()
    
    try:
        print(f"🔄 Starting recipient sync job at {job_start}")
        
        # Define table names
        r2_source = f"{DA.catalog}.{DA.schema}.r2_source_data"
        local_target = local_table_name
        
        # Check if R2 source is accessible
        try:
            r2_test_count = spark.table(r2_source).count()
            print(f"✅ R2 source accessible: {r2_test_count} records")
        except Exception as e:
            raise Exception(f"Cannot access R2 source table: {str(e)}")
        
        # Detect changes
        change_info = detect_r2_changes(r2_source, local_target)
        
        if change_info.get('error'):
            raise Exception(f"Change detection failed: {change_info['error']}")
        
        # Synchronize if changes detected
        if change_info.get('has_changes', False):
            print(f"📊 Changes detected: {change_info['total_changes']} records")
            
            sync_result = sync_r2_to_local(r2_source, local_target)
            
            if sync_result.get('error'):
                raise Exception(f"Synchronization failed: {sync_result['error']}")
            
            job_end = datetime.now()
            duration = (job_end - job_start).total_seconds()
            
            # Log successful sync
            log_entry = {
                'timestamp': job_start.isoformat(),
                'duration_seconds': duration,
                'status': 'success_with_changes',
                'records_processed': sync_result['records_processed'],
                'new_records': sync_result.get('new_records', 0),
                'updated_records': sync_result.get('updated_records', 0),
                'final_count': sync_result.get('final_count', 0),
                'r2_version': sync_result.get('r2_version', 0),
                'r2_source': r2_source,
                'local_target': local_target
            }
            
            print(f"✅ Sync completed successfully")
            print(f"   Duration: {duration:.2f} seconds")
            print(f"   Records processed: {sync_result['records_processed']}")
            print(f"   New: {sync_result.get('new_records', 0)}, Updated: {sync_result.get('updated_records', 0)}")
            
        else:
            job_end = datetime.now()
            duration = (job_end - job_start).total_seconds()
            
            log_entry = {
                'timestamp': job_start.isoformat(),
                'duration_seconds': duration,
                'status': 'success_no_changes',
                'records_processed': 0,
                'message': 'No changes detected - local table is up to date',
                'r2_source': r2_source,
                'local_target': local_target
            }
            
            print(f"ℹ️  No changes detected - table is up to date")
            print(f"   Duration: {duration:.2f} seconds")
        
        # In production, you might want to:
        # 1. Send metrics to monitoring system (DataDog, CloudWatch, etc.)
        # 2. Log to centralized system (Splunk, ELK stack, etc.)
        # 3. Update job metadata table
        # 4. Send success notifications if needed
        
        return log_entry
        
    except Exception as e:
        job_end = datetime.now()
        duration = (job_end - job_start).total_seconds()
        
        error_log = {
            'timestamp': job_start.isoformat(),
            'duration_seconds': duration,
            'status': 'error',
            'error_message': str(e),
            'r2_source': r2_source,
            'local_target': local_target
        }
        
        print(f"❌ Job failed after {duration:.2f} seconds")
        print(f"   Error: {str(e)}")
        
        # In production:
        # 1. Send alert to operations team
        # 2. Log error details for debugging
        # 3. Implement retry logic with exponential backoff
        
        raise

# Test the production job
job_result = production_sync_job()
print(f"\n📋 Job Result Summary: {job_result}")

## Step 13: Data Quality and Validation

Implement data quality checks to ensure synchronization integrity.

In [None]:
def validate_sync_quality(r2_table, local_table):
    """
    Validate data quality after synchronization.
    
    Returns:
        dict: Validation results
    """
    validation_results = {
        'passed': True,
        'checks': [],
        'warnings': [],
        'errors': []
    }
    
    try:
        # Check 1: Record count consistency
        r2_count = spark.table(r2_table).count()
        local_count = spark.table(local_table).count()
        
        if r2_count == local_count:
            validation_results['checks'].append(f"✅ Record count match: {r2_count} records")
        else:
            validation_results['errors'].append(f"❌ Record count mismatch: R2={r2_count}, Local={local_count}")
            validation_results['passed'] = False
        
        # Check 2: Unique transaction IDs
        local_unique_check = spark.sql(f"""
            SELECT COUNT(*) as total, COUNT(DISTINCT transaction_id) as unique_ids
            FROM {local_table}
        """).collect()[0]
        
        if local_unique_check[0] == local_unique_check[1]:
            validation_results['checks'].append(f"✅ All transaction IDs are unique")
        else:
            validation_results['errors'].append(f"❌ Duplicate transaction IDs detected")
            validation_results['passed'] = False
        
        # Check 3: Data freshness (SCD metadata)
        freshness_check = spark.sql(f"""
            SELECT 
                DATEDIFF(HOUR, MAX(sync_timestamp), current_timestamp()) as hours_since_sync,
                MAX(source_version) as latest_version
            FROM {local_table}
        """).collect()[0]
        
        hours_since_sync = freshness_check[0]
        if hours_since_sync <= 24:  # Data should be less than 24 hours old
            validation_results['checks'].append(f"✅ Data is fresh: {hours_since_sync} hours old")
        else:
            validation_results['warnings'].append(f"⚠️  Data may be stale: {hours_since_sync} hours old")
        
        # Check 4: Amount field validation (no negative values)
        negative_amounts = spark.sql(f"""
            SELECT COUNT(*) as negative_count
            FROM {local_table}
            WHERE amount < 0
        """).collect()[0][0]
        
        if negative_amounts == 0:
            validation_results['checks'].append(f"✅ No negative amounts found")
        else:
            validation_results['warnings'].append(f"⚠️  Found {negative_amounts} negative amounts")
        
        # Check 5: Date range validation
        date_range = spark.sql(f"""
            SELECT 
                MIN(transaction_date) as earliest_date,
                MAX(transaction_date) as latest_date,
                COUNT(DISTINCT transaction_date) as distinct_dates
            FROM {local_table}
        """).collect()[0]
        
        validation_results['checks'].append(f"✅ Date range: {date_range[0]} to {date_range[1]} ({date_range[2]} distinct dates)")
        
        return validation_results
        
    except Exception as e:
        validation_results['errors'].append(f"❌ Validation failed: {str(e)}")
        validation_results['passed'] = False
        return validation_results

# Run validation
validation = validate_sync_quality(
    f"{DA.catalog}.{DA.schema}.r2_source_data",
    local_table_name
)

print("🔍 Data Quality Validation Results:")
print(f"\n   Overall Status: {'✅ PASSED' if validation['passed'] else '❌ FAILED'}")

if validation['checks']:
    print("\n   ✅ Passed Checks:")
    for check in validation['checks']:
        print(f"      {check}")

if validation['warnings']:
    print("\n   ⚠️  Warnings:")
    for warning in validation['warnings']:
        print(f"      {warning}")

if validation['errors']:
    print("\n   ❌ Errors:")
    for error in validation['errors']:
        print(f"      {error}")

## Step 14: Performance Analysis

Analyze the performance characteristics of our R2-based synchronization.

In [None]:
# Performance analysis
performance_stats = spark.sql(f"""
SELECT 
    'Local Synchronized Table' as source,
    COUNT(*) as total_records,
    ROUND(SUM(amount), 2) as total_amount,
    ROUND(AVG(amount), 2) as avg_amount,
    COUNT(DISTINCT customer_id) as unique_customers,
    COUNT(DISTINCT region) as unique_regions,
    MIN(transaction_date) as earliest_date,
    MAX(transaction_date) as latest_date,
    MAX(source_version) as latest_version,
    MAX(sync_timestamp) as last_sync
FROM {local_table_name}

UNION ALL

SELECT 
    'R2 External Table' as source,
    COUNT(*) as total_records,
    ROUND(SUM(amount), 2) as total_amount,
    ROUND(AVG(amount), 2) as avg_amount,
    COUNT(DISTINCT customer_id) as unique_customers,
    COUNT(DISTINCT region) as unique_regions,
    MIN(transaction_date) as earliest_date,
    MAX(transaction_date) as latest_date,
    NULL as latest_version,
    NULL as last_sync
FROM {DA.catalog}.{DA.schema}.r2_source_data
""")

print("📊 Performance Comparison:")
display(performance_stats)

# Check table sizes and optimization
print("\n🔧 Table Details:")
local_details = spark.sql(f"DESCRIBE DETAIL {local_table_name}")
display(local_details.select("name", "numFiles", "sizeInBytes", "properties"))

## Summary and Next Steps

🎉 **Congratulations!** You have successfully implemented a recipient-side synchronization system for cross-cloud data replication using Cloudflare R2.

### What We Accomplished:

✅ **R2 Access**: Connected to Cloudflare R2 external table with zero egress costs  
✅ **Local Table**: Created managed table with Type 1 SCD metadata  
✅ **Change Detection**: Implemented automatic change detection from R2 source  
✅ **MERGE Operations**: Used MERGE for efficient Type 1 SCD updates  
✅ **Production Job**: Created schedulable synchronization function  
✅ **Data Quality**: Implemented comprehensive validation checks  
✅ **Monitoring**: Added performance analysis and health monitoring  

### Key Benefits Achieved:

🌍 **Global Access**: Fast data access from Cloudflare's global network  
💰 **Cost Efficient**: Zero egress fees for reading from R2  
⚡ **Local Performance**: Optimized local queries on synchronized data  
🔄 **Automatic Updates**: MERGE-based Type 1 SCD for current data  
📊 **Data Quality**: Built-in validation and monitoring  
🛡️ **Reliability**: Error handling and retry capabilities  

### Type 1 SCD Implementation:

Our implementation provides:
- **Current Data Only**: Type 1 SCD overwrites old values with new ones
- **Metadata Tracking**: `last_updated_at`, `source_version`, `sync_timestamp`
- **MERGE Logic**: Efficient upsert operations (UPDATE existing, INSERT new)
- **Change Detection**: Automatic identification of new and updated records

### Next Steps for Production:

1. **Schedule the Job**: 
   - Create Databricks Job with the `production_sync_job()` function
   - Set appropriate frequency (hourly, daily, etc.)
   - Configure cluster auto-scaling for cost efficiency

2. **Security & Secrets**:
   - Store R2 credentials in Databricks Secrets
   - Implement row-level security if needed
   - Set up audit logging

3. **Advanced Monitoring**:
   - Set up alerts for sync failures
   - Create dashboards for data freshness
   - Implement SLA monitoring

4. **Performance Optimization**:
   - Implement partition pruning strategies
   - Use table optimization (OPTIMIZE, VACUUM)
   - Consider Z-ordering for better query performance

5. **Advanced SCD Patterns**:
   - Implement Type 2 SCD if historical tracking needed
   - Add soft deletes handling
   - Implement CDC (Change Data Capture) patterns

### Cost Benefits:

By using Cloudflare R2 for cross-cloud replication:
- **Traditional**: $90/TB egress costs for cross-cloud data access
- **With R2**: $0/TB egress costs + $15/TB storage
- **Savings**: Up to 95% reduction in data sharing costs

### Use Cases:

This pattern is ideal for:
- Multi-cloud analytics environments
- Global data distribution scenarios
- Cost-sensitive data sharing requirements
- SaaS platforms serving global customers
- Backup and disaster recovery strategies

The recipient now has a robust, cost-effective way to stay synchronized with provider data across cloud boundaries!

---
&copy; 2025 Databricks, Inc. All rights reserved. Apache, Apache Spark, Spark, the Spark Logo, Apache Iceberg, Iceberg, and the Apache Iceberg logo are trademarks of the <a href="https://www.apache.org/" target="_blank">Apache Software Foundation</a>.<br/><br/><a href="https://databricks.com/privacy-policy" target="_blank">Privacy Policy</a> | <a href="https://databricks.com/terms-of-use" target="_blank">Terms of Use</a> | <a href="https://help.databricks.com/" target="_blank">Support</a>