In [0]:
# ============================================
# FULLY DYNAMIC CONFIGURATION
# Auto-discovers databases AND tables
# ============================================

# Base storage account and container
storage_account = "datamigrationsathya"
container = "datalake"

# Path structure configuration
layer = "bronze"  # bronze, silver, gold
source_system = "mysql"  # mysql, postgres, etc.

# Base path for the source system
source_base_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{layer}/{source_system}/"

print("=" * 70)
print("AUTO-DISCOVERING DATABASES AND TABLES")
print("=" * 70)
print(f"\nSource system: {source_system}")
print(f"Base path: {source_base_path}")

# ============================================
# AUTO-DISCOVER ALL DATABASES
# ============================================

all_database_configs = []

try:
    # List all database folders under bronze/mysql/
    database_folders = dbutils.fs.ls(source_base_path)
    
    print(f"\nFound {len(database_folders)} database(s):\n")
    
    for db_folder in database_folders:
        if db_folder.isDir():
            database_name = db_folder.name.rstrip('/')
            
            # Construct paths for this database
            bronze_path = f"{source_base_path}{database_name}/"
            silver_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/silver/{source_system}/{database_name}/"
            checkpoint_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/checkpoints/bronze_to_silver/{source_system}/{database_name}/"
            
            # Auto-discover tables in this database
            try:
                table_folders = dbutils.fs.ls(bronze_path)
                tables = [t.name.rstrip('/') for t in table_folders if t.isDir()]
                
                if tables:  # Only add if tables exist
                    all_database_configs.append({
                        "database_name": database_name,
                        "bronze_path": bronze_path,
                        "silver_path": silver_path,
                        "checkpoint_path": checkpoint_path,
                        "tables": tables
                    })
                    
                    print(f"  ‚úì {database_name}: {len(tables)} table(s)")
                    for table in tables:
                        print(f"      - {table}")
                    print()
                else:
                    print(f"  ‚ö† {database_name}: No tables found (skipping)\n")
                    
            except Exception as e:
                print(f"  ‚úó {database_name}: Error reading tables - {str(e)}\n")
                continue
    
    if not all_database_configs:
        print("\n‚ö†Ô∏è  No databases with tables found!")
        print("Please check your bronze layer structure.")
    else:
        print("=" * 70)
        print(f"SUMMARY: {len(all_database_configs)} database(s) ready to process")
        total_tables = sum(len(config['tables']) for config in all_database_configs)
        print(f"Total tables across all databases: {total_tables}")
        print("=" * 70)
        
except Exception as e:
    print(f"\n‚ùå Error discovering databases: {str(e)}")
    print("\nFalling back to manual configuration...")
    
    # Fallback: Manual configuration
    database_name = "retail_db"
    bronze_base_path = f"{source_base_path}{database_name}/"
    silver_base_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/silver/{source_system}/{database_name}/"
    checkpoint_base_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/checkpoints/bronze_to_silver/{source_system}/{database_name}/"
    
    # Auto-discover tables
    try:
        folders = dbutils.fs.ls(bronze_base_path)
        tables_to_process = [folder.name.rstrip('/') for folder in folders if folder.isDir()]
        print(f"Using manual database: {database_name}")
        print(f"Found {len(tables_to_process)} tables: {tables_to_process}")
    except:
        tables_to_process = ["customer_details"]
        print(f"Using fallback table list: {tables_to_process}")
    
    # Create single database config for backward compatibility
    all_database_configs = [{
        "database_name": database_name,
        "bronze_path": bronze_base_path,
        "silver_path": silver_base_path,
        "checkpoint_path": checkpoint_base_path,
        "tables": tables_to_process
    }]

print("\n‚úÖ Configuration complete!")
print("\nNote: Auto Loader will recursively process all files in subdirectories (e.g., load_date partitions)")

In [0]:
# ============================================
# OPTIONAL: Filter tables by pattern
# ============================================

# Option 1: Process only specific tables
include_tables = []  # Leave empty to process all, or specify: ["customer_details", "orders"]

