# **Notebook Job Orchestrator Pipeline**

This pipeline automates **batch execution** of Jupyter notebooks with **parameterized experiments** via **Papermill**, supporting **workspace cleanup**, **dynamic job queues**, and **parallel processing**.

---

## **Pipeline Overview**

### **1. Configuration & Cleanup**
- **`Config`** holds all parameters: `start_fresh`, `execution_mode`, `max_concurrent_jobs`, `template_notebook`, and sub-configs for **simple**, **sensitivity**, and **manifest** modes.
- **`prompt_and_clean_workspace()`** optionally wipes output directory and regenerates manifest.

### **2. Job Queue Construction**
- **Simple Mode**: `create_simple_job_queue()` builds jobs per dataset/well.
- **Sensitivity Mode**: `create_sensitivity_job_queue()` generates cartesian product of window sizes & horizons.
- **Manifest Mode**: `create_manifest_job_queue()` loads pending jobs from CSV.

### **3. Orchestration & Monitoring**
- **`generate_job_filenames()`** creates unique notebook & log paths.
- **`main()`** launches up to `max_concurrent_jobs` Papermill processes, updates manifest statuses, and throttles launches.
- Continuously polls child processes, logs completion status, and updates manifest.

### **4. Final Cleanup**
- Ensures all child processes are terminated on exit.

---

## **Key Functions**

### 🔹 `prompt_and_clean_workspace(clean_dir, manifest_path, generator_script)`
Prompts user for destructive cleanup and regenerates the manifest CSV.

### 🔹 `generate_job_filenames(job, mode, output_dir)`
Returns a job ID and file paths for notebook/output and log based on mode.

### 🔹 `create_*_job_queue(...)`
Builds the list of parameter dictionaries for each execution mode.

### 🔹 `main()`
Coordinates configuration, cleanup, queue creation, Papermill launches, monitoring, and final cleanup.

---

## **Config Parameters**

| Parameter               | Description                                            | Default                           |
|-------------------------|--------------------------------------------------------|-----------------------------------|
| `start_fresh`           | Wipe outputs & regenerate manifest                     | `False`                           |
| `execution_mode`        | Which queue to run: `simple` \| `sensitivity` \| `manifest` | `simple`                          |
| `max_concurrent_jobs`   | Maximum parallel Papermill jobs                        | `12`                              |
| `template_notebook`     | Path to the notebook template                          | `"base_pipeline.ipynb"`           |
| `simple.datasets_filter`| List of datasets for simple mode                       | `["UNISIM","VOLVE","OPSD"]`       |
| `sensitivity.*`         | Window sizes, horizons, filters & architecture         | see defaults                      |
| `manifest.path`         | CSV manifest location                                  | `"../../output_manifest/manifest.csv"` |
| `manifest.output_notebooks_dir` | Output directory for generated notebooks       | `"../output_notebooks"`           |

---

## **Why Use This Pipeline?**
✅ **Automated Notebook Execution**  
✅ **Dynamic Job Queues for Any Mode**  
✅ **Parallel Processing with Throttling**  
✅ **Built-in Cleanup & Manifest Tracking**  
✅ **Modular, Configurable & Extensible**  

In [1]:
"""
Main orchestrator for executing notebook jobs.
"""

# --- Environment configuration ---
import os
import warnings
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'
os.environ['ABSL_LOG_LEVEL'] = '3'
warnings.filterwarnings('ignore', category=UserWarning, module='tensorflow_addons')

import tensorflow as tf
tf.get_logger().setLevel('ERROR')

# --- Standard libraries ---
import time
import subprocess
import shutil
import sys

# --- System and file management ---
from pathlib import Path
from typing import List, Dict, Any, Optional

# --- Process management ---
import psutil

# --- Data handling ---
import pandas as pd
import numpy as np
import yaml

# --- Visualization ---
import plotly.graph_objects as go

# --- Project-specific: pipeline configuration ---
from forecast_pipeline.config import (
    Config,
    SimpleConfig,
    SensitivityConfig,
    ManifestConfig,
    ExecutionMode,
    AnalysisConfig,
    AggregationMode,
    validate_config
)

