# Batch ALTER Lakehouse Tables ‚Äì **Rollback Safe**

**Goal:** Apply Delta properties **safely** with **100% reliable rollback** (other than internal Delta properties).

 **Key Features**
 - **Safe property order** (reader/writer first)
 - **Idempotent apply** (skip if already set)
 - **Smart dry-run**
 - **Absolute rollback** ‚Äî **only user-managed props**
 - **Tiny, safe backups** (per lakehouse)
 - **No internal Delta props** (`maxColumnId`, etc.)
 - **Persistent logs** in `Files/logs/`
 - **Graceful skip** on active writes

---

### Why This Notebook?
This notebook safely modifies Delta table properties in Microsoft Fabric Lakehouses. It ensures changes are applied without risking data loss, supports previews (dry-runs), skips tables with ongoing writes to prevent conflicts, and provides full rollback capability.

- **Why dry-runs?** A dry-run simulates changes without actually applying them. This lets you preview what would happen (e.g., which properties would change) to avoid mistakes in production.
- **Why skip if something is writing to a table?** If a table is being modified (e.g., by another process inserting data), altering its properties could cause conflicts or corruption. Skipping ensures safety; you can retry later.
- **Why rollback?** Changes might have unintended effects (e.g., performance issues). Rollback reverts to the previous state using backups, providing a safety net.
- **Why wait to check if successful?** Metadata changes (like table properties) might not reflect immediately due to system delays or caching. Waiting and polling ensures the change was applied correctly before proceeding.
- **Why and where save log files?** Logs record every action for auditing, debugging, and compliance. They are saved persistently in each lakehouse under `Files/logs/` (a folder in the lakehouse storage) so they survive notebook sessions and are organized per lakehouse for easy access.


In [None]:
# We need semantic link to scan workspaces to find all lakehouses for batch ALTER
%pip install semantic-link-labs

In [None]:
# Import required libraries: sempy_labs for Fabric interactions, datetime for timestamps, etc.
import sempy_labs as labs
import sempy.fabric as fabric
import urllib.parse
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import logging
from functools import wraps
import io
from threading import Lock
from delta.tables import DeltaTable

# === SILENCE CHATTY LIBRARIES ===
# Set logging levels to WARNING to reduce unnecessary output from libraries like azure and msal.
logging.getLogger("azure").setLevel(logging.WARNING)
logging.getLogger("sempy").setLevel(logging.WARNING)
logging.getLogger("msal").setLevel(logging.WARNING)
logging.getLogger("urllib3").setLevel(logging.WARNING)

print("Libraries loaded. Noisy logs silenced.")

## 2. Configuration (Edit Here)

This section defines user-editable settings. Edit variables like `workspace`, `MODE`, and `properties` to customize behavior.

For developers new to delta lake: `properties` are Delta table settings (e.g., file size for optimization). The backup table stores snapshots before changes for rollback.

In [None]:
# Workspace name in Fabric where lakehouses are located.
workspace = "Local Ambulatory Services"
# Mode: 'dry-run' to preview, 'apply' to make changes, 'rollback' to revert.
MODE = "dry-run"
# Extra DRY_RUN variable for rollback mode only, set to True to preview rollback without applying. Set to False only to apply rollback.
DRY_RUN = True
# Timestamp from a previous run to rollback to (get from logs or backups). e.g. "20251030_031558" or None
ROLLBACK_TIMESTAMP = '20251118_144632'

# If set, process only this lakehouse; otherwise, enter None to alter all lakehouses in the workspace.
#LAKEHOUSE_FILTER = None
LAKEHOUSE_FILTER = ["LocalAmbulatoryServicesTransactionDetail"]   # ‚Üê Uncomment to target one or more

# Table in the default lakehouse to store property backups from all lakehouses.
backup_table = "table_property_backups"  # Stores backups from ALL managed workspaces/lakehouses

# List of Delta properties to set, in a safe order (reader before writer versions).
properties = [
    #('delta.minReaderVersion', '2'),
    #('delta.minWriterVersion', '5'),
    #('delta.columnMapping.mode', 'name'),
    #('delta.targetFileSize', '128m'),
    ('delta.autoOptimize.autoCompact', 'True')
]

# === INTERNAL DELTA PROPERTIES (DO NOT BACK UP OR RESTORE) ===
# Complete list of Delta Lake-managed internal properties
INTERNAL_PROPERTIES = {
    # Column Mapping (auto-managed by Delta)
    'delta.columnMapping.maxColumnId',
    'delta.columnMapping.maxColumnNameLength',
    
    # Protocol Versions (auto-upgraded, never downgrade)
    'delta.minReaderVersion',
    'delta.minWriterVersion',
    
    # Checkpoint Internals (auto-managed)
    'delta.checkpoint.writeStatsAsJson',
    'delta.checkpoint.writeStatsAsStruct',
    
    # Add more as discovered
}

# These can be set to 'true' but never back to 'false'
ONE_WAY_PROPERTIES = {
    'delta.enableChangeDataFeed',
    'delta.enableDeletionVectors',
}

# Valid protocol versions for safety checks
VALID_READER_VERSIONS = {'1', '2', '3'}
VALID_WRITER_VERSIONS = {'2', '3', '4', '5', '6', '7'}

# === PROPERTY VALIDATION RULES ===
PROPERTY_RULES = {
    'delta.minReaderVersion': {
        'valid_values': VALID_READER_VERSIONS,
        'allow_downgrade': False,
        'warning': 'Reader version upgrades are irreversible. Downgrading will break table compatibility.'
    },
    'delta.minWriterVersion': {
        'valid_values': VALID_WRITER_VERSIONS,
        'allow_downgrade': False,
        'warning': 'Writer version upgrades are irreversible. Downgrading will corrupt data integrity.'
    },
    'delta.columnMapping.mode': {
        'valid_values': {'none', 'name', 'id'},
        'allow_downgrade': False,
        'warning': 'Column mapping mode changes are irreversible. Cannot change back to "none" once set.'
    },
    'delta.targetFileSize': {
        'valid_values': None,  # Numeric validation
        'range': (8, 2048),  # Min 8MB, max 2GB
        'warning': 'Target file size should be between 8MB and 2GB for optimal performance.'
    }
}

# === TUNING ===
# Maximum tables to process in parallel for efficiency.
MAX_PARALLEL_TABLES = 3
# Number of retry attempts for failed operations.
MAX_RETRIES = 2
# Base wait time between retries (exponential backoff).
RETRY_WAIT_SECONDS = 5
# Minutes to look back for recent writes to detect activity.
ACTIVE_WRITE_LOOKBACK_MINUTES = 1

