# Pipeline Runner



In [1]:
# Ensure we run from the project root so component paths resolve correctly
import os
from pathlib import Path


def _find_project_root(start_dir: Path) -> Path:
    any_markers = {".git", "pyproject.toml", "setup.cfg"}
    required_entries = {"configs", "src", "notebooks"}

    def _looks_like_root(path: Path) -> bool:
        return any((path / marker).exists() for marker in any_markers) and all(
            (path / entry).exists() for entry in required_entries
        )

    for candidate in [start_dir, *start_dir.parents]:
        if _looks_like_root(candidate):
            return candidate

    try:
        for child in start_dir.iterdir():
            if child.is_dir() and _looks_like_root(child):
                return child
    except PermissionError:
        pass

    env_override = os.getenv("AML_PROJECT_ROOT")
    if env_override:
        return Path(env_override).resolve()

    raise RuntimeError(
        "Unable to determine project root. Set AML_PROJECT_ROOT or start inside the repo."
    )


NOTEBOOK_DIR = Path.cwd().resolve()
PROJECT_ROOT = _find_project_root(NOTEBOOK_DIR)
if Path.cwd() != PROJECT_ROOT:
    os.chdir(PROJECT_ROOT)
print(f"Changed working directory to: {PROJECT_ROOT}")


Changed working directory to: /workspaces/customer-churn-prediction-azureml


In [2]:
from __future__ import annotations

import itertools
import json
import os
import time
from pathlib import Path
from typing import Dict, Iterable, List, Optional

import yaml
from dotenv import load_dotenv
from azure.identity import DefaultAzureCredential
from azure.ai.ml import Input, MLClient, load_component

# Allow importing project utilities
import sys
if str(PROJECT_ROOT.resolve()) not in sys.path:
    sys.path.append(str(PROJECT_ROOT.resolve()))

import importlib
import hpo_utils  # noqa: E402
hpo_utils = importlib.reload(hpo_utils)
from azure.ai.ml.sweep import Choice

from src.utils import get_data_asset_config, load_azure_config  # noqa: E402

In [3]:
load_dotenv(PROJECT_ROOT / "config.env")

azure_cfg = load_azure_config()
data_asset_cfg = get_data_asset_config()

credential = DefaultAzureCredential()
ml_client = MLClient(
    credential,
    subscription_id=azure_cfg["subscription_id"],
    resource_group_name=azure_cfg["resource_group"],
    workspace_name=azure_cfg["workspace_name"],
)

workspace_url = f"https://ml.azure.com/?wsid=/subscriptions/{azure_cfg['subscription_id']}/resourcegroups/{azure_cfg['resource_group']}/workspaces/{azure_cfg['workspace_name']}"
print(f"Connected to workspace '{azure_cfg['workspace_name']}'.")



Class DeploymentTemplateOperations: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


Connected to workspace 'churn-ml-workspace'.


In [4]:
SRC_DIR = str((PROJECT_ROOT / "src").resolve())
DEFAULT_ENV = os.getenv("AML_DEFAULT_ENV", "azureml:bank-churn-env:1")
DEFAULT_COMPUTE = os.getenv("AML_COMPUTE_CLUSTER", "cpu-cluster")
PROCESSED_DATA_DATASTORE = os.getenv("AML_PROCESSED_DATA_DATASTORE", "workspaceblobstore")
PROCESSED_DATA_PREFIX = os.getenv("AML_PROCESSED_DATA_PREFIX", "manual-hpo-data")

hpo_cfg = hpo_utils.load_hpo_config()
search_space_cfg = hpo_utils.build_parameter_space(hpo_cfg.get("search_space", {}))

train_config_path = PROJECT_ROOT / "configs" / "train.yaml"
if not train_config_path.exists():
    raise FileNotFoundError(
        f"Expected training config at {train_config_path}. Ensure you're running inside the repo."
    )
with train_config_path.open() as f:
    train_cfg = yaml.safe_load(f) or {}

raw_input = Input(
    type="uri_folder",
    path=f"azureml:{data_asset_cfg['data_asset_name']}:{data_asset_cfg['data_asset_version']}",
    mode="mount",
)

In [5]:
from azure.ai.ml import command, Output
from azure.ai.ml.sweep import BanditPolicy
from typing import Any


_URI_KEYS = (
    "uri",
    "path",
    "value",
    "uri_folder",
    "uri_file",
    "asset_uri",
    "assetUri",
    "location",
)


