# Database Maintenance Notebook

This notebook performs regular maintenance tasks on Delta Lake tables in the Verisk Pipeline.

## Maintenance Tasks

1. **Retry Queue Cleanup** - Remove expired retry records older than retention period
2. **Delta Table Optimization** - Compact small files for better query performance
3. **VACUUM Operations** - Remove old file versions to reclaim storage
4. **Health Reporting** - Generate statistics on table health and data quality

## Scheduling

This notebook should be scheduled to run daily during off-peak hours (e.g., 2:00 AM).

---

## Configuration

In [None]:
# =============================================================================
# CONFIGURATION - Update these values for your environment
# =============================================================================

# Lakehouse paths - Update these to match your Fabric lakehouse
XACT_LAKEHOUSE_PATH = "abfss://your-workspace@onelake.dfs.fabric.microsoft.com/your-xact-lakehouse.Lakehouse/Tables"
CLAIMX_LAKEHOUSE_PATH = "abfss://your-workspace@onelake.dfs.fabric.microsoft.com/your-claimx-lakehouse.Lakehouse/Tables"

# Retention settings (in days)
RETRY_RETENTION_DAYS = 30          # Delete retry records older than this
EVENT_LOG_RETENTION_DAYS = 90      # Delete event log records older than this  
VACUUM_RETENTION_HOURS = 168       # 7 days - Delta Lake minimum for time travel

# Optimization settings
TARGET_FILE_SIZE_MB = 128          # Target file size after compaction
MIN_FILE_SIZE_MB = 10              # Files smaller than this trigger optimization
SMALL_FILE_THRESHOLD = 10          # Optimize if more than this many small files

# Safety settings
DRY_RUN = False                    # Set to True to preview changes without applying

In [None]:
# =============================================================================
# TABLE DEFINITIONS
# =============================================================================

# XACT Pipeline Tables
XACT_TABLES = {
    "xact_events": {
        "path": f"{XACT_LAKEHOUSE_PATH}/xact_events",
        "z_order_columns": ["event_date", "type"],
        "partition_column": "event_date",
        "description": "XACT webhook events from Kusto"
    },
    "xact_attachments": {
        "path": f"{XACT_LAKEHOUSE_PATH}/xact_attachments",
        "z_order_columns": ["event_date", "file_type"],
        "partition_column": "event_date",
        "description": "Successfully downloaded XACT attachments"
    },
    "xact_retry": {
        "path": f"{XACT_LAKEHOUSE_PATH}/xact_retry",
        "z_order_columns": ["status", "retry_count", "created_date"],
        "partition_column": "created_date",
        "cleanup": True,
        "retention_days": RETRY_RETENTION_DAYS,
        "description": "XACT attachment download retry queue"
    }
}

# ClaimX Pipeline Tables
CLAIMX_TABLES = {
    "claimx_events": {
        "path": f"{CLAIMX_LAKEHOUSE_PATH}/claimx_events",
        "z_order_columns": ["event_date", "event_type"],
        "partition_column": "event_date",
        "description": "ClaimX events from Kusto"
    },
    "claimx_event_log": {
        "path": f"{CLAIMX_LAKEHOUSE_PATH}/claimx_event_log",
        "z_order_columns": ["processed_date", "status"],
        "partition_column": "processed_date",
        "cleanup": True,
        "retention_days": EVENT_LOG_RETENTION_DAYS,
        "cleanup_filter": "status IN ('success', 'failed_permanent')",
        "description": "ClaimX API enrichment processing log"
    },
    "claimx_projects": {
        "path": f"{CLAIMX_LAKEHOUSE_PATH}/claimx_projects",
        "z_order_columns": ["project_id"],
        "description": "ClaimX project entities"
    },
    "claimx_contacts": {
        "path": f"{CLAIMX_LAKEHOUSE_PATH}/claimx_contacts",
        "z_order_columns": ["project_id", "created_date"],
        "partition_column": "created_date",
        "description": "ClaimX contact entities"
    },
    "claimx_attachment_metadata": {
        "path": f"{CLAIMX_LAKEHOUSE_PATH}/claimx_attachment_metadata",
        "z_order_columns": ["project_id", "media_id"],
        "description": "ClaimX media metadata from API"
    },
    "claimx_attachments": {
        "path": f"{CLAIMX_LAKEHOUSE_PATH}/claimx_attachments",
        "z_order_columns": ["created_date", "file_type"],
        "partition_column": "created_date",
        "description": "Successfully downloaded ClaimX attachments"
    },
    "claimx_retry": {
        "path": f"{CLAIMX_LAKEHOUSE_PATH}/claimx_retry",
        "z_order_columns": ["status", "retry_count", "created_date"],
        "partition_column": "created_date",
        "cleanup": True,
        "retention_days": RETRY_RETENTION_DAYS,
        "description": "ClaimX media download retry queue"
    },
    "claimx_tasks": {
        "path": f"{CLAIMX_LAKEHOUSE_PATH}/claimx_tasks",
        "z_order_columns": ["project_id", "assignment_id"],
        "description": "ClaimX task assignments"
    },
    "claimx_task_templates": {
        "path": f"{CLAIMX_LAKEHOUSE_PATH}/claimx_task_templates",
        "z_order_columns": ["task_id"],
        "description": "ClaimX task templates"
    },
    "claimx_external_links": {
        "path": f"{CLAIMX_LAKEHOUSE_PATH}/claimx_external_links",
        "z_order_columns": ["project_id", "assignment_id"],
        "description": "ClaimX external link entities"
    },
    "claimx_video_collab": {
        "path": f"{CLAIMX_LAKEHOUSE_PATH}/claimx_video_collab",
        "z_order_columns": ["video_collaboration_id"],
        "description": "ClaimX video collaboration sessions"
    }
}