# --- Project-specific: utility functions ---
from utils.utilities import (
    prompt_and_clean_workspace,
    generate_job_filenames,
    create_sensitivity_job_queue,
    create_simple_job_queue,
    create_manifest_job_queue,
    run_simple_aggregation,
    run_sensitivity_analysis,
    run_manifest_aggregation,
    run_sensitivity_drilldown_analysis,
    create_performance_dashboard_from_pivot,
    load_and_prepare_data,
    create_annotated_heatmap
)

In [2]:
def main() -> None:
    """Entry point for orchestrator."""
    # Instantiate configuration with all default settings visible
    config = Config(
        start_fresh=False,
        execution_mode=ExecutionMode.SIMPLE,
        max_concurrent_jobs=17,
        template_notebook="base_pipeline.ipynb",
        simple=SimpleConfig(
            datasets_filter=["UNISIM", "VOLVE", "OPSD"]
        ),
        sensitivity=SensitivityConfig(
            window_sizes=[3, 7, 14, 21],
            forecast_horizons=[14, 28, 56, 70, 94, 112],
            datasets_filter=["UNISIM", "VOLVE", "OPSD"],
            architecture="Generic"
        ),
        manifest=ManifestConfig(
            path=Path("../../output_manifest/manifest.csv"),
            output_notebooks_dir=Path("../output_notebooks")
        )
    )
    
    print(f"START_FRESH: {config.start_fresh}")
    print(f"EXECUTION_MODE: {config.execution_mode.value}")
    print(f"MAX_CONCURRENT_JOBS: {config.max_concurrent_jobs}")
    validate_config(config)

    if config.start_fresh:
        success = prompt_and_clean_workspace(
            config.manifest.output_notebooks_dir,
            config.manifest.path,
            config.generate_manifest_script
        )
        if not success:
            print("\nMain execution cancelled.")
            return
        print("\nCleanup and preparation complete. Proceeding with orchestration...")
    else:
        print("INFO: 'Start Fresh' mode disabled. Using existing results and manifest.")

    output_dir = config.manifest.output_notebooks_dir
    output_dir.mkdir(parents=True, exist_ok=True)

    manifest_df = None
    if config.execution_mode == ExecutionMode.MANIFEST:
        job_queue = create_manifest_job_queue(config.manifest.path)
        manifest_df = pd.read_csv(config.manifest.path)
    elif config.execution_mode == ExecutionMode.SENSITIVITY:
        job_queue = create_sensitivity_job_queue(
            config.sensitivity.datasets_filter,
            config.sensitivity.window_sizes,
            config.sensitivity.forecast_horizons
        )
    else:
        job_queue = create_simple_job_queue(config.simple.datasets_filter)

    if not job_queue:
        print("No jobs to execute. Exiting.")
        return

    print(f"🚀 Starting orchestration of {len(job_queue)} jobs in mode '{config.execution_mode.value}'...")
    print(f"⚙️  Maximum of {config.max_concurrent_jobs} concurrent processes.")
    print("-" * 60)

    active_procs: Dict[subprocess.Popen, Dict[str, Any]] = {}
    main_proc = psutil.Process(os.getpid())

    try:
        while job_queue or active_procs:
            # Launch new jobs up to concurrency limit
            while len(active_procs) < config.max_concurrent_jobs and job_queue:
                job = job_queue.pop(0)
                job_id, nb_path, log_path = generate_job_filenames(
                    job, config.execution_mode, output_dir
                )
                cmd = [
                    "papermill",
                    config.template_notebook,
                    str(nb_path),
                    "--log-output",
                    "--progress-bar"
                ]
                for key, val in job.items():
                    if key in {'dataset', 'well', 'architecture_id', 'hyperparam_id', 'window_size', 'forecast_horizon'}:
                        cmd.extend(["-p", key, str(val)])

                print(f"✨ [LAUNCH] Launching job: {job_id}")
                if manifest_df is not None:
                    manifest_df.loc[manifest_df['job_id'] == job['job_id'], ['status', 'started_at']] = [
                        'running', pd.Timestamp.now()
                    ]
                    manifest_df.to_csv(config.manifest.path, index=False)

                log_file = open(log_path, "w")
                proc = subprocess.Popen(
                    cmd,
                    stdout=log_file,
                    stderr=subprocess.STDOUT,
                    preexec_fn=os.setsid if os.name != 'nt' else None
                )
                active_procs[proc] = job
                time.sleep(3)

            # Check for finished processes
            for proc, job in list(active_procs.items()):
                if proc.poll() is not None:
                    code = proc.returncode
                    status = "completed" if code == 0 else "failed"
                    jid, _, _ = generate_job_filenames(job, config.execution_mode, output_dir)
                    print(f"🏁 [FINISH] Job {jid} finished with status: {status.upper()} (code {code})")

                    if manifest_df is not None:
                        manifest_df.loc[manifest_df['job_id'] == job['job_id'], ['status', 'finished_at']] = [
                            status, pd.Timestamp.now()
                        ]
                        manifest_df.to_csv(config.manifest.path, index=False)

                    if proc.stdout and not proc.stdout.closed:
                        proc.stdout.close()
                    del active_procs[proc]

            time.sleep(2)
    finally:
        print("\n🧹 Cleaning up child processes...")
        children = main_proc.children(recursive=True)
        if children:
            for child in children:
                try:
                    child.terminate()
                except psutil.NoSuchProcess:
                    continue
            gone, alive = psutil.wait_procs(children, timeout=3)
            for p in alive:
                try:
                    p.kill()
                except psutil.NoSuchProcess:
                    continue
        print("🎉 Orchestration complete.")


