# Simple AutoLoader + Change Data Feed Demo

This notebook demonstrates:
1. **Unity Catalog** setup
2. **AutoLoader** for file ingestion (both batch and streaming)
3. **Change Data Feed (CDF)** for incremental processing (both batch and streaming)
4. **Feature Engineering** in the Silver layer

## 📋 Requirements
- **Databricks Runtime**: 11.3 LTS or higher (for CDF support)
- **Unity Catalog**: Enabled workspace (for managed tables)
- **Cluster permissions**: CREATE TABLE, CREATE SCHEMA
- **Storage**: Access to DBFS or cloud storage (ADLS Gen2, S3, or GCS)

## 🎯 Batch vs Streaming
**Important**: Both AutoLoader and CDF support batch AND streaming workloads!
- **Batch**: Use `.trigger(once=True)` or regular DataFrame reads
- **Streaming**: Use `.trigger(processingTime='X seconds')` for continuous processing

## 1️⃣ Setup and Configuration

**💡 Note**: We're using DBFS paths for data storage. In production, you can also use cloud storage paths.

In [0]:
from pyspark.sql.functions import *
from delta.tables import DeltaTable
import pandas as pd
import numpy as np
from sklearn.datasets import load_iris
import time

# Simple configuration using DBFS
catalog = "marcin_demo"
schema = "iris_demo_cdf"

# DBFS paths for data and checkpoints
data_path = "dbfs:/FileStore/autoloader_demo/iris_landing/"  
checkpoint_base = "dbfs:/FileStore/autoloader_demo/checkpoints/"
schema_location_base = "dbfs:/FileStore/autoloader_demo/schemas/"

# For cloud storage, use paths like:
# data_path = "abfss://container@storage.dfs.core.windows.net/autoloader_demo/iris_landing/"
# data_path = "s3a://bucket/autoloader_demo/iris_landing/"

print(f"📍 Data path: {data_path}")
print(f"📍 Checkpoint base: {checkpoint_base}")
print(f"📍 Schema location base: {schema_location_base}")

📍 Data path: dbfs:/FileStore/autoloader_demo/iris_landing/
📍 Checkpoint base: dbfs:/FileStore/autoloader_demo/checkpoints/
📍 Schema location base: dbfs:/FileStore/autoloader_demo/schemas/


## 2️⃣ Create Unity Catalog Objects

**🔑 Key Concept**: Unity Catalog provides governance for Delta tables. CDF requires Unity Catalog managed tables.

In [0]:
%sql
DROP TABLE IF EXISTS silver_iris;
DROP TABLE IF EXISTS bronze_iris;
DROP TABLE IF EXISTS cdf_version_control;

In [0]:
# Create catalog and schema
spark.sql(f"CREATE CATALOG IF NOT EXISTS {catalog}")
spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {schema}")
spark.sql(f"USE SCHEMA {schema}")

print(f"✅ Using: {catalog}.{schema}")

✅ Using: marcin_demo.iris_demo_cdf


## 3️⃣ Generate IRIS Data in Chunks (Simulating File Arrivals)

**🎯 Real-world scenario**: Files typically arrive in batches (hourly, daily) or continuously in a landing zone.

**Note**: We'll use Spark DataFrames to write CSV files directly to DBFS.

In [0]:
# Create data directory in DBFS using dbutils
dbutils.fs.mkdirs(data_path)
print(f"✅ Created DBFS directory: {data_path}")

# Load IRIS dataset
iris = load_iris()
df_pandas = pd.DataFrame(data=iris.data, columns=['sepal_length', 'sepal_width', 'petal_length', 'petal_width'])
df_pandas['species'] = iris.target_names[iris.target]

