# Building Enrichment Pipeline - Job Creator & Runner

This notebook orchestrates the entire building enrichment pipeline:
1. Auto-installs required packages
2. Creates ISO3-based folder structure
3. Copies input files to the correct locations
4. Generates job configuration dynamically
5. Creates and submits Databricks job
6. Monitors job progress in real-time
7. Exports final outputs

**Prerequisites:**
- Edit `job_config.yaml` with your settings
- Ensure input files exist at specified paths
- Run this notebook on a cluster with Databricks Runtime 13.3+ LTS

**No CLI required** - Everything runs via Databricks SDK and workspace APIs

## Step 1: Install Notebook Dependencies

In [None]:
# Install packages needed for THIS notebook (not the job itself)
print("üì¶ Checking notebook dependencies...")

try:
    import databricks.sdk
    import yaml
    print("‚úÖ All notebook dependencies available")
except ImportError:
    print("‚öôÔ∏è  Installing missing packages...")
    %pip install databricks-sdk pyyaml --quiet
    print("‚úÖ Packages installed successfully")
    print("üîÑ Restarting Python kernel...")
    dbutils.library.restartPython()

## Step 2: Import Libraries & Initialize

In [None]:
import os
import json
import time
import shutil
from pathlib import Path
from datetime import datetime
import yaml

from databricks.sdk import WorkspaceClient
from databricks.sdk.service.jobs import (
    Task, NotebookTask, Source, JobCluster, ClusterSpec,
    AutoScale, RuntimeEngine, JobSettings, TaskDependency
)
from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder.getOrCreate()

# Initialize Databricks Workspace Client
w = WorkspaceClient()

print("‚úÖ Libraries imported successfully")

## Step 3: Load Configuration

In [None]:
# Get current notebook's directory
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
notebook_dir = str(Path(notebook_path).parent)

# Load job configuration
config_path = f"/Workspace{notebook_dir}/job_config.yaml"
print(f"üìÑ Loading config from: {config_path}")

with open(config_path.replace('/Workspace', '/Workspace'), 'r') as f:
    config = yaml.safe_load(f)

# Extract key values
ISO3 = config['iso3']
CATALOG = config['databricks']['catalog']
SCHEMA = config['databricks']['schema']
WORKSPACE_BASE = config['databricks']['workspace_base']
VOLUME_BASE = config['databricks']['volume_base']

print(f"‚úÖ Configuration loaded for country: {ISO3}")
print(f"   Catalog: {CATALOG}")
print(f"   Schema: {SCHEMA}")

## Step 4: Auto-Detect or Get Cluster ID

In [None]:
# Auto-detect current cluster or use specified one
if config['databricks'].get('cluster_id'):
    CLUSTER_ID = config['databricks']['cluster_id']
    print(f"üìå Using specified cluster ID: {CLUSTER_ID}")
else:
    CLUSTER_ID = spark.conf.get("spark.databricks.clusterUsageTags.clusterId")
    print(f"üîç Auto-detected current cluster ID: {CLUSTER_ID}")

# Get cluster details
try:
    cluster_info = w.clusters.get(cluster_id=CLUSTER_ID)
    print(f"‚úÖ Cluster found: {cluster_info.cluster_name}")
    print(f"   Runtime: {cluster_info.spark_version}")
    print(f"   State: {cluster_info.state}")
except Exception as e:
    print(f"‚ö†Ô∏è  Warning: Could not fetch cluster details: {e}")
    print(f"   Will still use cluster ID: {CLUSTER_ID}")

## Step 5: Create Folder Structure with ISO3 First

In [None]:
print(f"üìÅ Creating folder structure for {ISO3}...")

# Define folder structure: {ISO3}/input/, {ISO3}/output/, {ISO3}/logs/
BASE_DATA_DIR = f"{VOLUME_BASE}/{ISO3}"
INPUT_DIR = f"{BASE_DATA_DIR}/input"
TILES_DIR = f"{INPUT_DIR}/tiles"
OUTPUT_DIR = f"{BASE_DATA_DIR}/output"
LOGS_DIR = f"{BASE_DATA_DIR}/logs"

# Create directories
dbutils.fs.mkdirs(BASE_DATA_DIR)
dbutils.fs.mkdirs(INPUT_DIR)
dbutils.fs.mkdirs(TILES_DIR)
dbutils.fs.mkdirs(OUTPUT_DIR)
dbutils.fs.mkdirs(LOGS_DIR)