if __name__ == "__main__":
    main()


START_FRESH: False
EXECUTION_MODE: simple
MAX_CONCURRENT_JOBS: 17
INFO: 'Start Fresh' mode disabled. Using existing results and manifest.
INFO: Creating job queue in 'simple' mode...
🚀 Starting orchestration of 17 jobs in mode 'simple'...
⚙️  Maximum of 17 concurrent processes.
------------------------------------------------------------
✨ [LAUNCH] Launching job: UNISIM_Prod-1
✨ [LAUNCH] Launching job: UNISIM_Prod-2
✨ [LAUNCH] Launching job: UNISIM_Prod-3
✨ [LAUNCH] Launching job: UNISIM_Prod-4
✨ [LAUNCH] Launching job: UNISIM_Prod-5
✨ [LAUNCH] Launching job: UNISIM_Prod-6
✨ [LAUNCH] Launching job: UNISIM_Prod-7
✨ [LAUNCH] Launching job: UNISIM_Prod-8
✨ [LAUNCH] Launching job: UNISIM_Prod-9
✨ [LAUNCH] Launching job: UNISIM_Prod-10
✨ [LAUNCH] Launching job: VOLVE_15_9-F-14
✨ [LAUNCH] Launching job: VOLVE_15_9-F-12
✨ [LAUNCH] Launching job: VOLVE_15_9-F-11
✨ [LAUNCH] Launching job: VOLVE_15_9-F-15 D
✨ [LAUNCH] Launching job: OPSD_solar
✨ [LAUNCH] Launching job: OPSD_wind
✨ [LAUNCH] Launc

In [3]:
ARCHITECTURE_ALIASES: Dict[str, str] = {
    "Model1_1xConv": "M1: 1xConv",
    "Model2_2xConv": "M2: 2xConv",
    "Model3_2xConv_BiLSTM": "M3: Conv+LSTM",
    "Model4_Transformer_BiLSTM": "M4: Trans+LSTM",
    "Model5_Transformer_2xConv": "M5: Trans+Conv",
    "Model6_Transformer_Conv_BiLSTM": "M6: Trans+Conv+LSTM",
}

HYPERPARAM_ALIASES: Dict[str, str] = {
    "hp_small_fast": "Small & Fast",
    "hp_medium_balanced": "Medium (Balanced)",
    "hp_large_robust": "Large & Robust",
    "hp_stable_regularized": "Stable & Regularized",
    "hp_wide_and_shallow": "Wide & Shallow",
}