# Generate 3 initial chunks using Spark
for i in range(3):
    # Create chunk with pandas
    chunk = df_pandas.sample(n=50, replace=True).copy()
    chunk['record_id'] = [f"REC_{i:03d}_{j:04d}" for j in range(len(chunk))]
    chunk['batch_id'] = f"batch_{i:03d}"
    chunk['timestamp'] = pd.Timestamp.now() + pd.Timedelta(hours=i)
    
    # Convert to Spark DataFrame and write to DBFS
    spark_df = spark.createDataFrame(chunk)
    file_path = f"{data_path}iris_batch_{i:03d}.csv"
    
    # Write as single CSV file (coalesce to 1 partition)
    spark_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(file_path)
    print(f"✅ Created: {file_path}")

# List files in DBFS
print(f"\n📁 Files in landing zone:")
display(dbutils.fs.ls(data_path))

✅ Created DBFS directory: dbfs:/FileStore/autoloader_demo/iris_landing/
✅ Created: dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_000.csv
✅ Created: dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_001.csv
✅ Created: dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_002.csv

📁 Files in landing zone:


path,name,size,modificationTime
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_000.csv/,iris_batch_000.csv/,0,1753292945000
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_001.csv/,iris_batch_001.csv/,0,1753292946000
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_002.csv/,iris_batch_002.csv/,0,1753292947000
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_003.csv/,iris_batch_003.csv/,0,1753292017000
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_004.csv/,iris_batch_004.csv/,0,1753292018000
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_005.csv/,iris_batch_005.csv/,0,1753292019000


## 4️⃣ Create Bronze Table with AutoLoader

### 🚀 AutoLoader Requirements:
- **File format**: CSV, JSON, Parquet, Avro, Text, Binaryfile
- **Schema location**: Required for schema inference and evolution (use DBFS path)
- **Checkpoint**: Required for exactly-once processing (use DBFS path)

**💡 Best Practice**: Enable CDF on Bronze for downstream incremental processing

In [0]:
# Create Bronze table with CDF enabled
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS bronze_iris (
        sepal_length DOUBLE,
        sepal_width DOUBLE,
        petal_length DOUBLE,
        petal_width DOUBLE,
        species STRING,
        record_id STRING,
        batch_id STRING,
        timestamp TIMESTAMP
    )
    USING DELTA
    TBLPROPERTIES (
        'delta.enableChangeDataFeed' = 'true',  -- Required for CDF
        'delta.autoOptimize.optimizeWrite' = 'true'  -- Recommended for both batch and streaming
    )
""")

print("✅ Bronze table created with CDF enabled")

✅ Bronze table created with CDF enabled


## 5️⃣ AutoLoader - BATCH Mode (One-time processing)

**📊 Batch AutoLoader**: Perfect for scheduled jobs (hourly, daily) that process all new files at once

In [0]:
print("🔄 Running AutoLoader in BATCH mode...")

# Define DBFS paths for schema and checkpoint
bronze_schema_location = f"{schema_location_base}bronze_batch/"
bronze_checkpoint_location = f"{checkpoint_base}bronze_batch/"

batch_bronze_stream = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", bronze_schema_location)
        .option("header", "true")
        .load(data_path)
)

batch_query = (
    batch_bronze_stream.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", bronze_checkpoint_location)
        .option("mergeSchema", "true")
        .trigger(availableNow=True)  # Updated: recommended for 'batch' mode
        .toTable("bronze_iris")      # Use .toTable for table writes
)

batch_query.awaitTermination()

print(f"✅ Batch AutoLoader completed!")
print(f"📊 Bronze records after batch load: {spark.table('bronze_iris').count()}")
display(spark.sql("SELECT * FROM bronze_iris ORDER BY timestamp LIMIT 5"))

🔄 Running AutoLoader in BATCH mode...
✅ Batch AutoLoader completed!
📊 Bronze records after batch load: 150


sepal_length,sepal_width,petal_length,petal_width,species,record_id,batch_id,timestamp,_rescued_data
5.1,3.5,1.4,0.3,setosa,REC_000_0003,batch_000,2025-07-23T17:49:04.866Z,
5.8,4.0,1.2,0.2,setosa,REC_000_0001,batch_000,2025-07-23T17:49:04.866Z,
4.7,3.2,1.3,0.2,setosa,REC_000_0002,batch_000,2025-07-23T17:49:04.866Z,
5.4,3.9,1.7,0.4,setosa,REC_000_0000,batch_000,2025-07-23T17:49:04.866Z,
7.2,3.0,5.8,1.6,virginica,REC_000_0004,batch_000,2025-07-23T17:49:04.866Z,


## 6️⃣ Create Silver Table with Feature Engineering

**🎯 Silver Layer Purpose**: Cleansed, validated, and enriched data ready for analytics

In [0]:
# Create Silver table with CDF enabled
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS silver_iris (
        record_id STRING,
        species STRING,
        sepal_length DOUBLE,
        sepal_width DOUBLE,
        petal_length DOUBLE,
        petal_width DOUBLE,
        -- Feature engineering columns
        sepal_area DOUBLE,
        petal_area DOUBLE,
        sepal_ratio DOUBLE,
        petal_ratio DOUBLE,
        size_category STRING,
        -- Metadata for tracking
        batch_id STRING,
        original_timestamp TIMESTAMP,
        silver_timestamp TIMESTAMP,
        cdf_operation STRING,
        processing_mode STRING  -- Track if batch or stream
    )
    USING DELTA
    TBLPROPERTIES ('delta.enableChangeDataFeed' = 'true')
""")

