# Unity Catalog Volume - Excel File Cleanup

This notebook deletes Excel files (`.xlsx`, `.xls`) older than a configurable retention period from Unity Catalog Volume paths.

**Scheduled**: Daily via Databricks Workflow  
**Configuration**: All parameters are exposed as Databricks widgets for job-level overrides.

### Enterprise Features
- **Structured logging** with Python `logging` (timestamped, leveled)
- **Input validation** with early-fail on misconfiguration
- **Retry logic** with configurable attempts for transient deletion failures
- **Idempotency guards** - verifies file existence before and after operations
- **Per-volume statistics** for granular observability
- **Execution timing** for performance monitoring
- **Hard failure on errors** - raises exception to fail the Databricks job/workflow

## 1. Configuration - Widget Parameters

In [0]:
# ---------------------------------------------------------------
# Widget definitions - override these at job/task level as needed
# ---------------------------------------------------------------

dbutils.widgets.text(
    "volume_paths",
    "/Volumes/accenture/prasun12377/claims_checkpoint_dir,/Volumes/accenture/prasun12377/bcbsm_volume",
    "Comma-separated volume paths to scan",
)

dbutils.widgets.text(
    "retention_days",
    "0",
    "Delete files older than this many days",
)

dbutils.widgets.text(
    "file_extensions",
    ".xlsx,.xls",
    "Comma-separated file extensions to target",
)

dbutils.widgets.dropdown(
    "dry_run",
    "true",
    ["true", "false"],
    "Dry run mode (true = list only, false = delete)",
)

dbutils.widgets.text(
    "max_retries",
    "3",
    "Max retry attempts for transient file-deletion failures",
)

## 2. Logging Setup

In [0]:
import logging
import os
import sys
import time
from datetime import datetime, timezone

# ---------------------------------------------------------------------------
# Configure structured logger (avoids duplicate handlers on re-runs)
# ---------------------------------------------------------------------------
LOG_FORMAT = "%(asctime)s | %(levelname)-8s | %(message)s"
DATE_FORMAT = "%Y-%m-%d %H:%M:%S"

logger = logging.getLogger("excel_cleanup")
logger.setLevel(logging.INFO)
logger.handlers.clear()

_handler = logging.StreamHandler(sys.stdout)
_handler.setFormatter(logging.Formatter(LOG_FORMAT, datefmt=DATE_FORMAT))
logger.addHandler(_handler)

logger.info("Logger initialised")

2026-02-24 05:43:55 | INFO     | Logger initialised


## 3. Parse & Validate Widget Values

In [0]:
# ---------------------------------------------------------------------------
# Parse widgets into typed config
# ---------------------------------------------------------------------------
VOLUME_PATHS = [
    p.strip() for p in dbutils.widgets.get("volume_paths").split(",") if p.strip()
]
RETENTION_DAYS = int(dbutils.widgets.get("retention_days"))
FILE_EXTENSIONS = tuple(
    ext.strip().lower()
    for ext in dbutils.widgets.get("file_extensions").split(",")
    if ext.strip()
)
DRY_RUN = dbutils.widgets.get("dry_run").lower() == "true"
MAX_RETRIES = int(dbutils.widgets.get("max_retries"))

# Pre-compute the cutoff timestamp (seconds since epoch)
#CUTOFF_EPOCH = time.time() - (RETENTION_DAYS * 86400)
CUTOFF_EPOCH = time.time() - (5 * 60)
CUTOFF_DATE = datetime.fromtimestamp(CUTOFF_EPOCH, tz=timezone.utc)

# ---------------------------------------------------------------------------
# Input validation - fail fast on misconfiguration
# ---------------------------------------------------------------------------
_errors = []
if not VOLUME_PATHS:
    _errors.append("volume_paths is empty - at least one path is required")
for _p in VOLUME_PATHS:
    if not _p.startswith("/Volumes/"):
        _errors.append(f"Invalid volume path (must start with /Volumes/): {_p}")
if RETENTION_DAYS < 1:
    _errors.append(f"retention_days must be >= 1, got {RETENTION_DAYS}")
if not FILE_EXTENSIONS:
    _errors.append("file_extensions is empty - at least one extension is required")
for _ext in FILE_EXTENSIONS:
    if not _ext.startswith("."):
        _errors.append(f"Extension must start with a dot: '{_ext}'")
if MAX_RETRIES < 0:
    _errors.append(f"max_retries must be >= 0, got {MAX_RETRIES}")

if _errors:
    for _e in _errors:
        logger.error(f"VALIDATION FAILURE: {_e}")
    raise ValueError(
        f"Configuration validation failed with {len(_errors)} error(s). "
        "Check logs above for details."
    )