# Option 2: Exclude specific tables
exclude_tables = []  # Example: ["temp_table", "test_table"]

# Option 3: Filter by prefix/pattern
table_prefix = ""  # Example: "customer_" to process only customer_* tables

# Apply filters
if include_tables:
    tables_to_process = [t for t in tables_to_process if t in include_tables]
    print(f"\nFiltered to include only: {include_tables}")

if exclude_tables:
    tables_to_process = [t for t in tables_to_process if t not in exclude_tables]
    print(f"\nExcluded tables: {exclude_tables}")

if table_prefix:
    tables_to_process = [t for t in tables_to_process if t.startswith(table_prefix)]
    print(f"\nFiltered by prefix '{table_prefix}'")

print(f"\nFinal tables to process ({len(tables_to_process)}):")
for table in tables_to_process:
    print(f"  ‚úì {table}")

In [0]:
# ============================================
# MULTI-DATABASE CONFIGURATION
# Use this if you have multiple databases to process
# ============================================

# Define multiple database configurations
database_configs = [
    {
        "source_system": "mysql",
        "database_name": "retail_db",
    },
    # Uncomment and add more databases as needed:
    # {
    #     "source_system": "mysql",
    #     "database_name": "analytics_db",
    # },
    # {
    #     "source_system": "postgres",
    #     "database_name": "crm_db",
    # },
]

# Build configuration for all databases
all_configs = []

for config in database_configs:
    source = config["source_system"]
    db = config["database_name"]
    
    bronze_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/{layer}/{source}/{db}/"
    silver_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/silver/{source}/{db}/"
    checkpoint_path = f"abfss://{container}@{storage_account}.dfs.core.windows.net/checkpoints/bronze_to_silver/{source}/{db}/"
    
    # Auto-discover tables for this database
    try:
        folders = dbutils.fs.ls(bronze_path)
        tables = [folder.name.rstrip('/') for folder in folders if folder.isDir()]
        
        all_configs.append({
            "source_system": source,
            "database_name": db,
            "bronze_path": bronze_path,
            "silver_path": silver_path,
            "checkpoint_path": checkpoint_path,
            "tables": tables
        })
        
        print(f"\n‚úì {source}/{db}: Found {len(tables)} tables")
        
    except Exception as e:
        print(f"\n‚úó {source}/{db}: Error - {str(e)}")

print(f"\n{'='*60}")
print(f"Total databases configured: {len(all_configs)}")
print(f"{'='*60}")

# Display summary
for config in all_configs:
    print(f"\n{config['source_system']}/{config['database_name']}:")
    print(f"  Tables: {', '.join(config['tables'][:5])}{'...' if len(config['tables']) > 5 else ''}")
    print(f"  Total: {len(config['tables'])} tables")

## Handling Date-Partitioned Folders

Your bronze layer has a structure like:
```
customer_details/
  ‚îú‚îÄ‚îÄ load_date=2026-01-31/
  ‚îÇ   ‚îú‚îÄ‚îÄ file1.parquet
  ‚îÇ   ‚îú‚îÄ‚îÄ file2.parquet
  ‚îú‚îÄ‚îÄ load_date=2026-02-01/
  ‚îÇ   ‚îú‚îÄ‚îÄ file1.parquet
  ‚îÇ   ‚îú‚îÄ‚îÄ file2.parquet
```

**Good news:** Auto Loader (Options 1 & 4) automatically handles this!

* **`recursiveFileLookup=true`** processes all files in all subdirectories
* **Partition columns** (like `load_date`) are automatically extracted and added as columns
* **New date folders** are automatically discovered and processed

**For batch processing** (Options 2 & 3), Spark also automatically reads partition columns when you use the wildcard pattern or specify the parent folder.

In [0]:
# Run this to verify that partition columns (like load_date) are automatically captured

table_name = "customer_details"

# Read with partition discovery
df_with_partitions = spark.read.parquet(f"{bronze_base_path}{table_name}/")

print("Schema with partition columns:")
df_with_partitions.printSchema()

print("\nSample data showing load_date column:")
display(df_with_partitions.select("*", "load_date").limit(10))

