# Backfill Demo: Orchestrator Notebook

This notebook orchestrates backfill operations with business day validation and comprehensive logging.

## What It Does:
1. Accepts `position_date` and `job_name` parameters via widgets
2. Validates if the date is a business day using the calendar table
3. Skips non-business days (weekends/holidays) with logging
4. Triggers the processing job for valid business dates
5. Monitors job execution until completion
6. Logs all operations (START/SUCCESS/FAILED/SKIPPED) to the backfill log table
7. Supports retry tracking with JSON metadata for failed job retries

## Key Features:
- **Business Day Validation**: Queries calendar table to avoid processing holidays/weekends
- **Job Triggering**: Uses Databricks SDK to trigger and monitor jobs programmatically
- **Comprehensive Logging**: Tracks all operations with timestamps, status, and error details
- **Retry Support**: Accepts retry metadata from the retry notebook to track retry attempts
- **Backward Compatible**: Works with or without the retry_metadata column in the log table

## Usage:
- **Single Date**: Set `position_date` widget and run notebook
- **Date Range**: Call this notebook in a loop for multiple dates (use Databricks SDK)
- **Retry Failed Jobs**: Called by `05_retry_failed_jobs.ipynb` with retry metadata

**Note:** This notebook is designed to be called programmatically or run interactively for testing.

In [0]:
# Setup and Parameters
import sys
from pyspark.sql import functions as F

# Get current notebook path dynamically
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
workspace_path = f"/Workspace{notebook_path}"
base_path = workspace_path.rsplit('/', 1)[0]

# Add to sys.path for importing config
sys.path.append(base_path)

from config import BACKFILL_LOG_TABLE, CALENDAR_TABLE
log_table = BACKFILL_LOG_TABLE
calendar_table = CALENDAR_TABLE

## Step 1: Configuration Setup

Load the centralized configuration from `config.py` to get table names and connection details.

In [0]:
workspace_id = dbutils.entry_point.getDbutils().notebook().getContext().workspaceId().get()
display(workspace_id)

'5584109198115548'

## Step 2: Get Workspace Context

Retrieve the workspace ID for querying job metadata from the system.lakeflow.jobs table.

In [0]:
dbutils.widgets.text("position_date", "", "Position Date (YYYY-MM-DD)")
position_date_str = dbutils.widgets.get("position_date")

dbutils.widgets.text("job_name", "", "Job Name")
job_name = dbutils.widgets.get("job_name")

# Retry metadata as JSON string (NULL for original runs)
dbutils.widgets.text("retry_metadata", "", "Retry Metadata (JSON)")
retry_metadata_raw = dbutils.widgets.get("retry_metadata")
# Only use retry_metadata if it's a non-empty string
retry_metadata_json = retry_metadata_raw if retry_metadata_raw and retry_metadata_raw.strip() else None

display({"position_date": position_date_str, "job_name": job_name, "retry_metadata": retry_metadata_json})

{'position_date': '2025-01-03',
 'job_name': '02_process_data',
 'retry_metadata': None}

## Step 3: Widget Parameters

Accept input parameters for the backfill operation:
- **position_date**: The date to process (YYYY-MM-DD format)
- **job_name**: The name of the job to trigger (e.g., "02_process_data")
- **retry_metadata**: JSON metadata for retry tracking (optional, used by retry notebook)

In [0]:
df = (spark.table("system.lakeflow.jobs")
        .filter(f"workspace_id = '{workspace_id}' and name = '{job_name}'")
        .orderBy("change_time", ascending=False)
        .limit(1))
display(df)

job_id = df.select("job_id").first()["job_id"]
job_id

account_id,workspace_id,job_id,name,creator_id,tags,run_as,change_time,delete_time,description
31ee97a6-24bd-4274-9ac4-00fbbbb94321,5584109198115548,743256224103762,02_process_data,75053393473744,"Map(app_name -> backfill_demo, environment -> dev)",75053393473744,2025-11-21T05:39:37.396Z,,


'743256224103762'

## Step 4: Lookup Job ID

Query the `system.lakeflow.jobs` table to find the job ID for the specified job name.

In [0]:
from datetime import datetime

# Get the downstream job ID (current workflow/job ID)
context = dbutils.notebook.entry_point.getDbutils().notebook().getContext()
if context.jobId().isDefined():
    # Running as part of a job/workflow
    job_run_id = context.jobRunId().get()
    print(f"Downstream Job Run ID: {job_run_id}")
    print(f"Running Mode: Job/Workflow")
else:
    # Running on interactive cluster
    notebook_id = context.notebookId().get() if context.notebookId().isDefined() else "unknown_notebook"
    job_run_id = f"{notebook_id}"
    print(f"Downstream Job ID: {job_run_id}")
    print(f"Running Mode: Interactive Cluster")

