# Module 2: Creating CML Jobs Programmatically

## Overview

In Module 1, you built a complete ML pipeline using Python scripts executed manually. In Module 2, we'll automate this pipeline using **CML Jobs** - scheduled or triggered tasks that run your scripts at scale.

This notebook teaches you how to **create jobs programmatically** using the CML API. Instead of clicking through the UI, you'll write code to define, configure, and manage jobs. This approach enables:

- **Automation**: Create multiple jobs with consistent configurations
- **Reproducibility**: Version control your job definitions
- **Integration**: Orchestrate complex workflows programmatically
- **Scalability**: Deploy jobs across projects and environments
- **GitOps**: Store job definitions in version control

---

## Learning Goals

By the end of this notebook, you'll understand:

1. ✅ How to authenticate with CML API
2. ✅ How to query available ML runtimes
3. ✅ How to retrieve project metadata
4. ✅ How to define a job using `cmlapi.CreateJobRequest`
5. ✅ How to create jobs programmatically
6. ✅ How to configure job dependencies

**Note**: We will **create** jobs but not **run** them in this notebook. Running jobs will be covered separately.

## Module 2 Job Pipeline Structure

This notebook creates **2 jobs** that work together in a monitoring pipeline:

```
Job 1: Prepare Artificial Data
    ↓
    └─→ Creates training dataset

Job 2: Monitor Pipeline
    ├─ Period 0: Get predictions → Load ground truth → Check model
    ├─ Period 1: Get predictions → Load ground truth → Check model
    ├─ Period 2: Get predictions → Load ground truth → Check model
    └─ ... (repeats for all periods)
```

Key difference from previous design:
- **Old**: 4 jobs with manual state passing and job chaining
- **New**: 2 jobs with self-contained period management in Job 2

---

## Section 1: Setup and Authentication

### Step 1.1: Import Required Libraries

We need key libraries:
- **`cmlapi`**: Official CML Python API client for programmatic access
- **`os`**: To access environment variables set by CML
- **`json`**: For parsing runtime filters

In [None]:
import os
import cmlapi
import json

print("✓ Libraries imported successfully")

### Step 1.2: Create CML API Client

The CML API client authenticates your requests using credentials automatically provided by CML:

- **`CDSW_API_URL`**: The CML API endpoint for your workspace
- **`CDSW_APIV2_KEY`**: Your authentication token

We remove `/api/v1` from the URL since the client handles API versioning internally.

In [None]:
# Create the CML API client
# The client uses environment variables automatically set by CML for authentication
client = cmlapi.default_client(
    url=os.getenv("CDSW_API_URL").replace("/api/v1", ""),
    cml_api_key=os.getenv("CDSW_APIV2_KEY")
)

print("✓ CML API client created successfully")
print(f"  API URL: {os.getenv('CDSW_API_URL')}")

### Step 1.3: Retrieve Available ML Runtimes

Jobs execute in **ML Runtimes** - pre-configured Docker containers with specific Python versions, libraries, and GPU support. We need to query available runtimes that match our requirements.

In this example, we filter for:
- **kernel**: "Python 3.10" - Python version
- **edition**: "Standard" - Standard edition
- **editor**: "PBJ Workbench" - JupyterLab interface for development

**Important Notes:**
- Runtimes from **2024.05 onwards** include ML packages (pandas, numpy, scikit-learn, etc.)
- We select the **NEWEST runtime** to ensure all dependencies are included
- Older runtimes (2023.x) lack pandas and will cause `ModuleNotFoundError` in jobs

In [None]:
# Query available runtimes matching our criteria
# The search_filter is a JSON string with our runtime requirements
available_runtimes = client.list_runtimes(
    search_filter=json.dumps({
        "kernel": "Python 3.10",
        "edition": "Standard",
        "editor": "PBJ Workbench"
    })
)
print(f"✓ Found {len(available_runtimes.runtimes)} available runtime(s)\n")

# Display all available runtimes
for i, runtime in enumerate(available_runtimes.runtimes, 1):
    print(f"Runtime {i}:")
    print(f"  Version: {runtime.full_version}")
    print(f"  Kernel: {runtime.kernel}")
    print(f"  Edition: {runtime.edition}")
    print(f"  Image: {runtime.image_identifier}")
    print()

### Step 1.4: Select and Store the Latest Runtime

We'll use the **first (most recent) runtime** from our filtered list. This runtime identifier will be passed to job creation calls.