print("\nDistinct load dates in the data:")
df_with_partitions.select("load_date").distinct().show()

In [0]:
# Process a single table using Auto Loader (cloudFiles)
# Auto Loader automatically discovers new files and processes them incrementally

table_name = "customer_details"

# Read all files from all subdirectories using Auto Loader
df = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")  # Change to "json", "csv", "avro" as needed
    .option("cloudFiles.schemaLocation", f"{checkpoint_base_path}{table_name}/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("recursiveFileLookup", "true")  # Process files in all subdirectories
    .load(f"{bronze_base_path}{table_name}/")
)

# Add metadata columns
from pyspark.sql.functions import current_timestamp, input_file_name

df_enriched = (df
    .withColumn("processing_timestamp", current_timestamp())
    .withColumn("source_file", input_file_name())
)

# Display sample
display(df_enriched.limit(10))

In [0]:
# Alternative: Read all parquet files across all folders at once
# This is simpler but doesn't track which files have been processed

# Use wildcard to read all parquet files in all subdirectories
df_all = spark.read.parquet(f"{bronze_base_path}*/**/")

print(f"Total records: {df_all.count()}")
print(f"Schema:")
df_all.printSchema()

display(df_all.limit(10))

In [0]:
# Process multiple tables in a loop
# This approach processes each table separately

for table_name in tables_to_process:
    print(f"\nProcessing table: {table_name}")
    
    try:
        # Read data from bronze layer
        df = spark.read.parquet(f"{bronze_base_path}{table_name}/")
        
        # Add processing metadata
        from pyspark.sql.functions import current_timestamp, lit
        df_processed = (df
            .withColumn("processing_timestamp", current_timestamp())
            .withColumn("source_table", lit(table_name))
        )
        
        # Show sample
        print(f"Records in {table_name}: {df_processed.count()}")
        
        # Write to silver layer (optional - uncomment when ready)
        # df_processed.write.mode("overwrite").parquet(f"{silver_base_path}{table_name}/")
        
    except Exception as e:
        print(f"Error processing {table_name}: {str(e)}")
        continue

In [0]:
# Write the streaming data to silver layer using Auto Loader
# This creates an incremental pipeline that processes new files automatically

table_name = "customer_details"

# Read with Auto Loader
df_stream = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", f"{checkpoint_base_path}{table_name}/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("recursiveFileLookup", "true")
    .load(f"{bronze_base_path}{table_name}/")
)

# Add metadata
from pyspark.sql.functions import current_timestamp, input_file_name
df_enriched = (df_stream
    .withColumn("processing_timestamp", current_timestamp())
    .withColumn("source_file", input_file_name())
)

# Write to silver layer
query = (df_enriched.writeStream
    .format("delta")  # Use Delta format for silver layer
    .option("checkpointLocation", f"{checkpoint_base_path}{table_name}/checkpoint")
    .option("mergeSchema", "true")
    .outputMode("append")
    .trigger(availableNow=True)  # Process all available files then stop
    .table(f"silver.retail_db.{table_name}")  # Or use .start(f"{silver_base_path}{table_name}/")
)

# Wait for completion
query.awaitTermination()
print(f"Processing complete for {table_name}")

In [0]:
# Explore your bronze layer structure
# This helps you understand what folders and files exist

print("=== Bronze Layer Structure ===")
print(f"\nBase path: {bronze_base_path}\n")

try:
    # List all folders in bronze layer
    folders = dbutils.fs.ls(bronze_base_path)
    
    for folder in folders:
        if folder.isDir():
            print(f"üìÅ {folder.name}")
            
            # List files in each folder (first level)
            try:
                subfolders = dbutils.fs.ls(folder.path)
                for subfolder in subfolders[:5]:  # Show first 5 items
                    print(f"   ‚îî‚îÄ {subfolder.name} ({subfolder.size} bytes)")
                if len(subfolders) > 5:
                    print(f"   ‚îî‚îÄ ... and {len(subfolders) - 5} more items")
            except:
                pass
            print()