# Combined for iteration
ALL_TABLES = {**XACT_TABLES, **CLAIMX_TABLES}

print(f"Configured {len(XACT_TABLES)} XACT tables and {len(CLAIMX_TABLES)} ClaimX tables")
print(f"Dry run mode: {DRY_RUN}")

## Setup and Imports

In [None]:
from datetime import datetime, timedelta
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field
import json

from delta.tables import DeltaTable
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lit, when, sum as spark_sum, max as spark_max, min as spark_min

# Get or create Spark session (Fabric provides this automatically)
spark = SparkSession.builder.getOrCreate()

# Enable Delta Lake optimizations
spark.conf.set("spark.databricks.delta.optimizeWrite.enabled", "true")
spark.conf.set("spark.databricks.delta.autoCompact.enabled", "false")  # We'll run manually

print(f"Spark version: {spark.version}")
print(f"Maintenance run started at: {datetime.now().isoformat()}")

In [None]:
@dataclass
class MaintenanceResult:
    """Result of a maintenance operation."""
    table_name: str
    operation: str
    success: bool
    duration_seconds: float = 0.0
    rows_affected: int = 0
    files_before: int = 0
    files_after: int = 0
    size_before_mb: float = 0.0
    size_after_mb: float = 0.0
    error_message: Optional[str] = None
    details: Dict[str, Any] = field(default_factory=dict)

    def to_dict(self) -> Dict[str, Any]:
        return {
            "table_name": self.table_name,
            "operation": self.operation,
            "success": self.success,
            "duration_seconds": round(self.duration_seconds, 2),
            "rows_affected": self.rows_affected,
            "files_before": self.files_before,
            "files_after": self.files_after,
            "size_before_mb": round(self.size_before_mb, 2),
            "size_after_mb": round(self.size_after_mb, 2),
            "error_message": self.error_message,
            "details": self.details
        }

# Store all results for final report
maintenance_results: List[MaintenanceResult] = []

## Helper Functions

In [None]:
def table_exists(table_path: str) -> bool:
    """Check if a Delta table exists at the given path."""
    try:
        DeltaTable.forPath(spark, table_path)
        return True
    except Exception:
        return False


def get_table_stats(table_path: str) -> Dict[str, Any]:
    """Get statistics for a Delta table."""
    try:
        dt = DeltaTable.forPath(spark, table_path)
        detail = dt.detail().collect()[0]
        
        # Get row count
        row_count = spark.read.format("delta").load(table_path).count()
        
        return {
            "exists": True,
            "row_count": row_count,
            "num_files": detail.numFiles,
            "size_bytes": detail.sizeInBytes,
            "size_mb": round(detail.sizeInBytes / (1024 * 1024), 2),
            "created_at": str(detail.createdAt) if detail.createdAt else None,
            "last_modified": str(detail.lastModified) if detail.lastModified else None,
            "partitioning": detail.partitionColumns if detail.partitionColumns else []
        }
    except Exception as e:
        return {
            "exists": False,
            "error": str(e)
        }