# === VALIDATION ===
# Timeout in seconds to wait for property validation.
VALIDATION_TIMEOUT_SECONDS = 90
# Interval in seconds to poll during validation.
VALIDATION_POLL_INTERVAL = 3

# === FIX DELTA DETECTION ===
spark.conf.set("spark.microsoft.delta.formatCheck.enabled", "false")
spark.conf.set("spark.sql.sources.commitProtocolClass",
               "com.microsoft.delta.commit.CommitProtocol")

## 3. Persistent Logging

Sets up logging to capture operations. Logs are saved to files in each lakehouse for persistence and printed in cell output.

Logs help track what happened during runs. Saved in default lakehouse under `Files/logs/[workspace]/[lakehouse]/[date]/""`

In [None]:
# Generate a timestamp for unique log filenames.
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# Build mode string that includes dry-run status for rollback
if MODE == "rollback":
    log_mode = f"rollback-dryrun" if DRY_RUN else "rollback"
else:
    log_mode = MODE

log_filename = f"logs/table_property_update_{log_mode}_{timestamp}.log"

# Check if running in Spark environment (mssparkutils is Fabric-specific).
if 'mssparkutils' in globals():
    log_dir = "Files/logs"
    if not mssparkutils.fs.exists(log_dir):
        mssparkutils.fs.mkdirs(log_dir)
    log_path = f"/lakehouse/default/Files/{log_filename}"
else:
    log_path = log_filename