Downstream Job ID: 774114517055437
Running Mode: Interactive Cluster


## Step 5: Get Orchestrator Context

Determine the run ID for this orchestration job (for logging purposes):
- If running as a Databricks Job/Workflow: Use jobRunId
- If running on interactive cluster: Use notebookId

In [0]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, TimestampType
from datetime import datetime
from delta.tables import DeltaTable



# Define schema once
log_schema = StructType([
    StructField("run_id", StringType(), False),
    StructField("position_date", DateType(), True),
    StructField("job_name", StringType(), True),
    StructField("job_id", StringType(), True),
    StructField("status", StringType(), True),
    StructField("start_time", TimestampType(), True),
    StructField("end_time", TimestampType(), True),
    StructField("backfill_job_id", StringType(), True),
    StructField("error_message", StringType(), True),
    StructField("creator_user_name", StringType(), True),
    StructField("run_page_url", StringType(), True),
    StructField("retry_metadata", StringType(), True)
])

class BackfillLogger:
    """
    Simple, intuitive logger for backfill operations.
    Usage:
        logger = BackfillLogger(run_id, position_date, job_name, job_id, retry_metadata_json=None)
        logger.start()
        logger.success()  # or logger.fail("error message")
    """
    
    def __init__(self, run_id, position_date, job_name, job_id, retry_metadata_json=None):
        self.run_id = str(run_id)  # Ensure string
        self.position_date = position_date
        self.job_name = str(job_name)  # Ensure string
        self.job_id = str(job_id)  # Ensure string
        self.start_time = datetime.now()
        self.retry_metadata_json = retry_metadata_json  # JSON string or None
        
    def _upsert(self, status, backfill_job_id=None, error_message=None, creator_user_name=None, run_page_url=None):
        """Internal method to upsert log record."""
        # Parse position_date if string
        if isinstance(self.position_date, str):
            pos_date = datetime.strptime(self.position_date, "%Y-%m-%d").date()
        else:
            pos_date = self.position_date
        
        # Auto-set end_time for terminal states
        end_time = datetime.now() if status in ['SUCCESS', 'FAILED', 'SKIPPED'] else None
        
        # Check which columns exist in the target table
        delta_table = DeltaTable.forName(spark, log_table)
        existing_columns = set([field.name for field in delta_table.toDF().schema.fields])
        
        # Create record with explicit types - ensure no None for required fields
        record = [{
            "run_id": self.run_id,
            "position_date": pos_date,
            "job_name": self.job_name,
            "job_id": self.job_id,
            "status": status,
            "start_time": self.start_time,
            "end_time": end_time,
            "backfill_job_id": backfill_job_id if backfill_job_id else "",
            "error_message": error_message if error_message else None,
            "creator_user_name": creator_user_name if creator_user_name else None,
            "run_page_url": run_page_url if run_page_url else None,
            "retry_metadata": self.retry_metadata_json
        }]
        
        # Create DataFrame with explicit schema
        source_df = spark.createDataFrame(record, schema=log_schema)
        
        # Debug: Show what we're trying to insert
        print(f"DEBUG: Upserting - run_id={self.run_id}, pos_date={pos_date}, job_name={self.job_name}, job_id={self.job_id}, status={status}")
        
        # Build update/insert dictionaries dynamically based on existing columns
        update_set = {
            "status": "s.status",
            "end_time": "s.end_time",
            "backfill_job_id": "s.backfill_job_id",
            "error_message": "s.error_message"
        }
        
        insert_values = {
            "run_id": "s.run_id",
            "position_date": "s.position_date",
            "job_name": "s.job_name",
            "job_id": "s.job_id",
            "status": "s.status",
            "start_time": "s.start_time",
            "end_time": "s.end_time",
            "backfill_job_id": "s.backfill_job_id",
            "error_message": "s.error_message"
        }
        
        # Add optional columns only if they exist
        if "creator_user_name" in existing_columns:
            update_set["creator_user_name"] = "s.creator_user_name"
            insert_values["creator_user_name"] = "s.creator_user_name"
        
        if "run_page_url" in existing_columns:
            update_set["run_page_url"] = "s.run_page_url"
            insert_values["run_page_url"] = "s.run_page_url"
        
        if "retry_metadata" in existing_columns:
            update_set["retry_metadata"] = "s.retry_metadata"
            insert_values["retry_metadata"] = "s.retry_metadata"
        
        # MERGE: Explicitly set all fields
        (delta_table.alias("t")
            .merge(source_df.alias("s"), "t.run_id = s.run_id")
            .whenMatchedUpdate(set=update_set)
            .whenNotMatchedInsert(values=insert_values)
            .execute()
        )
        
        return status
    
    def start(self):
        """Log the start of processing."""
        self._upsert("STARTED")
        print(f"▶ Started: {self.run_id}")
        return self
    
    def success(self, backfill_job_id=None, creator_user_name=None, run_page_url=None):
        """Log successful completion."""
        self._upsert("SUCCESS", backfill_job_id=backfill_job_id, creator_user_name=creator_user_name, run_page_url=run_page_url)
        print(f"✓ Success: {self.run_id}")
        return self
    
    def fail(self, error_message, backfill_job_id=None, creator_user_name=None, run_page_url=None):
        """Log failure with error message."""
        self._upsert("FAILED", backfill_job_id=backfill_job_id, error_message=error_message, creator_user_name=creator_user_name, run_page_url=run_page_url)
        print(f"✗ Failed: {self.run_id} - {error_message}")
        return self
    
    def skip(self, reason, backfill_job_id=None, creator_user_name=None, run_page_url=None):
        """Log skip with reason."""
        self._upsert("SKIPPED", backfill_job_id=backfill_job_id, error_message=reason, creator_user_name=creator_user_name, run_page_url=run_page_url)
        print(f"⊘ Skipped: {self.run_id} - {reason}")
        return self
    
    def update_backfill_id(self, backfill_job_id):
        """Update only the backfill job ID."""
        spark.sql(f"""
            UPDATE {log_table}
            SET backfill_job_id = '{backfill_job_id}'
            WHERE run_id = '{self.run_id}'
        """)
        print(f"↻ Updated backfill_job_id: {backfill_job_id}")
        return self

