# Module 2: Creating CML Jobs Programmatically

## Overview

In Module 1, you built a complete ML pipeline using Python scripts executed manually. In Module 2, we'll automate this pipeline using **CML Jobs** - scheduled or triggered tasks that run your scripts at scale.

This notebook teaches you how to **create jobs programmatically** using the CML API. Instead of clicking through the UI, you'll write code to define, configure, and manage jobs. This approach enables:

- **Automation**: Create multiple jobs with consistent configurations
- **Reproducibility**: Version control your job definitions
- **Integration**: Orchestrate complex workflows programmatically
- **Scalability**: Deploy jobs across projects and environments
- **GitOps**: Store job definitions in version control

---

## Learning Goals

By the end of this notebook, you'll understand:

1. ✅ How to authenticate with CML API
2. ✅ How to query available ML runtimes
3. ✅ How to retrieve project metadata
4. ✅ How to define a job using `cmlapi.CreateJobRequest`
5. ✅ How to create jobs programmatically
6. ✅ How to configure job dependencies (chaining jobs)

**Note**: We will **create** jobs but not **run** them in this notebook. Running jobs will be covered separately.

## Module 2 Job Pipeline Structure

This notebook creates **4 jobs** that work together in a monitoring pipeline:

```
Job 02: Prepare Artificial Data
        ↓
        └─→ Calls cmlapi.create_job_run() for Job 03.1
        
Job 03.1: Get Predictions
        ↓
        └─→ Calls cmlapi.create_job_run() for Job 03.2
        
Job 03.2: Load Ground Truth
        ↓
        └─→ Calls cmlapi.create_job_run() for Job 03.3
        
Job 03.3: Check Model
        ↓
        └─→ Decides if pipeline continues or stops
```

---

## Section 1: Setup and Authentication

### Step 1.1: Import Required Libraries

We need key libraries:
- **`cmlapi`**: Official CML Python API client for programmatic access
- **`os`**: To access environment variables set by CML
- **`json`**: For parsing runtime filters

In [1]:
import os
import cmlapi
import json

print("✓ Libraries imported successfully")

✓ Libraries imported successfully


### Step 1.2: Create CML API Client

The CML API client authenticates your requests using credentials automatically provided by CML:

- **`CDSW_API_URL`**: The CML API endpoint for your workspace
- **`CDSW_APIV2_KEY`**: Your authentication token

We remove `/api/v1` from the URL since the client handles API versioning internally.

In [2]:
# Create the CML API client
# The client uses environment variables automatically set by CML for authentication
client = cmlapi.default_client(
    url=os.getenv("CDSW_API_URL").replace("/api/v1", ""),
    cml_api_key=os.getenv("CDSW_APIV2_KEY")
)

print("✓ CML API client created successfully")
print(f"  API URL: {os.getenv('CDSW_API_URL')}")

✓ CML API client created successfully
  API URL: https://ml-dbfc64d1-783.go01-dem.ylcu-atmi.cloudera.site/api/v1


### Step 1.3: Retrieve Available ML Runtimes

Jobs execute in **ML Runtimes** - pre-configured Docker containers with specific Python versions, libraries, and GPU support. We need to query available runtimes that match our requirements.

In this example, we filter for:
- **kernel**: "Python 3.10" - Python version
- **edition**: "Standard" - Standard edition
- **editor**: "PBJ Workbench" - JupyterLab interface for development

**Important Notes:**
- Runtimes from **2024.05 onwards** include ML packages (pandas, numpy, scikit-learn, etc.)
- We select the **NEWEST runtime** to ensure all dependencies are included
- Older runtimes (2023.x) lack pandas and will cause `ModuleNotFoundError` in jobs

In [3]:
# Query available runtimes matching our criteria
# The search_filter is a JSON string with our runtime requirements
available_runtimes = client.list_runtimes(
    search_filter=json.dumps({
        "kernel": "Python 3.10",
        "edition": "Standard",
        "editor": "PBJ Workbench"
    })
)
print(f"✓ Found {len(available_runtimes.runtimes)} available runtime(s)\n")