In [None]:
# Select the latest runtime (sort by version, newest first)
sorted_runtimes = sorted(available_runtimes.runtimes, key=lambda r: r.full_version, reverse=True)
JOB_IMAGE_ML_RUNTIME = sorted_runtimes[0].image_identifier

print(f"✓ Selected Runtime (NEWEST):")
print(f"  Version: {sorted_runtimes[0].full_version}")
print(f"  Image: {JOB_IMAGE_ML_RUNTIME}")
print(f"  Note: Only 2024+ runtimes include ML packages (pandas, numpy, scikit-learn)")

# Verify this is a recent runtime with pandas
if "2024" not in sorted_runtimes[0].full_version:
    print(f"\n⚠️  WARNING: Selected runtime {sorted_runtimes[0].full_version} may not have pandas!")
    print(f"  Recommended: Use 2024.05+ or 2024.10+")
else:
    print(f"  ✓ Runtime includes pandas and ML libraries")

# Store in environment variable for use in job definitions
os.environ['JOB_IMAGE_ML_RUNTIME'] = JOB_IMAGE_ML_RUNTIME

### Step 1.5: Retrieve Project Metadata

Jobs are created within a specific CML project. We retrieve the current project's metadata to get its ID and other information needed for job creation.

In [None]:
# Get metadata for the current CML project
# CDSW_PROJECT_ID is automatically set when you run code in a CML project
project = client.get_project(
    project_id=os.getenv("CDSW_PROJECT_ID")
)

print(f"✓ Project Retrieved:")
print(f"  Project ID: {project.id}")
print(f"  Project Name: {project.name}")
print(f"  Description: {project.description}")

---

## Section 2: Job Creation - The Monitoring Pipeline

### Understanding Job Definitions

A CML job is defined using `cmlapi.CreateJobRequest` with these key parameters:

| Parameter | Purpose | Example |
|-----------|---------|----------|
| `project_id` | Which project to create the job in | `project.id` |
| `name` | Human-readable job name | `"Monitor Pipeline"` |
| `script` | Path to Python script to run | `"03_monitoring_pipeline.py"` |
| `cpu` | CPU cores to allocate | `2` |
| `memory` | RAM in GB to allocate | `4` |
| `runtime_identifier` | ML Runtime image to use | `JOB_IMAGE_ML_RUNTIME` |
| `environment` | Environment variables for the job | `{"BATCH_SIZE": "250"}` |

### Step 2.1: Create Job 1 - Prepare Artificial Data

This is the entry point to the monitoring pipeline. It prepares synthetic data for the workflow.

### Step 2.1: Create Job 02 - Prepare Artificial Data

This is the entry point to the monitoring pipeline. It prepares synthetic data for the workflow.
The notebook will then trigger Job 03.1 (Get Predictions) after completion.

In [None]:
# Define Job 1: Prepare Artificial Data
# This is the entry point that creates the synthetic dataset

job_body_prepare_data = cmlapi.CreateJobRequest(
    project_id=project.id,
    name="Mod 2 Job 1: Prepare Artificial Data",
    script="module2/02_prepare_artificial_data.py",
    cpu=1,
    memory=2,
    runtime_identifier=os.getenv('JOB_IMAGE_ML_RUNTIME'),
)

print("Job 1 Definition - Prepare Artificial Data:")
print(f"  Script: {job_body_prepare_data.script}")
print(f"  CPU: {job_body_prepare_data.cpu} cores")
print(f"  Memory: {job_body_prepare_data.memory} GB")

In [None]:
# Create Job 1 in CML
# This registers the job with CML but does NOT run it
job_1_prepare_data = client.create_job(
    body=job_body_prepare_data,
    project_id=str(project.id)
)

print("\n✓ Job 1 Created Successfully!")
print(f"  Job ID: {job_1_prepare_data.id}")
print(f"  Job Name: {job_1_prepare_data.name}")
print(f"  Script: {job_1_prepare_data.script}")

## Section 2: Job Creation - The Monitoring Pipeline

Now we'll create the 2 CML jobs that power the monitoring system.

In [None]:
# Define Job 2: Monitor Pipeline
# This is the main monitoring job that processes all periods sequentially

job_body_monitor_pipeline = cmlapi.CreateJobRequest(
    project_id=project.id,
    name="Mod 2 Job 2: Monitor Pipeline",
    script="module2/03_monitoring_pipeline.py",
    cpu=2,
    memory=4,
    runtime_identifier=os.getenv('JOB_IMAGE_ML_RUNTIME'),
)