# Configure logging: Set level, format, and handlers (file and console).
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s [%(levelname)s] %(message)s',
    handlers=[
        logging.FileHandler(log_path),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

print(f"Log: Files/{log_filename}")

# === PER-LAKEHOUSE LOGGING ===
log_buffers = {}  # {lakehouse: StringIO buffer}

# Function to save in-memory logs to lakehouse files.
def flush_logs_to_lakehouse(run_timestamp: str):  # ‚Üê Accept timestamp parameter
    """Save logs to default lakehouse under Files/logs/[workspace]/[lakehouse]/[date]/"""
    
    current_log_mode = log_mode
    base_dir = "Files/logs"
    
    try:
        mssparkutils.fs.mkdirs(base_dir)
    except Exception as e:
        print(f"ERROR creating base directory: {e}")
        return

    saved_count = 0
    
    for lh, buffer in log_buffers.items():
        try:
            content = buffer.getvalue().strip()
            
            if not content:
                continue
            
            # USE THE PASSED TIMESTAMP instead of creating new one
            date_folder = datetime.now().strftime("%Y-%m-%d")
            filename = f"{run_timestamp}_{current_log_mode}_{lh}.log"  # ‚Üê Use run_timestamp
            log_path = f"{base_dir}/{workspace}/{lh}/{date_folder}/{filename}"
            
            dir_path = f"{base_dir}/{workspace}/{lh}/{date_folder}"
            mssparkutils.fs.mkdirs(dir_path)
            
            mssparkutils.fs.put(log_path, content, True)
            print(f"LOG SAVED: {log_path}")
            saved_count += 1
            
        except Exception as e:
            print(f"ERROR saving log for '{lh}': {e}")
    
    print(f"FLUSH COMPLETED: {saved_count}/{len(log_buffers)} logs saved")
    
    for buffer in log_buffers.values():
        buffer.close()
    log_buffers.clear()

def get_lakehouse_logger(lh: str):
    """Returns a logger that writes to in-memory buffer"""
    if lh not in log_buffers:
        log_buffers[lh] = io.StringIO()
    
    logger = logging.getLogger(f"lakehouse_{lh}")
    logger.setLevel(logging.INFO)
    logger.handlers.clear()
    
    buffer_handler = logging.StreamHandler(log_buffers[lh])
    buffer_handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(message)s'))
    logger.addHandler(buffer_handler)
    
    return logger

## 4. Helpers

Utility functions for common tasks like quoting identifiers, retries, and fetching table properties.

Retries handle transient errors (e.g., network issues). OneLake paths allow accessing tables across workspaces without attaching lakehouses.

In [None]:
def normalize_property_value(key: str, value: str) -> str:
    """
    Normalize property values for consistent comparison.
    Handles boolean case sensitivity and other common variations.
    """
    if value is None:
        return None
    
    # Convert to string if not already
    value_str = str(value).strip()
    
    # Boolean properties - normalize to lowercase
    boolean_properties = {
        'delta.autoOptimize.autoCompact',
        'delta.autoOptimize.optimizeWrite',
        'delta.enableChangeDataFeed',
        'delta.enableDeletionVectors',
        'delta.checkpoint.writeStatsAsJson',
        'delta.checkpoint.writeStatsAsStruct'
    }
    
    if key in boolean_properties:
        return value_str.lower()  # 'True' -> 'true', 'False' -> 'false'
    
    # File size properties - normalize to lowercase for 'm', 'mb', 'gb'
    if 'FileSize' in key or 'fileSize' in key:
        return value_str.lower()  # '128M' -> '128m', '1GB' -> '1gb'
    
    return value_str

In [None]:
def validate_property(key: str, new_value: str, current_value: str = None, strict_mode: bool = True) -> tuple[bool, str]:
    """
    Validate if a property change is safe.
    
    Args:
        key: Property name
        new_value: New value to set
        current_value: Current value (if any)
        strict_mode: If True, block unsafe changes; if False, warn only
    
    Returns:
        (is_valid, message)
    """
    
    # Check 1: Block internal properties
    if key in INTERNAL_PROPERTIES:
        return False, f"‚ùå BLOCKED: '{key}' is an internal Delta property and cannot be manually modified."
    
    # Check 2: Validate against rules
    if key in PROPERTY_RULES:
        rule = PROPERTY_RULES[key]
        
        # Validate against allowed values (use .get() for safety)
        if rule.get('valid_values') and new_value not in rule['valid_values']:
            return False, f"‚ùå INVALID: '{key}' must be one of {rule['valid_values']}, got '{new_value}'"
        
        # Check for downgrades (use .get() for safety)
        if rule.get('allow_downgrade') == False and current_value:
            try:
                # For version numbers, compare numerically
                if 'Version' in key:
                    if int(new_value) < int(current_value):
                        warning = rule.get('warning', 'Downgrade detected')
                        msg = f"‚ö†Ô∏è DANGEROUS: Downgrading {key} from '{current_value}' to '{new_value}'\n   {warning}"
                        if strict_mode:
                            return False, f"‚ùå BLOCKED: {msg}"
                        else:
                            return True, f"‚ö†Ô∏è WARNING: {msg}"
            except ValueError:
                pass  # Not numeric, skip comparison
        
        # Range validation for numeric properties
        if 'range' in rule:
            try:
                value_int = int(new_value)
                min_val, max_val = rule['range']
                if not (min_val <= value_int <= max_val):
                    return False, f"‚ùå OUT OF RANGE: '{key}' must be between {min_val} and {max_val}, got {value_int}"
            except ValueError:
                return False, f"‚ùå INVALID: '{key}' must be a number, got '{new_value}'"
    
    # Check 3: One-way properties
    if key in ONE_WAY_PROPERTIES:
        if current_value == 'true' and new_value == 'false':
            msg = f"‚ö†Ô∏è IRREVERSIBLE: '{key}' cannot be disabled once enabled"
            if strict_mode:
                return False, f"‚ùå BLOCKED: {msg}"
            else:
                return True, f"‚ö†Ô∏è WARNING: {msg}"
    
    return True, f"‚úÖ Safe to change '{key}' to '{new_value}'"


def validate_properties_list(properties_list: list, strict_mode: bool = True) -> tuple[bool, list]:
    """
    Validate entire properties configuration before execution.
    
    Args:
        properties_list: List of (key, value) tuples
        strict_mode: If True, block on any validation failure
    
    Returns:
        (all_valid, messages)
    """
    all_valid = True
    messages = []
    
    for key, value in properties_list:
        is_valid, msg = validate_property(key, value, strict_mode=strict_mode)
        messages.append(msg)
        
        if not is_valid:
            all_valid = False
    
    return all_valid, messages


# ====================================================================
# BACKUP TABLE CREATION
# ====================================================================

def ensure_backup_table_exists():
    """Create backup table in default lakehouse if it doesn't exist"""
    try:
        spark.sql(f"DESCRIBE TABLE {backup_table}")
        print(f"Backup table '{backup_table}' exists in default lakehouse")
    except:
        print(f"Creating backup table '{backup_table}' in default lakehouse...")
        spark.sql(f"""
            CREATE TABLE {backup_table} (
                run_timestamp STRING,
                execution_time TIMESTAMP,
                execution_mode STRING,
                backup_timing STRING,
                target_workspace STRING,
                lakehouse_name STRING,
                table_name STRING,
                property_key STRING,
                property_value STRING
            )
            USING DELTA
            TBLPROPERTIES (
                'delta.autoOptimize.autoCompact' = 'true'
            )
        """)
        print("Backup table created successfully")

# Call this at the start of execution
ensure_backup_table_exists()


# ====================================================================
# STARTUP VALIDATION - RUNS AUTOMATICALLY
# ====================================================================

print("\n" + "="*80)
print("VALIDATING PROPERTIES CONFIGURATION")
print("="*80)

all_valid, validation_messages = validate_properties_list(properties, strict_mode=True)

for msg in validation_messages:
    print(msg)

if not all_valid:
    print("\n‚ùå CONFIGURATION CONTAINS UNSAFE PROPERTIES")
    print("   Please review and remove blocked properties before running.")
    print("="*80)
    raise ValueError("Unsafe properties detected in configuration. Execution halted.")
else:
    print("\n‚úÖ All configured properties passed validation")
    print("="*80)

In [None]:
# === BUILD ONELAKE PATH FUNCTION ===
def build_onelake_path(workspace_name: str, lakehouse_name: str, logger=None) -> str | None:
    """
    Build OneLake ABFSS path using admin scan.
    Uses workspace variable from config section.
    Logs to both file and console.
    """
    try:
        from sempy_labs import admin
        
        if logger:
            logger.info(f"Building OneLake path for workspace='{workspace_name}' lakehouse='{lakehouse_name}'")
        
        # Scan workspace to get IDs
        scan_result = admin.scan_workspaces(workspace=workspace_name)
        
        # Get workspace ID
        if not scan_result.get("workspaces") or len(scan_result["workspaces"]) == 0:
            msg = f"Workspace '{workspace_name}' not found"
            if logger:
                logger.error(msg)
            else:
                print(f"‚ùå {msg}")
            return None
        
        workspace_id = scan_result["workspaces"][0]["id"]
        if logger:
            logger.info(f"Found workspace ID: {workspace_id}")
        
        # Find lakehouse
        lakehouses = scan_result["workspaces"][0].get("Lakehouse", [])
        lakehouse = next((lh for lh in lakehouses if lh["name"] == lakehouse_name), None)
        
        if not lakehouse:
            available = [lh["name"] for lh in lakehouses]
            msg = f"Lakehouse '{lakehouse_name}' not found. Available: {available}"
            if logger:
                logger.error(msg)
            else:
                print(f"‚ùå {msg}")
            return None
        
        lakehouse_id = lakehouse["id"]
        if logger:
            logger.info(f"Found lakehouse ID: {lakehouse_id}")
        
        # Build path
        path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/"
        
        if logger:
            logger.info(f"‚úÖ OneLake path built successfully: {path}")
        else:
            print(f"‚úÖ Built path: {path}")
        
        return path
        
    except Exception as e:
        msg = f"Error building OneLake path: {e}"
        if logger:
            logger.error(msg)
        else:
            print(f"‚ùå {msg}")
        return None

In [None]:
# Decorator for retry logic: Wraps functions to retry on failure.
def retry_on_failure(max_attempts=MAX_RETRIES, wait_seconds=RETRY_WAIT_SECONDS):
    def decorator(func):
        @wraps(func)  # Preserves original function's metadata.
        def wrapper(*args, **kwargs):
            last_e = None
            for a in range(1, max_attempts + 1):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_e = e
                    if a < max_attempts:
                        wait = wait_seconds * (2 ** (a - 1))  # Exponential backoff.
                        logger.warning(f"Retry {a}/{max_attempts} after {wait}s: {e}")
                        time.sleep(wait)  # Pause before retry.
            raise last_e
        return wrapper
    return decorator

@retry_on_failure()
def get_table_properties_from_path(table_path: str, table_name: str, logger) -> dict:
    """Get table properties using exact physical path."""
    logger.debug(f"Fetching properties for table '{table_name}'")
    
    try:
        delta_table = DeltaTable.forPath(spark, table_path)
        detail = delta_table.detail().collect()[0]
        props = dict(detail.properties) if hasattr(detail, 'properties') and detail.properties else {}
        logger.debug(f"Retrieved {len(props)} properties for {table_name}")
        return props
    except Exception as e:
        logger.warning(f"Property fetch failed for {table_name}: {e}")
        return {}

@retry_on_failure()
def get_table_detail_from_path(table_path: str, table_name: str, logger) -> dict:
    """Get size / file-count using exact physical path."""
    logger.debug(f"Inspecting table '{table_name}'")
    
    # Quick Delta-check
    is_delta = False
    try:
        mssparkutils.fs.ls(f"{table_path}/_delta_log")
        is_delta = True
    except:
        pass
    
    if not is_delta:
        return {'size_bytes': 0, 'num_files': 0, 'format': 'unknown'}

    try:
        files = mssparkutils.fs.ls(table_path)
        parquet_files = [f for f in files if f.name.endswith('.parquet')]
        return {
            'size_bytes': sum(f.size for f in parquet_files),
            'num_files':  len(parquet_files),
            'format': 'delta'
        }
    except Exception as e:
        logger.warning(f"Detail fetch failed for {table_name}: {e}")
        return {'size_bytes': 0, 'num_files': 0, 'format': 'delta'}

def check_active_writes_from_path(table_path: str, workspace: str, lakehouse: str, table: str, lookback_minutes: int, logger) -> bool:
    """Check if table has recent write activity using exact physical path"""
    try:
        from datetime import timedelta

        cutoff = datetime.now() - timedelta(minutes=lookback_minutes)
        
        history = spark.sql(f"DESCRIBE HISTORY delta.`{table_path}` LIMIT 10").collect()
        
        for entry in history:
            if hasattr(entry, 'timestamp'):
                if entry.timestamp > cutoff and entry.operation in ['WRITE', 'MERGE', 'UPDATE', 'DELETE', 'OPTIMIZE']:
                    logger.info(f"{workspace}.{lakehouse}.{table} has recent {entry.operation} at {entry.timestamp}")
                    return True
        
        return False
    except Exception as e:
        logger.warning(f"Could not check active writes for {workspace}.{lakehouse}.{table}: {e}")
        return False

@retry_on_failure()
def apply_table_property_from_path(table_path: str, table_name: str, k: str, v: str, logger):
    """Apply a single property using exact physical path."""
    logger.info(f"Applying {k} = {v} to table '{table_name}'")
    
    try:
        spark.sql(f"ALTER TABLE delta.`{table_path}` SET TBLPROPERTIES ('{k}' = '{v}')")
        logger.info(f"Property {k} applied successfully")
    except Exception as e:
        logger.error(f"Failed to apply {k} on {table_name}: {e}")
        raise

def wait_for_property_from_path(table_path: str, table_name: str, key: str, expected: str, logger,
                                timeout=VALIDATION_TIMEOUT_SECONDS,
                                interval=VALIDATION_POLL_INTERVAL) -> bool:
    """Poll until the property matches the expected value."""
    start = time.time()
    
    while time.time() - start < timeout:
        try:
            res = spark.sql(f"SHOW TBLPROPERTIES delta.`{table_path}`('{key}')").collect()
            if res and len(res) > 0:
                actual = res[0][1] if len(res[0]) > 1 else res[0].get('value')
                if actual == expected:
                    logger.info(f"{key} validated in {time.time()-start:.1f}s")
                    return True
        except Exception as e:
            logger.debug(f"Validation retry: {e}")
        time.sleep(interval)
    
    logger.warning(f"Validation TIMEOUT for {key} on {table_name}")
    return False

## 5. Backup & Rollback (Safe!)

Functions for backing up properties before changes and rolling back if needed.

Backups are stored in the default lakehouse Delta table for central access. Rollback queries this table to restore old values. Internal properties are skipped to avoid breaking Delta.

In [None]:
def save_backup(run_ts, lh, tbl, backup_props, backup_timing, target_workspace):
    """
    Back up to DEFAULT lakehouse (not the table's lakehouse).
    backup_props should be a dict of {property_key: current_value}
    Only backs up properties that are about to change.
    """
    
    # Log what we're backing up
    if backup_props:
        logger.info(f"Backing up {len(backup_props)} properties for {lh}.{tbl}: {list(backup_props.keys())}")
    
    # Always save to the DEFAULT lakehouse where this notebook runs
    backup_fq = f"`{backup_table}`"
    
    try:
        # Create rows for properties that are changing
        data = [
            (run_ts, datetime.now(), MODE, backup_timing, target_workspace, lh, tbl, k, v)
            for k, v in backup_props.items()
        ]
        
        if data:
            df = spark.createDataFrame(data, [
                "run_timestamp",
                "execution_time",
                "execution_mode",
                "backup_timing",
                "target_workspace",
                "lakehouse_name",
                "table_name",
                "property_key",
                "property_value"
            ])
            
            df.write.mode('append').saveAsTable(backup_fq)
            logger.info(f"Backed up {len(data)} props from {target_workspace}.{lh}.{tbl} to default lakehouse")
        else:
            logger.info(f"No properties to backup for {target_workspace}.{lh}.{tbl}")
            
    except Exception as e:
        logger.error(f"Backup failed for {target_workspace}.{lh}.{tbl}: {e}")

# Function to perform rollback.
def rollback_to_timestamp(ts: str, target_workspace: str = None, 
                          lakehouse_filter = None, dry_run: bool = False):
    """Rollback from backups stored in DEFAULT lakehouse"""
    print(f"{'[DRY RUN] ' if dry_run else ''}Searching default lakehouse for backup: {ts}")
    
    # Pretty print for list vs string
    if lakehouse_filter:
        if isinstance(lakehouse_filter, list):
            print(f"Filtering to lakehouse(s): {', '.join(lakehouse_filter)}")
        else:
            print(f"Filtering to lakehouse: {lakehouse_filter}")
    
    try:
        # Query the default lakehouse backup table.
        query = f"SELECT * FROM {backup_table} WHERE run_timestamp = '{ts}'"
        
        if target_workspace:
            query += f" AND target_workspace = '{target_workspace}'"
        
        # Handle both list and string properly
        if lakehouse_filter:
            if isinstance(lakehouse_filter, list):
                # Handle list: use IN clause
                lakehouse_list = "', '".join(lakehouse_filter)
                query += f" AND lakehouse_name IN ('{lakehouse_list}')"
            else:
                # Handle string: use equality
                query += f" AND lakehouse_name = '{lakehouse_filter}'"
        
        data = spark.sql(query).collect()  # Execute query and collect results.
        
        if data:
            unique_lakehouses = set(r.lakehouse_name for r in data)  # Get unique lakehouses.
            print(f"Found {len(data)} property backups across {len(unique_lakehouses)} lakehouse(s): {', '.join(unique_lakehouses)}")
            
            if dry_run:
                print("\nüîç DRY RUN - Would restore:")
                restore_from_data(data, dry_run=True)
            else:
                # Ask for confirmation if many properties
                if len(data) > 10 and not lakehouse_filter:
                    print(f"‚ö†Ô∏è  WARNING: This will restore {len(data)} properties across {len(unique_lakehouses)} lakehouses")
                    response = input("Continue? (yes/no): ")  # User input for confirmation.
                    if response.lower() != 'yes':
                        print("Rollback cancelled")
                        return
                
                restore_from_data(data, dry_run=False)
        else:
            if lakehouse_filter:
                filter_display = ', '.join(lakehouse_filter) if isinstance(lakehouse_filter, list) else lakehouse_filter
                print(f"No backup found for lakehouse '{filter_display}' at timestamp {ts}")
            else:
                print("No backup found.")
            list_recent_backups(target_workspace, lakehouse_filter)
    except Exception as e:
        print(f"Error reading backup: {e}")
        list_recent_backups(target_workspace, lakehouse_filter)
        
def restore_from_data(data, dry_run: bool = False):
    """Restore properties from backup data with comprehensive duplicate detection"""
    
    # ========================================
    # DIAGNOSTIC LEVEL 1: Check raw backup data
    # ========================================
    property_counts = {}
    for r in data:
        key = (r.target_workspace, r.lakehouse_name, r.table_name, r.property_key)
        property_counts[key] = property_counts.get(key, 0) + 1
    
    duplicates = {k: v for k, v in property_counts.items() if v > 1}
    if duplicates:
        print("‚ö†Ô∏è WARNING: Duplicate properties found in backup data:")
        for (ws, lh, tbl, prop), count in duplicates.items():
            print(f"  {ws}.{lh}.{tbl}.{prop}: {count} times")
    
    # ========================================
    # Group properties by table
    # ========================================
    tables = {}
    for r in data:
        if r.property_key in INTERNAL_PROPERTIES:
            continue
        key = (r.target_workspace, r.lakehouse_name, r.table_name)
        tables.setdefault(key, []).append((r.property_key, r.property_value))
    
    # ========================================
    # DIAGNOSTIC LEVEL 2: Check grouped tables
    # ========================================
    for (ws, lh, tbl), props in tables.items():
        prop_keys = [k for k, v in props]
        dup_props = [k for k in prop_keys if prop_keys.count(k) > 1]
        if dup_props:
            print(f"‚ö†Ô∏è WARNING: {ws}.{lh}.{tbl} has duplicate properties: {set(dup_props)}")
    
    # ========================================
    # Process each table
    # ========================================
    for (ws, lh, tbl), props in tables.items():
        logger = get_lakehouse_logger(lh)
        
        # ========================================
        # DIAGNOSTIC LEVEL 3: Final pre-restore check
        # ========================================
        prop_keys = [k for k, v in props]
        if len(prop_keys) != len(set(prop_keys)):
            dup_keys = [k for k in set(prop_keys) if prop_keys.count(k) > 1]
            warning_msg = f"‚ö†Ô∏è DUPLICATE PROPERTIES DETECTED for {tbl}: {dup_keys}"
            logger.warning(warning_msg)
            print(warning_msg)
            
            # Show all occurrences with their values
            for dup_key in dup_keys:
                occurrences = [(k, v) for k, v in props if k == dup_key]
                logger.warning(f"  {dup_key} appears {len(occurrences)} times:")
                print(f"  {dup_key} appears {len(occurrences)} times:")
                for i, (k, v) in enumerate(occurrences, 1):
                    logger.warning(f"    Occurrence {i}: value = '{v}'")
                    print(f"    Occurrence {i}: value = '{v}'")
        
        # ========================================
        # Start restore operation
        # ========================================
        logger.info(f"{'[DRY RUN] ' if dry_run else ''}Restoring {ws}.{lh}.{tbl}")
        print(f"{'[DRY RUN] ' if dry_run else ''}Restoring {ws}.{lh}.{tbl}")
        
        # ========================================
        # Build OneLake path
        # ========================================
        try:
            onelake_path = build_onelake_path(ws, lh, logger)
            
            if onelake_path:
                physical_path = get_table_physical_path(ws, lh, tbl, onelake_path, logger)
                
                if physical_path:
                    current_props = get_table_properties_from_path(physical_path, tbl, logger)
                    table_path = physical_path
                else:
                    logger.warning(f"Could not resolve physical path for {tbl}")
                    print(f"   ‚ö†Ô∏è Warning: Could not resolve physical path for {tbl}")
                    current_props = {}
                    table_path = f"{onelake_path}{tbl}"
            else:
                logger.warning(f"Could not build OneLake path for {ws}.{lh}")
                print(f"   ‚ö†Ô∏è Warning: Could not build OneLake path for {ws}.{lh}")
                current_props = {}
                
                # LAST RESORT: Try to get GUIDs via scan as fallback
                try:
                    logger.info("Attempting to scan workspace for GUIDs...")
                    print(f"   üîÑ Attempting to scan workspace for GUIDs...")
                    from sempy_labs import admin
                    scan_result = admin.scan_workspaces(workspace=ws)
                    
                    if scan_result.get("workspaces") and len(scan_result["workspaces"]) > 0:
                        workspace_id = scan_result["workspaces"][0]["id"]
                        lakehouses = scan_result["workspaces"][0].get("Lakehouse", [])
                        lakehouse_obj = next((l for l in lakehouses if l["name"] == lh), None)
                        
                        if lakehouse_obj:
                            lakehouse_id = lakehouse_obj["id"]
                            table_path = f"abfss://{workspace_id}@onelake.dfs.fabric.microsoft.com/{lakehouse_id}/Tables/{tbl}"
                            logger.info("Built GUID-based path via scan")
                            print(f"   ‚úÖ Built GUID-based path via scan")
                        else:
                            raise Exception(f"Lakehouse {lh} not found in workspace scan")
                    else:
                        raise Exception(f"Workspace {ws} not found in scan")
                        
                except Exception as scan_error:
                    logger.error(f"Could not get GUIDs: {scan_error}")
                    logger.error(f"CRITICAL: Cannot restore {tbl} - no valid path available")
                    print(f"   ‚ùå Could not get GUIDs: {scan_error}")
                    print(f"   ‚ö†Ô∏è CRITICAL: Cannot restore {tbl} - skipping")
                    continue
                
        except Exception as e:
            logger.error(f"Error during path resolution: {e}")
            logger.error(f"CRITICAL: Cannot restore {tbl} - skipping")
            print(f"   ‚ùå Error during path resolution: {e}")
            print(f"   ‚ö†Ô∏è CRITICAL: Cannot restore {tbl} - skipping")
            continue
        
        # ========================================
        # Execute restore (dry-run or actual)
        # ========================================
        if dry_run:
            for k, v in props:
                current_val = current_props.get(k, 'NOT SET')
                if v is not None:
                    msg = f"Would SET {k}: '{current_val}' ‚Üí '{v}'"
                    logger.info(msg)
                    print(f"   {msg}")
                else:
                    msg = f"Would UNSET {k} (current: '{current_val}')"
                    logger.info(msg)
                    print(f"   {msg}")
            continue
        
        # Apply the restore
        try:
            for k, v in props:
                current_val = current_props.get(k, 'NOT SET')
                if v is not None:
                    spark.sql(f"ALTER TABLE delta.`{table_path}` SET TBLPROPERTIES ('{k}' = '{v}')")
                    msg = f"SET {k}: '{current_val}' ‚Üí '{v}'"
                    logger.info(msg)
                    print(f"   ‚úÖ {msg}")
                else:
                    spark.sql(f"ALTER TABLE delta.`{table_path}` UNSET TBLPROPERTIES IF EXISTS ('{k}')")
                    msg = f"UNSET {k} (was: '{current_val}')"
                    logger.info(msg)
                    print(f"   ‚úÖ {msg}")
            
            success_msg = "SUCCESS - All properties restored"
            logger.info(success_msg)
            print(f"   ‚úÖ {success_msg}")
            
        except Exception as e:
            error_msg = f"FAILED to restore properties: {e}"
            logger.error(error_msg)
            print(f"   ‚ùå {error_msg}")

def list_recent_backups(target_workspace: str = None, lakehouse_filter = None):
    """List recent backups from DEFAULT lakehouse"""
    print("\nRecent backup runs:")
    try:
        where_clauses = ["1=1"]
        
        if target_workspace:
            where_clauses.append(f"target_workspace = '{target_workspace}'")
        
        if lakehouse_filter:
            if isinstance(lakehouse_filter, list):
                lakehouse_list = "', '".join(lakehouse_filter)
                where_clauses.append(f"lakehouse_name IN ('{lakehouse_list}')")
            else:
                where_clauses.append(f"lakehouse_name = '{lakehouse_filter}'")
        
        where_clause = " AND ".join(where_clauses)
        
        # Summary query
        summary_query = f"""
            SELECT 
                run_timestamp, 
                MAX(execution_time) as execution_time, 
                target_workspace,
                COUNT(DISTINCT lakehouse_name) as lakehouse_count,
                COUNT(DISTINCT CONCAT(lakehouse_name, '.', table_name)) as table_count,
                COUNT(*) as property_count,
                MAX(backup_timing) as mode
            FROM {backup_table}
            WHERE {where_clause}
            GROUP BY run_timestamp, target_workspace
            ORDER BY execution_time DESC
            LIMIT 10
        """
        recent = spark.sql(summary_query).collect()
        
        if recent:
            print(f"\n{'Timestamp':<20} {'Date':<12} {'Workspace':<25} {'Lakehouses':<12} {'Tables':<8} {'Properties':<12} {'Mode':<15}")
            print("-" * 120)
            
            for r in recent:
                exec_date = r.execution_time.strftime("%Y-%m-%d")
                
                print(f"{r.run_timestamp:<20} {exec_date:<12} {r.target_workspace:<25} {r.lakehouse_count:<12} {r.table_count:<8} {r.property_count:<12} {r.mode:<15}")
            
            print("\nüí° To rollback, copy a timestamp and set:")
            print("   MODE = 'rollback'")
            print("   ROLLBACK_TIMESTAMP = '<timestamp_from_above>'")
            print("   DRY_RUN = True  # for preview, False to apply")
            
        else:
            print("  No backups found")
            
            if lakehouse_filter:
                filter_display = ', '.join(lakehouse_filter) if isinstance(lakehouse_filter, list) else lakehouse_filter
                print(f"\nüí° Hint: No backups found for lakehouse '{filter_display}'")
            elif target_workspace:
                print(f"\nüí° Hint: No backups found for workspace '{target_workspace}'")
            else:
                print(f"\nüí° Hint: No backups found")
            
            print(f"   - Make sure you've run in MODE='apply' to create backups")
            print(f"   - Backups are only created when properties are actually changed")
            
    except Exception as e:
        print(f"  Error listing backups: {e}")

## 6. Process One Table

Logic for handling a single table: Check, apply, validate.

Skips non-Delta tables or active ones. In apply mode, backs up first, applies changes idempotently (skips if already set), then validates by waiting/polling.

In [None]:
def get_table_physical_path(workspace: str, lakehouse: str, table: str, onelake_path: str, logger) -> str | None:
    """
    Get the exact physical path of a table using OneLake path.
    This handles spaces, mixed case, and special characters correctly.
    """
    try:
        logger.debug(f"Looking for table: '{table}' in lakehouse: '{lakehouse}'")
        logger.debug(f"OneLake base path: {onelake_path}")
        
        # Try multiple path variations
        path_attempts = [
            table,                    # Original name as-is
            table.lower(),            # Lowercase
            table.replace(' ', '%20') # URL encoded spaces
        ]
        
        for attempt in path_attempts:
            table_path = f"{onelake_path}{attempt}"
            logger.debug(f"Trying path: {table_path}")
            
            # Check if _delta_log exists (confirms it's a Delta table)
            try:
                delta_log_path = f"{table_path}/_delta_log"
                mssparkutils.fs.ls(delta_log_path)
                logger.info(f"‚úÖ Found physical path for {lakehouse}.{table}: {table_path}")
                return table_path
            except Exception as e:
                logger.debug(f"Path attempt failed: {attempt} - {e}")
                continue
        
        # If direct attempts fail, list all tables and find case-insensitive match
        logger.debug(f"Direct path attempts failed, listing tables in: {onelake_path}")
        try:
            all_items = mssparkutils.fs.ls(onelake_path)
            logger.debug(f"Found {len(all_items)} items in Tables directory")
            
            for item in all_items:
                item_name = item.name.rstrip('/')
                logger.debug(f"Checking item: '{item_name}' against '{table}'")
                
                # Case-insensitive comparison
                if item_name.lower() == table.lower():
                    table_path = f"{onelake_path}{item_name}"
                    
                    # Verify it's a Delta table
                    try:
                        mssparkutils.fs.ls(f"{table_path}/_delta_log")
                        logger.info(f"‚úÖ Found table with matching name (case variation): '{table}' -> '{item_name}'")
                        logger.info(f"Physical path: {table_path}")
                        return table_path
                    except Exception as delta_check_error:
                        logger.debug(f"Item '{item_name}' is not a Delta table: {delta_check_error}")
                        continue
            
            logger.error(f"Could not find table '{table}' in {len(all_items)} items listed")
            
        except Exception as list_error:
            logger.error(f"Could not list tables in {lakehouse}: {list_error}")
        
        logger.error(f"Could not find physical path for {lakehouse}.{table}")
        return None
        
    except Exception as e:
        logger.error(f"Unexpected error getting physical path for {lakehouse}.{table}: {e}")
        return None

In [None]:
# Function to process a single table.
def process_table(info: dict, run_ts: str, mode: str) -> dict:
    lh, tbl, logger, ws = info['lakehouse'], info['table'], info['logger'], info['workspace']
    physical_path = info['physical_path']
    result = {'table': f"{lh}.{tbl}", 'status': 'unknown', 'details': []}

    try:
        logger.info(f"Processing {lh}.{tbl}")
        result['details'].append(f"Table: {lh}.{tbl}")
        result['details'].append(f"Path: {physical_path}")

        detail = get_table_detail_from_path(physical_path, tbl, logger)
        
        size_gb = detail['size_bytes'] / (1024**3)
        result['details'].append(f"Size: {size_gb:.2f}GB | Files: {detail['num_files']} | Format: {detail['format']}")

        if detail['format'].lower() != 'delta':
            result['status'] = 'skipped'
            result['details'].append("SKIPPED - Not Delta")
            return result

        if check_active_writes_from_path(physical_path, ws, lh, tbl, ACTIVE_WRITE_LOOKBACK_MINUTES, logger):
            result['status'] = 'skipped'
            result['details'].append(f"SKIPPED - Active writes in last {ACTIVE_WRITE_LOOKBACK_MINUTES} min")
            return result

        props = get_table_properties_from_path(physical_path, tbl, logger)

        # ----- DRY-RUN -----
        if mode == "dry-run":
            changes = []
            blocked = []
            
            for k, v in properties:
                # Normalize for comparison
                current_val = props.get(k, 'NOT SET')
                current_normalized = normalize_property_value(k, current_val) if current_val != 'NOT SET' else 'NOT SET'
                v_normalized = normalize_property_value(k, v)
                
                if current_normalized != v_normalized:
                    # VALIDATE BEFORE SHOWING AS CHANGE
                    is_valid, msg = validate_property(k, v_normalized, current_normalized, strict_mode=True)
                    
                    if is_valid:
                        changes.append(f"   {k}: '{current_val}' to '{v_normalized}'")
                    else:
                        blocked.append(f"   {k}: '{current_val}' to '{v_normalized}' - {msg}")
            
            if blocked:
                result['details'].extend([f"DRY RUN - ‚ö†Ô∏è {len(blocked)} change(s) BLOCKED:"] + blocked)
            
            if changes:
                result['details'].extend([f"DRY RUN - Would apply {len(changes)} change(s):"] + changes)
            
            if not changes and not blocked:
                result['details'].append("DRY RUN - No changes needed")
            
            result['status'] = 'dry-run'

        # ----- APPLY -----
        elif mode == "apply":
            applied = 0
            skipped = 0
            blocked = 0
            backup_props = {}  # Collect properties that will actually change
            
            for k, v in properties:
                # Skip internal properties entirely
                if k in INTERNAL_PROPERTIES:
                    logger.info(f"Skipping internal property: {k}")
                    continue
                
                        # Normalize both current and target values for comparison
                    cur = props.get(k, 'NOT SET')
                    cur_normalized = normalize_property_value(k, cur) if cur != 'NOT SET' else 'NOT SET'
                    v_normalized = normalize_property_value(k, v)
                    
                    if cur_normalized == v_normalized:
                        result['details'].append(f"   {k}: '{cur}' (no change)")
                        skipped += 1
                    else:
                        # VALIDATE BEFORE APPLYING
                        is_valid, msg = validate_property(k, v_normalized, cur_normalized, strict_mode=True)
                        
                        if not is_valid:
                            result['details'].append(f"   {k}: BLOCKED - {msg}")
                            logger.warning(f"Blocked unsafe property change for {lh}.{tbl}: {msg}")
                            blocked += 1
                            continue
                        
                        # Add to backup ONLY if we're going to change it
                        # Store the current value (or None if not set) for rollback
                        backup_props[k] = None if cur == 'NOT SET' else cur
                        
                        result['details'].append(f"   {k}: '{cur}' to '{v_normalized}'")
                        apply_table_property_from_path(physical_path, tbl, k, v_normalized, logger)
                        applied += 1
            
            # Save backup ONLY if we actually changed something
            if backup_props:
                save_backup(run_ts, lh, tbl, backup_props, 'before_changes', ws)
            else:
                logger.info(f"No properties changed for {lh}.{tbl} - skipping backup")
            
            result['details'].append(f"Applied: {applied} | Skipped: {skipped} | Blocked: {blocked}")

            # Validation for applied changes only
            all_ok = True
            for k, v in properties:
                # Only validate properties that were actually applied
                if k in backup_props:  # Only validate what we changed
                    if not wait_for_property_from_path(physical_path, tbl, k, v, logger):
                        result['details'].append(f"   FAILED validation: {k}")
                        all_ok = False
                    else:
                        result['details'].append(f"   Verified: {k}")
            
            result['status'] = 'success' if all_ok else 'validation_failed'

    except Exception as e:
        result['status'] = 'failed'
        result['details'].append(f"Error: {e}")
        logger.error(f"{lh}.{tbl} - {e}")

    return result

## 7. Main Execution

Main logic: Handles rollback or processes lakehouses/tables in parallel.

Uses threading for parallel processing to speed up. Locks ensure safe printing.

Lists lakehouses/tables, processes them, summarizes results, flushes logs.

In [None]:
if MODE == "rollback":
    if not ROLLBACK_TIMESTAMP:
        raise ValueError("Set ROLLBACK_TIMESTAMP for rollback mode")
    
    # Use current timestamp for this rollback operation
    rollback_run_timestamp = timestamp  # Created in Section 3
    
    print(f"{'[DRY RUN] ' if DRY_RUN else ''}Starting rollback operation")
    print(f"Rollback run timestamp: {rollback_run_timestamp}")
    print(f"Restoring from backup: {ROLLBACK_TIMESTAMP}")
    print()

    rollback_to_timestamp(ROLLBACK_TIMESTAMP, workspace, LAKEHOUSE_FILTER, dry_run=DRY_RUN)
    
    print("\n" + "="*80)
    flush_logs_to_lakehouse(rollback_run_timestamp)
    print(f"üìã Rollback logs saved: Files/logs/{workspace}/[lakehouse]/[date]/")
    print(f"   Rollback run: {rollback_run_timestamp}")
    print(f"   Restored from backup: {ROLLBACK_TIMESTAMP}")
    print("="*80)
    
else:
    log_buffers.clear()
    
    # Ensure backup table exists
    ensure_backup_table_exists()
    
    lakehouses = labs.list_lakehouses(workspace=workspace)
    
    if LAKEHOUSE_FILTER:
        if isinstance(LAKEHOUSE_FILTER, list):
            lakehouses = lakehouses[lakehouses['Lakehouse Name'].isin(LAKEHOUSE_FILTER)]
            if lakehouses.empty:
                raise ValueError(f"None of the specified lakehouses found: {LAKEHOUSE_FILTER}")
            print(f"Targeting {len(lakehouses)} lakehouse(es): {', '.join(LAKEHOUSE_FILTER)}")
        else:
            lakehouses = lakehouses[lakehouses['Lakehouse Name'] == LAKEHOUSE_FILTER]
            if lakehouses.empty:
                raise ValueError(f"Lakehouse '{LAKEHOUSE_FILTER}' not found")
            print(f"Targeting lakehouse: {LAKEHOUSE_FILTER}")
    else:
        print(f"Targeting all {len(lakehouses)} lakehouses")

    # === INITIALIZE COLLECTIONS ===
    all_tables = []
    lakehouse_summary = {} 

    # === BUILD all_tables LIST ===
    for _, row in lakehouses.iterrows():
        lh = row['Lakehouse Name']
        
        lakehouse_logger = get_lakehouse_logger(lh)
        onelake_path = build_onelake_path(workspace, lh, lakehouse_logger)
        
        if not onelake_path:
            print(f"‚ö†Ô∏è  Skipping lakehouse '{lh}' - could not build OneLake path")
            continue
        
        # Initialize summary for this lakehouse
        lakehouse_summary[lh] = {
            'tables': 0, 
            'success': 0, 
            'skipped': 0, 
            'failed': 0, 
            'validation_failed': 0,
            'dry-run': 0
        }

        tables = labs.lakehouse.get_lakehouse_tables(lakehouse=lh, workspace=workspace, exclude_shortcuts=True)
        
        for _, t in tables.iterrows():
            table_name = t['Table Name']
            
            physical_path = get_table_physical_path(workspace, lh, table_name, onelake_path, lakehouse_logger)
            
            if not physical_path:
                lakehouse_logger.warning(f"Skipping {table_name} - could not resolve physical path")
                continue
            
            all_tables.append({
                'lakehouse': lh, 
                'table': table_name,
                'physical_path': physical_path,
                'logger': lakehouse_logger,
                'workspace': workspace,
                'onelake_path': onelake_path
            })
            lakehouse_summary[lh]['tables'] += 1

    print(f"Found {len(all_tables)} tables across {len(lakehouse_summary)} lakehouse(s)")
    print(f"Processing in parallel with {MAX_PARALLEL_TABLES} workers...")
    print()

    # === PROCESS TABLES WITH THREADING ===
    from threading import Lock
    
    print_lock = Lock()
    results = {'success':0, 'skipped':0, 'failed':0, 'validation_failed':0, 'dry-run':0}
    all_results = []
    start = time.time()
    counter = {'count': 0}

    with ThreadPoolExecutor(max_workers=MAX_PARALLEL_TABLES) as exec:
        futures = {exec.submit(process_table, t, timestamp, MODE): t for t in all_tables}
        
        for f in as_completed(futures):
            info = futures[f]
            res = f.result()
            lh = info['lakehouse']
            tbl = info['table']
            lakehouse_logger = info['logger']
            
            with print_lock:
                counter['count'] += 1
                i = counter['count']
                
                all_results.append({
                    'sort_key': f"{lh}|||{tbl}",
                    'lakehouse': lh,
                    'table': tbl,
                    'result': res,
                    'index': i,
                    'total': len(all_tables)
                })
                
                eta = ((time.time() - start) / i) * (len(all_tables) - i) / 60
                
                # Log to both console AND lakehouse log file
                progress_msg = f"[{i}/{len(all_tables)}] Completed: {lh}.{tbl} | ETA: {eta:.1f}m"
                print(progress_msg)
                lakehouse_logger.info(progress_msg)
                
                # Log the detailed results to the lakehouse log
                lakehouse_logger.info(f"--- Result for {tbl} ---")
                for detail in res['details']:
                    lakehouse_logger.info(f"  {detail}")
                lakehouse_logger.info("")
                
                results[res['status']] += 1
                lakehouse_summary[lh][res['status']] += 1

    # === SORT AND DISPLAY RESULTS ===
    print()
    print("="*80)
    print("RESULTS (sorted by lakehouse and table name)")
    print("="*80)
    print()

    all_results.sort(key=lambda x: x['sort_key'])

    current_lakehouse = None

    for item in all_results:
        lh = item['lakehouse']
        tbl = item['table']
        res = item['result']
        
        if lh != current_lakehouse:
            if current_lakehouse is not None:
                print()
            
            header = f"üì¶ LAKEHOUSE: {lh}"
            separator = "-" * 80
            
            print(header)
            print(separator)
            
            current_lakehouse = lh
        
        # Print to console
        print(f"  [{item['index']}/{item['total']}] {tbl}")
        for detail in res['details']:
            print(f"    {detail}")
        print()

    # === PER-LAKEHOUSE SUMMARY ===
    print("="*80)
    print("SUMMARY BY LAKEHOUSE")
    print("="*80)

    for lh, stats in lakehouse_summary.items():
        summary_header = f"üì¶ {lh}"
        print(summary_header)
        
        lh_logger = get_lakehouse_logger(lh)
        lh_logger.info("="*80)
        lh_logger.info("SUMMARY")
        lh_logger.info("="*80)
        
        for k in ['tables', 'success', 'skipped', 'failed', 'validation_failed', 'dry-run']:
            if k in stats and stats[k] > 0:
                stat_line = f"   {k}: {stats[k]}"
                print(stat_line)
                lh_logger.info(stat_line)
        print()
        lh_logger.info("")
    # ‚Üê for loop ends here

    # ‚úÖ These lines must be at THIS indentation (4 spaces - inside else block)
    print(f"‚è±Ô∏è TOTAL TIME: {time.time()-start:.1f}s")

    flush_logs_to_lakehouse(timestamp)
    print(f"üìÅ Logs saved: Files/logs/{workspace}/[lakehouse]/[date]/")

    if MODE == "apply":
        print(f"\nüìÑ To rollback these changes:")
        print(f"   MODE = 'rollback'")
        print(f"   ROLLBACK_TIMESTAMP = '{timestamp}'")