def _extract_asset_reference(container: Dict[str, Any]) -> Optional[str]:
    asset_name = container.get("asset_name") or container.get("assetName")
    asset_version = container.get("asset_version") or container.get("assetVersion")
    if asset_name and asset_version:
        return f"azureml:{asset_name}:{asset_version}"
    asset_id = container.get("asset_id") or container.get("assetId")
    if asset_id:
        return asset_id
    return None


def _extract_uri_from_mapping(data: Optional[Dict[str, Any]]) -> Optional[str]:
    if not data:
        return None
    if isinstance(data, str):
        return data
    containers = [data, data.get("metadata") or {}]
    for container in containers:
        for key in _URI_KEYS:
            value = container.get(key)
            if value:
                return value
        asset_ref = _extract_asset_reference(container)
        if asset_ref:
            return asset_ref
    return None


def stream_job(job_name: str) -> None:
    """Stream logs for an Azure ML job."""
    print(f"Streaming logs for job: {job_name}")
    ml_client.jobs.stream(job_name)


def _resolve_output_uri(job_output) -> str:
    """Best-effort extraction of the backing URI from a job output."""
    if isinstance(job_output, str):
        return job_output
    for attr in _URI_KEYS:
        value = getattr(job_output, attr, None)
        if value:
            return value
    attr_dict = getattr(job_output, "__dict__", {}) or {}
    asset_ref = _extract_asset_reference(attr_dict)
    if asset_ref:
        return asset_ref
    if hasattr(job_output, "as_dict"):
        data = job_output.as_dict() or {}
        extracted = _extract_uri_from_mapping(data)
        if extracted:
            return extracted
    if hasattr(job_output, "_to_dict"):
        data = job_output._to_dict() or {}
        extracted = _extract_uri_from_mapping(data)
        if extracted:
            return extracted
    if isinstance(job_output, dict):
        extracted = _extract_uri_from_mapping(job_output)
        if extracted:
            return extracted
    raise AttributeError("Unable to resolve output URI from job output metadata.")


def _get_output_uri(job, output_name: str) -> str:
    outputs = getattr(job, "outputs", None) or {}
    if output_name not in outputs:
        raise KeyError(f"Job {job.name} has no output named '{output_name}'.")
    output = outputs[output_name]
    job_dict = job._to_dict() if hasattr(job, "_to_dict") else {}
    try:
        return _resolve_output_uri(output)
    except AttributeError:
        fallback = _extract_uri_from_mapping((job_dict.get("outputs") or {}).get(output_name) or {})
        if fallback:
            return fallback
        fallback = _extract_uri_from_mapping(
            (job_dict.get("job_outputs") or {}).get(output_name) or {}
        )
        if fallback:
            return fallback
        artifact_store = os.getenv("AML_ARTIFACT_DATASTORE", "workspaceartifactstore")
        run_output_path = (
            f"azureml://datastores/{artifact_store}/paths/ExperimentRun/dcid.{job.name}/"
            f"outputs/{output_name}/"
        )
        print(
            f"Unable to locate explicit URI for {job.name}:{output_name}; "
            f"falling back to {run_output_path}. Consider registering the output as a data asset."
        )
        return run_output_path


def _wait_for_job_completion(job_name: str, poll_interval: int = 15):
    """Poll a job until it finishes and return the refreshed job."""
    while True:
        fresh_job = ml_client.jobs.get(job_name)
        status = getattr(fresh_job, "status", None)
        if status in {"Completed", "Finished"}:
            return fresh_job
        if status in {"Failed", "Canceled"}:
            raise RuntimeError(f"Job {job_name} finished with status {status}")
        time.sleep(poll_interval)


def run_data_prep_job(
    *, wait_for_completion: bool = True, stream_logs: bool = True, poll_interval: int = 15
) -> Dict[str, str]:
    """Submit data prep job and return metadata (always job name, plus URI when ready)."""
    output_subdir = f"{PROCESSED_DATA_PREFIX}/{int(time.time())}"
    output_uri = f"azureml://datastores/{PROCESSED_DATA_DATASTORE}/paths/{output_subdir}"
    prep_command = command(
        code=SRC_DIR,
        command="python data_prep.py --input ${{inputs.raw_data}} --output ${{outputs.processed_data}}",
        inputs={"raw_data": raw_input},
        outputs={"processed_data": Output(type="uri_folder", path=output_uri)},
        environment=DEFAULT_ENV,
        compute=DEFAULT_COMPUTE,
        experiment_name="manual-hpo-data-prep",
        display_name="manual-hpo-data-prep",
    )
    returned_job = ml_client.jobs.create_or_update(prep_command)
    result = {"job_name": returned_job.name, "studio_url": returned_job.studio_url}
    print(f"Data prep job submitted: {returned_job.name} | Studio: {returned_job.studio_url}")
    if not wait_for_completion:
        print(
            "Data prep job is running asynchronously; record the job name above and call "
            "`fetch_processed_data_uri(job_name)` once it finishes to get the output URI."
        )
        return result
    if stream_logs:
        stream_job(returned_job.name)
        completed_job = ml_client.jobs.get(returned_job.name)
    else:
        completed_job = _wait_for_job_completion(returned_job.name, poll_interval=poll_interval)
    processed_uri = _get_output_uri(completed_job, "processed_data")
    print(f"Processed data available at: {processed_uri}")
    result["processed_data_uri"] = processed_uri
    return result


