# LLM Classification Pipeline

## Imports and Config

In [1]:
import pandas as pd
import numpy as np
import re
import os
import time
from pathlib import Path
from datetime import datetime
from dotenv import load_dotenv
from openai import OpenAI
from sklearn.metrics.pairwise import cosine_similarity

load_dotenv()
client = OpenAI()  # reads OPENAI_API_KEY from env

# ============================================================
# Config — set these before running
# ============================================================
# These filenames come from the output of data_prep.ipynb.
MCP_DATA_FILE = "mcp_data_2026-02-17.csv"       # cleaned MCPs to classify (in data/mcp/raw/)
MCP_EMB_FILE  = "voyage_mcp_emb_2026-02-17.npy" # Voyage embeddings for those MCPs (in data/embeddings/)

# Existing results to merge new classifications into.
# Set to None to auto-detect the most recent mcp_results_*.csv in data/mcp/results/, or specify explicitly.
# Set to "none" (string) for a completely fresh run with no merging.
EXISTING_RESULTS_FILE = None

# LLM / retrieval settings
MODEL          = "gpt-4.1"
TOP_K_DWAS     = 80    # retrieve top 80 DWAs by cosine similarity
MAX_DWA_SELECT = 15    # LLM selects up to 15
MAX_TASKS      = 125   # max tasks to send to rating prompt
TEMPERATURE    = 0.0   # low temp for consistency

# ---------- Paths ----------
DATA_DIR     = Path("../data")
RESULTS_DIR  = DATA_DIR / "mcp" / "results"
RAW_DIR      = DATA_DIR / "mcp" / "raw"
RESULTS_DIR.mkdir(exist_ok=True)

# ---------- Resolve existing results file ----------
if EXISTING_RESULTS_FILE is None:
    existing_files = sorted(RESULTS_DIR.glob("mcp_results_*.csv"))
    if existing_files:
        EXISTING_RESULTS_FILE = existing_files[-1].name
        print(f"Auto-detected existing results: {EXISTING_RESULTS_FILE}")
    else:
        EXISTING_RESULTS_FILE = "none"
        print("No existing results found — fresh run.")
elif EXISTING_RESULTS_FILE == "none":
    print("Fresh run — no existing results will be merged.")
else:
    print(f"Using existing results: {EXISTING_RESULTS_FILE}")

# ---------- Derived paths ----------
MCP_DATA_PATH  = RAW_DIR / MCP_DATA_FILE
MCP_EMB_PATH   = DATA_DIR / "embeddings" / MCP_EMB_FILE
DWA_EMB_PATH   = DATA_DIR / "embeddings" / "voyage_dwa_emb.npy"
ONET_DATA_PATH = DATA_DIR / "onet" / "onet_data.csv"

print(f"\nModel: {MODEL}")
print(f"Config: top-{TOP_K_DWAS} DWAs -> select up to {MAX_DWA_SELECT} -> rate up to {MAX_TASKS} tasks")
print(f"MCP data:   {MCP_DATA_FILE}")
print(f"MCP emb:    {MCP_EMB_FILE}")


Auto-detected existing results: mcp_results_2026-02-14.csv

Model: gpt-4.1
Config: top-80 DWAs -> select up to 15 -> rate up to 125 tasks
MCP data:   mcp_data_2026-02-17.csv
MCP emb:    voyage_mcp_emb_2026-02-17.npy


# Load Data

In [2]:
# ---------- Load MCP data ----------
mcp_df = pd.read_csv(MCP_DATA_PATH)
print(f"MCPs loaded: {len(mcp_df):,}")

# ---------- Load O*NET data ----------
onet_df = pd.read_csv(ONET_DATA_PATH)
print(f"O*NET rows loaded: {len(onet_df):,}")

dwa_titles_unique = onet_df["dwa_title"].dropna().drop_duplicates().reset_index(drop=True).tolist()
print(f"Unique DWAs: {len(dwa_titles_unique):,}")

# ---------- Load embeddings ----------
mcp_emb = np.load(MCP_EMB_PATH)
dwa_emb = np.load(DWA_EMB_PATH)
print(f"MCP embeddings: {mcp_emb.shape}")
print(f"DWA embeddings: {dwa_emb.shape}")

assert mcp_emb.shape[0] == len(mcp_df), "MCP embedding count mismatch"
assert dwa_emb.shape[0] == len(dwa_titles_unique), "DWA embedding count mismatch"

# ---------- L2 normalize for cosine similarity ----------
def l2_normalize(X):
    norms = np.linalg.norm(X, axis=1, keepdims=True)
    norms[norms == 0] = 1
    return X / norms

mcp_emb_norm = l2_normalize(mcp_emb)
dwa_emb_norm = l2_normalize(dwa_emb)

# ---------- Build DWA -> tasks lookup ----------
dwa_to_tasks = {}
for _, row in onet_df.iterrows():
    dwa = row["dwa_title"]
    if pd.isna(dwa):
        continue
    if dwa not in dwa_to_tasks:
        dwa_to_tasks[dwa] = []
    dwa_to_tasks[dwa].append((row["task"], row["title"]))

print(f"\nDWA->tasks lookup built: {len(dwa_to_tasks):,} DWAs with tasks")
print(f"Avg tasks per DWA: {np.mean([len(v) for v in dwa_to_tasks.values()]):.1f}")


MCPs loaded: 1,183
O*NET rows loaded: 23,850
Unique DWAs: 2,083
MCP embeddings: (1183, 1024)
DWA embeddings: (2083, 1024)

DWA->tasks lookup built: 2,083 DWAs with tasks
Avg tasks per DWA: 11.4


# Core Functions

In [3]:
def get_top_dwas(mcp_idx, top_k=TOP_K_DWAS):
    """Get top-k DWAs by cosine similarity for a given MCP index.
    Returns list of (dwa_title, similarity_score) sorted by similarity desc."""
    mcp_vec = mcp_emb_norm[mcp_idx:mcp_idx+1]  # (1, dim)
    sims = (mcp_vec @ dwa_emb_norm.T).flatten()  # (n_dwa,)
    top_indices = np.argsort(-sims)[:top_k]
    return [(dwa_titles_unique[i], float(sims[i])) for i in top_indices]