except Exception as e:
    print(f"Error listing folders: {str(e)}")

---
## üìÖ Daily Production Run Guide

### **Cells to Run Daily:**

1. **Cell 1** - Configuration (always run first)
2. **Cell 10** - Production pipeline (processes all tables incrementally)
3. **Cell 11** - Verification (optional, to check results)

---

### **How It Works:**

‚úÖ **First Run (Day 1):**
- Processes ALL existing files in bronze layer
- Creates checkpoints for each table
- Writes data to silver layer

‚úÖ **Subsequent Runs (Day 2, 3, 4...):**
- **Only processes NEW files** added since last run
- Skips already-processed files (based on checkpoint)
- Completes in seconds if no new data

---

### **Key Benefits:**

* üöÄ **Efficient**: Only reads new data
* üí∞ **Cost-effective**: Minimal compute usage
* üîÑ **Idempotent**: Safe to run multiple times
* üìÇ **Automatic**: Discovers new date partitions automatically
* ‚ö° **Fast**: Completes quickly when no new data

---

### **Schedule Options:**

**Option A - Manual:** Run Cell 1 ‚Üí Cell 10 daily

**Option B - Databricks Job:** 
- Create a scheduled job that runs this notebook daily
- Set schedule: Daily at specific time (e.g., 2 AM)
- Job will automatically run cells 1 and 10

**Option C - Workflow:**
- Use Databricks Workflows for orchestration
- Add dependencies if you have upstream processes

In [0]:
# This cell processes ALL tables incrementally using Auto Loader
# Run this daily - it will only process NEW files added since last run

from pyspark.sql.functions import current_timestamp, input_file_name, lit

print("=" * 60)
print("Starting Daily Bronze to Silver Pipeline")
print("=" * 60)

for table_name in tables_to_process:
    print(f"\n{'='*60}")
    print(f"Processing table: {table_name}")
    print(f"{'='*60}")
    
    try:
        # Read with Auto Loader (only processes new files)
        df_stream = (spark.readStream
            .format("cloudFiles")
            .option("cloudFiles.format", "parquet")
            .option("cloudFiles.schemaLocation", f"{checkpoint_base_path}{table_name}/schema")
            .option("cloudFiles.inferColumnTypes", "true")
            .option("recursiveFileLookup", "true")
            .load(f"{bronze_base_path}{table_name}/")
        )
        
        # Add metadata columns
        df_enriched = (df_stream
            .withColumn("processing_timestamp", current_timestamp())
            .withColumn("source_file", input_file_name())
            .withColumn("source_table", lit(table_name))
        )
        
        # Write to silver layer
        query = (df_enriched.writeStream
            .format("delta")
            .option("checkpointLocation", f"{checkpoint_base_path}{table_name}/checkpoint")
            .option("mergeSchema", "true")
            .outputMode("append")
            .trigger(availableNow=True)  # Process all available files then stop
            .start(f"{silver_base_path}{table_name}/")
        )
        
        # Wait for completion
        query.awaitTermination()
        
        print(f"‚úÖ Successfully processed {table_name}")
        
    except Exception as e:
        print(f"‚ùå Error processing {table_name}: {str(e)}")
        # Continue with next table even if one fails
        continue

print("\n" + "="*60)
print("Daily Pipeline Complete!")
print("="*60)

In [0]:
# ============================================
# PRODUCTION PIPELINE - MULTI-DATABASE SUPPORT
# This cell works with both single and multi-database configurations
# ============================================

from pyspark.sql.functions import current_timestamp, input_file_name, lit

print("=" * 70)
print("Starting Daily Bronze to Silver Pipeline (Multi-Database)")
print("=" * 70)

# Check if multi-database config exists, otherwise use single database
if 'all_configs' in locals() and all_configs:
    configs_to_process = all_configs
    print(f"\nProcessing {len(configs_to_process)} database(s)")
else:
    # Single database mode
    configs_to_process = [{
        "source_system": source_system,
        "database_name": database_name,
        "bronze_path": bronze_base_path,
        "silver_path": silver_base_path,
        "checkpoint_path": checkpoint_base_path,
        "tables": tables_to_process
    }]
    print(f"\nProcessing single database: {source_system}/{database_name}")