print(f"‚úÖ Folder structure created:")
print(f"   Base: {BASE_DATA_DIR}")
print(f"   Input: {INPUT_DIR}")
print(f"   Tiles: {TILES_DIR}")
print(f"   Output: {OUTPUT_DIR}")
print(f"   Logs: {LOGS_DIR}")

## Step 6: Copy Input Files

In [None]:
print("üìÇ Copying input files...")

# Copy tile shapefile from git data/ folder to {ISO3}/input/tiles/
TILES_SOURCE = f"{WORKSPACE_BASE}/ghsl2_0_mwd_l1_tile_schema_land.gpkg"
TILES_DEST = f"{TILES_DIR}/ghsl2_0_mwd_l1_tile_schema_land.gpkg"

print(f"   Copying tiles: {TILES_SOURCE} -> {TILES_DEST}")
dbutils.fs.cp(TILES_SOURCE.replace('/Workspace', 'file:/Workspace'), TILES_DEST, recurse=True)
print(f"   ‚úÖ Tiles copied")

# Copy TSI CSV
tsi_source = config['inputs']['tsi_csv']
tsi_dest = f"{INPUT_DIR}/tsi.csv"
print(f"   Copying TSI: {tsi_source} -> {tsi_dest}")
dbutils.fs.cp(tsi_source, tsi_dest, recurse=True)
print(f"   ‚úÖ TSI copied")

# Copy Proportions CSV
proportions_source = config['inputs']['proportions_csv']
proportions_dest = f"{INPUT_DIR}/proportions.csv"
print(f"   Copying Proportions: {proportions_source} -> {proportions_dest}")
dbutils.fs.cp(proportions_source, proportions_dest, recurse=True)
print(f"   ‚úÖ Proportions copied")

# Copy World boundaries if provided
if config['inputs'].get('world_boundaries'):
    world_source = config['inputs']['world_boundaries']
    world_dest = f"{INPUT_DIR}/world_boundaries.gpkg"
    print(f"   Copying World boundaries: {world_source} -> {world_dest}")
    dbutils.fs.cp(world_source, world_dest, recurse=True)
    print(f"   ‚úÖ World boundaries copied")

print("‚úÖ All input files copied successfully")

## Step 7: Generate Config with ISO3 Suffix in Table Names

In [None]:
print("‚öôÔ∏è  Generating pipeline configuration...")

# Helper function to create table names with ISO3 suffix
def table_name(base_name):
    return f"{CATALOG}.{SCHEMA}.{base_name}_{ISO3}"

# Create full pipeline config with ISO3 suffix in ALL table names
pipeline_config = {
    # Core settings
    "catalog": CATALOG,
    "schema": SCHEMA,
    "iso3": ISO3,

    # Input paths
    "proportions_csv_path": proportions_dest,
    "tsi_csv_path": tsi_dest,
    "admin_path": config['inputs'].get('world_boundaries', world_dest if 'world_dest' in locals() else ''),
    "tile_footprint_path": TILES_DEST,

    # Delta tables with ISO3 suffix
    "proportions_path": table_name("building_enrichment_proportions_input"),
    "proportions_table": table_name("building_enrichment_proportions_input"),
    "tsi_table": table_name("building_enrichment_tsi_input"),
    "delta_table_base": table_name("grid_centroids"),
    "grid_source": table_name("grid_centroids"),
    "grid_count_table": table_name("grid_counts"),
    "counts_delta_table": table_name("grid_counts"),
    "output_table": table_name("building_enrichment_output"),
    "download_status_table": table_name("download_status"),

    # Output directories
    "output_dir": OUTPUT_DIR,
    "grid_output_csv": f"{OUTPUT_DIR}/grid_centroids.csv",
    "tiles_dest_root": f"{INPUT_DIR}/tiles",
    "built_root": f"{INPUT_DIR}/tiles/built_c",
    "smod_root": f"{INPUT_DIR}/tiles/smod",

    # Admin boundaries
    "admin_field": "ISO3",
    "admin_value": ISO3,

    # Tile footprints
    "tile_id_field": "tile_id",

    # Processing parameters
    "cell_size": config.get('params', {}).get('cell_size', 2000),
    "export_crs": "EPSG:4326",
    "target_crs": "ESRI:54009",
    "datasets": "built_c,smod",
    "download_concurrency": config.get('params', {}).get('download_concurrency', 3),
    "download_retries": 2,
    "use_smod": True,
    "use_boundary_mask": True,
    "include_nodata": True,
    "add_percentages": False,
    "chunk_size": 10000,
    "max_workers": config.get('params', {}).get('max_workers', 8),
    "tile_parallelism": str(config.get('params', {}).get('tile_parallelism', 4)),
    "SAMPLE_SIZE": 10000,
    "stage_to_local": True,
    "local_dir": "/local_disk0/raster_cache",
    "spark_tmp_dir": "/tmp/job3_grid_tmp",

    # Flags
    "dry_run": False,
    "preview": True,
    "preview_rows": 5,
    "overwrite_schema": True,
    "write_mode": "overwrite",
    "csv_infer_schema": True,
    "save_temp_csv": False,
    "save_per_tile": False
}