# Display all available runtimes
for i, runtime in enumerate(available_runtimes.runtimes, 1):
    print(f"Runtime {i}:")
    print(f"  Version: {runtime.full_version}")
    print(f"  Kernel: {runtime.kernel}")
    print(f"  Edition: {runtime.edition}")
    print(f"  Image: {runtime.image_identifier}")
    print()

✓ Found 10 available runtime(s)

Runtime 1:
  Version: 2023.05.1-b4
  Kernel: Python 3.10
  Edition: Standard
  Image: container.repository.cloudera.com/cloudera/cdsw/ml-runtime-pbj-workbench-python3.10-standard:2023.05.1-b4

Runtime 2:
  Version: 2023.05.1-b4
  Kernel: Python 3.10
  Edition: Standard
  Image: docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-pbj-workbench-python3.10-standard:2023.05.1-b4

Runtime 3:
  Version: 2023.05.2-b7
  Kernel: Python 3.10
  Edition: Standard
  Image: docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-pbj-workbench-python3.10-standard:2023.05.2-b7

Runtime 4:
  Version: 2023.08.1-b6
  Kernel: Python 3.10
  Edition: Standard
  Image: docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-pbj-workbench-python3.10-standard:2023.08.1-b6

Runtime 5:
  Version: 2023.08.2-b8
  Kernel: Python 3.10
  Edition: Standard
  Image: docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-pbj-workbench-python3.10-standard:2023.08.2-b8

Runtime 6:
  V

### Step 1.4: Select and Store the Latest Runtime

We'll use the **first (most recent) runtime** from our filtered list. This runtime identifier will be passed to job creation calls.

In [4]:
# Select the latest runtime (sort by version, newest first)
sorted_runtimes = sorted(available_runtimes.runtimes, key=lambda r: r.full_version, reverse=True)
JOB_IMAGE_ML_RUNTIME = sorted_runtimes[0].image_identifier

print(f"✓ Selected Runtime (NEWEST):")
print(f"  Version: {sorted_runtimes[0].full_version}")
print(f"  Image: {JOB_IMAGE_ML_RUNTIME}")
print(f"  Note: Only 2024+ runtimes include ML packages (pandas, numpy, scikit-learn)")

# Verify this is a recent runtime with pandas
if "2024" not in sorted_runtimes[0].full_version:
    print(f"\n⚠️  WARNING: Selected runtime {sorted_runtimes[0].full_version} may not have pandas!")
    print(f"  Recommended: Use 2024.05+ or 2024.10+")
else:
    print(f"  ✓ Runtime includes pandas and ML libraries")

# Store in environment variable for use in job definitions
os.environ['JOB_IMAGE_ML_RUNTIME'] = JOB_IMAGE_ML_RUNTIME

✓ Selected Runtime (NEWEST):
  Version: 2024.10.1-b12
  Image: docker.repository.cloudera.com/cloudera/cdsw/ml-runtime-pbj-workbench-python3.10-standard:2024.10.1-b12
  Note: Only 2024+ runtimes include ML packages (pandas, numpy, scikit-learn)
  ✓ Runtime includes pandas and ML libraries


### Step 1.5: Retrieve Project Metadata

Jobs are created within a specific CML project. We retrieve the current project's metadata to get its ID and other information needed for job creation.

In [5]:
# Get metadata for the current CML project
# CDSW_PROJECT_ID is automatically set when you run code in a CML project
project = client.get_project(
    project_id=os.getenv("CDSW_PROJECT_ID")
)

print(f"✓ Project Retrieved:")
print(f"  Project ID: {project.id}")
print(f"  Project Name: {project.name}")
print(f"  Description: {project.description}")

✓ Project Retrieved:
  Project ID: crfy-i66p-le3j-cdss
  Project Name: CAI Baseline MLOPS
  Description: 


---

## Section 2: Job Creation - The Monitoring Pipeline

### Understanding Job Definitions

A CML job is defined using `cmlapi.CreateJobRequest` with these key parameters:

| Parameter | Purpose | Example |
|-----------|---------|----------|
| `project_id` | Which project to create the job in | `project.id` |
| `name` | Human-readable job name | `"Get Predictions"` |
| `script` | Path to Python script to run | `"03.1_get_predictions.py"` |
| `cpu` | CPU cores to allocate | `2` |
| `memory` | RAM in GB to allocate | `4` |
| `runtime_identifier` | ML Runtime image to use | `JOB_IMAGE_ML_RUNTIME` |
| `environment` | Environment variables for the job | `{"PERIOD": "0"}` |

### Step 2.1: Create Job 02 - Prepare Artificial Data

This is the entry point to the monitoring pipeline. It prepares synthetic data for the workflow.
The notebook will then trigger Job 03.1 (Get Predictions) after completion.

In [6]:
# Define Job 02: Prepare Artificial Data
# This is the entry point that triggers the entire pipeline

job_body_prepare_data = cmlapi.CreateJobRequest(
    project_id=project.id,
    name="Job 02: Prepare Artificial Data",
    script="module2/02_prepare_artificial_data.py",
    cpu=1,
    memory=2,
    runtime_identifier=os.getenv('JOB_IMAGE_ML_RUNTIME'),

)

print("Job 02 Definition - Prepare Artificial Data:")
print(f"  Script: {job_body_prepare_data.script}")
print(f"  CPU: {job_body_prepare_data.cpu} cores")
print(f"  Memory: {job_body_prepare_data.memory} GB")

Job 02 Definition - Prepare Artificial Data:
  Script: module2/02_prepare_artificial_data.py
  CPU: 1 cores
  Memory: 2 GB


In [7]:
# Create Job 02 in CML
# This registers the job with CML but does NOT run it
job_02_prepare_data = client.create_job(
    body=job_body_prepare_data,
    project_id=str(project.id)
)

print("\n✓ Job 02 Created Successfully!")
print(f"  Job ID: {job_02_prepare_data.id}")
print(f"  Job Name: {job_02_prepare_data.name}")
print(f"  Script: {job_02_prepare_data.script}")


✓ Job 02 Created Successfully!
  Job ID: psgr-5s9o-fwnu-kgpu
  Job Name: Job 02: Prepare Artificial Data
  Script: module2/02_prepare_artificial_data.py


In [8]:
type(job_02_prepare_data)

cmlapi.models.job.Job

### Step 2.2: Create Job 03.1 - Get Predictions

This job processes predictions for the current period in batches.
It is triggered by Job 02 via `cmlapi.create_job_run()` in the job script.

In [None]:
# Define Job 03.1: Get Predictions
# This job loads period data, processes it in batches, and tracks predictions

job_body_get_predictions = cmlapi.CreateJobRequest(
    project_id=project.id,
    name="Get Predictions",
    script="module2/03.1_get_predictions.py",
    cpu=2,
    memory=4,
    runtime_identifier=os.getenv('JOB_IMAGE_ML_RUNTIME'),

)

print("Job 03.1 Definition - Get Predictions:")
print(f"  Script: {job_body_get_predictions.script}")
print(f"  CPU: {job_body_get_predictions.cpu} cores")
print(f"  Memory: {job_body_get_predictions.memory} GB")

In [10]:
# Create Job 03.1 in CML
job_031_get_predictions = client.create_job(
    body=job_body_get_predictions,
    project_id=str(project.id)
)

print("\n✓ Job 03.1 Created Successfully!")
print(f"  Job ID: {job_031_get_predictions.id}")
print(f"  Job Name: {job_031_get_predictions.name}")
print(f"  Script: {job_031_get_predictions.script}")


✓ Job 03.1 Created Successfully!
  Job ID: 3wtt-x1lt-iitx-oori
  Job Name: Job 03.1: Get Predictions
  Script: module2/03.1_get_predictions.py


### Step 2.3: Create Job 03.2 - Load Ground Truth

This job loads ground truth labels for the current period.
It is triggered by Job 03.1 after predictions are made.

In [None]:
# Define Job 03.2: Load Ground Truth
# This job loads labels for the current period and validates predictions

job_body_load_ground_truth = cmlapi.CreateJobRequest(
    project_id=project.id,
    name="Load Ground Truth",
    script="module2/03.2_load_ground_truth.py",
    cpu=1,
    memory=2,
    runtime_identifier=os.getenv('JOB_IMAGE_ML_RUNTIME')
)