def fetch_processed_data_uri(
    job_name: str, output_name: str = "processed_data", poll_interval: int = 15
) -> str:
    """Wait for an existing job to finish and return the processed data URI."""
    completed_job = _wait_for_job_completion(job_name, poll_interval=poll_interval)
    processed_uri = _get_output_uri(completed_job, output_name)
    print(f"Processed data for {job_name} available at: {processed_uri}")
    return processed_uri




In [6]:
# Optional manual override of the previous data prep job name (leave blank to rely on env var).
DATA_PREP_JOB = "quirky_giraffe_bh47lt2h0r"

previous_data_prep_job_name = (
    DATA_PREP_JOB.strip()
    or os.getenv("AML_PREVIOUS_DATA_PREP_JOB", "").strip()
)
if not previous_data_prep_job_name:
    print("No previous data prep job specified; set DATA_PREP_JOB or AML_PREVIOUS_DATA_PREP_JOB.")

In [7]:
# Fetch processed data URI for a previously completed data prep job.
# Set AML_PREVIOUS_DATA_PREP_JOB (or edit the cell) to the job name you want to inspect.
if previous_data_prep_job_name:
    previous_processed_data_uri = fetch_processed_data_uri(previous_data_prep_job_name)
    print(
        f"Processed data URI for {previous_data_prep_job_name}: {previous_processed_data_uri}"
    )
else:
    print(
        "Set the DATA_PREP_JOB env var (or edit the cell) with the job "
        "name to retrieve its processed data URI."
    )


Class AutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class AutoDeleteConditionSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseAutoDeleteSettingSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class IntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class ProtectionLevelSchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.
Class BaseIntellectualPropertySchema: This is an experimental class, and may change at any time. Please see https://aka.ms/azuremlexperimental for more information.


Processed data for quirky_giraffe_bh47lt2h0r available at: azureml://datastores/workspaceblobstore/paths/manual-hpo-data/1763391399
Processed data URI for quirky_giraffe_bh47lt2h0r: azureml://datastores/workspaceblobstore/paths/manual-hpo-data/1763391399


In [115]:
DATA_PREP_ASYNC = os.getenv("AML_DATA_PREP_ASYNC", "false").lower() in {"true", "1", "yes"}

data_prep_result = run_data_prep_job(
    wait_for_completion=not DATA_PREP_ASYNC,
    stream_logs=not DATA_PREP_ASYNC,
)

data_prep_job_name = data_prep_result["job_name"]
processed_data_uri = data_prep_result.get("processed_data_uri")
if not processed_data_uri:
    print(f"Waiting for processed data from job {data_prep_job_name}...")
    processed_data_uri = fetch_processed_data_uri(data_prep_job_name)

print(f"Processed data URI: {processed_data_uri}")

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored


Data prep job submitted: quirky_giraffe_bh47lt2h0r | Studio: https://ml.azure.com/runs/quirky_giraffe_bh47lt2h0r?wsid=/subscriptions/a23fa87c-802c-4fdf-9e59-e3d7969bcf31/resourcegroups/rg-churn-ml-project-2025-11-15/workspaces/churn-ml-workspace&tid=e7572e92-7aee-4713-a3c4-ba64888ad45f
Streaming logs for job: quirky_giraffe_bh47lt2h0r
RunId: quirky_giraffe_bh47lt2h0r
Web View: https://ml.azure.com/runs/quirky_giraffe_bh47lt2h0r?wsid=/subscriptions/a23fa87c-802c-4fdf-9e59-e3d7969bcf31/resourcegroups/rg-churn-ml-project-2025-11-15/workspaces/churn-ml-workspace