def should_optimize(table_path: str) -> tuple[bool, str]:
    """Determine if a table needs optimization based on file metrics."""
    stats = get_table_stats(table_path)
    
    if not stats.get("exists"):
        return False, "Table does not exist"
    
    num_files = stats.get("num_files", 0)
    size_mb = stats.get("size_mb", 0)
    
    if num_files == 0:
        return False, "Table is empty"
    
    avg_file_size_mb = size_mb / num_files if num_files > 0 else 0
    
    # Count small files (estimate based on average)
    if avg_file_size_mb < MIN_FILE_SIZE_MB and num_files > SMALL_FILE_THRESHOLD:
        return True, f"Average file size ({avg_file_size_mb:.1f}MB) below threshold ({MIN_FILE_SIZE_MB}MB) with {num_files} files"
    
    if num_files > 100 and avg_file_size_mb < TARGET_FILE_SIZE_MB / 2:
        return True, f"Many files ({num_files}) with small average size ({avg_file_size_mb:.1f}MB)"
    
    return False, f"Table is healthy ({num_files} files, {avg_file_size_mb:.1f}MB avg)"


print("Helper functions defined.")

---

## 1. Retry Queue Cleanup

Remove expired records from retry queues to prevent unbounded growth.

Records are deleted when:
- `created_at` is older than retention period
- For event_log: only cleanup completed/permanent_failed records

In [None]:
def cleanup_expired_records(
    table_name: str,
    table_path: str,
    retention_days: int,
    additional_filter: Optional[str] = None,
    dry_run: bool = False
) -> MaintenanceResult:
    """
    Delete records older than retention period from a table.
    
    Args:
        table_name: Name of the table for logging
        table_path: Full path to the Delta table
        retention_days: Number of days to retain
        additional_filter: Optional additional SQL filter condition
        dry_run: If True, only count records without deleting
    
    Returns:
        MaintenanceResult with operation details
    """
    start_time = datetime.now()
    
    try:
        if not table_exists(table_path):
            return MaintenanceResult(
                table_name=table_name,
                operation="cleanup",
                success=True,
                details={"message": "Table does not exist, skipping"}
            )
        
        dt = DeltaTable.forPath(spark, table_path)
        cutoff_date = datetime.now() - timedelta(days=retention_days)
        cutoff_iso = cutoff_date.isoformat()
        
        # Build delete condition
        delete_condition = f"created_at < '{cutoff_iso}'"
        if additional_filter:
            delete_condition = f"({delete_condition}) AND ({additional_filter})"
        
        # Count records to delete
        df = spark.read.format("delta").load(table_path)
        count_before = df.count()
        records_to_delete = df.filter(delete_condition).count()
        
        if dry_run:
            duration = (datetime.now() - start_time).total_seconds()
            return MaintenanceResult(
                table_name=table_name,
                operation="cleanup_dry_run",
                success=True,
                duration_seconds=duration,
                rows_affected=records_to_delete,
                details={
                    "cutoff_date": cutoff_iso,
                    "retention_days": retention_days,
                    "total_rows": count_before,
                    "would_delete": records_to_delete,
                    "filter": delete_condition
                }
            )
        
        # Execute delete
        if records_to_delete > 0:
            dt.delete(delete_condition)
        
        count_after = spark.read.format("delta").load(table_path).count()
        duration = (datetime.now() - start_time).total_seconds()
        
        return MaintenanceResult(
            table_name=table_name,
            operation="cleanup",
            success=True,
            duration_seconds=duration,
            rows_affected=count_before - count_after,
            details={
                "cutoff_date": cutoff_iso,
                "retention_days": retention_days,
                "rows_before": count_before,
                "rows_after": count_after,
                "filter": delete_condition
            }
        )
        
    except Exception as e:
        duration = (datetime.now() - start_time).total_seconds()
        return MaintenanceResult(
            table_name=table_name,
            operation="cleanup",
            success=False,
            duration_seconds=duration,
            error_message=str(e)
        )


print("Cleanup function defined.")

In [None]:
# Execute cleanup on tables marked for cleanup
print("="*60)
print("RETRY QUEUE CLEANUP")
print("="*60)

cleanup_tables = [
    (name, config) for name, config in ALL_TABLES.items() 
    if config.get("cleanup", False)
]

print(f"\nTables scheduled for cleanup: {len(cleanup_tables)}")
for name, config in cleanup_tables:
    print(f"  - {name}: {config.get('retention_days', RETRY_RETENTION_DAYS)} days retention")