# Process each database
for config in configs_to_process:
    source = config['source_system']
    db = config['database_name']
    bronze_path = config['bronze_path']
    silver_path = config['silver_path']
    checkpoint_path = config['checkpoint_path']
    tables = config['tables']
    
    print(f"\n{'='*70}")
    print(f"Database: {source}/{db}")
    print(f"Tables to process: {len(tables)}")
    print(f"{'='*70}")
    
    # Process each table in this database
    for table_name in tables:
        print(f"\n  Processing: {source}/{db}/{table_name}")
        
        try:
            # Read with Auto Loader (only processes new files)
            df_stream = (spark.readStream
                .format("cloudFiles")
                .option("cloudFiles.format", "parquet")
                .option("cloudFiles.schemaLocation", f"{checkpoint_path}{table_name}/schema")
                .option("cloudFiles.inferColumnTypes", "true")
                .option("recursiveFileLookup", "true")
                .load(f"{bronze_path}{table_name}/")
            )
            
            # Add metadata columns
            df_enriched = (df_stream
                .withColumn("processing_timestamp", current_timestamp())
                .withColumn("source_file", input_file_name())
                .withColumn("source_table", lit(table_name))
                .withColumn("source_system", lit(source))
                .withColumn("source_database", lit(db))
            )
            
            # Write to silver layer
            query = (df_enriched.writeStream
                .format("delta")
                .option("checkpointLocation", f"{checkpoint_path}{table_name}/checkpoint")
                .option("mergeSchema", "true")
                .outputMode("append")
                .trigger(availableNow=True)
                .start(f"{silver_path}{table_name}/")
            )
            
            # Wait for completion
            query.awaitTermination()
            
            print(f"  ‚úÖ Successfully processed {table_name}")
            
        except Exception as e:
            print(f"  ‚ùå Error processing {table_name}: {str(e)}")
            continue
    
    print(f"\n‚úÖ Completed database: {source}/{db}")

print("\n" + "="*70)
print("Daily Pipeline Complete!")
print("="*70)

In [0]:
# ============================================
# DAILY PRODUCTION PIPELINE
# Processes all databases and tables discovered in Cell 1
# ============================================

from pyspark.sql.functions import current_timestamp, lit

print("=" * 70)
print("Starting Daily Bronze to Silver Pipeline")
print("=" * 70)

# Use the all_database_configs from Cell 1
if 'all_database_configs' not in dir() or not all_database_configs:
    print("\n‚ùå ERROR: Please run Cell 1 first to discover databases and tables!")
    print("Cell 1 creates the 'all_database_configs' variable needed for processing.")
else:
    print(f"\n‚úÖ Found configuration for {len(all_database_configs)} database(s)")
    total_tables = sum(len(config['tables']) for config in all_database_configs)
    print(f"Total tables to process: {total_tables}\n")
    
    # Process each database
    for config in all_database_configs:
        db_name = config['database_name']
        bronze_path = config['bronze_path']
        silver_path = config['silver_path']
        checkpoint_path = config['checkpoint_path']
        tables = config['tables']
        
        print(f"\n{'='*70}")
        print(f"Database: {db_name}")
        print(f"Tables: {len(tables)}")
        print(f"{'='*70}")
        
        # Process each table in this database
        for table_name in tables:
            print(f"\n  üìä Processing: {db_name}/{table_name}")
            
            try:
                # Read with Auto Loader (only processes new files)
                df_stream = (spark.readStream
                    .format("cloudFiles")
                    .option("cloudFiles.format", "parquet")
                    .option("cloudFiles.schemaLocation", f"{checkpoint_path}{table_name}/schema")
                    .option("cloudFiles.inferColumnTypes", "true")
                    .option("recursiveFileLookup", "true")
                    .load(f"{bronze_path}{table_name}/")
                )
                
                # Add metadata columns
                df_enriched = (df_stream
                    .withColumn("processing_timestamp", current_timestamp())
                    .withColumn("source_table", lit(table_name))
                    .withColumn("source_database", lit(db_name))
                )
                
                # Write to silver layer
                query = (df_enriched.writeStream
                    .format("delta")
                    .option("checkpointLocation", f"{checkpoint_path}{table_name}/checkpoint")
                    .option("mergeSchema", "true")
                    .outputMode("append")
                    .trigger(availableNow=True)
                    .start(f"{silver_path}{table_name}/")
                )
                
                # Wait for completion
                query.awaitTermination()
                
                print(f"     ‚úÖ Successfully processed {table_name}")
                
            except Exception as e:
                print(f"     ‚ùå Error processing {table_name}: {str(e)[:100]}")
                continue
        
        print(f"\n  ‚úÖ Completed database: {db_name}")
    
    print("\n" + "="*70)
    print("‚úÖ Daily Pipeline Complete!")
    print("="*70)