Execution Summary
RunId: quirky_giraffe_bh47lt2h0r
Web View: https://ml.azure.com/runs/quirky_giraffe_bh47lt2h0r?wsid=/subscriptions/a23fa87c-802c-4fdf-9e59-e3d7969bcf31/resourcegroups/rg-churn-ml-project-2025-11-15/workspaces/churn-ml-workspace

Processed data available at: azureml://datastores/workspaceblobstore/paths/manual-hpo-data/1763391399
Processed data URI: azureml://datastores/workspaceblobstore/paths/manual-hpo-data/176

In [8]:
# Determine which processed data URI to feed into training.
try:
    _latest_processed_data_uri = processed_data_uri  # from synchronous prep run
except NameError:
    _latest_processed_data_uri = None

training_data_uri = _latest_processed_data_uri or locals().get("previous_processed_data_uri")
if not training_data_uri:
    raise RuntimeError(
        "No processed data URI available. Run the data prep job above or set DATA_PREP_JOB/AML_PREVIOUS_DATA_PREP_JOB."
    )

print(f"Training jobs will consume processed data from: {training_data_uri}")


Training jobs will consume processed data from: azureml://datastores/workspaceblobstore/paths/manual-hpo-data/1763391399


In [27]:
# Configure sweep jobs per model (no cross-model mixing).
budget_cfg = hpo_cfg.get("budget", {})
timeouts_cfg = hpo_cfg.get("timeouts", {})
early_cfg = hpo_cfg.get("early_stopping", {})

sweep_jobs = {}
for model_name in search_space_cfg.get("model_types", []):
    model_space = search_space_cfg.get(model_name)
    if not model_space:
        print(f"Skipping sweep for {model_name}: no hyperparameters defined in configs/hpo.yaml.")
        continue

    command_segments = [
        "python run_sweep_trial.py",
        "--data ${{inputs.processed_data}}",
        f"--model-type {model_name}",
        "--model-artifact-dir ${{outputs.model_output}}",
    ]

    base_command_inputs = {
        "processed_data": Input(type="uri_folder", path=training_data_uri),
    }

    from azure.ai.ml.sweep import Choice

    def _infer_literal_type(value):
        if isinstance(value, bool):
            return "boolean"
        if isinstance(value, int):
            return "integer"
        if isinstance(value, float):
            return "number"
        return "string"

    sweep_search_space = {}
    hyperparam_names = []
    for hp_name, hp_values in model_space.items():
        prefixed_name = f"{model_name}_{hp_name}"
        hyperparam_names.append(prefixed_name)
        command_segments.append(f"--{prefixed_name} ${{{{search_space.{prefixed_name}}}}}")
        sweep_search_space[prefixed_name] = Choice(values=hp_values)

    sweep_command = " ".join(command_segments)

    base_training_command = command(
        code=SRC_DIR,
        command=sweep_command,
        inputs=base_command_inputs,
        outputs={"model_output": Output(type="uri_folder")},
        environment=DEFAULT_ENV,
        compute=DEFAULT_COMPUTE,
        display_name=f"manual-hpo-sweep-trial-{model_name}",
        experiment_name=hpo_cfg.get("experiment_name", "manual-hpo-sweep"),
    )

    early_policy = None
    if early_cfg.get("enabled"):
        policy_name = (early_cfg.get("policy", "bandit") or "bandit").lower()
        if policy_name != "bandit":
            raise ValueError(f"Unsupported early stopping policy: {policy_name}")
        eval_interval = max(1, int(early_cfg.get("evaluation_interval", 2)))
        delay_eval = max(1, int(early_cfg.get("delay_evaluation", eval_interval)))
        slack_factor = early_cfg.get("slack_factor")
        slack_amount = early_cfg.get("slack_amount")
        early_policy = BanditPolicy(
            evaluation_interval=eval_interval,
            delay_evaluation=delay_eval,
            slack_factor=slack_factor,
            slack_amount=slack_amount,
        )

    sweep_kwargs = {
        "primary_metric": hpo_cfg.get("metric", "f1"),
        "goal": "Maximize" if hpo_cfg.get("mode", "max").lower() == "max" else "Minimize",
        "sampling_algorithm": hpo_cfg.get("sampling_algorithm", "random"),
        "search_space": sweep_search_space,
        "early_termination_policy": early_policy,
    }
    if budget_cfg.get("max_trials"):
        sweep_kwargs["max_total_trials"] = budget_cfg["max_trials"]
    if budget_cfg.get("max_concurrent"):
        sweep_kwargs["max_concurrent_trials"] = min(budget_cfg["max_concurrent"], sweep_kwargs.get("max_total_trials", budget_cfg.get("max_concurrent")))
    if timeouts_cfg.get("total_minutes"):
        sweep_kwargs["timeout"] = int(timeouts_cfg["total_minutes"]) * 60
    if timeouts_cfg.get("trial_minutes"):
        sweep_kwargs["trial_timeout"] = int(timeouts_cfg["trial_minutes"]) * 60

    sweep_job = base_training_command.sweep(**sweep_kwargs)
    sweep_job.display_name = f"{hpo_cfg.get('sweep_display_name', 'manual-hpo-sweep')}-{model_name}"
    sweep_job.experiment_name = hpo_cfg.get("experiment_name", "manual-hpo-sweep")
    sweep_jobs[model_name] = sweep_job

    print(f"Configured sweep for {model_name}:")
    print(f"  metric: {sweep_kwargs['primary_metric']} ({sweep_kwargs['goal']})")
    print(f"  sampling: {sweep_kwargs['sampling_algorithm']}")
    print(
        f"  limits: max_total_trials={sweep_kwargs.get('max_total_trials', 'auto')} | "
        f"max_concurrent={sweep_kwargs.get('max_concurrent_trials', 'auto')}"
    )
    print(
        f"  timeouts: total={timeouts_cfg.get('total_minutes', 'inf')} min | "
        f"trial={timeouts_cfg.get('trial_minutes', 'inf')} min"
    )
    print(f"  hyperparameters: {', '.join(hyperparam_names)}")
    print("-")