print("\n" + "-"*60)

for table_name, config in cleanup_tables:
    print(f"\nProcessing: {table_name}")
    
    result = cleanup_expired_records(
        table_name=table_name,
        table_path=config["path"],
        retention_days=config.get("retention_days", RETRY_RETENTION_DAYS),
        additional_filter=config.get("cleanup_filter"),
        dry_run=DRY_RUN
    )
    
    maintenance_results.append(result)
    
    if result.success:
        if result.rows_affected > 0:
            print(f"  {'Would delete' if DRY_RUN else 'Deleted'}: {result.rows_affected:,} records")
        else:
            print(f"  No records to clean up")
        print(f"  Duration: {result.duration_seconds:.2f}s")
    else:
        print(f"  ERROR: {result.error_message}")

print("\n" + "="*60)
print("Cleanup phase complete.")

---

## 2. Delta Table Optimization

Compact small files into larger, more efficient files using Delta Lake OPTIMIZE.

Benefits:
- Improved query performance (fewer files to scan)
- Reduced metadata overhead
- Z-ordering for data co-location on common query columns

In [None]:
def optimize_table(
    table_name: str,
    table_path: str,
    z_order_columns: Optional[List[str]] = None,
    dry_run: bool = False
) -> MaintenanceResult:
    """
    Optimize a Delta table through file compaction and optional Z-ordering.
    
    Args:
        table_name: Name of the table for logging
        table_path: Full path to the Delta table
        z_order_columns: Columns to Z-order by for query optimization
        dry_run: If True, only analyze without optimizing
    
    Returns:
        MaintenanceResult with operation details
    """
    start_time = datetime.now()
    
    try:
        if not table_exists(table_path):
            return MaintenanceResult(
                table_name=table_name,
                operation="optimize",
                success=True,
                details={"message": "Table does not exist, skipping"}
            )
        
        # Check if optimization is needed
        needs_opt, reason = should_optimize(table_path)
        stats_before = get_table_stats(table_path)
        
        if not needs_opt:
            duration = (datetime.now() - start_time).total_seconds()
            return MaintenanceResult(
                table_name=table_name,
                operation="optimize_skipped",
                success=True,
                duration_seconds=duration,
                files_before=stats_before.get("num_files", 0),
                size_before_mb=stats_before.get("size_mb", 0),
                details={"reason": reason}
            )
        
        if dry_run:
            duration = (datetime.now() - start_time).total_seconds()
            return MaintenanceResult(
                table_name=table_name,
                operation="optimize_dry_run",
                success=True,
                duration_seconds=duration,
                files_before=stats_before.get("num_files", 0),
                size_before_mb=stats_before.get("size_mb", 0),
                details={
                    "would_optimize": True,
                    "reason": reason,
                    "z_order_columns": z_order_columns
                }
            )
        
        # Execute OPTIMIZE
        dt = DeltaTable.forPath(spark, table_path)
        
        if z_order_columns:
            # Validate Z-order columns exist
            df_schema = spark.read.format("delta").load(table_path).columns
            valid_columns = [c for c in z_order_columns if c in df_schema]
            
            if valid_columns:
                dt.optimize().executeZOrderBy(valid_columns)
            else:
                dt.optimize().executeCompaction()
        else:
            dt.optimize().executeCompaction()
        
        # Get stats after optimization
        stats_after = get_table_stats(table_path)
        duration = (datetime.now() - start_time).total_seconds()
        
        return MaintenanceResult(
            table_name=table_name,
            operation="optimize",
            success=True,
            duration_seconds=duration,
            files_before=stats_before.get("num_files", 0),
            files_after=stats_after.get("num_files", 0),
            size_before_mb=stats_before.get("size_mb", 0),
            size_after_mb=stats_after.get("size_mb", 0),
            details={
                "z_order_columns": z_order_columns,
                "files_removed": stats_before.get("num_files", 0) - stats_after.get("num_files", 0)
            }
        )
        
    except Exception as e:
        duration = (datetime.now() - start_time).total_seconds()
        return MaintenanceResult(
            table_name=table_name,
            operation="optimize",
            success=False,
            duration_seconds=duration,
            error_message=str(e)
        )


print("Optimize function defined.")

In [None]:
# Execute optimization on all tables
print("="*60)
print("DELTA TABLE OPTIMIZATION")
print("="*60)