print("Job 03.2 Definition - Load Ground Truth:")
print(f"  Script: {job_body_load_ground_truth.script}")
print(f"  CPU: {job_body_load_ground_truth.cpu} cores")
print(f"  Memory: {job_body_load_ground_truth.memory} GB")

In [12]:
# Create Job 03.2 in CML
job_032_load_ground_truth = client.create_job(
    body=job_body_load_ground_truth,
    project_id=str(project.id)
)

print("\n✓ Job 03.2 Created Successfully!")
print(f"  Job ID: {job_032_load_ground_truth.id}")
print(f"  Job Name: {job_032_load_ground_truth.name}")
print(f"  Script: {job_032_load_ground_truth.script}")


✓ Job 03.2 Created Successfully!
  Job ID: a64l-b07m-ouic-e1sh
  Job Name: Job 03.2: Load Ground Truth
  Script: module2/03.2_load_ground_truth.py


### Step 2.4: Create Job 03.3 - Check Model

This job validates model accuracy for the current period and orchestrates the monitoring pipeline.
It is triggered by Job 03.2 after ground truth labels are loaded.

In [None]:
# Define Job 03.3: Check Model Performance
# This job calculates accuracy metrics and detects degradation

job_body_check_model = cmlapi.CreateJobRequest(
    project_id=project.id,
    name="Check Model",
    script="module2/03.3_check_model.py",
    cpu=1,
    memory=2,
    runtime_identifier=os.getenv('JOB_IMAGE_ML_RUNTIME'),

)

print("Job 03.3 Definition - Check Model:")
print(f"  Script: {job_body_check_model.script}")
print(f"  CPU: {job_body_check_model.cpu} cores")
print(f"  Memory: {job_body_check_model.memory} GB")

In [14]:
# Create Job 03.3 in CML
job_033_check_model = client.create_job(
    body=job_body_check_model,
    project_id=str(project.id)
)

print("\n✓ Job 03.3 Created Successfully!")
print(f"  Job ID: {job_033_check_model.id}")
print(f"  Job Name: {job_033_check_model.name}")
print(f"  Script: {job_033_check_model.script}")


✓ Job 03.3 Created Successfully!
  Job ID: 88vy-os3t-njrx-9hw0
  Job Name: Job 03.3: Check Model
  Script: module2/03.3_check_model.py


---

## Section 3: Job Orchestration and Summary

### Understanding the Job Pipeline

The four jobs we created work together in an automated monitoring pipeline:

**Pipeline Flow:**

```
Job 02: Prepare Artificial Data
   (Entry point - prepares synthetic data)
        ↓
   job_02_prepare_data.py calls:
   client.create_job_run(..., job_id=job_031_get_predictions.id)
        ↓
Job 03.1: Get Predictions
   (Processes predictions for current period in batches)
        ↓
   job_031_get_predictions.py calls:
   client.create_job_run(..., job_id=job_032_load_ground_truth.id)
        ↓
Job 03.2: Load Ground Truth
   (Loads and processes ground truth labels)
        ↓
   job_032_load_ground_truth.py calls:
   client.create_job_run(..., job_id=job_033_check_model.id)
        ↓
Job 03.3: Check Model
   (Validates accuracy and detects degradation)
        ↓
   Decides: Continue pipeline or stop
```

**How it works:**

1. Job 02 runs and prepares the data
2. At the end, Job 02's script calls `client.create_job_run()` to trigger Job 03.1
3. Job 03.1 gets predictions for the period
4. At the end, Job 03.1's script calls `client.create_job_run()` to trigger Job 03.2
5. Job 03.2 loads ground truth labels
6. At the end, Job 03.2's script calls `client.create_job_run()` to trigger Job 03.3
7. Job 03.3 validates model accuracy and decides next steps

This approach allows:
- ✅ Sequential execution (each job waits for the previous to complete)
- ✅ Data passing between jobs (via files or environment variables)
- ✅ Dynamic decision logic (Job 03.3 can decide whether to continue the pipeline)
- ✅ Resilience (if a job fails, the pipeline stops safely)

### Step 3.1: Display Complete Job Summary