In [0]:
# Verification: Check silver layer for all databases and tables

print("=" * 70)
print("SILVER LAYER VERIFICATION")
print("=" * 70)

if 'all_database_configs' not in dir() or not all_database_configs:
    print("\n‚ùå Please run Cell 1 first!")
else:
    total_processed = 0
    total_records = 0
    
    for config in all_database_configs:
        db_name = config['database_name']
        silver_path = config['silver_path']
        tables = config['tables']
        
        print(f"\nüìä Database: {db_name}")
        print("=" * 70)
        
        for table_name in tables:
            try:
                # Read from silver layer
                df = spark.read.format("delta").load(f"{silver_path}{table_name}/")
                count = df.count()
                
                # Get distinct load dates if column exists
                if "load_date" in df.columns:
                    dates = df.select("load_date").distinct().count()
                    print(f"  ‚úì {table_name}: {count:,} records, {dates} load date(s)")
                else:
                    print(f"  ‚úì {table_name}: {count:,} records")
                
                total_processed += 1
                total_records += count
                
            except Exception as e:
                print(f"  ‚úó {table_name}: Not processed or error - {str(e)[:50]}")
        
        print()
    
    print("=" * 70)
    print(f"SUMMARY")
    print("=" * 70)
    print(f"Databases: {len(all_database_configs)}")
    print(f"Tables processed: {total_processed}")
    print(f"Total records: {total_records:,}")
    print("=" * 70)

In [0]:
# Test processing a single table to verify the setup works

from pyspark.sql.functions import current_timestamp, input_file_name, lit

# Use first database and first table for testing
test_config = all_database_configs[0]
test_db = test_config['database_name']
test_table = test_config['tables'][0]

print(f"üìä Testing with: {test_db}/{test_table}")
print("=" * 70)

bronze_path = f"{test_config['bronze_path']}{test_table}/"
silver_path = f"{test_config['silver_path']}{test_table}/"
checkpoint_path = f"{test_config['checkpoint_path']}{test_table}/"

print(f"\nBronze: {bronze_path}")
print(f"Silver: {silver_path}")
print(f"Checkpoint: {checkpoint_path}")

# Check bronze layer has data
print(f"\nüîç Checking bronze layer...")
try:
    bronze_files = dbutils.fs.ls(bronze_path)
    print(f"   ‚úì Found {len(bronze_files)} item(s) in bronze layer")
    for item in bronze_files[:3]:
        print(f"     - {item.name}")
except Exception as e:
    print(f"   ‚ùå Error: {str(e)}")
    raise

# Process with Auto Loader
print(f"\n‚è≥ Processing with Auto Loader...")