def get_tasks_for_dwas(selected_dwas_with_sims, max_tasks=MAX_TASKS):
    """Given selected DWAs (with similarity scores), get tasks underneath them.
    Iterates from highest to lowest similarity. If a DWA's new tasks would
    push the total over max_tasks, skips that DWA and keeps trying the rest.
    This maximizes task coverage while prioritizing the most similar DWAs.
    Returns: (tasks_list, used_dwas) where tasks_list is [(task_text, occupation, dwa_title), ...]
    and used_dwas is the final list of DWA titles kept."""
    # Sort by similarity descending
    sorted_dwas = sorted(selected_dwas_with_sims, key=lambda x: x[1], reverse=True)
    
    tasks_list = []
    seen = set()
    used_dwas = []
    
    for dwa_title, sim in sorted_dwas:
        # Collect only unseen tasks for this DWA
        candidate_tasks = []
        for task_text, occ_title in dwa_to_tasks.get(dwa_title, []):
            key = (task_text, occ_title)
            if key not in seen:
                candidate_tasks.append((task_text, occ_title, dwa_title, key))
        
        # Skip this DWA if its new tasks would exceed the limit
        if len(tasks_list) + len(candidate_tasks) > max_tasks:
            continue
        
        # Add all candidate tasks
        for task_text, occ_title, dwa_title_inner, key in candidate_tasks:
            seen.add(key)
            tasks_list.append((task_text, occ_title, dwa_title_inner))
        used_dwas.append(dwa_title)
    
    return tasks_list, used_dwas


# ---------- Prompt templates ----------

DWA_SELECTION_PROMPT = """You are classifying AI agent automation potential for occupational tasks.

Below is a description and list of key features and use cases of an AI Model Context Protocol (MCP) server — a plugin-like system that allows AI assistants to access external tools, APIs, or data sources to perform real-world tasks.

<mcp_desc>
{mcp_desc}
</mcp_desc>

Below is a list of Detailed Work Activities (DWAs) from the O*NET occupational database. These were retrieved via semantic cosine similarity and may or may not be relevant to this MCP.

<dwas>
{dwas}
</dwas>

<question>
Which of these DWAs are most likely to contain specific occupational tasks underneath them in the O*NET hierarchy that this MCP could automate or substantially support in real-world deployments?
</question>

Follow these guidelines when answering:
- This is a funnel. Select DWAs whose underlying occupational tasks are most likely automatable or substantially supported by this MCP. Err slightly toward inclusion, but only where the connection is direct and concrete.
- Evaluate MCP capabilities strictly based on what is explicitly described or directly implied. Treat connected tools as part of the MCP only if their functionality is directly accessible through the MCP's exposed app or API. You may use general background knowledge to understand technologies and terminology, but do not infer undocumented capabilities, hypothetical future integrations, or functionality beyond what is described.
- Interpret DWAs based on their action verbs and the real-world human work they represent in formal occupational contexts, and consider who would own, operate, and be responsible for the workflows of the DWAs. Use background knowledge to understand DWA meaning, but classify only where there is a concrete, direct connection between the MCP's described functionality and the actual workplace tasks implied by the DWA.
- Select up to 15 DWAs. If none are relevant, answer "None."
- If the MCP description is too vague to determine real capabilities, answer "Not enough information."
- If the MCP does not enable a concrete paid occupational activity represented in O*NET, answer "Not occupationally relevant."
- Do not treat the fact that something is an MCP server or tool as evidence of relevance — evaluate only the described functionality.

Answer with only ONE of the following formats and nothing else:

<answer>
The numbers of up to 15 selected DWAs from the list above, separated by semicolons (e.g., 1; 5; 12).
</answer>

Or:

<answer>
None
</answer>

Or:

<answer>
Not enough information
</answer>

Or:

<answer>
Not occupationally relevant
</answer>

Follow these guidelines for your output:
- Wrap your final response in <answer> tags.
- Do not include DWA text, or any other words/explanation for any of your answers."""


TASK_RATING_PROMPT = """You are classifying AI agent automation potential for occupational tasks.

Below is a description and list of key features and use cases of an AI Model Context Protocol (MCP) server — a plugin-like system that allows AI assistants to access external tools, APIs, or data sources to perform real-world tasks.

<mcp_desc>
{mcp_desc}
</mcp_desc>

Below is a list of occupational tasks from the O*NET database. Each task is performed by workers in the occupation noted in parentheses. These tasks fall under work activities that were previously identified as potentially automatable by this MCP.

<tasks>
{tasks_with_occupations}
</tasks>

<question>
For each task, rate on a scale of 1-5 how much this MCP could automate the task in a real-world deployment.
</question>

Follow these scale description when answering:
1 — No meaningful automation (0–10%).
The MCP does not meaningfully address this task. Any connection is superficial or incidental.
2 — Minimal support (10–30%).
The MCP may assist with a small component of the task, but humans still perform nearly all core work.
3 — Partial automation (30–60%).
The MCP can automate or significantly accelerate a meaningful portion of the core workflow. Human judgment and intervention remain essential.
4 — Substantial automation (60–90%).
The MCP can perform most of the core task workflow with limited human oversight. A human may initiate, monitor, or handle exceptions.
5 — Near-full automation (90–100%).
The MCP can execute the task end-to-end in typical scenarios, with little to no human involvement beyond initial setup or oversight.

Follow these guidelines:
- Rate each task independently based on the MCP's described capabilities.
- Each rating should only be a 1, 2, 3, 4, or 5
- Provide exactly one rating per task, in the same order as listed.
- Evaluate MCP capabilities strictly based on what is explicitly described or directly implied. Treat connected tools as part of the MCP only if their functionality is directly accessible through the MCP's exposed app or API. You may use general background knowledge to understand technologies and terminology, but do not infer undocumented capabilities, hypothetical future integrations, or functionality beyond what is described.
- Consider the task as it would be performed in the occupation noted — context matters. The same task description may be more or less automatable depending on the occupational context.
- Interpret tasks based on their action verbs and the real-world human work they represent in formal occupational contexts, and consider who would own, operate, and be responsible for the workflows of the task. Use background knowledge to understand task meaning, but classify based on the concrete, direct connection between the MCP's described functionality and the actual workplace tasks.
- A rating of 1 is expected and appropriate for tasks that passed the DWA filter but are not actually automatable by this specific MCP. Do not hesitate to use it.


Answer ONLY in the following format and nothing else:

<ratings>
1:[rating]; 2:[rating]; 3:[rating]; ...; N:[rating]
</ratings>

Follow these guidelines for your output:
- Wrap your final response in <ratings> tags.
- Provide a semicolon-separated list for ALL tasks provided above.
- Use the format: TaskNumber:Rating, where TaskNumber and Rating are each a single integer corresponding to (respectively) the task number of the task provided above list and the automation rating selected for it.
- Do not include the task text, or any other words/explanation."""