# ---------------------------------------------------------------------------
# Log validated configuration
# ---------------------------------------------------------------------------
logger.info("=" * 60)
logger.info("CONFIGURATION (validated)")
logger.info("=" * 60)
logger.info(f"Volume paths     : {VOLUME_PATHS}")
logger.info(f"Retention days   : {RETENTION_DAYS}")
logger.info(f"File extensions  : {FILE_EXTENSIONS}")
logger.info(f"Dry run          : {DRY_RUN}")
logger.info(f"Max retries      : {MAX_RETRIES}")
logger.info(f"Cutoff date (UTC): {CUTOFF_DATE:%Y-%m-%d %H:%M:%S}")
logger.info("=" * 60)

2026-02-24 05:43:56 | INFO     | CONFIGURATION (validated)
2026-02-24 05:43:56 | INFO     | Volume paths     : ['/Volumes/accenture/prasun12377/claims_checkpoint_dir', '/Volumes/accenture/prasun12377/bcbsm_volume']
2026-02-24 05:43:56 | INFO     | Retention days   : 1
2026-02-24 05:43:56 | INFO     | File extensions  : ('.xlsx', '.xls')
2026-02-24 05:43:56 | INFO     | Dry run          : False
2026-02-24 05:43:56 | INFO     | Max retries      : 3
2026-02-24 05:43:56 | INFO     | Cutoff date (UTC): 2026-02-24 05:38:56


## 4. Helper Functions

In [0]:
def format_bytes(size_bytes: int) -> str:
    """Return a human-readable file-size string."""
    for unit in ("B", "KB", "MB", "GB", "TB"):
        if abs(size_bytes) < 1024:
            return f"{size_bytes:.1f} {unit}"
        size_bytes /= 1024
    return f"{size_bytes:.1f} PB"


def format_duration(seconds: float) -> str:
    """Return a human-readable duration string."""
    if seconds < 60:
        return f"{seconds:.1f}s"
    minutes, secs = divmod(seconds, 60)
    return f"{int(minutes)}m {secs:.1f}s"


def scan_files(base_path: str, extensions: tuple[str, ...]) -> list[str]:
    """Recursively find all files matching the target extensions under *base_path*."""
    matched = []
    try:
        for root, _dirs, files in os.walk(base_path):
            for fname in files:
                if fname.lower().endswith(extensions):
                    matched.append(os.path.join(root, fname))
    except PermissionError as exc:
        logger.warning(f"Permission denied while scanning {base_path}: {exc}")
    except OSError as exc:
        logger.error(f"OS error scanning {base_path}: {exc}")
    return matched


def is_older_than_cutoff(file_path: str, cutoff_epoch: float) -> bool:
    """Return True if the file's modification time is before the cutoff."""
    return os.path.getmtime(file_path) < cutoff_epoch


def delete_file_with_retry(
    file_path: str,
    dry_run: bool,
    max_retries: int,
) -> dict:
    """Delete a single file with retry logic. Returns a result dict.

    Retries on OSError/PermissionError up to *max_retries* times with
    exponential back-off (1s, 2s, 4s ...).  PermissionError on all
    attempts is treated as a hard error.
    """
    mtime = os.path.getmtime(file_path)
    size_bytes = os.path.getsize(file_path)
    age_days = (time.time() - mtime) / 86400

    result = {
        "file_path": file_path,
        "size_bytes": size_bytes,
        "age_days": round(age_days, 1),
        "modified_utc": datetime.fromtimestamp(mtime, tz=timezone.utc).strftime(
            "%Y-%m-%d %H:%M:%S"
        ),
        "status": "pending",
        "attempts": 0,
    }

    if dry_run:
        result["status"] = "dry_run_skipped"
        return result

    # Idempotency guard - file may have been removed between scan and delete
    if not os.path.exists(file_path):
        result["status"] = "already_removed"
        logger.info(f"File already removed (idempotent skip): {file_path}")
        return result

    last_exc = None
    for attempt in range(1, max_retries + 1):
        result["attempts"] = attempt
        try:
            os.remove(file_path)
            # Verify deletion
            if not os.path.exists(file_path):
                result["status"] = "deleted"
                return result
            else:
                logger.warning(
                    f"os.remove returned but file still exists: {file_path}"
                )
                result["status"] = "error: file still exists after os.remove"
                return result
        except FileNotFoundError:
            # Another process deleted it between our check and os.remove
            result["status"] = "already_removed"
            return result
        except (PermissionError, OSError) as exc:
            last_exc = exc
            if attempt < max_retries:
                backoff = 2 ** (attempt - 1)
                logger.warning(
                    f"Attempt {attempt}/{max_retries} failed for {file_path}: "
                    f"{exc} - retrying in {backoff}s"
                )
                time.sleep(backoff)

    result["status"] = f"error: {last_exc}"
    return result

## 5. Scan and Delete Old Excel Files

In [0]:
RUN_START = time.time()

