# Data Flywheel Blueprint Dev notebook

This notebook uploads some sample data, runs a flywheel job and monitors its progress, displaying results in a table that updates as evaluations complete.

## Rough architecture goal

Below is our current rough goal for the architecture

![Arch](./arch.png)

## Current deployment for notebook

Below is how this notebook is currently deployed

![Deployment](./deployment.png)

## Prerequisites

Pull down this repo and do the following:

Install deps:

```sh
uv sync --dev
```

Run docker compose:

```sh
docker compose up --build
```

Once the services are all up, continue with the notebook.

### Edit this next cell!

Put the full absolute path of where you downloaded this git repo into

In [1]:
PROJECT_ABSOLUTE_PATH = "/home/rangilly/code/data-flywheel-blueprint"

## Define code snippets

This section defines the methods we need in the notebook.

In [2]:
import requests
import time
from datetime import datetime
import pandas as pd
from IPython.display import display, clear_output
import json

import sys
import os

pd.set_option('display.max_columns', None)  # Show all columns
pd.set_option('display.width', None)        # Width of the display in characters
pd.set_option('display.max_colwidth', None)  # Show full content of each cell

# Add the parent directory of the current notebook to the Python path
notebook_dir = os.path.join(PROJECT_ABSOLUTE_PATH, "notebooks")
parent_dir = os.path.abspath(os.path.join(notebook_dir, os.pardir))
sys.path.insert(0, parent_dir)

from src.scripts.load_test_data import load_data_to_elasticsearch

# Configuration
API_BASE_URL = "http://0.0.0.0:8000"
WORKLOAD_ID = "bug-triager"
CLIENT_ID = "dev-notebook"
POLL_INTERVAL = 5  # seconds

def get_job_status(job_id):
    """Get the current status of a job."""
    response = requests.get(f"{API_BASE_URL}/api/jobs/{job_id}")
    response.raise_for_status()
    return response.json()

def format_runtime(seconds):
    """Format runtime in seconds to a human-readable string."""
    if seconds is None:
        return "-"
    minutes, seconds = divmod(seconds, 60)
    if minutes > 0:
        return f"{int(minutes)}m {int(seconds)}s"
    return f"{int(seconds)}s"

def create_results_table(job_data):
    """Create a pandas DataFrame from job data."""
    rows = []
    for nim in job_data["nims"]:
        model_name = nim["model_name"]
        for eval in nim["evaluations"]:
            rows.append({
                "Model": model_name,
                "Eval Type": eval["eval_type"].upper(),
                "Score": "; ".join(f"{k}: {v}" for k, v in eval["scores"].items()),
                "Percent Done": eval["progress"],
                "Runtime": format_runtime(eval["runtime_seconds"]),
                "Status": "Error" if eval.get("error", None) else "Completed" if eval["finished_at"] else "Running",
                "Started": datetime.fromisoformat(eval["started_at"]).strftime("%H:%M:%S"),
                "Finished": datetime.fromisoformat(eval["finished_at"]).strftime("%H:%M:%S") if eval["finished_at"] else "-"
            })
    if not rows:
        return pd.DataFrame(columns=["Model", "Eval Type", "Scores", "Percent Done", "Runtime", "Status", "Started", "Finished"])
    
    df = pd.DataFrame(rows)

    return df.sort_values(["Model", "Eval Type"])

def create_customization_table(job_data):
    """Create a pandas DataFrame from customization data."""
    customizations = []
    for nim in job_data["nims"]:
        model_name = nim["model_name"]
        for custom in nim["customizations"]:
            customizations.append({
                "Model": model_name,
                "Started": datetime.fromisoformat(custom["started_at"]).strftime("%H:%M:%S"),
                "Epochs Completed": custom["epochs_completed"],
                "Steps Completed": custom["steps_completed"],
                "Finished": datetime.fromisoformat(custom["finished_at"]).strftime("%H:%M:%S") if custom["finished_at"] else "-",
                "Status": "Error" if custom.get("error", None) else "Completed" if custom["finished_at"] else "Running",
                "Runtime": format_runtime(custom["runtime_seconds"]),
                "Percent Done": custom["progress"],
            })
   
    if not customizations:
        customizations = pd.DataFrame(columns=["Model", "Started", "Epochs Completed", "Steps Completed", "Finished", "Runtime", "Percent Done"])
    customizations = pd.DataFrame(customizations)
    return customizations.sort_values(["Model"])