def format_dwa_list(dwas_with_sims):
    """Format numbered DWA list for the prompt."""
    lines = []
    for i, (dwa_title, sim) in enumerate(dwas_with_sims, 1):
        lines.append(f"{i}. {dwa_title}")
    return "\n".join(lines)


def format_task_list(tasks):
    """Format numbered task list for the prompt. tasks: [(task_text, occupation, dwa_title), ...]"""
    lines = []
    for i, (task_text, occ, dwa) in enumerate(tasks, 1):
        lines.append(f"{i}. {task_text} ({occ})")
    return "\n".join(lines)


def call_llm(prompt, max_retries=3):
    """Call GPT-4.1 with retry logic. Returns raw response text."""
    for attempt in range(max_retries):
        try:
            response = client.chat.completions.create(
                model=MODEL,
                messages=[{"role": "user", "content": prompt}],
                temperature=TEMPERATURE,
                max_tokens=4096,
            )
            return response.choices[0].message.content
        except Exception as e:
            print(f"  API error (attempt {attempt+1}/{max_retries}): {e}")
            if attempt < max_retries - 1:
                time.sleep(5 * (attempt + 1))
            else:
                raise


def parse_dwa_response(response_text, dwas_with_sims):
    """Parse DWA selection response. Returns (selected_dwa_titles, status) where
    status is 'selected', 'none', 'not_enough_info', or 'not_occ_relevant'."""
    # Extract content between <answer> tags
    match = re.search(r"<answer>\s*(.*?)\s*</answer>", response_text, re.DOTALL)
    if not match:
        return [], "parse_error"
    
    content = match.group(1).strip()
    
    if content.lower() == "none":
        return [], "none"
    if "not enough information" in content.lower():
        return [], "not_enough_info"
    if "not occupationally relevant" in content.lower():
        return [], "not_occ_relevant"
    
    # Parse semicolon-separated numbers
    try:
        numbers = [int(n.strip()) for n in content.split(";") if n.strip()]
    except ValueError:
        return [], "parse_error"
    
    # Map numbers to DWA titles (1-indexed)
    selected = []
    for num in numbers:
        if 1 <= num <= len(dwas_with_sims):
            selected.append(dwas_with_sims[num - 1])  # (title, sim)
    
    return selected, "selected"


def parse_task_ratings(response_text, tasks):
    """Parse task rating response. Returns list of (task_text, occupation, dwa_title, rating)."""
    match = re.search(r"<ratings>\s*(.*?)\s*</ratings>", response_text, re.DOTALL)
    if not match:
        return []
    
    content = match.group(1).strip()
    ratings = []
    
    for pair in content.split(";"):
        pair = pair.strip()
        if not pair:
            continue
        try:
            num_str, rating_str = pair.split(":")
            num = int(num_str.strip())
            rating = int(rating_str.strip())
            if 1 <= num <= len(tasks) and 1 <= rating <= 5:
                task_text, occ, dwa = tasks[num - 1]
                ratings.append((task_text, occ, dwa, rating))
        except (ValueError, IndexError):
            continue
    
    return ratings


print("Core functions defined.")

Core functions defined.


# Select MCPs to Classify

In [4]:
# ============================================================
# Select MCPs to classify
# By default, all MCPs in mcp_df are classified (they are the
# new MCPs not yet in the existing results).
# ============================================================
run_indices = list(range(len(mcp_df)))
print(f"MCPs to classify: {len(run_indices)}")

# ---------- Output path for this run's results ----------
date_str = datetime.now().strftime("%Y-%m-%d")
OUTPUT_PATH = RESULTS_DIR / f"mcp_results_{date_str}.csv"

# Resume logic: if a partial output already exists for today's run,
# skip the MCPs that were already processed.
if OUTPUT_PATH.exists():
    partial_df = pd.read_csv(OUTPUT_PATH)
    already_done_urls = set(partial_df["url"].dropna().tolist())
    before = len(run_indices)
    run_indices = [i for i in run_indices if mcp_df.loc[i, "url"] not in already_done_urls]
    print(f"Resuming: skipping {before - len(run_indices)} already-processed MCPs")
else:
    partial_df = pd.DataFrame()

print(f"MCPs to process this session: {len(run_indices)}")
print(f"Output path: {OUTPUT_PATH}")


MCPs to classify: 1183
MCPs to process this session: 1183
Output path: ..\data\mcp\results\mcp_results_2026-02-18.csv


# Run Classification Pipeline

In [None]:
results = []

for progress_i, mcp_idx in enumerate(run_indices):
    row = mcp_df.iloc[mcp_idx]
    title = row["title"]
    url = row["url"]
    mcp_desc = row["text_for_llm"]
    
    print(f"\n[{progress_i+1}/{len(run_indices)}] {title}")
    
    # --- Step 1: Get top DWAs by cosine similarity ---
    top_dwas = get_top_dwas(mcp_idx)
    print(f"  Top {len(top_dwas)} DWAs retrieved (sim range: {top_dwas[0][1]:.4f} - {top_dwas[-1][1]:.4f})")
    
    # --- Step 2: DWA selection prompt ---
    dwa_prompt = DWA_SELECTION_PROMPT.format(
        mcp_desc=mcp_desc,
        dwas=format_dwa_list(top_dwas)
    )
    
    dwa_response_raw = call_llm(dwa_prompt)
    selected_dwas, dwa_status = parse_dwa_response(dwa_response_raw, top_dwas)
    
    print(f"  DWA selection: {dwa_status} ({len(selected_dwas)} DWAs)")
    
    # Build result dict with all MCP data
    result = {
        "title": title,
        "url": url,
        "text_for_llm": mcp_desc,
        "uploaded_clean": row.get("uploaded_clean", ""),
        "dwa_status": dwa_status,
        "dwas_selected": "; ".join([d[0] for d in selected_dwas]) if selected_dwas else "",
        "n_dwas_selected": len(selected_dwas),
        "dwa_response_raw": dwa_response_raw,
        "task_ratings": "",
        "task_rating_response_raw": "",
        "n_tasks_rated": 0,
    }
    
    # --- Step 3: If DWAs were selected, get tasks and rate them ---
    if dwa_status == "selected" and len(selected_dwas) > 0:
        tasks, used_dwas = get_tasks_for_dwas(selected_dwas, max_tasks=MAX_TASKS)
        print(f"  Tasks retrieved: {len(tasks)} (from {len(used_dwas)}/{len(selected_dwas)} DWAs)")
        
        if len(tasks) > 0:
            task_prompt = TASK_RATING_PROMPT.format(
                mcp_desc=mcp_desc,
                tasks_with_occupations=format_task_list(tasks)
            )
            
            task_response_raw = call_llm(task_prompt)
            rated_tasks = parse_task_ratings(task_response_raw, tasks)
            
            print(f"  Tasks rated: {len(rated_tasks)}/{len(tasks)}")
            
            # Format task ratings as "task_text (occupation): rating" separated by semicolons
            task_ratings_str = "; ".join(
                [f"{t} ({o}): {r}" for t, o, d, r in rated_tasks]
            )
            
            result["task_ratings"] = task_ratings_str
            result["task_rating_response_raw"] = task_response_raw
            result["n_tasks_rated"] = len(rated_tasks)
            result["n_tasks_sent"] = len(tasks)
            result["dwas_used_for_tasks"] = "; ".join(used_dwas)
    
    results.append(result)
    
    # --- Incremental save ---
    new_results_df = pd.DataFrame(results)
    if not existing_df.empty:
        save_df = pd.concat([existing_df, new_results_df], ignore_index=True)
    else:
        save_df = new_results_df
    save_df.to_csv(OUTPUT_PATH, index=False)