print("✅ Silver table created with CDF enabled")

✅ Silver table created with CDF enabled


## 7️⃣ Create Version Control Table

### 📊 Version Tracking for Incremental Processing
**Purpose**: Track the last processed version from Bronze to avoid reprocessing data

We'll create a control table to store processing metadata. **Streaming does not require this.**

In [0]:
# BATCH CDF - Read changes from Bronze using version tracking
print("🔄 Processing Bronze to Silver using BATCH CDF with version tracking...")

# Create version control table
spark.sql("""
    CREATE TABLE IF NOT EXISTS cdf_version_control (
        table_name STRING,
        last_processed_version LONG,
        processing_timestamp TIMESTAMP,
        records_processed LONG,
        processing_status STRING,
        processing_mode STRING,
        error_message STRING
    )
    USING DELTA
""")
print("✅ Version control table created")

# Initialize version control for bronze_iris if not already present
spark.sql("""
    MERGE INTO cdf_version_control AS target
    USING (SELECT 'bronze_iris' AS table_name, 0 AS last_processed_version, 
                  current_timestamp() AS processing_timestamp, 
                  0 AS records_processed, 
                  'initialized' AS processing_status, 
                  'batch' AS processing_mode, 
                  NULL AS error_message) AS source
    ON target.table_name = source.table_name
    WHEN NOT MATCHED THEN
    INSERT (table_name, last_processed_version, processing_timestamp, records_processed, processing_status, processing_mode, error_message)
    VALUES (source.table_name, source.last_processed_version, source.processing_timestamp, source.records_processed, source.processing_status, source.processing_mode, source.error_message)
""")
print("✅ Version control initialized for bronze_iris")

🔄 Processing Bronze to Silver using BATCH CDF with version tracking...
✅ Version control table created
✅ Version control initialized for bronze_iris


In [0]:
# Get last processed version from control table
last_processed_version = spark.sql("""
    SELECT last_processed_version 
    FROM cdf_version_control 
    WHERE table_name = 'bronze_iris'
""").collect()[0]['last_processed_version']

print(f"📍 Last processed version: {last_processed_version}")

📍 Last processed version: 0


In [0]:
# Get current Bronze table version
current_bronze_version = spark.sql("DESCRIBE HISTORY bronze_iris").select("version").first()[0]
print(f"📍 Current Bronze table version: {current_bronze_version}")

📍 Current Bronze table version: 2