# Save config as JSON for tasks to use
config_json_path = f"{BASE_DATA_DIR}/config.json"
config_json_local = config_json_path.replace('dbfs:', '/dbfs')

with open(config_json_local, 'w') as f:
    json.dump(pipeline_config, f, indent=2)

print(f"‚úÖ Configuration saved to: {config_json_path}")
print(f"   All table names will include suffix: _{ISO3}")
print(f"   Output table: {pipeline_config['output_table']}")

## Step 8: Create Databricks Job

In [None]:
print("üî® Creating Databricks job...")

# Define job name with ISO3
job_name = config.get('job', {}).get('name', 'Building_Enrichment_{ISO3}').replace('{ISO3}', ISO3)

# Requirements.txt path
requirements_path = f"{WORKSPACE_BASE}/requirements.txt"

# Define tasks
tasks = [
    Task(
        task_key="task1_proportions_to_delta",
        existing_cluster_id=CLUSTER_ID,
        python_wheel_task=None,
        spark_python_task={
            "python_file": f"{WORKSPACE_BASE}/task1_proportions_to_delta.py",
            "parameters": ["--config_path", config_json_path]
        },
        libraries=[
            {"requirements": requirements_path}
        ]
    ),
    Task(
        task_key="task2_grid_generation",
        depends_on=[TaskDependency(task_key="task1_proportions_to_delta")],
        existing_cluster_id=CLUSTER_ID,
        spark_python_task={
            "python_file": f"{WORKSPACE_BASE}/task2_grid_generation.py",
            "parameters": ["--config_path", config_json_path]
        },
        libraries=[
            {"requirements": requirements_path}
        ]
    ),
    Task(
        task_key="task3_tile_downloader",
        depends_on=[TaskDependency(task_key="task2_grid_generation")],
        existing_cluster_id=CLUSTER_ID,
        spark_python_task={
            "python_file": f"{WORKSPACE_BASE}/task3_tile_downloader.py",
            "parameters": ["--config_path", config_json_path]
        },
        libraries=[
            {"requirements": requirements_path}
        ]
    ),
    Task(
        task_key="task4_raster_stats",
        depends_on=[TaskDependency(task_key="task3_tile_downloader")],
        existing_cluster_id=CLUSTER_ID,
        spark_python_task={
            "python_file": f"{WORKSPACE_BASE}/task4_raster_stats.py",
            "parameters": ["--config_path", config_json_path]
        },
        libraries=[
            {"requirements": requirements_path}
        ]
    ),
    Task(
        task_key="task5_post_processing",
        depends_on=[TaskDependency(task_key="task4_raster_stats")],
        existing_cluster_id=CLUSTER_ID,
        spark_python_task={
            "python_file": f"{WORKSPACE_BASE}/task5_post_processing.py",
            "parameters": ["--config_path", config_json_path]
        },
        libraries=[
            {"requirements": requirements_path}
        ]
    ),
    Task(
        task_key="task6_create_views",
        depends_on=[TaskDependency(task_key="task5_post_processing")],
        existing_cluster_id=CLUSTER_ID,
        spark_python_task={
            "python_file": f"{WORKSPACE_BASE}/task6_create_views.py",
            "parameters": ["--config_path", config_json_path]
        },
        libraries=[
            {"requirements": requirements_path}
        ]
    ),
    Task(
        task_key="task7_export",
        depends_on=[TaskDependency(task_key="task6_create_views")],
        existing_cluster_id=CLUSTER_ID,
        spark_python_task={
            "python_file": f"{WORKSPACE_BASE}/task7_export.py",
            "parameters": ["--config_path", config_json_path, "--iso3", ISO3]
        },
        libraries=[
            {"requirements": requirements_path}
        ]
    )
]