try:
    df_stream = (spark.readStream
        .format("cloudFiles")
        .option("cloudFiles.format", "parquet")
        .option("cloudFiles.schemaLocation", f"{checkpoint_path}schema")
        .option("cloudFiles.inferColumnTypes", "true")
        .option("recursiveFileLookup", "true")
        .load(bronze_path)
    )
    
    print(f"   ‚úì Stream created")
    
    # Add metadata
    df_enriched = (df_stream
        .withColumn("processing_timestamp", current_timestamp())
        .withColumn("source_file", input_file_name())
        .withColumn("source_table", lit(test_table))
        .withColumn("source_database", lit(test_db))
    )
    
    print(f"   ‚úì Metadata columns added")
    
    # Write to silver
    query = (df_enriched.writeStream
        .format("delta")
        .option("checkpointLocation", f"{checkpoint_path}checkpoint")
        .option("mergeSchema", "true")
        .outputMode("append")
        .trigger(availableNow=True)
        .start(silver_path)
    )
    
    print(f"   ‚úì Write stream started")
    print(f"   ‚è≥ Waiting for completion...")
    
    query.awaitTermination()
    
    print(f"\n‚úÖ SUCCESS! Processed {test_db}/{test_table}")
    
    # Verify silver layer
    print(f"\nüîç Verifying silver layer...")
    df_silver = spark.read.format("delta").load(silver_path)
    count = df_silver.count()
    print(f"   ‚úì Silver layer has {count:,} records")
    
    print(f"\nüéâ Test successful! The pipeline is working correctly.")
    
except Exception as e:
    print(f"\n‚ùå ERROR: {str(e)}")
    import traceback
    traceback.print_exc()

In [0]:
# Run this after the daily pipeline to verify the results

print("=== Silver Layer Summary ===")
print()

for table_name in tables_to_process:
    try:
        df = spark.read.format("delta").load(f"{silver_base_path}{table_name}/")
        record_count = df.count()
        
        # Get distinct load dates
        load_dates = df.select("load_date").distinct().count()
        
        print(f"üìä {table_name}:")
        print(f"   Total records: {record_count:,}")
        print(f"   Distinct load dates: {load_dates}")
        print()
        
    except Exception as e:
        print(f"‚ö†Ô∏è  {table_name}: Not yet processed or error - {str(e)}")
        print()

## ‚ö†Ô∏è Important: How Auto Loader Handles Modified Files

### **Default Behavior:**
Auto Loader tracks files by **path/name only**, not by content or modification time.

**Scenario:**
```
load_date=2026-01-31/data_abc.parquet (processed on Day 1)
load_date=2026-01-31/data_abc.parquet (modified on Day 2)
```

**Result:** Auto Loader will **SKIP** the modified file on Day 2 because the filename already exists in the checkpoint.

---

### **Solutions:**

**Option 1: Append-Only Pattern (Recommended)**
- Never modify existing files
- Always write NEW files with unique names (timestamps/UUIDs)
- Example: `data_2026-01-31_v1.parquet`, `data_2026-01-31_v2.parquet`

**Option 2: Use File Notifications (Azure Event Grid)**
- Enable `cloudFiles.useNotifications=true`
- Requires Azure Event Grid setup with proper IAM roles
- Tracks file modifications via storage events

**Option 3: Full Refresh Strategy**
- Delete checkpoint and reprocess all files
- Use for one-time fixes or major data corrections

**Option 4: Partition-Level Reprocessing**
- Delete specific partition data and checkpoint entries
- Reprocess only affected partitions

In [0]:
# OPTION 1: Use Azure Event Grid to detect file modifications
# This requires proper IAM roles on your storage account:
# - Storage Account Contributor
# - Storage Blob Data Contributor  
# - EventGrid EventSubscription Contributor
# - Storage Queue Data Contributor

from pyspark.sql.functions import current_timestamp, input_file_name, lit

table_name = "customer_details"

# Read with Auto Loader + File Notifications
df_stream = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", f"{checkpoint_base_path}{table_name}/schema")
    .option("cloudFiles.inferColumnTypes", "true")
    .option("recursiveFileLookup", "true")
    
    # Enable file notifications to detect modifications
    .option("cloudFiles.useNotifications", "true")  # Requires Event Grid setup
    .option("cloudFiles.includeExistingFiles", "true")  # Process existing files on first run
    
    .load(f"{bronze_base_path}{table_name}/")
)

# Add metadata
df_enriched = (df_stream
    .withColumn("processing_timestamp", current_timestamp())
    .withColumn("source_file", input_file_name())
)