In [0]:
def process_bronze_to_silver_cdf(last_processed_version, current_bronze_version):
    # Check if there are new changes to process
    if last_processed_version >= current_bronze_version:
        print("✅ No new changes to process. Bronze table is up to date!")
    else:
        print(f"🆕 Processing changes from version {last_processed_version + 1} to {current_bronze_version}")
        
        try:
            # Read only new changes since last processed version
            bronze_changes_batch = (
                spark.read
                .format("delta")
                .option("readChangeFeed", "true")
                .option("startingVersion", last_processed_version + 1)  # Start from next version
                .option("endingVersion", current_bronze_version)  # Up to current version
                .table("bronze_iris")
            )
            
            print(f"📊 CDF columns available: {[c for c in bronze_changes_batch.columns if c.startswith('_')]}")
            
            # Filter for inserts and updates
            filtered_changes = bronze_changes_batch.filter(
                col("_change_type").isin(["insert", "update_postimage"])
            )
            
            changes_count = filtered_changes.count()
            print(f"📊 Changes to process: {changes_count}")
            
            if changes_count > 0:
                # Transform with feature engineering
                silver_batch = (
                    filtered_changes
                    # Feature engineering
                    .withColumn("sepal_area", col("sepal_length") * col("sepal_width"))
                    .withColumn("petal_area", col("petal_length") * col("petal_width"))
                    .withColumn("sepal_ratio", col("sepal_length") / col("sepal_width"))
                    .withColumn("petal_ratio", col("petal_length") / col("petal_width"))
                    .withColumn("size_category", 
                        when(col("petal_area") < 2, "small")
                        .when(col("petal_area") < 10, "medium")
                        .otherwise("large")
                    )
                    # Metadata
                    .withColumn("original_timestamp", col("timestamp"))
                    .withColumn("silver_timestamp", current_timestamp())
                    .withColumn("cdf_operation", col("_change_type"))
                    .withColumn("processing_mode", lit("batch"))
                    # Select final columns
                    .select("record_id", "species", "sepal_length", "sepal_width", "petal_length", "petal_width",
                            "sepal_area", "petal_area", "sepal_ratio", "petal_ratio", "size_category",
                            "batch_id", "original_timestamp", "silver_timestamp", "cdf_operation", "processing_mode")
                )
                
                # Write to Silver
                silver_batch.write.format("delta").mode("append").saveAsTable("silver_iris")
                
                # Update version control table with successful processing
                spark.sql(f"""
                    UPDATE cdf_version_control 
                    SET last_processed_version = {current_bronze_version},
                        processing_timestamp = current_timestamp(),
                        records_processed = {changes_count},
                        processing_status = 'success',
                        processing_mode = 'batch',
                        error_message = null
                    WHERE table_name = 'bronze_iris'
                """)
                
                print(f"✅ Batch CDF processing completed! Processed {changes_count} records")
                print(f"💾 Updated control table - last processed version: {current_bronze_version}")
            else:
                print("ℹ️ No insert or update changes found in the version range")
                
        except Exception as e:
            # Log error in control table
            error_msg = str(e).replace("'", "''")  # Escape single quotes for SQL
            spark.sql(f"""
                UPDATE cdf_version_control 
                SET processing_timestamp = current_timestamp(),
                    processing_status = 'failed',
                    error_message = '{error_msg}'
                WHERE table_name = 'bronze_iris'
            """)
            print(f"❌ Error during processing: {e}")
            raise e

In [0]:
process_bronze_to_silver_cdf(last_processed_version, current_bronze_version)

# Display sample of processed records
print("\n📊 Sample Silver records from this batch:")
display(spark.sql("""
    SELECT * FROM silver_iris 
    WHERE processing_mode = 'batch' 
    ORDER BY silver_timestamp DESC 
    LIMIT 5
    """))

🆕 Processing changes from version 1 to 2
📊 CDF columns available: ['_rescued_data', '_change_type', '_commit_version', '_commit_timestamp']
📊 Changes to process: 150
✅ Batch CDF processing completed! Processed 150 records
💾 Updated control table - last processed version: 2

📊 Sample Silver records from this batch:


record_id,species,sepal_length,sepal_width,petal_length,petal_width,sepal_area,petal_area,sepal_ratio,petal_ratio,size_category,batch_id,original_timestamp,silver_timestamp,cdf_operation,processing_mode
REC_002_0000,virginica,7.7,3.0,6.1,2.3,23.1,14.029999999999998,2.566666666666667,2.6521739130434785,large,batch_002,2025-07-23T19:49:06.871Z,2025-07-23T17:54:30.276Z,insert,batch
REC_002_0001,setosa,5.1,3.8,1.9,0.4,19.38,0.76,1.3421052631578947,4.749999999999999,small,batch_002,2025-07-23T19:49:06.871Z,2025-07-23T17:54:30.276Z,insert,batch
REC_002_0003,virginica,7.4,2.8,6.1,1.9,20.72,11.589999999999998,2.6428571428571432,3.210526315789473,large,batch_002,2025-07-23T19:49:06.871Z,2025-07-23T17:54:30.276Z,insert,batch
REC_002_0002,setosa,5.5,4.2,1.4,0.2,23.1,0.2799999999999999,1.3095238095238095,6.999999999999999,small,batch_002,2025-07-23T19:49:06.871Z,2025-07-23T17:54:30.276Z,insert,batch
REC_002_0004,virginica,6.5,3.0,5.8,2.2,19.5,12.76,2.1666666666666665,2.636363636363636,large,batch_002,2025-07-23T19:49:06.871Z,2025-07-23T17:54:30.276Z,insert,batch


## 8️⃣ Generate More Files (Simulating next batch arrival)

In [0]:
# Generate more files to show batch processing
print("📂 Generating new batch of files...")
for i in range(3, 6):
    # Create chunk with pandas
    chunk = df_pandas.sample(n=30, replace=True).copy()
    chunk['record_id'] = [f"REC_{i:03d}_{j:04d}" for j in range(len(chunk))]
    chunk['batch_id'] = f"batch_{i:03d}"
    chunk['timestamp'] = pd.Timestamp.now()
    
    # Convert to Spark DataFrame and write to DBFS
    spark_df = spark.createDataFrame(chunk)
    file_path = f"{data_path}iris_batch_{i:03d}.csv"
    
    # Write as single CSV file
    spark_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(file_path)
    print(f"✅ Created: {file_path}")

print("\n📁 New files ready for next batch run!")
display(dbutils.fs.ls(data_path))

📂 Generating new batch of files...
✅ Created: dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_003.csv
✅ Created: dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_004.csv
✅ Created: dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_005.csv

📁 New files ready for next batch run!


path,name,size,modificationTime
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_000.csv/,iris_batch_000.csv/,0,1753292945000
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_001.csv/,iris_batch_001.csv/,0,1753292946000
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_002.csv/,iris_batch_002.csv/,0,1753292947000
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_003.csv/,iris_batch_003.csv/,0,1753293307000
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_004.csv/,iris_batch_004.csv/,0,1753293308000
dbfs:/FileStore/autoloader_demo/iris_landing/iris_batch_005.csv/,iris_batch_005.csv/,0,1753293309000


## 9️⃣ AutoLoader - BATCH Mode Again (Process only new files)

**💡 Key Point**: AutoLoader tracks processed files in its checkpoint, so it only processes NEW files in each batch run

In [0]:
# Get current Bronze count before processing
bronze_count_before = spark.table('bronze_iris').count()
print(f"📊 Bronze records before: {bronze_count_before}")

# Run AutoLoader batch again - it will only process NEW files
print("\n🔄 Running AutoLoader BATCH again (only new files)...")

print("🔄 Running AutoLoader in BATCH mode...")

batch_bronze_stream_2 = (
    spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "csv")
        .option("cloudFiles.schemaLocation", bronze_schema_location)
        .option("header", "true")
        .load(data_path)
)

batch_query_2 = (
    batch_bronze_stream.writeStream
        .format("delta")
        .outputMode("append")
        .option("checkpointLocation", bronze_checkpoint_location)
        .option("mergeSchema", "true")
        .trigger(availableNow=True)  # Updated: recommended for 'batch' mode
        .toTable("bronze_iris")      # Use .toTable for table writes
)