all_results: list[dict] = []
per_volume_stats: dict[str, dict] = {}  # per-path breakdown

global_summary = {
    "paths_configured": len(VOLUME_PATHS),
    "paths_scanned": 0,
    "paths_skipped": 0,
    "files_scanned": 0,
    "files_eligible": 0,
    "files_retained": 0,
    "files_deleted": 0,
    "files_already_removed": 0,
    "errors": 0,
    "total_bytes_freed": 0,
}

for vol_path in VOLUME_PATHS:
    logger.info(f"Scanning: {vol_path}")

    vol_stats = {
        "files_scanned": 0,
        "files_eligible": 0,
        "files_retained": 0,
        "files_deleted": 0,
        "errors": 0,
        "bytes_freed": 0,
    }

    if not os.path.isdir(vol_path):
        logger.warning(f"Path does not exist or is not a directory - skipping: {vol_path}")
        global_summary["paths_skipped"] += 1
        per_volume_stats[vol_path] = vol_stats
        continue

    global_summary["paths_scanned"] += 1
    excel_files = scan_files(vol_path, FILE_EXTENSIONS)
    vol_stats["files_scanned"] = len(excel_files)
    global_summary["files_scanned"] += len(excel_files)
    logger.info(f"  Found {len(excel_files)} matching file(s)")

    for fpath in excel_files:
        try:
            if not is_older_than_cutoff(fpath, CUTOFF_EPOCH):
                vol_stats["files_retained"] += 1
                global_summary["files_retained"] += 1
                continue
        except OSError as exc:
            logger.error(f"  Cannot stat file {fpath}: {exc}")
            vol_stats["errors"] += 1
            global_summary["errors"] += 1
            all_results.append({"file_path": fpath, "status": f"error: {exc}"})
            continue

        global_summary["files_eligible"] += 1
        vol_stats["files_eligible"] += 1

        result = delete_file_with_retry(fpath, DRY_RUN, MAX_RETRIES)
        all_results.append(result)

        if result["status"] == "deleted":
            vol_stats["files_deleted"] += 1
            vol_stats["bytes_freed"] += result["size_bytes"]
            global_summary["files_deleted"] += 1
            global_summary["total_bytes_freed"] += result["size_bytes"]
        elif result["status"] == "already_removed":
            global_summary["files_already_removed"] += 1
        elif result["status"].startswith("error"):
            vol_stats["errors"] += 1
            global_summary["errors"] += 1

        level = logging.WARNING if result["status"].startswith("error") else logging.INFO
        logger.log(
            level,
            f"  [{result['status'].upper()}] {result['file_path']} "
            f"| age={result.get('age_days', '?')}d "
            f"| size={format_bytes(result.get('size_bytes', 0))} "
            f"| modified={result.get('modified_utc', 'N/A')}"
            + (f" | attempts={result.get('attempts')}" if result.get("attempts", 0) > 1 else ""),
        )

    per_volume_stats[vol_path] = vol_stats
    logger.info(
        f"  Volume summary: scanned={vol_stats['files_scanned']} "
        f"eligible={vol_stats['files_eligible']} "
        f"deleted={vol_stats['files_deleted']} "
        f"retained={vol_stats['files_retained']} "
        f"errors={vol_stats['errors']} "
        f"freed={format_bytes(vol_stats['bytes_freed'])}"
    )

RUN_DURATION = time.time() - RUN_START

2026-02-24 05:43:57 | INFO     | Scanning: /Volumes/accenture/prasun12377/claims_checkpoint_dir
2026-02-24 05:43:57 | INFO     |   Found 1 matching file(s)
2026-02-24 05:43:57 | INFO     |   [DELETED] /Volumes/accenture/prasun12377/claims_checkpoint_dir/test2/Workforce-Transformation-AD (BOM) v1.xlsx | age=0.0d | size=33.7 KB | modified=2026-02-24 05:33:51
2026-02-24 05:43:57 | INFO     |   Volume summary: scanned=1 eligible=1 deleted=1 retained=0 errors=0 freed=33.7 KB
2026-02-24 05:43:57 | INFO     | Scanning: /Volumes/accenture/prasun12377/bcbsm_volume
2026-02-24 05:43:57 | INFO     |   Found 4 matching file(s)
2026-02-24 05:43:57 | INFO     |   [DELETED] /Volumes/accenture/prasun12377/bcbsm_volume/test1/DB_Comprehensive_Metrics_TPCC_new.xlsx | age=0.0d | size=148.7 KB | modified=2026-02-24 05:32:25
2026-02-24 05:43:57 | INFO     |   [DELETED] /Volumes/accenture/prasun12377/bcbsm_volume/test1/ReturnFileActiveMembers_18022026.xls | age=0.0d | size=4.5 KB | modified=2026-02-24 05:32:2

## 6. Summary Report