print(f"\nTables to process: {len(ALL_TABLES)}")
print("-"*60)

for table_name, config in ALL_TABLES.items():
    print(f"\nProcessing: {table_name}")
    print(f"  Description: {config.get('description', 'N/A')}")
    
    result = optimize_table(
        table_name=table_name,
        table_path=config["path"],
        z_order_columns=config.get("z_order_columns"),
        dry_run=DRY_RUN
    )
    
    maintenance_results.append(result)
    
    if result.success:
        if "skipped" in result.operation:
            print(f"  Skipped: {result.details.get('reason', 'N/A')}")
        elif "dry_run" in result.operation:
            print(f"  Would optimize: {result.files_before} files ({result.size_before_mb:.1f} MB)")
            print(f"  Reason: {result.details.get('reason', 'N/A')}")
        else:
            files_removed = result.files_before - result.files_after
            print(f"  Files: {result.files_before} -> {result.files_after} ({files_removed} compacted)")
            print(f"  Size: {result.size_before_mb:.1f} MB -> {result.size_after_mb:.1f} MB")
        print(f"  Duration: {result.duration_seconds:.2f}s")
    else:
        print(f"  ERROR: {result.error_message}")

print("\n" + "="*60)
print("Optimization phase complete.")

---

## 3. VACUUM Operations

Remove old file versions from Delta tables to reclaim storage.

**Important**: VACUUM removes old files that are no longer referenced by the current version of the table. The retention period should be set to allow for time travel queries within your SLA.

In [None]:
def vacuum_table(
    table_name: str,
    table_path: str,
    retention_hours: int = 168,  # 7 days default
    dry_run: bool = False
) -> MaintenanceResult:
    """
    Run VACUUM on a Delta table to remove old file versions.
    
    Args:
        table_name: Name of the table for logging
        table_path: Full path to the Delta table
        retention_hours: Hours to retain old versions (min 168 = 7 days)
        dry_run: If True, only report without vacuuming
    
    Returns:
        MaintenanceResult with operation details
    """
    start_time = datetime.now()
    
    try:
        if not table_exists(table_path):
            return MaintenanceResult(
                table_name=table_name,
                operation="vacuum",
                success=True,
                details={"message": "Table does not exist, skipping"}
            )
        
        dt = DeltaTable.forPath(spark, table_path)
        stats_before = get_table_stats(table_path)
        
        if dry_run:
            # Dry run - just report what would be cleaned
            files_to_delete = dt.vacuum(retention_hours, dry_run=True)
            
            # Convert to count if it's a list or DataFrame
            if hasattr(files_to_delete, 'count'):
                file_count = files_to_delete.count()
            elif isinstance(files_to_delete, list):
                file_count = len(files_to_delete)
            else:
                file_count = 0
            
            duration = (datetime.now() - start_time).total_seconds()
            return MaintenanceResult(
                table_name=table_name,
                operation="vacuum_dry_run",
                success=True,
                duration_seconds=duration,
                size_before_mb=stats_before.get("size_mb", 0),
                details={
                    "retention_hours": retention_hours,
                    "files_to_delete": file_count
                }
            )
        
        # Execute VACUUM
        dt.vacuum(retention_hours)
        
        stats_after = get_table_stats(table_path)
        duration = (datetime.now() - start_time).total_seconds()
        
        return MaintenanceResult(
            table_name=table_name,
            operation="vacuum",
            success=True,
            duration_seconds=duration,
            size_before_mb=stats_before.get("size_mb", 0),
            size_after_mb=stats_after.get("size_mb", 0),
            details={
                "retention_hours": retention_hours,
                "space_reclaimed_mb": round(stats_before.get("size_mb", 0) - stats_after.get("size_mb", 0), 2)
            }
        )
        
    except Exception as e:
        duration = (datetime.now() - start_time).total_seconds()
        return MaintenanceResult(
            table_name=table_name,
            operation="vacuum",
            success=False,
            duration_seconds=duration,
            error_message=str(e)
        )


print("Vacuum function defined.")

In [None]:
# Execute VACUUM on all tables
print("="*60)
print("VACUUM OPERATIONS")
print("="*60)

print(f"\nRetention period: {VACUUM_RETENTION_HOURS} hours ({VACUUM_RETENTION_HOURS/24:.1f} days)")
print(f"Tables to process: {len(ALL_TABLES)}")
print("-"*60)