print("✓ BackfillLogger class defined")

✓ BackfillLogger class defined


## Step 6: BackfillLogger Class

This class provides a simple, intuitive interface for logging backfill operations to the log table.

**Features:**
- Logs START/SUCCESS/FAILED/SKIPPED status with timestamps
- Captures job metadata (run ID, creator, job URL, error messages)
- Supports retry metadata tracking (JSON column)
- Backward compatible (works with/without retry_metadata column)
- Uses Delta MERGE for upsert operations (handles reruns gracefully)

**Usage:**
```python
logger = BackfillLogger(run_id, position_date, job_name, job_id, retry_metadata_json=None)
logger.start()  # Log start
logger.success()  # Or logger.fail("error") or logger.skip("reason")
```

In [0]:
# Add new columns to existing table (run this once to update schema)
try:
    # Try to add all columns at once
    spark.sql(f"ALTER TABLE {log_table} ADD COLUMNS (creator_user_name STRING, run_page_url STRING, retry_metadata STRING COMMENT 'JSON: {{is_retry, original_run_id, retry_count, retry_triggered_by}}')")
    print(f"✓ Added new columns to {log_table}")
except Exception as e:
    if "already exists" in str(e).lower():
        print(f"✓ Columns already exist in {log_table}")
    else:
        # Try adding columns individually
        for col_name, col_def in [
            ("creator_user_name", "STRING"),
            ("run_page_url", "STRING"),
            ("retry_metadata", "STRING COMMENT 'JSON: {is_retry, original_run_id, retry_count, retry_triggered_by}'")
        ]:
            try:
                spark.sql(f"ALTER TABLE {log_table} ADD COLUMNS ({col_name} {col_def})")
                print(f"  ✓ Added column: {col_name}")
            except Exception as col_error:
                if "already exists" in str(col_error).lower():
                    print(f"  ✓ Column exists: {col_name}")
                else:
                    print(f"  Note: {col_name} - {col_error}")

✓ Columns already exist in demos.backfill_demo.backfill_log


## Step 7: Schema Migration (Run Once)

This cell adds new columns to the backfill_log table if they don't exist:
- `creator_user_name`: User who triggered the job
- `run_page_url`: URL to the job run in Databricks UI
- `retry_metadata`: JSON column for tracking retry information

**Note:** This cell is backward compatible and safe to run multiple times.

In [0]:
# Create logger and start logging
logger = BackfillLogger(
    run_id=job_run_id,
    position_date=position_date_str,
    job_name=job_name,
    job_id=job_id,
    retry_metadata_json=retry_metadata_json
).start()


DEBUG: Upserting - run_id=774114517055437, pos_date=2025-01-03, job_name=02_process_data, job_id=743256224103762, status=STARTED
▶ Started: 774114517055437


## Step 8: Initialize Logger

Create the BackfillLogger instance with the run context and start logging.

In [0]:
# Import config to get calendar table
import sys
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
base_path = f"/Workspace{notebook_path}".rsplit('/', 1)[0]
sys.path.append(base_path)
from config import CALENDAR_TABLE