batch_query_2.awaitTermination()
bronze_count_after = spark.table('bronze_iris').count()
print(f"\n✅ Batch completed!")
print(f"📊 Bronze records after: {bronze_count_after}")
print(f"🆕 New records added: {bronze_count_after - bronze_count_before}")
print("\n💡 AutoLoader only processed the NEW files!")

📊 Bronze records before: 150

🔄 Running AutoLoader BATCH again (only new files)...
🔄 Running AutoLoader in BATCH mode...

✅ Batch completed!
📊 Bronze records after: 240
🆕 New records added: 90

💡 AutoLoader only processed the NEW files!


## 🔟 BATCH Mode CDF Process Again!

In [0]:
last_processed_version = spark.sql("""
    SELECT last_processed_version 
    FROM cdf_version_control 
    WHERE table_name = 'bronze_iris'
""").collect()[0]['last_processed_version']

print(f"📍 Last processed version: {last_processed_version}")

# Get current Bronze table version
current_bronze_version = spark.sql("DESCRIBE HISTORY bronze_iris").select("version").first()[0]
print(f"📍 Current Bronze table version: {current_bronze_version}")

📍 Last processed version: 2
📍 Current Bronze table version: 3


In [0]:
process_bronze_to_silver_cdf(last_processed_version, current_bronze_version)

# Display sample of processed records
print("\n📊 Sample Silver records from this batch:")
display(spark.sql("""
    SELECT * FROM silver_iris 
    WHERE processing_mode = 'batch' 
    ORDER BY silver_timestamp DESC 
    LIMIT 5
    """))

🆕 Processing changes from version 3 to 3
📊 CDF columns available: ['_rescued_data', '_change_type', '_commit_version', '_commit_timestamp']
📊 Changes to process: 90
✅ Batch CDF processing completed! Processed 90 records
💾 Updated control table - last processed version: 3

📊 Sample Silver records from this batch:


record_id,species,sepal_length,sepal_width,petal_length,petal_width,sepal_area,petal_area,sepal_ratio,petal_ratio,size_category,batch_id,original_timestamp,silver_timestamp,cdf_operation,processing_mode
REC_005_0000,versicolor,5.5,2.6,4.4,1.2,14.3,5.28,2.1153846153846154,3.666666666666667,medium,batch_005,2025-07-23T17:55:08.186Z,2025-07-23T17:55:39.981Z,insert,batch
REC_005_0001,virginica,6.9,3.1,5.1,2.3,21.39,11.73,2.2258064516129035,2.217391304347826,large,batch_005,2025-07-23T17:55:08.186Z,2025-07-23T17:55:39.981Z,insert,batch
REC_005_0003,versicolor,6.0,3.4,4.5,1.6,20.4,7.2,1.7647058823529411,2.8125,medium,batch_005,2025-07-23T17:55:08.186Z,2025-07-23T17:55:39.981Z,insert,batch
REC_005_0002,setosa,4.8,3.4,1.6,0.2,16.32,0.32,1.411764705882353,8.0,small,batch_005,2025-07-23T17:55:08.186Z,2025-07-23T17:55:39.981Z,insert,batch
REC_005_0004,virginica,6.9,3.1,5.4,2.1,21.39,11.340000000000002,2.2258064516129035,2.571428571428572,large,batch_005,2025-07-23T17:55:08.186Z,2025-07-23T17:55:39.981Z,insert,batch


## Close it out - no more new processing required! 

We check the versions once more, and confirm there's no more processing!

In [0]:
last_processed_version = spark.sql("""
    SELECT last_processed_version 
    FROM cdf_version_control 
    WHERE table_name = 'bronze_iris'
""").collect()[0]['last_processed_version']

print(f"📍 Last processed version: {last_processed_version}")

# Get current Bronze table version
current_bronze_version = spark.sql("DESCRIBE HISTORY bronze_iris").select("version").first()[0]
print(f"📍 Current Bronze table version: {current_bronze_version}")

process_bronze_to_silver_cdf(last_processed_version, current_bronze_version)

📍 Last processed version: 3
📍 Current Bronze table version: 3
✅ No new changes to process. Bronze table is up to date!