print(f"\n{'='*60}")
print(f"Classification complete. {len(results)} MCPs processed.")
print(f"Results saved to: {OUTPUT_PATH}")


[1/5] Amadeus MCP Server
  Top 80 DWAs retrieved (sim range: 0.4317 - 0.3385)
  DWA selection: selected (7 DWAs)
  Tasks retrieved: 80 (from 7/7 DWAs)
  Tasks rated: 80/80

[2/5] SumoLogic MCP Server
  Top 80 DWAs retrieved (sim range: 0.5076 - 0.3518)
  DWA selection: selected (15 DWAs)
  Tasks retrieved: 125 (from 12/15 DWAs)
  Tasks rated: 125/125

[3/5] iRacing
  Top 80 DWAs retrieved (sim range: 0.4526 - 0.3386)
  DWA selection: selected (15 DWAs)
  Tasks retrieved: 122 (from 11/15 DWAs)
  Tasks rated: 122/122

[4/5] DiceDB MCP
  Top 80 DWAs retrieved (sim range: 0.4192 - 0.3119)
  DWA selection: selected (15 DWAs)
  Tasks retrieved: 123 (from 10/15 DWAs)
  Tasks rated: 123/123

[5/5] Amadeus MCP Server
  Top 80 DWAs retrieved (sim range: 0.4058 - 0.3208)
  DWA selection: selected (15 DWAs)
  Tasks retrieved: 124 (from 11/15 DWAs)
  Tasks rated: 124/124

Classification complete. 5 MCPs processed.
Results saved to: ..\data\mcp\gpt-4.1_dwa_task_classification_2026-02-13_3.csv


# Task-Level Aggregation

In [None]:
# ============================================================
# Merge new results with historical results + run task aggregation
# ============================================================

# Load this run's results
new_results_df = pd.read_csv(OUTPUT_PATH)
print(f"New MCPs classified this run: {len(new_results_df)}")

# Merge with historical results from previous runs
if EXISTING_RESULTS_FILE != "none":
    historical_df = pd.read_csv(RESULTS_DIR / EXISTING_RESULTS_FILE)
    print(f"Historical results: {len(historical_df)} MCPs")
    results_df = pd.concat([historical_df, new_results_df], ignore_index=True)
else:
    results_df = new_results_df

# Save combined results
results_df.to_csv(OUTPUT_PATH, index=False)
print(f"Combined results saved: {len(results_df)} MCPs -> {OUTPUT_PATH.name}")

# ============================================================
# Build task-level aggregation (over all MCPs in combined results)
# ============================================================
all_task_ratings = []

for _, row in results_df.iterrows():
    ratings_str = row.get("task_ratings", "")
    if not isinstance(ratings_str, str) or not ratings_str.strip():
        continue

    mcp_title = row["title"]
    mcp_url = row["url"]

    for entry in ratings_str.split("; "):
        entry = entry.strip()
        if not entry:
            continue
        match = re.match(r"^(.+):\s*(\d)$", entry)
        if not match:
            continue
        task_occ_str = match.group(1).strip()
        rating = int(match.group(2))

        occ_match = re.match(r"^(.+)\s*\(([^)]+)\)$", task_occ_str)
        if occ_match:
            task_text = occ_match.group(1).strip()
            occupation = occ_match.group(2).strip()
        else:
            task_text = task_occ_str
            occupation = ""

        all_task_ratings.append({
            "task": task_text,
            "occupation": occupation,
            "rating": rating,
            "mcp_title": mcp_title,
            "mcp_url": mcp_url,
        })

task_ratings_flat = pd.DataFrame(all_task_ratings)
print(f"\nTotal task-rating pairs: {len(task_ratings_flat):,}")
print(f"Unique (task, occupation) pairs: {task_ratings_flat.groupby(['task', 'occupation']).ngroups:,}")

task_agg = task_ratings_flat.groupby(["task", "occupation"]).agg(
    n_ratings=("rating", "count"),
    mean_rating=("rating", "mean"),
    median_rating=("rating", "median"),
    max_rating=("rating", "max"),
    min_rating=("rating", "min"),
    p25_rating=("rating", lambda x: np.percentile(x, 25)),
    p75_rating=("rating", lambda x: np.percentile(x, 75)),
).reset_index()

for col in ["mean_rating", "median_rating", "p25_rating", "p75_rating"]:
    task_agg[col] = task_agg[col].round(2)

task_agg = task_agg.sort_values(["n_ratings", "mean_rating"], ascending=[False, False]).reset_index(drop=True)

TASK_AGG_PATH = RESULTS_DIR / f"task_results_{date_str}.csv"
task_agg.to_csv(TASK_AGG_PATH, index=False)

print(f"\nTask aggregation saved: {TASK_AGG_PATH.name}  ({len(task_agg):,} rows)")
task_agg.head(10)


# Batch API Classification Pipeline

Uses OpenAI's Batch API (50% cheaper, higher rate limits, up to 24h turnaround). Requires **two sequential batches** because task rating prompts depend on DWA selection results.

**How to use — run cells in this order:**