In [None]:
print("\n" + "="*80)
print("ALL 4 JOBS CREATED SUCCESSFULLY")
print("="*80)

jobs = [
    ("Job 02", job_02_prepare_data, "Entry point - prepares synthetic data"),
    ("Job 03.1", job_031_get_predictions, "Processes predictions in batches"),
    ("Job 03.2", job_032_load_ground_truth, "Loads ground truth labels"),
    ("Job 03.3", job_033_check_model, "Validates model accuracy")
]

for label, job, description in jobs:
    print(f"\n{label}: {job.name}")
    print(f"  ID: {job.id}")
    print(f"  Script: {job.script}")
    print(f"  CPU: {job.cpu} cores")
    print(f"  Memory: {job.memory} GB")
    print(f"  Purpose: {description}")

print("\n" + "="*80)
print("NEXT STEPS")
print("="*80)
print("""
✓ All 4 jobs have been created programmatically
✓ Jobs are now registered in CML but have NOT been run

To run the jobs:

1. Option A - Run via CML UI:
   - Go to your project's Jobs tab
   - Click on "Prepare Artificial Data"
   - Select "Run Now"
   - The job will automatically trigger the downstream jobs (Get Predictions → Load Ground Truth → Check Model)

2. Option B - Run via API (next notebook):
   - Use client.create_job_run() to trigger "Prepare Artificial Data"
   - Downstream jobs will trigger automatically
   - Poll for completion status

Job Orchestration Pattern (Best Practice for Labs):
  • Jobs discover downstream jobs by NAME within the project
  • Each job searches for and triggers the next job by name
  • This is more readable for students and easier to understand
  • Pattern: job_response = client.list_jobs(search_filter={"name": "Job Name"})
  • Extract job_id from result and call client.create_job_run()

Key Learning Points:
  • Jobs are created once, run many times
  • Each run can have different environment variables
  • Job names enable orchestration (search by name, extract ID, trigger)
  • cmlapi enables GitOps workflows for job management
  • This 4-job pipeline automates the complete monitoring workflow
""")

---

## Key Concepts Review

### Authentication
```python
client = cmlapi.default_client(
    url=os.getenv("CDSW_API_URL").replace("/api/v1", ""),
    cml_api_key=os.getenv("CDSW_APIV2_KEY")
)
```
CML automatically provides API credentials in environment variables.

### Selecting Runtimes
```python
available_runtimes = client.list_runtimes(
    search_filter=json.dumps({"kernel": "Python 3.10"})
)
```
Query available ML Runtimes with specific configurations (Python version, GPU, editor).

### Creating Jobs
```python
job_body = cmlapi.CreateJobRequest(
    project_id=project.id,
    name="Job Name",
    script="script.py",
    cpu=2,
    memory=4,
    runtime_identifier=runtime_id,
    environment={"VAR": "value"}
)

job = client.create_job(body=job_body, project_id=project.id)
```
Define job configuration and create it in CML (does NOT run automatically).

### Job Orchestration - Triggering Downstream Jobs
```python
# Inside a job script (e.g., 02_prepare_artificial_data.py):
client = cmlapi.default_client(...)
project = client.get_project(...)

# Trigger the next job in the pipeline
client.create_job_run(
    cmlapi.CreateJobRunRequest(),
    project_id=project.id,
    job_id=next_job_id  # This triggers Job 03.1
)
```
Jobs can trigger dependent jobs programmatically, enabling workflow automation without manual intervention.

---

## Summary: The 4-Job Pipeline

You've just created a production-grade monitoring pipeline using CML Jobs:

| Job | Purpose | Triggered By |
|-----|---------|---------------|
| 02 | Prepare synthetic data | Manual (UI or API) |
| 03.1 | Get predictions from model | Job 02 |
| 03.2 | Load ground truth labels | Job 03.1 |
| 03.3 | Check model accuracy & degradation | Job 03.2 |

This is how real-world ML monitoring systems work - **automated, reproducible, and scalable**.

---

## Next Steps

In the next notebook, you'll:
1. Learn how to **run jobs** using `client.create_job_run()`
2. Monitor job execution and track progress
3. Handle errors and job failures
4. Implement the complete automated monitoring pipeline