print("Job 2 Definition - Monitor Pipeline:")
print(f"  Script: {job_body_monitor_pipeline.script}")
print(f"  CPU: {job_body_monitor_pipeline.cpu} cores")
print(f"  Memory: {job_body_monitor_pipeline.memory} GB")

In [None]:
# Create Job 2 in CML
job_2_monitor_pipeline = client.create_job(
    body=job_body_monitor_pipeline,
    project_id=str(project.id)
)

print("\n✓ Job 2 Created Successfully!")
print(f"  Job ID: {job_2_monitor_pipeline.id}")
print(f"  Job Name: {job_2_monitor_pipeline.name}")
print(f"  Script: {job_2_monitor_pipeline.script}")

### Step 2.2: Create Job 2 - Monitor Pipeline

This job processes all periods sequentially in a single execution.
It automatically handles period state management and degradation detection.

In [None]:
# Note: Jobs 3 and 4 from the original design have been consolidated into Job 2
# The Monitor Pipeline job (03_monitoring_pipeline.py) handles:
# - Getting predictions (original Job 3.1)
# - Loading ground truth (original Job 3.2)  
# - Checking model (original Job 3.3)
# All in a single, self-contained execution per run

print("✓ Job 1 and Job 2 have been created successfully!")
print("  The previous 4-job design has been consolidated into 2 jobs")
print("  Jobs 3, 4, and 5 are no longer needed")

In [None]:
# This cell is intentionally empty
# Jobs 3 and 4 from the original design are no longer created separately
pass

### Step 2.4: Create Job 03.3 - Check Model

This job validates model accuracy for the current period and orchestrates the monitoring pipeline.
It is triggered by Job 03.2 after ground truth labels are loaded.

In [None]:
# This cell is intentionally empty
pass

In [None]:
# This cell is intentionally empty
pass

---

## Section 3: Job Orchestration and Summary

### Understanding the Job Pipeline

The four jobs we created work together in an automated monitoring pipeline:

**Pipeline Flow:**

```
Job 02: Prepare Artificial Data
   (Entry point - prepares synthetic data)
        ↓
   job_02_prepare_data.py calls:
   client.create_job_run(..., job_id=job_031_get_predictions.id)
        ↓
Job 03.1: Get Predictions
   (Processes predictions for current period in batches)
        ↓
   job_031_get_predictions.py calls:
   client.create_job_run(..., job_id=job_032_load_ground_truth.id)
        ↓
Job 03.2: Load Ground Truth
   (Loads and processes ground truth labels)
        ↓
   job_032_load_ground_truth.py calls:
   client.create_job_run(..., job_id=job_033_check_model.id)
        ↓
Job 03.3: Check Model
   (Validates accuracy and detects degradation)
        ↓
   Decides: Continue pipeline or stop
```

**How it works:**

1. Job 02 runs and prepares the data
2. At the end, Job 02's script calls `client.create_job_run()` to trigger Job 03.1
3. Job 03.1 gets predictions for the period
4. At the end, Job 03.1's script calls `client.create_job_run()` to trigger Job 03.2
5. Job 03.2 loads ground truth labels
6. At the end, Job 03.2's script calls `client.create_job_run()` to trigger Job 03.3
7. Job 03.3 validates model accuracy and decides next steps

This approach allows:
- ✅ Sequential execution (each job waits for the previous to complete)
- ✅ Data passing between jobs (via files or environment variables)
- ✅ Dynamic decision logic (Job 03.3 can decide whether to continue the pipeline)
- ✅ Resilience (if a job fails, the pipeline stops safely)

### Step 3.1: Display Complete Job Summary

In [None]:
print("\n" + "="*80)
print("ALL 2 JOBS CREATED SUCCESSFULLY")
print("="*80)

jobs = [
    ("Job 1", job_1_prepare_data, "Entry point - prepares synthetic data"),
    ("Job 2", job_2_monitor_pipeline, "Integrated monitoring - processes all periods"),
]

for label, job, description in jobs:
    print(f"\n{label}: {job.name}")
    print(f"  ID: {job.id}")
    print(f"  Script: {job.script}")
    print(f"  CPU: {job.cpu} cores")
    print(f"  Memory: {job.memory} GB")
    print(f"  Purpose: {description}")