# Write to silver layer
query = (df_enriched.writeStream
    .format("delta")
    .option("checkpointLocation", f"{checkpoint_base_path}{table_name}/checkpoint_with_notifications")
    .option("mergeSchema", "true")
    .outputMode("append")
    .trigger(availableNow=True)
    .start(f"{silver_base_path}{table_name}/")
)

query.awaitTermination()
print(f"Processing complete for {table_name}")

In [0]:
# OPTION 2: Full refresh - Use when you need to reprocess ALL files
# WARNING: This will reprocess ALL files, which can be expensive

table_name = "customer_details"

print(f"‚ö†Ô∏è  WARNING: This will delete checkpoint and reprocess ALL files for {table_name}")
print(f"Checkpoint location: {checkpoint_base_path}{table_name}/")

# Uncomment the lines below to execute the full refresh
# print("\nDeleting checkpoint...")
# dbutils.fs.rm(f"{checkpoint_base_path}{table_name}/", recurse=True)
# print("‚úÖ Checkpoint deleted")

# print("\nDeleting existing silver layer data...")
# dbutils.fs.rm(f"{silver_base_path}{table_name}/", recurse=True)
# print("‚úÖ Silver layer data deleted")

# print("\nNow run Cell 10 to reprocess all files from scratch")

In [0]:
# OPTION 3: Reprocess specific partition (e.g., specific load_date)
# Use this when only one date partition has modified files

from pyspark.sql.functions import col

table_name = "customer_details"
partition_to_reprocess = "2026-01-31"  # Change this to the date you want to reprocess

print(f"Reprocessing partition: load_date={partition_to_reprocess}")

# Step 1: Delete data for this partition from silver layer
print(f"\nStep 1: Deleting partition data from silver layer...")
try:
    df_silver = spark.read.format("delta").load(f"{silver_base_path}{table_name}/")
    
    # Delete records for this partition
    df_filtered = df_silver.filter(col("load_date") != partition_to_reprocess)
    
    # Overwrite silver layer (excluding the partition to reprocess)
    df_filtered.write.format("delta").mode("overwrite").save(f"{silver_base_path}{table_name}/")
    print(f"‚úÖ Deleted partition data from silver layer")
except Exception as e:
    print(f"‚ö†Ô∏è  Error or no existing data: {str(e)}")

# Step 2: Delete checkpoint to force reprocessing
print(f"\nStep 2: Deleting checkpoint...")
try:
    dbutils.fs.rm(f"{checkpoint_base_path}{table_name}/", recurse=True)
    print(f"‚úÖ Checkpoint deleted")
except Exception as e:
    print(f"‚ö†Ô∏è  Error deleting checkpoint: {str(e)}")

print(f"\n‚úÖ Ready to reprocess. Run Cell 10 to process all files (including modified partition)")

## ‚úÖ Recommended Best Practice: Append-Only Pattern

### **Instead of modifying files, use unique filenames:**

**‚ùå Bad Pattern (causes issues):**
```
load_date=2026-01-31/data.parquet  (written on Day 1)
load_date=2026-01-31/data.parquet  (overwritten on Day 2) ‚Üê Auto Loader misses this
```

**‚úÖ Good Pattern (works perfectly):**
```
load_date=2026-01-31/data_20260131_120000.parquet  (Day 1)
load_date=2026-01-31/data_20260131_140000.parquet  (Day 2) ‚Üê Auto Loader picks this up
```

### **Implementation Tips:**

1. **Add timestamps to filenames:**
   ```python
   from datetime import datetime
   timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
   filename = f"data_{timestamp}.parquet"
   ```

2. **Use UUIDs for uniqueness:**
   ```python
   import uuid
   filename = f"data_{uuid.uuid4()}.parquet"
   ```

3. **Configure upstream systems** to write with unique names

4. **Never overwrite existing files** in bronze layer

### **Benefits:**
- Auto Loader works perfectly
- Full audit trail of all data loads
- Easy to track data lineage
- No need for complex checkpoint management