1. Run the prerequisite cells above: "Imports and Config", "Load Data", "Core Functions", "Select MCPs to Classify"
2. Run "Batch Config + Output Paths"
3. Run "Generate DWA Selection JSONL" → then "Upload + Create Batch"
4. Run the **DWA poll cell** — it auto-waits (polls every 30s) until done, then downloads results
5. Run "Parse DWA Results" — parses responses + saves intermediate state to disk
6. Run "Generate Task Rating JSONL" → then "Upload + Create Task Batch"
7. Run the **Task poll cell** — same auto-wait behavior
8. Run "Parse + Combine + Save" — produces the final CSV (same format as synchronous pipeline)
9. Run "Task-Level Aggregation" if you want the aggregated stats

**Kernel restart safe:** Intermediate state (`batch_mcp_meta`, `task_batch_meta`) is saved to `.pkl` files in `data/batch/`. If you restart the kernel, just re-run the prerequisite cells, set the `DWA_BATCH_ID` / `TASK_BATCH_ID` variable manually, and continue from the poll cell.

**Prerequisite cells:** Imports and Config, Load Data, Core Functions, Select MCPs to Classify

## Batch Config + Output Paths

In [5]:
import json

# ---------- Batch output paths ----------
BATCH_DIR = DATA_DIR / "batch"
BATCH_DIR.mkdir(exist_ok=True)

BATCH_OUTPUT_PATH = RESULTS_DIR / f"mcp_results_{date_str}.csv"

# JSONL files for the two batches
DWA_JSONL_PATH  = BATCH_DIR / f"dwa_selection_batch_{date_str}.jsonl"
TASK_JSONL_PATH = BATCH_DIR / f"task_rating_batch_{date_str}.jsonl"

# Resume logic for batch: if a partial output exists for today's run,
# skip already-classified MCPs (excluding api_error rows so they get retried).
if BATCH_OUTPUT_PATH.exists():
    existing_batch_df = pd.read_csv(BATCH_OUTPUT_PATH)
    total_existing = len(existing_batch_df)
    existing_batch_df = existing_batch_df[existing_batch_df["dwa_status"] != "api_error"]
    n_dropped = total_existing - len(existing_batch_df)
    if n_dropped > 0:
        print(f"Dropped {n_dropped} api_error rows (will re-process)")
    already_done_batch_urls = set(existing_batch_df["url"].dropna().tolist())
    batch_run_indices = [i for i in run_indices if mcp_df.loc[i, "url"] not in already_done_batch_urls]
    print(f"Resuming: skipping {len(run_indices) - len(batch_run_indices)} already-classified MCPs")
else:
    existing_batch_df = pd.DataFrame()
    batch_run_indices = list(run_indices)

print(f"Batch output: {BATCH_OUTPUT_PATH}")
print(f"MCPs to process via batch: {len(batch_run_indices)}")


Batch output: ..\data\mcp\results\mcp_results_2026-02-18.csv
MCPs to process via batch: 1183


## Batch 1: Generate DWA Selection JSONL + Submit

In [6]:
# ============================================================
# Build DWA selection JSONL + precompute top DWAs per MCP
# ============================================================

# Store per-MCP metadata needed for later steps
batch_mcp_meta = {}  # custom_id -> {mcp_idx, title, url, text_for_llm, uploaded_clean, top_dwas}

with open(DWA_JSONL_PATH, "w", encoding="utf-8") as f:
    for mcp_idx in batch_run_indices:
        row = mcp_df.iloc[mcp_idx]
        custom_id = f"dwa-{mcp_idx}"
        
        # Get top DWAs by cosine similarity
        top_dwas = get_top_dwas(mcp_idx)
        
        # Build the prompt (same as synchronous pipeline)
        dwa_prompt = DWA_SELECTION_PROMPT.format(
            mcp_desc=row["text_for_llm"],
            dwas=format_dwa_list(top_dwas)
        )
        
        # Write JSONL line
        batch_line = {
            "custom_id": custom_id,
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": MODEL,
                "messages": [{"role": "user", "content": dwa_prompt}],
                "temperature": TEMPERATURE,
                "max_tokens": 4096,
            }
        }
        f.write(json.dumps(batch_line) + "\n")
        
        # Save metadata
        batch_mcp_meta[custom_id] = {
            "mcp_idx": mcp_idx,
            "title": row["title"],
            "url": row["url"],
            "text_for_llm": row["text_for_llm"],
            "uploaded_clean": row.get("uploaded_clean", ""),
            "top_dwas": top_dwas,
        }

print(f"DWA selection JSONL written: {DWA_JSONL_PATH}")
print(f"Requests: {len(batch_mcp_meta)}")

DWA selection JSONL written: ..\data\batch\dwa_selection_batch_2026-02-18.jsonl
Requests: 1183


In [7]:
# ============================================================
# Upload JSONL + create batch
# ============================================================

# Upload the file
dwa_batch_file = client.files.create(
    file=open(DWA_JSONL_PATH, "rb"),
    purpose="batch"
)
print(f"Uploaded file: {dwa_batch_file.id}")

# Create the batch
dwa_batch = client.batches.create(
    input_file_id=dwa_batch_file.id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
    metadata={"description": f"DWA selection for {len(batch_mcp_meta)} MCPs"}
)
print(f"Batch created: {dwa_batch.id}")
print(f"Status: {dwa_batch.status}")

# Save batch ID for retrieval
DWA_BATCH_ID = dwa_batch.id

Uploaded file: file-MZiLdckErAkvDWrt7JeMdj
Batch created: batch_69960a0aeb88819084985f8c8a1447b5
Status: validating


## Batch 1: Poll Status + Retrieve DWA Results

In [8]:
# ============================================================
# Poll DWA batch until complete (auto-waits, checks every 30s)
# If you restarted the kernel, set DWA_BATCH_ID manually first:
#   DWA_BATCH_ID = "batch_..."
# ============================================================

import pickle

POLL_INTERVAL = 30  # seconds between status checks

while True:
    dwa_batch_status = client.batches.retrieve(DWA_BATCH_ID)
    status = dwa_batch_status.status
    counts = dwa_batch_status.request_counts
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Status: {status} | "
          f"{counts.completed}/{counts.total} completed, {counts.failed} failed")
    
    if status == "completed":
        # Download output
        dwa_output_content = client.files.content(dwa_batch_status.output_file_id).text
        dwa_output_lines = [json.loads(line) for line in dwa_output_content.strip().split("\n")]
        print(f"\nDWA batch complete! Downloaded {len(dwa_output_lines)} results.")
        
        if dwa_batch_status.error_file_id:
            dwa_error_content = client.files.content(dwa_batch_status.error_file_id).text
            print(f"Error file:\n{dwa_error_content[:2000]}")
        break
    
    elif status in ("failed", "expired", "cancelled"):
        print(f"\nBatch {status}!")
        if dwa_batch_status.errors:
            for err in dwa_batch_status.errors.data:
                print(f"  {err.code}: {err.message}")
        break
    
    else:
        time.sleep(POLL_INTERVAL)