# Check if position date is a business day
query = f"""
    SELECT COUNT(1) AS is_business_day
    FROM {CALENDAR_TABLE}
    WHERE us_business_or_holiday_flag = 'B'
      AND calendar_date = '{position_date_str}'
"""
df = spark.sql(query)
is_business_day = df.first()["is_business_day"] > 0
display({"position_date": position_date_str, "is_business_day": is_business_day, "calendar_table": CALENDAR_TABLE})

if not is_business_day:
    # Use the logger object (not the old log_status function)
    logger.skip("Not a business day")
    dbutils.notebook.exit("Notebook exited: Not a business day")

{'position_date': '2025-01-03',
 'is_business_day': True,
 'calendar_table': 'demos.backfill_demo.calendar'}

## Step 9: Business Day Validation

Query the calendar table to check if the position_date is a business day:
- **Business Day (B)**: Proceed with processing
- **Holiday/Weekend (H)**: Skip with logging and exit notebook

This prevents unnecessary job runs for non-business dates.

In [0]:
from databricks.sdk import WorkspaceClient
import time

w = WorkspaceClient()

try:
    # Trigger the job with the position_date from widget
    print(f"Triggering job {job_id} for position_date: {position_date_str}")
    response = w.jobs.run_now(job_id=job_id, notebook_params={"position_date": position_date_str})
    triggered_run_id = response.run_id
    
    print(f"Job triggered successfully. Run ID: {triggered_run_id}")
    
    # Update logger with the triggered run ID
    logger.update_backfill_id(str(triggered_run_id))
    
    # Poll for job completion
    print("Monitoring job execution...")
    while True:
        run_status = w.jobs.get_run(run_id=triggered_run_id)
        life_cycle_state = str(run_status.state.life_cycle_state)
        result_state = str(run_status.state.result_state) if run_status.state.result_state else "PENDING"
        
        print(f"  Status: {life_cycle_state} | Result: {result_state}")
        
        # Check if job is in terminal state
        if "TERMINATED" in life_cycle_state or "SKIPPED" in life_cycle_state or "INTERNAL_ERROR" in life_cycle_state:
            print(f"\nJob completed with state: {life_cycle_state}, result: {result_state}")
            
            # Extract creator and run page URL
            creator_user = run_status.creator_user_name if hasattr(run_status, 'creator_user_name') else None
            run_url = run_status.run_page_url if hasattr(run_status, 'run_page_url') else None
            
            # Update logger based on result
            if "SUCCESS" in result_state:
                logger.success(
                    backfill_job_id=str(triggered_run_id),
                    creator_user_name=creator_user,
                    run_page_url=run_url
                )
            else:
                error_msg = run_status.state.state_message if run_status.state.state_message else f"Job failed with state: {life_cycle_state}, result: {result_state}"
                logger.fail(
                    error_msg,
                    backfill_job_id=str(triggered_run_id),
                    creator_user_name=creator_user,
                    run_page_url=run_url
                )
            
            break
        
        time.sleep(10)
    
    # display(run_status.as_dict())

except Exception as e:
    error_message = f"Failed to trigger or monitor job: {str(e)}"
    print(f"✗ {error_message}")
    logger.fail(error_message)
    raise

Triggering job 743256224103762 for position_date: 2025-01-03
Job triggered successfully. Run ID: 923018509174690
↻ Updated backfill_job_id: 923018509174690
Monitoring job execution...
  Status: RunLifeCycleState.RUNNING | Result: PENDING
  Status: RunLifeCycleState.RUNNING | Result: PENDING
  Status: RunLifeCycleState.RUNNING | Result: PENDING
  Status: RunLifeCycleState.RUNNING | Result: PENDING
  Status: RunLifeCycleState.TERMINATED | Result: RunResultState.SUCCESS

Job completed with state: RunLifeCycleState.TERMINATED, result: RunResultState.SUCCESS
DEBUG: Upserting - run_id=774114517055437, pos_date=2025-01-03, job_name=02_process_data, job_id=743256224103762, status=SUCCESS
✓ Success: 774114517055437


## Step 10: Trigger and Monitor Job

This cell performs the core orchestration:

1. **Trigger Job**: Uses Databricks SDK to trigger the processing job with position_date parameter
2. **Monitor Execution**: Polls job status every 10 seconds until completion
3. **Capture Results**: Extracts creator, run URL, and error messages
4. **Update Logger**: Logs final status (SUCCESS or FAILED) with all metadata

**Job States:**
- **PENDING/RUNNING**: Job is executing
- **TERMINATED + SUCCESS**: Job completed successfully
- **TERMINATED + FAILED**: Job encountered an error
- **SKIPPED**: Job was skipped
- **INTERNAL_ERROR**: Databricks internal error