def monitor_job(job_id):
    """Monitor a job and display its progress in a table."""
    print(f"Monitoring job {job_id}...")
    print("Press Ctrl+C to stop monitoring")
    
    while True:
        try:
            job_data = get_job_status(job_id)
            evals_df = create_results_table(job_data)
            customizations_df = create_customization_table(job_data)
            clear_output(wait=True)
            print(f"Job Status: {job_data['status']}")
            # print(f"Percent done: {job_data['progress']}")
            print(f"Total Records: {job_data['num_records']}")
            print(f"Last Updated: {datetime.now().strftime('%H:%M:%S')}")
            print("\nResults:")
            display(evals_df)
            print("\nCustomizations:")
            display(customizations_df)
            
            display(job_data)
            
            # if job_data["status"] in ["SUCCESS", "FAILURE"]:
            #     print(f"\nJob completed with status: {job_data['status']}")
            #     break
                
            time.sleep(POLL_INTERVAL)
            
        except KeyboardInterrupt:
            print("\nMonitoring stopped by user")
            break
        except Exception as e:
            print(f"\nError: {str(e)}")
            break

Elasticsearch is ready! Status: yellow
Elasticsearch is ready! Status: yellow
Index already exists
Index already exists


## Load test data

For the purposes of this notebook, we have created a `bug_triage` dataset. This is a very simple dataset. It was created using llama-3.1-70b-instruct. It is not "real" nor is it "good." You should not expect this dataset to yield any good results. It is purely illustrative to allow the flywheel to execute.

In [3]:
# workload_id = "test"
# load_data_to_elasticsearch(workload_id, CLIENT_ID, "test_test_data.jsonl")

# load_data_to_elasticsearch("", CLIENT_ID, "aiva_dataset.jsonl")

#load_data_to_elasticsearch("aiva-final.jsonl")

## Run the flywheel and monitor

Kick off the flywheel job. The service currently requires that you manually tell it to run a flywheel for a given `workload_id`. Automating this is trivial,
but first principles dictate we first give customers the ability to run these when they want.

In [3]:
# Create a new job

response = requests.post(
    f"{API_BASE_URL}/api/jobs",
    json={"workload_id": "aiva_1", "client_id": "3434"}
)

response.raise_for_status()
job_id = response.json()["id"]

print(f"Created job with ID: {job_id}")

Created job with ID: 6826d81c3c0784ef6b7d3bd5


In [5]:
# Start monitoring the job
monitor_job(job_id)

Job Status: running
Total Records: 999
Last Updated: 12:51:16

Results:


Unnamed: 0,Model,Eval Type,Score,Percent Done,Runtime,Status,Started,Finished
0,meta/llama-3.2-1b-instruct,BASE-EVAL,,5.0,0s,Running,18:47:24,-
1,meta/llama-3.2-1b-instruct,ICL-EVAL,,15.0,0s,Running,18:47:24,-



Customizations:


Unnamed: 0,Model,Started,Epochs Completed,Steps Completed,Finished,Status,Runtime,Percent Done
0,meta/llama-3.2-1b-instruct,18:47:24,1,56,-,Running,0s,50.0


{'id': '681a59381933c7360120239d',
 'workload_id': 'aiva_1',
 'client_id': '3434',
 'status': 'running',
 'started_at': '2025-05-06T18:47:20.418000',
 'finished_at': None,
 'num_records': 999,
 'llm_judge': None,
 'nims': [{'model_name': 'meta/llama-3.2-1b-instruct',
   'deployment_status': 'ready',
   'evaluations': [{'eval_type': 'base-eval',
     'scores': {},
     'started_at': '2025-05-06T18:47:24.462000',
     'finished_at': None,
     'runtime_seconds': 0.0,
     'progress': 5.0,
     'nmp_uri': 'http://nemo.test/v1/evaluation/jobs/eval-2KphN3hQvTkwq6t7z8E98C'},
    {'eval_type': 'icl-eval',
     'scores': {},
     'started_at': '2025-05-06T18:47:24.465000',
     'finished_at': None,
     'runtime_seconds': 0.0,
     'progress': 15.0,
     'nmp_uri': 'http://nemo.test/v1/evaluation/jobs/eval-Un9jfNABbTApjZzePCHzVy'}],
   'customizations': [{'started_at': '2025-05-06T18:47:24.471000',
     'finished_at': None,
     'runtime_seconds': 0.0,
     'progress': 50.0,
     'epochs_compl


Monitoring stopped by user


In [None]:
response = requests.post(
    f"{API_BASE_URL}/api/jobs",
    json={"workload_id": "aiva_3", "client_id": "3434"}
)

response.raise_for_status()
job3_id = response.json()["id"]

print(f"Created job with ID: {job3_id}")

In [None]:
monitor_job(job3_id)