[11:50:54] Status: validating | 0/0 completed, 0 failed
[11:51:24] Status: validating | 0/0 completed, 0 failed
[11:51:55] Status: in_progress | 0/1183 completed, 0 failed
[11:52:25] Status: in_progress | 0/1183 completed, 0 failed
[11:52:55] Status: in_progress | 74/1183 completed, 0 failed
[11:53:25] Status: in_progress | 677/1183 completed, 0 failed
[11:53:55] Status: in_progress | 1153/1183 completed, 0 failed
[11:54:26] Status: finalizing | 1183/1183 completed, 0 failed
[11:54:56] Status: completed | 1183/1183 completed, 0 failed

DWA batch complete! Downloaded 1183 results.


In [9]:
# ============================================================
# Parse DWA batch results + save intermediate state to disk
# ============================================================

# Map custom_id -> response text
dwa_responses = {}
for line in dwa_output_lines:
    custom_id = line["custom_id"]
    if line.get("error"):
        print(f"  Error for {custom_id}: {line['error']}")
        continue
    response_body = line["response"]["body"]
    dwa_responses[custom_id] = response_body["choices"][0]["message"]["content"]

print(f"Successful DWA responses: {len(dwa_responses)}/{len(dwa_output_lines)}")

# Parse each response using existing parse_dwa_response function
dwa_parse_summary = {"selected": 0, "none": 0, "not_enough_info": 0, "not_occ_relevant": 0, "parse_error": 0, "api_error": 0}

for custom_id, meta in batch_mcp_meta.items():
    if custom_id not in dwa_responses:
        meta["dwa_status"] = "api_error"
        meta["selected_dwas"] = []
        meta["dwa_response_raw"] = ""
        dwa_parse_summary["api_error"] += 1
        continue
    
    response_text = dwa_responses[custom_id]
    selected_dwas, dwa_status = parse_dwa_response(response_text, meta["top_dwas"])
    
    meta["dwa_status"] = dwa_status
    meta["selected_dwas"] = selected_dwas
    meta["dwa_response_raw"] = response_text
    dwa_parse_summary[dwa_status] = dwa_parse_summary.get(dwa_status, 0) + 1

print(f"\nDWA parsing summary:")
for status, count in sorted(dwa_parse_summary.items(), key=lambda x: -x[1]):
    if count > 0:
        print(f"  {status}: {count}")

# Save intermediate state so Batch 2 survives a kernel restart
BATCH_META_PATH = BATCH_DIR / f"batch_mcp_meta_{date_str}.pkl"
with open(BATCH_META_PATH, "wb") as f:
    pickle.dump(batch_mcp_meta, f)
print(f"\nIntermediate state saved to: {BATCH_META_PATH}")
print("(If you restart the kernel before Batch 2, this will be reloaded automatically)")

Successful DWA responses: 1183/1183

DWA parsing summary:
  selected: 1182
  not_enough_info: 1

Intermediate state saved to: ..\data\batch\batch_mcp_meta_2026-02-18.pkl
(If you restart the kernel before Batch 2, this will be reloaded automatically)


## Batch 2: Generate Task Rating JSONL + Submit

In [10]:
# ============================================================
# Build task rating JSONL for MCPs that had DWAs selected
# If kernel was restarted, reloads batch_mcp_meta from disk.
# ============================================================
import pickle

BATCH_META_PATH = BATCH_DIR / f"batch_mcp_meta_{date_str}.pkl"

# Reload intermediate state if not in memory (kernel restart)
if "batch_mcp_meta" not in dir() or not batch_mcp_meta:
    if BATCH_META_PATH.exists():
        with open(BATCH_META_PATH, "rb") as f:
            batch_mcp_meta = pickle.load(f)
        print(f"Reloaded batch_mcp_meta from disk ({len(batch_mcp_meta)} MCPs)")
    else:
        raise FileNotFoundError(f"No saved state found at {BATCH_META_PATH}. "
                                "Run Batch 1 cells first.")

task_batch_meta = {}  # custom_id -> {tasks, used_dwas, dwa_custom_id}
n_skipped = 0

with open(TASK_JSONL_PATH, "w", encoding="utf-8") as f:
    for dwa_custom_id, meta in batch_mcp_meta.items():
        if meta["dwa_status"] != "selected" or len(meta["selected_dwas"]) == 0:
            n_skipped += 1
            continue
        
        mcp_idx = meta["mcp_idx"]
        task_custom_id = f"task-{mcp_idx}"
        
        # Get tasks for selected DWAs (same logic as synchronous pipeline)
        tasks, used_dwas = get_tasks_for_dwas(meta["selected_dwas"], max_tasks=MAX_TASKS)
        
        if len(tasks) == 0:
            n_skipped += 1
            continue
        
        # Build the prompt
        task_prompt = TASK_RATING_PROMPT.format(
            mcp_desc=meta["text_for_llm"],
            tasks_with_occupations=format_task_list(tasks)
        )
        
        batch_line = {
            "custom_id": task_custom_id,
            "method": "POST",
            "url": "/v1/chat/completions",
            "body": {
                "model": MODEL,
                "messages": [{"role": "user", "content": task_prompt}],
                "temperature": TEMPERATURE,
                "max_tokens": 4096,
            }
        }
        f.write(json.dumps(batch_line) + "\n")
        
        task_batch_meta[task_custom_id] = {
            "dwa_custom_id": dwa_custom_id,
            "tasks": tasks,
            "used_dwas": used_dwas,
            "n_tasks_sent": len(tasks),
        }

# Save task_batch_meta too so final parse cell survives restart
TASK_BATCH_META_PATH = BATCH_DIR / f"task_batch_meta_{date_str}.pkl"
with open(TASK_BATCH_META_PATH, "wb") as f:
    pickle.dump(task_batch_meta, f)

print(f"Task rating JSONL written: {TASK_JSONL_PATH}")
print(f"Requests: {len(task_batch_meta)} (skipped {n_skipped} MCPs with no selected DWAs/tasks)")

Task rating JSONL written: ..\data\batch\task_rating_batch_2026-02-18.jsonl
Requests: 1182 (skipped 1 MCPs with no selected DWAs/tasks)


In [11]:
# ============================================================
# Upload JSONL + create task rating batch
# ============================================================