# Create job
job = w.jobs.create(
    name=job_name,
    tasks=tasks,
    max_concurrent_runs=config.get('job', {}).get('max_concurrent_runs', 1),
    timeout_seconds=config.get('job', {}).get('timeout_seconds', 0),
    email_notifications={
        "on_success": [config.get('job', {}).get('email_notifications', '')],
        "on_failure": [config.get('job', {}).get('email_notifications', '')]
    } if config.get('job', {}).get('email_notifications') else None
)

JOB_ID = job.job_id
print(f"‚úÖ Job created successfully!")
print(f"   Job ID: {JOB_ID}")
print(f"   Job Name: {job_name}")
print(f"   Tasks: {len(tasks)}")

## Step 9: Run Job & Monitor Progress

In [None]:
print(f"üöÄ Starting job {JOB_ID}...")

# Run the job
run = w.jobs.run_now(job_id=JOB_ID)
RUN_ID = run.run_id

print(f"‚úÖ Job started!")
print(f"   Run ID: {RUN_ID}")
print(f"")
print(f"‚è≥ Monitoring job progress...")
print(f"   (This will update every 30 seconds until completion)")
print(f"")

# Monitor job progress
start_time = time.time()
last_state = None
last_task_status = {}

while True:
    run_info = w.jobs.get_run(run_id=RUN_ID)
    state = run_info.state
    life_cycle_state = state.life_cycle_state.value
    
    # Print state changes
    if life_cycle_state != last_state:
        elapsed = int(time.time() - start_time)
        print(f"[{elapsed}s] Job status: {life_cycle_state}")
        last_state = life_cycle_state
    
    # Print task progress
    if run_info.tasks:
        for task in run_info.tasks:
            task_key = task.task_key
            task_state = task.state.life_cycle_state.value if task.state else "PENDING"
            
            if task_key not in last_task_status or last_task_status[task_key] != task_state:
                elapsed = int(time.time() - start_time)
                status_icon = "‚è≥" if task_state == "RUNNING" else "‚úÖ" if task_state == "TERMINATED" else "‚è∏Ô∏è"
                print(f"[{elapsed}s] {status_icon} {task_key}: {task_state}")
                last_task_status[task_key] = task_state
    
    # Check if job is done
    if life_cycle_state in ["TERMINATED", "INTERNAL_ERROR", "SKIPPED"]:
        result_state = state.result_state.value if state.result_state else "UNKNOWN"
        elapsed = int(time.time() - start_time)
        
        if result_state == "SUCCESS":
            print(f"")
            print(f"‚úÖ Job completed successfully!")
            print(f"   Duration: {elapsed // 60}m {elapsed % 60}s")
        else:
            print(f"")
            print(f"‚ùå Job failed with state: {result_state}")
            print(f"   Duration: {elapsed // 60}m {elapsed % 60}s")
            if state.state_message:
                print(f"   Error: {state.state_message}")
        break
    
    # Wait before next check
    time.sleep(30)

## Step 10: Verify Outputs

In [None]:
print("üîç Verifying outputs...")
print(f"")

# Check if main output table exists
output_table = f"{CATALOG}.{SCHEMA}.building_enrichment_output_{ISO3}"

try:
    df = spark.table(output_table)
    count = df.count()
    print(f"‚úÖ Main output table exists: {output_table}")
    print(f"   Row count: {count:,}")
    print(f"")
    print(f"   Sample data:")
    display(df.limit(5))
except Exception as e:
    print(f"‚ö†Ô∏è  Warning: Could not verify table {output_table}")
    print(f"   Error: {e}")

print(f"")
print(f"üìä Check export files in: {OUTPUT_DIR}/exports/FULL_{ISO3}/")

## Summary

In [None]:
print("="*60)
print("PIPELINE EXECUTION SUMMARY")
print("="*60)
print(f"Country: {ISO3}")
print(f"Job ID: {JOB_ID}")
print(f"Run ID: {RUN_ID}")
print(f"")
print(f"üìÅ Data Location: {BASE_DATA_DIR}")
print(f"üìä Output Table: {output_table}")
print(f"üìÇ Export Location: {OUTPUT_DIR}/exports/FULL_{ISO3}/")
print(f"")
print(f"To view job details in Databricks UI:")
print(f"Workflows ‚Üí Jobs ‚Üí {job_name}")
print("="*60)