In [0]:
logger.info("")
logger.info("=" * 60)
logger.info("EXECUTION SUMMARY")
logger.info("=" * 60)
logger.info(f"Mode                 : {'DRY RUN' if DRY_RUN else 'LIVE DELETE'}")
logger.info(f"Run duration         : {format_duration(RUN_DURATION)}")
logger.info(f"Volume paths config  : {global_summary['paths_configured']}")
logger.info(f"Volume paths scanned : {global_summary['paths_scanned']}")
logger.info(f"Volume paths skipped : {global_summary['paths_skipped']}")
logger.info(f"Total files found    : {global_summary['files_scanned']}")
logger.info(f"Files retained (<{RETENTION_DAYS}d): {global_summary['files_retained']}")
logger.info(f"Eligible for delete  : {global_summary['files_eligible']} (older than {RETENTION_DAYS} days)")

if DRY_RUN:
    logger.info(f"Files deleted        : 0 (dry run - no files removed)")
else:
    logger.info(f"Files deleted        : {global_summary['files_deleted']}")
    logger.info(f"Already removed      : {global_summary['files_already_removed']}")
    logger.info(f"Space freed          : {format_bytes(global_summary['total_bytes_freed'])}")

logger.info(f"Errors               : {global_summary['errors']}")
logger.info("=" * 60)

# Per-volume breakdown
logger.info("")
logger.info("PER-VOLUME BREAKDOWN")
logger.info("-" * 60)
for vp, vs in per_volume_stats.items():
    logger.info(
        f"  {vp}: scanned={vs['files_scanned']} eligible={vs['files_eligible']} "
        f"deleted={vs['files_deleted']} retained={vs['files_retained']} "
        f"errors={vs['errors']} freed={format_bytes(vs['bytes_freed'])}"
    )
logger.info("-" * 60)

# Error details
if global_summary["errors"] > 0:
    logger.error("")
    logger.error("ERROR DETAILS:")
    for r in all_results:
        if r["status"].startswith("error"):
            logger.error(f"  {r['file_path']} -> {r['status']}")

if DRY_RUN and global_summary["files_eligible"] > 0:
    logger.info(
        f"\nNOTE: Set dry_run=false to actually delete the "
        f"{global_summary['files_eligible']} eligible file(s)."
    )

2026-02-24 05:43:58 | INFO     | 
2026-02-24 05:43:58 | INFO     | EXECUTION SUMMARY
2026-02-24 05:43:58 | INFO     | Mode                 : LIVE DELETE
2026-02-24 05:43:58 | INFO     | Run duration         : 1.0s
2026-02-24 05:43:58 | INFO     | Volume paths config  : 2
2026-02-24 05:43:58 | INFO     | Volume paths scanned : 2
2026-02-24 05:43:58 | INFO     | Volume paths skipped : 0
2026-02-24 05:43:58 | INFO     | Total files found    : 5
2026-02-24 05:43:58 | INFO     | Files retained (<1d): 0
2026-02-24 05:43:58 | INFO     | Eligible for delete  : 5 (older than 1 days)
2026-02-24 05:43:58 | INFO     | Files deleted        : 5
2026-02-24 05:43:58 | INFO     | Already removed      : 0
2026-02-24 05:43:58 | INFO     | Space freed          : 490.1 KB
2026-02-24 05:43:58 | INFO     | Errors               : 0
2026-02-24 05:43:58 | INFO     | 
2026-02-24 05:43:58 | INFO     | PER-VOLUME BREAKDOWN
2026-02-24 05:43:58 | INFO     | -----------------------------------------------------------

## 7. Exit - Raise on Errors to Fail the Databricks Job

In [0]:
# ---------------------------------------------------------------------------
# Exit strategy:
#   - On ANY errors  -> raise RuntimeError so the Databricks workflow marks
#     the run as FAILED, triggering built-in alerting.
#   - On success     -> exit cleanly with a status message.
# ---------------------------------------------------------------------------
mode_label = "DRY_RUN" if DRY_RUN else "LIVE"

exit_msg = (
    f"{mode_label}: "
    f"scanned={global_summary['files_scanned']} "
    f"eligible={global_summary['files_eligible']} "
    f"deleted={global_summary['files_deleted']} "
    f"errors={global_summary['errors']} "
    f"freed={format_bytes(global_summary['total_bytes_freed'])} "
    f"duration={format_duration(RUN_DURATION)}"
)

if global_summary["errors"] > 0:
    logger.error(f"Job will FAIL due to {global_summary['errors']} error(s)")
    dbutils.notebook.exit(f"FAILED: {exit_msg}")
    raise RuntimeError(
        f"Excel cleanup completed with {global_summary['errors']} error(s). "
        f"Details: {exit_msg}"
    )
else:
    logger.info(f"Job completed successfully: {exit_msg}")
    dbutils.notebook.exit(f"SUCCESS: {exit_msg}")