task_batch_file = client.files.create(
    file=open(TASK_JSONL_PATH, "rb"),
    purpose="batch"
)
print(f"Uploaded file: {task_batch_file.id}")

task_batch = client.batches.create(
    input_file_id=task_batch_file.id,
    endpoint="/v1/chat/completions",
    completion_window="24h",
    metadata={"description": f"Task rating for {len(task_batch_meta)} MCPs"}
)
print(f"Batch created: {task_batch.id}")
print(f"Status: {task_batch.status}")

TASK_BATCH_ID = task_batch.id

Uploaded file: file-SP6AvxLpCjv7pW6c6rSDWZ
Batch created: batch_69960b27279c819080efabb0a983631a
Status: validating


## Batch 2: Poll Status + Retrieve Task Results + Save

In [12]:
# ============================================================
# Poll task rating batch until complete (auto-waits, checks every 30s)
# If you restarted the kernel, set TASK_BATCH_ID manually first:
#   TASK_BATCH_ID = "batch_..."
# ============================================================

while True:
    task_batch_status = client.batches.retrieve(TASK_BATCH_ID)
    status = task_batch_status.status
    counts = task_batch_status.request_counts
    print(f"[{datetime.now().strftime('%H:%M:%S')}] Status: {status} | "
          f"{counts.completed}/{counts.total} completed, {counts.failed} failed")
    
    if status == "completed":
        task_output_content = client.files.content(task_batch_status.output_file_id).text
        task_output_lines = [json.loads(line) for line in task_output_content.strip().split("\n")]
        print(f"\nTask batch complete! Downloaded {len(task_output_lines)} results.")
        
        if task_batch_status.error_file_id:
            task_error_content = client.files.content(task_batch_status.error_file_id).text
            print(f"Error file:\n{task_error_content[:2000]}")
        break
    
    elif status in ("failed", "expired", "cancelled"):
        print(f"\nBatch {status}!")
        if task_batch_status.errors:
            for err in task_batch_status.errors.data:
                print(f"  {err.code}: {err.message}")
        break
    
    else:
        time.sleep(POLL_INTERVAL)

[11:55:37] Status: validating | 0/0 completed, 0 failed
[11:56:07] Status: in_progress | 0/1182 completed, 0 failed
[11:56:38] Status: in_progress | 0/1182 completed, 0 failed
[11:57:08] Status: in_progress | 64/1182 completed, 0 failed
[11:57:38] Status: in_progress | 88/1182 completed, 0 failed
[11:58:08] Status: in_progress | 115/1182 completed, 0 failed
[11:58:38] Status: in_progress | 189/1182 completed, 0 failed
[11:59:08] Status: in_progress | 553/1182 completed, 0 failed
[11:59:39] Status: in_progress | 882/1182 completed, 0 failed
[12:00:09] Status: finalizing | 1182/1182 completed, 0 failed
[12:00:39] Status: finalizing | 1182/1182 completed, 0 failed
[12:01:09] Status: finalizing | 1182/1182 completed, 0 failed
[12:01:39] Status: finalizing | 1182/1182 completed, 0 failed
[12:02:10] Status: completed | 1182/1182 completed, 0 failed

Task batch complete! Downloaded 1182 results.


In [13]:
# ============================================================
# Parse task rating results + combine with DWA results + merge
# with historical results + save final output.
# Reloads intermediate state from disk if kernel was restarted.
# ============================================================
import pickle

BATCH_META_PATH      = BATCH_DIR / f"batch_mcp_meta_{date_str}.pkl"
TASK_BATCH_META_PATH = BATCH_DIR / f"task_batch_meta_{date_str}.pkl"

# Reload if needed (kernel restart)
if "batch_mcp_meta" not in dir() or not batch_mcp_meta:
    with open(BATCH_META_PATH, "rb") as f:
        batch_mcp_meta = pickle.load(f)
    print(f"Reloaded batch_mcp_meta from disk ({len(batch_mcp_meta)} MCPs)")

if "task_batch_meta" not in dir() or not task_batch_meta:
    with open(TASK_BATCH_META_PATH, "rb") as f:
        task_batch_meta = pickle.load(f)
    print(f"Reloaded task_batch_meta from disk ({len(task_batch_meta)} MCPs)")

# Map custom_id -> response text for task ratings
task_responses = {}
for line in task_output_lines:
    custom_id = line["custom_id"]
    if line.get("error"):
        print(f"  Error for {custom_id}: {line['error']}")
        continue
    response_body = line["response"]["body"]
    task_responses[custom_id] = response_body["choices"][0]["message"]["content"]

print(f"Successful task responses: {len(task_responses)}/{len(task_output_lines)}")

# ---- Build results from this batch ----
batch_results = []

for dwa_custom_id, meta in batch_mcp_meta.items():
    mcp_idx = meta["mcp_idx"]
    task_custom_id = f"task-{mcp_idx}"

    result = {
        "title": meta["title"],
        "url": meta["url"],
        "text_for_llm": meta["text_for_llm"],
        "uploaded_clean": meta["uploaded_clean"],
        "dwa_status": meta["dwa_status"],
        "dwas_selected": "; ".join([d[0] for d in meta["selected_dwas"]]) if meta["selected_dwas"] else "",
        "n_dwas_selected": len(meta["selected_dwas"]),
        "dwa_response_raw": meta["dwa_response_raw"],
        "task_ratings": "",
        "task_rating_response_raw": "",
        "n_tasks_rated": 0,
    }

    if task_custom_id in task_batch_meta and task_custom_id in task_responses:
        tmeta = task_batch_meta[task_custom_id]
        task_response_raw = task_responses[task_custom_id]
        rated_tasks = parse_task_ratings(task_response_raw, tmeta["tasks"])

        task_ratings_str = "; ".join(
            [f"{t} ({o}): {r}" for t, o, d, r in rated_tasks]
        )
        result["task_ratings"] = task_ratings_str
        result["task_rating_response_raw"] = task_response_raw
        result["n_tasks_rated"] = len(rated_tasks)
        result["n_tasks_sent"] = tmeta["n_tasks_sent"]
        result["dwas_used_for_tasks"] = "; ".join(tmeta["used_dwas"])

    batch_results.append(result)

# ---- Combine: partial resume rows + new batch rows ----
batch_results_df = pd.DataFrame(batch_results)
if not existing_batch_df.empty:
    combined_new = pd.concat([existing_batch_df, batch_results_df], ignore_index=True)
else:
    combined_new = batch_results_df