print("\n" + "="*80)
print("NEXT STEPS")
print("="*80)
print("""
✓ All 2 jobs have been created programmatically
✓ Jobs are now registered in CML but have NOT been run

To run the jobs:

1. Option A - Run via CML UI:
   - Go to your project's Jobs tab
   - Click on "Mod 2 Job 1: Prepare Artificial Data"
   - Select "Run Now"
   - Wait for completion
   
   - Then click on "Mod 2 Job 2: Monitor Pipeline"
   - Select "Run Now"
   - The job will process all periods sequentially
   - Check results in data/monitoring_results.json

2. Option B - Run via API (next notebook):
   - Use client.create_job_run() to trigger Job 1
   - Wait for completion
   - Use client.create_job_run() to trigger Job 2
   - Poll for completion status

Key Advantages of This Design:
   • Simpler: Only 2 jobs instead of 4
   • Reliable: No job chaining failures or state passing issues
   • Faster: No overhead from starting multiple jobs
   • Cleaner logs: All output in one execution
   • Self-contained: No external state file dependencies
   • Flexible: Can run specific period ranges via command-line args

Key Learning Points:
   • Jobs are created once, run many times
   • Each run can have different environment variables or arguments
   • Job names help identify jobs programmatically
   • cmlapi enables GitOps workflows for job management
   • This 2-job pipeline automates the complete monitoring workflow
   • Period state is managed internally, not via external files
""")


---

## Key Concepts Review

### Authentication
```python
client = cmlapi.default_client(
    url=os.getenv("CDSW_API_URL").replace("/api/v1", ""),
    cml_api_key=os.getenv("CDSW_APIV2_KEY")
)
```
CML automatically provides API credentials in environment variables.

### Selecting Runtimes
```python
available_runtimes = client.list_runtimes(
    search_filter=json.dumps({"kernel": "Python 3.10"})
)
```
Query available ML Runtimes with specific configurations (Python version, GPU, editor).

### Creating Jobs
```python
job_body = cmlapi.CreateJobRequest(
    project_id=project.id,
    name="Job Name",
    script="script.py",
    cpu=2,
    memory=4,
    runtime_identifier=runtime_id,
    environment={"VAR": "value"}
)

job = client.create_job(body=job_body, project_id=project.id)
```
Define job configuration and create it in CML (does NOT run automatically).

### Internal State Management (New Design)
Instead of passing state between jobs via environment variables and job chaining:
```python
# OLD (4 jobs): Job1 → triggers Job2 → triggers Job3 → triggers Job4
# NEW (2 jobs): Job1 → Job2 processes all periods internally

# Job 2 (Monitor Pipeline) manages period state:
for period in range(START_PERIOD, END_PERIOD + 1):
    # Phase 1: Get Predictions
    # Phase 2: Load Ground Truth
    # Phase 3: Check Model
    # Decision: Continue or exit
```
Period state is managed in-memory within a single job execution.

---

## Summary: The 2-Job Pipeline

You've just created a simplified, production-grade monitoring pipeline using CML Jobs:

| Job | Purpose | Runs |
|-----|---------|------|
| 1 | Prepare synthetic data | Once (one-time setup) |
| 2 | Monitor all periods | Once (processes all periods sequentially) |

This is simpler and more reliable than the original 4-job design while maintaining all the same functionality.

### Design Improvements

**Original Design (4 Jobs)**:
```
Job 1: Prepare Data
   ↓ triggers
Job 2: Get Predictions (Period 0)
   ↓ triggers
Job 3: Load Ground Truth (Period 0)
   ↓ triggers
Job 4: Check Model (Period 0)
   ↓ triggers
Job 2: Get Predictions (Period 1)
   ↓ ... (repeats for each period)
```
Problems: State passing, job chaining failures, multiple job executions, complexity

**New Design (2 Jobs)**:
```
Job 1: Prepare Data (one-time)
   ↓ manual trigger
Job 2: Monitor Pipeline
   ├─ for period in [0, 1, 2, ...]:
   │   ├─ Get Predictions
   │   ├─ Load Ground Truth
   │   └─ Check Model
   └─ (single execution, all periods handled)
```
Advantages: Self-contained, reliable, simple, maintainable, fast

---

## Next Steps

In the next notebook, you'll:
1. Learn how to **run jobs** using `client.create_job_run()`
2. Monitor job execution and track progress
3. Handle errors and job failures
4. Implement the complete automated monitoring pipeline