HYPERPARAM_DESCRIPTIONS: Dict[str, str] = {
    "Small & Fast": "Conv: 64, BiLSTM: 32, FF_DIM: 64, Dropout: 0.2",
    "Medium (Balanced)": "Conv: 128,64, BiLSTM: 32, FF_DIM: 128, Dropout: 0.3",
    "Large & Robust": "Conv: 256, BiLSTM: 128, FF_DIM: 256, Dropout: 0.4",
    "Stable & Regularized": "Conv: 128, BiLSTM: 64, FF_DIM: 128, Dropout: 0.5",
    "Wide & Shallow": "Conv: 512, BiLSTM: 256, FF_DIM: 128, Dropout: 0.25",
}

# ─── ANALYSIS CONFIGURATION ───────────────────────────────────────────────────
config = AnalysisConfig(
    aggregation_mode=AggregationMode.SIMPLE,       # simple | sensitivity | manifest
    exclude_datasets=["UNISIM_IV"],                    # datasets to exclude
    top_n_configs=3,                                   # how many top configs to detail
    project_root=Path.cwd().parent.parent,             # project root
    custom_dataset_order=["VOLVE", "UNISIM", "OPSD"],  # display order
    table_theme="dark"                                 # "dark" | "minimal"
)

LEADERBOARD_CSV_PATH = config.project_root / "output_manifest" / "experiment_leaderboard.csv"

def main() -> None:
    base_dir = config.output_notebooks_dir
    theme = config.table_theme
    drilldown_analysis = False

    

    if config.aggregation_mode == AggregationMode.SENSITIVITY:
        print("🏆 SENSITIVITY ANALYSIS (Window vs. Horizon) 🏆")
        heatmap = run_sensitivity_analysis(base_dir)
        if heatmap is not None:
            create_performance_dashboard_from_pivot(
                heatmap,
                "Average SMAPE (%): Window vs Horizon"
            )

        if drilldown_analysis:
            run_sensitivity_drilldown_analysis(
                base_dir, # Passa a raiz, a função sabe encontrar a pasta correta
                exclude_datasets=config.exclude_datasets
            )
            

    elif config.aggregation_mode == AggregationMode.SIMPLE:
        print("🏆 SIMPLE MODE AGGREGATION 🏆")
        run_simple_aggregation(
            base_dir,
            config.custom_dataset_order,
            config.exclude_datasets,
            theme=theme
        )

    else:
        print("🏆 MANIFEST MODE AGGREGATION 🏆")
        run_manifest_aggregation(
            base_dir,
            config.manifest_path,
            config.config_dir,
            config.top_n_configs,
            config.exclude_datasets,
            theme=theme
        )
        df = load_and_prepare_data(LEADERBOARD_CSV_PATH, ARCHITECTURE_ALIASES, HYPERPARAM_ALIASES)
        if df is not None:
            create_annotated_heatmap(df, ARCHITECTURE_ALIASES, HYPERPARAM_DESCRIPTIONS)

if __name__ == "__main__":
    main()


🏆 SIMPLE MODE AGGREGATION 🏆
Excluding datasets: ['UNISIM_IV']


Dataset,Well,MAE,SMAPE
VOLVE,15_9-F-11,17726.91,2.87%
VOLVE,15_9-F-12,16386.5,0.73%
VOLVE,15_9-F-14,12812.56,0.70%
VOLVE,15_9-F-15 D,2119.35,2.17%
UNISIM,Prod-1,5023.27,0.28%
UNISIM,Prod-10,4222.18,0.12%
UNISIM,Prod-2,4302.17,0.21%
UNISIM,Prod-3,6788.1,0.38%
UNISIM,Prod-4,4534.56,0.17%
UNISIM,Prod-5,3909.52,0.12%


Saved detailed results for 'Kalman' to 'analysis_results/results_kalman_simple.csv'


Dataset,Well,MAE,SMAPE
VOLVE,15_9-F-11,19031.25,3.08%
VOLVE,15_9-F-12,20632.07,0.84%
VOLVE,15_9-F-14,16014.36,0.82%
VOLVE,15_9-F-15 D,2389.26,2.43%
UNISIM,Prod-1,9903.42,0.47%
UNISIM,Prod-10,8811.24,0.24%
UNISIM,Prod-2,9749.27,0.40%
UNISIM,Prod-3,12734.11,0.61%
UNISIM,Prod-4,10818.69,0.35%
UNISIM,Prod-5,8710.67,0.27%


Saved detailed results for 'No Filter' to 'analysis_results/results_no_filter_simple.csv'