# ---- Merge with historical results from prior runs ----
if EXISTING_RESULTS_FILE != "none":
    historical_df = pd.read_csv(RESULTS_DIR / EXISTING_RESULTS_FILE)
    print(f"Historical results: {len(historical_df)} MCPs")
    save_batch_df = pd.concat([historical_df, combined_new], ignore_index=True)
else:
    save_batch_df = combined_new

save_batch_df.to_csv(BATCH_OUTPUT_PATH, index=False)

print(f"\n{'='*60}")
print(f"Batch classification complete.")
print(f"  New MCPs processed this batch: {len(batch_results)}")
print(f"  Total MCPs in output:          {len(save_batch_df)}")
print(f"Results saved to: {BATCH_OUTPUT_PATH.name}")
print(f"\nBreakdown (new MCPs only):")
print(f"  DWAs selected:    {sum(1 for r in batch_results if r['dwa_status'] == 'selected')}")
print(f"  None:             {sum(1 for r in batch_results if r['dwa_status'] == 'none')}")
print(f"  Not enough info:  {sum(1 for r in batch_results if r['dwa_status'] == 'not_enough_info')}")
print(f"  Not occ relevant: {sum(1 for r in batch_results if r['dwa_status'] == 'not_occ_relevant')}")
print(f"  Total tasks rated:{sum(r['n_tasks_rated'] for r in batch_results)}")


Successful task responses: 1182/1182
Historical results: 8957 MCPs

Batch classification complete.
  New MCPs processed this batch: 1183
  Total MCPs in output:          10140
Results saved to: mcp_results_2026-02-18.csv

Breakdown (new MCPs only):
  DWAs selected:    1182
  None:             0
  Not enough info:  1
  Not occ relevant: 0
  Total tasks rated:140563


## Batch Task-Level Aggregation

In [14]:
# ============================================================
# Task-level aggregation for batch results
# Loads the combined (historical + new) results file.
# ============================================================

batch_full_df = pd.read_csv(BATCH_OUTPUT_PATH)
print(f"Loaded {len(batch_full_df)} MCPs from {BATCH_OUTPUT_PATH.name}")

all_batch_task_ratings = []

for _, row in batch_full_df.iterrows():
    ratings_str = row.get("task_ratings", "")
    if not isinstance(ratings_str, str) or not ratings_str.strip():
        continue

    mcp_title = row["title"]
    mcp_url = row["url"]

    for entry in ratings_str.split("; "):
        entry = entry.strip()
        if not entry:
            continue
        match = re.match(r"^(.+):\s*(\d)$", entry)
        if not match:
            continue
        task_occ_str = match.group(1).strip()
        rating = int(match.group(2))

        occ_match = re.match(r"^(.+)\s*\(([^)]+)\)$", task_occ_str)
        if occ_match:
            task_text = occ_match.group(1).strip()
            occupation = occ_match.group(2).strip()
        else:
            task_text = task_occ_str
            occupation = ""

        all_batch_task_ratings.append({
            "task": task_text,
            "occupation": occupation,
            "rating": rating,
            "mcp_title": mcp_title,
            "mcp_url": mcp_url,
        })

batch_task_flat = pd.DataFrame(all_batch_task_ratings)
print(f"Total task-rating pairs: {len(batch_task_flat):,}")
print(f"Unique (task, occupation) pairs: {batch_task_flat.groupby(['task', 'occupation']).ngroups:,}")

batch_task_agg = batch_task_flat.groupby(["task", "occupation"]).agg(
    n_ratings=("rating", "count"),
    mean_rating=("rating", "mean"),
    median_rating=("rating", "median"),
    max_rating=("rating", "max"),
    min_rating=("rating", "min"),
    p25_rating=("rating", lambda x: np.percentile(x, 25)),
    p75_rating=("rating", lambda x: np.percentile(x, 75)),
).reset_index()

for col in ["mean_rating", "median_rating", "p25_rating", "p75_rating"]:
    batch_task_agg[col] = batch_task_agg[col].round(2)

batch_task_agg = batch_task_agg.sort_values(
    ["n_ratings", "mean_rating"], ascending=[False, False]
).reset_index(drop=True)

BATCH_TASK_AGG_PATH = RESULTS_DIR / f"task_results_{date_str}.csv"
batch_task_agg.to_csv(BATCH_TASK_AGG_PATH, index=False)

print(f"\nTask aggregation saved: {BATCH_TASK_AGG_PATH.name}  ({len(batch_task_agg):,} rows)")
batch_task_agg.head(10)


Loaded 10140 MCPs from mcp_results_2026-02-18.csv
Total task-rating pairs: 1,205,077
Unique (task, occupation) pairs: 11,567

Task aggregation saved: task_results_2026-02-18.csv  (11,567 rows)


Unnamed: 0,task,occupation,n_ratings,mean_rating,median_rating,max_rating,min_rating,p25_rating,p75_rating
0,"Document, design, code, or test Geographic Inf...",Geographic Information Systems Technologists a...,4776,2.22,2.0,5,1,2.0,3.0
1,"Plan, install, repair, or troubleshoot telehea...",Health Informatics Specialists,4350,1.36,1.0,3,1,1.0,2.0
2,Develop and document database architectures.,Database Architects,4301,2.3,2.0,5,1,2.0,3.0
3,Provide training or technical assistance in We...,Web Administrators,4103,1.58,2.0,4,1,1.0,2.0
4,Provide technical support for existing reports...,Business Intelligence Analysts,4082,1.92,2.0,4,1,1.0,2.0
5,Provide technical guidance or support for the ...,Computer Systems Engineers/Architects,4082,1.82,2.0,4,1,1.0,2.0
6,"Inform Web site users of problems, problem res...",Web Administrators,4082,1.79,2.0,4,1,1.0,2.0
7,Provide staff and users with assistance solvin...,Computer Systems Analysts,4082,1.73,2.0,4,1,1.0,2.0
8,Answer user inquiries regarding computer softw...,Computer User Support Specialists,4082,1.72,2.0,4,1,1.0,2.0
9,Provide technical support to junior staff or c...,Database Administrators,4082,1.7,2.0,4,1,1.0,2.0


## Batch Utilities

In [None]:
# ============================================================
# Utility: List recent batches or cancel a batch
# ============================================================

# List recent batches
print("Recent batches:")
for batch in client.batches.list(limit=10):
    print(f"  {batch.id} | {batch.status} | {batch.request_counts.completed}/{batch.request_counts.total} done | {batch.metadata}")

# To cancel a batch, uncomment:
# client.batches.cancel("batch_...")