if not sweep_jobs:
    raise RuntimeError("No sweep jobs were created. Check configs/hpo.yaml::search_space.")

Configured sweep for rf:
  metric: f1 (Maximize)
  sampling: random
  limits: max_total_trials=2 | max_concurrent=2
  timeouts: total=180 min | trial=45 min
  hyperparameters: rf_n_estimators, rf_max_depth, rf_min_samples_split, rf_min_samples_leaf
-
Configured sweep for xgboost:
  metric: f1 (Maximize)
  sampling: random
  limits: max_total_trials=2 | max_concurrent=2
  timeouts: total=180 min | trial=45 min
  hyperparameters: xgboost_n_estimators, xgboost_max_depth, xgboost_learning_rate, xgboost_subsample, xgboost_colsample_bytree
-


In [28]:
sweep_submissions = {}
for model_name, sweep_job in sweep_jobs.items():
    submission = ml_client.jobs.create_or_update(sweep_job)
    sweep_submissions[model_name] = submission
    print(f"Sweep job submitted for {model_name}!")
    print(f"  Name      : {submission.name}")
    print(f"  Status    : {submission.status}")
    print(f"  Studio URL: {submission.studio_url}")
    print("-")


[32mUploading src (0.24 MBs): 100%|██████████| 242621/242621 [00:03<00:00, 62082.26it/s] 
[39m

pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored


Sweep job submitted for rf!
  Name      : shy_lock_qdvrzwnvr2
  Status    : Running
  Studio URL: https://ml.azure.com/runs/shy_lock_qdvrzwnvr2?wsid=/subscriptions/a23fa87c-802c-4fdf-9e59-e3d7969bcf31/resourcegroups/rg-churn-ml-project-2025-11-15/workspaces/churn-ml-workspace&tid=e7572e92-7aee-4713-a3c4-ba64888ad45f
-


pathOnCompute is not a known attribute of class <class 'azure.ai.ml._restclient.v2023_04_01_preview.models._models_py3.UriFolderJobOutput'> and will be ignored


Sweep job submitted for xgboost!
  Name      : gentle_ear_wx2w5x8k5t
  Status    : Running
  Studio URL: https://ml.azure.com/runs/gentle_ear_wx2w5x8k5t?wsid=/subscriptions/a23fa87c-802c-4fdf-9e59-e3d7969bcf31/resourcegroups/rg-churn-ml-project-2025-11-15/workspaces/churn-ml-workspace&tid=e7572e92-7aee-4713-a3c4-ba64888ad45f
-


## Next steps

- For each entry in `sweep_submissions`, use `ml_client.jobs.stream(<job.name>)` (or the Studio link) to monitor progress.
- After completion, inspect each sweep job in Azure ML Studio to compare child trials and metrics per model.
- Run `extract_best_params.py --parent-run-id <sweep_job_name>` for whichever sweep produced the best metrics.