for table_name, config in ALL_TABLES.items():
    print(f"\nProcessing: {table_name}")
    
    result = vacuum_table(
        table_name=table_name,
        table_path=config["path"],
        retention_hours=VACUUM_RETENTION_HOURS,
        dry_run=DRY_RUN
    )
    
    maintenance_results.append(result)
    
    if result.success:
        if "dry_run" in result.operation:
            print(f"  Files to delete: {result.details.get('files_to_delete', 0)}")
        else:
            space_reclaimed = result.details.get("space_reclaimed_mb", 0)
            print(f"  Space reclaimed: {space_reclaimed:.1f} MB")
        print(f"  Duration: {result.duration_seconds:.2f}s")
    else:
        print(f"  ERROR: {result.error_message}")

print("\n" + "="*60)
print("Vacuum phase complete.")

---

## 4. Health Report

Generate a comprehensive health report for all tables and retry queues.

In [None]:
def get_retry_queue_health(table_name: str, table_path: str) -> Dict[str, Any]:
    """
    Get health metrics for a retry queue table.
    
    Returns:
        Dict with retry queue statistics
    """
    try:
        if not table_exists(table_path):
            return {"exists": False}
        
        df = spark.read.format("delta").load(table_path)
        
        # Aggregate statistics
        stats = df.agg(
            count("*").alias("total_rows"),
            spark_sum(when(col("status") == "failed", 1).otherwise(0)).alias("status_failed"),
            spark_sum(when(col("status") == "failed_permanent", 1).otherwise(0)).alias("status_permanent"),
            spark_sum(when(col("retry_count") == 0, 1).otherwise(0)).alias("retry_0"),
            spark_sum(when(col("retry_count") == 1, 1).otherwise(0)).alias("retry_1"),
            spark_sum(when(col("retry_count") == 2, 1).otherwise(0)).alias("retry_2"),
            spark_sum(when(col("retry_count") >= 3, 1).otherwise(0)).alias("retry_3_plus"),
            spark_min("created_at").alias("oldest_record"),
            spark_max("created_at").alias("newest_record")
        ).collect()[0]
        
        return {
            "exists": True,
            "total_rows": stats.total_rows,
            "status_failed": stats.status_failed or 0,
            "status_permanent": stats.status_permanent or 0,
            "retry_distribution": {
                "retry_0": stats.retry_0 or 0,
                "retry_1": stats.retry_1 or 0,
                "retry_2": stats.retry_2 or 0,
                "retry_3_plus": stats.retry_3_plus or 0
            },
            "oldest_record": str(stats.oldest_record) if stats.oldest_record else None,
            "newest_record": str(stats.newest_record) if stats.newest_record else None
        }
    except Exception as e:
        return {"exists": False, "error": str(e)}


print("Health report functions defined.")

In [None]:
# Generate comprehensive health report
print("="*60)
print("TABLE HEALTH REPORT")
print("="*60)
print(f"\nGenerated at: {datetime.now().isoformat()}")
print("-"*60)

health_report = {}

for table_name, config in ALL_TABLES.items():
    print(f"\n{table_name}")
    print(f"  Path: {config['path']}")
    
    stats = get_table_stats(config["path"])
    
    if stats.get("exists"):
        print(f"  Rows: {stats.get('row_count', 0):,}")
        print(f"  Files: {stats.get('num_files', 0):,}")
        print(f"  Size: {stats.get('size_mb', 0):.1f} MB")
        
        if stats.get('num_files', 0) > 0:
            avg_size = stats.get('size_mb', 0) / stats.get('num_files', 1)
            print(f"  Avg file size: {avg_size:.1f} MB")
        
        # Additional stats for retry tables
        if "retry" in table_name.lower():
            retry_stats = get_retry_queue_health(table_name, config["path"])
            if retry_stats.get("exists"):
                print(f"  Status - Failed: {retry_stats.get('status_failed', 0):,}, Permanent: {retry_stats.get('status_permanent', 0):,}")
                dist = retry_stats.get("retry_distribution", {})
                print(f"  Retry counts - 0: {dist.get('retry_0', 0)}, 1: {dist.get('retry_1', 0)}, 2: {dist.get('retry_2', 0)}, 3+: {dist.get('retry_3_plus', 0)}")
            stats["retry_health"] = retry_stats
    else:
        print(f"  Status: Does not exist or empty")
    
    health_report[table_name] = stats

print("\n" + "="*60)

In [None]:
# Summary of retry queues
print("\n" + "="*60)
print("RETRY QUEUE SUMMARY")
print("="*60)

retry_tables = [name for name in ALL_TABLES.keys() if "retry" in name.lower()]

for table_name in retry_tables:
    stats = health_report.get(table_name, {})
    retry_health = stats.get("retry_health", {})
    
    if retry_health.get("exists"):
        total = retry_health.get("total_rows", 0)
        failed = retry_health.get("status_failed", 0)
        permanent = retry_health.get("status_permanent", 0)
        
        print(f"\n{table_name}:")
        print(f"  Total records: {total:,}")
        print(f"  Active retries (failed): {failed:,}")
        print(f"  Permanent failures: {permanent:,}")
        
        # Calculate retry rate
        dist = retry_health.get("retry_distribution", {})
        retry_0 = dist.get("retry_0", 0)
        if total > 0:
            first_attempt_rate = (retry_0 / total) * 100
            print(f"  First-attempt failures: {first_attempt_rate:.1f}%")
        
        # Age analysis
        oldest = retry_health.get("oldest_record")
        if oldest and oldest != "None":
            print(f"  Oldest record: {oldest}")
    else:
        print(f"\n{table_name}: No data or table does not exist")

---

## 5. Maintenance Summary

In [None]:
# Generate final summary
print("\n" + "="*60)
print("MAINTENANCE RUN SUMMARY")
print("="*60)
print(f"\nCompleted at: {datetime.now().isoformat()}")
print(f"Dry run mode: {DRY_RUN}")
print("-"*60)

# Group results by operation type
operations = {}
for result in maintenance_results:
    op = result.operation.replace("_dry_run", "").replace("_skipped", "")
    if op not in operations:
        operations[op] = {"success": 0, "failed": 0, "total_duration": 0, "rows_affected": 0}
    
    if result.success:
        operations[op]["success"] += 1
    else:
        operations[op]["failed"] += 1
    
    operations[op]["total_duration"] += result.duration_seconds
    operations[op]["rows_affected"] += result.rows_affected

print("\nOperation Summary:")
for op, stats in operations.items():
    print(f"  {op}:")
    print(f"    Success: {stats['success']}, Failed: {stats['failed']}")
    print(f"    Duration: {stats['total_duration']:.1f}s")
    if stats['rows_affected'] > 0:
        print(f"    Rows affected: {stats['rows_affected']:,}")

# List any failures
failures = [r for r in maintenance_results if not r.success]
if failures:
    print("\n" + "-"*60)
    print("FAILURES:")
    for f in failures:
        print(f"  {f.table_name} ({f.operation}): {f.error_message}")
else:
    print("\nAll operations completed successfully!")

print("\n" + "="*60)

In [None]:
# Export detailed results as JSON for logging/monitoring
results_json = {
    "run_timestamp": datetime.now().isoformat(),
    "dry_run": DRY_RUN,
    "configuration": {
        "retry_retention_days": RETRY_RETENTION_DAYS,
        "event_log_retention_days": EVENT_LOG_RETENTION_DAYS,
        "vacuum_retention_hours": VACUUM_RETENTION_HOURS,
        "target_file_size_mb": TARGET_FILE_SIZE_MB
    },
    "results": [r.to_dict() for r in maintenance_results],
    "health_report": health_report,
    "summary": operations
}

print("\nDetailed results (JSON):")
print(json.dumps(results_json, indent=2, default=str))

---

## Notes for Scheduling

### Recommended Schedule

| Operation | Frequency | Recommended Time |
|-----------|-----------|------------------|
| Cleanup | Daily | 2:00 AM |
| Optimize | Daily | After cleanup |
| Vacuum | Weekly | Sunday 3:00 AM |
| Health Report | Daily | After all operations |

### Fabric Scheduling

1. Go to your Fabric workspace
2. Select this notebook
3. Click **Schedule** in the toolbar
4. Configure the schedule (daily at 2:00 AM recommended)
5. Enable notifications for failures

### Monitoring

- Check the health report section for growing retry queues
- Monitor for permanent failures in retry tables
- Alert if optimization operations fail consistently
- Track storage growth over time

### Troubleshooting

If VACUUM fails with retention errors:
- Ensure `VACUUM_RETENTION_HOURS` >= 168 (7 days)
- Check for concurrent operations on the table

If OPTIMIZE is slow:
- Consider partitioning high-volume tables
- Run during off-peak hours
- Increase Spark executor memory if available