# Parallel Labeling Tutorial: ThreadPoolExecutor for LLM Calls

## Learning Objectives

- ‚úÖ Use ThreadPoolExecutor for parallel LLM API calls
- ‚úÖ Implement incremental labeling with resume capability
- ‚úÖ Calculate and optimize labeling costs
- ‚úÖ Track progress with tqdm
- ‚úÖ Handle API failures gracefully
- ‚úÖ Use Pydantic for structured LLM outputs

## Estimated Time

**Execution:** 20-30 minutes (depends on API rate limits)

**‚ö†Ô∏è API Cost Warning:** Running on full dataset (~200 conversations) costs ~$0.50-1.00

## Setup

In [None]:
from pathlib import Path
import json
import os
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed

# LLM and utilities
import litellm
from pydantic import BaseModel
from dotenv import load_dotenv
from tqdm.notebook import tqdm

load_dotenv()
print("‚úì Imports successful")

In [None]:
# ========================================
# CONFIGURATION: Demo vs Full Mode
# ========================================

# Set DEMO_MODE = False to label full dataset
DEMO_MODE = True  # Default: Quick demo for tutorial

if DEMO_MODE:
    DEMO_SIZE = 5  # Label 5 traces for demo
    MAX_WORKERS = 5
    print("üöÄ DEMO MODE: Labeling small sample")
    print(f"   Traces: {DEMO_SIZE} | Workers: {MAX_WORKERS}")
    print(f"   Estimated cost: $0.05-0.10 | Time: ~10-15 seconds")
else:
    DEMO_SIZE = None  # Use full dataset (~200 traces)
    MAX_WORKERS = 20
    print("üìä FULL MODE: Labeling complete dataset")
    print(f"   Traces: ~200 | Workers: {MAX_WORKERS}")
    print(f"   Estimated cost: $0.50-1.00 | Time: ~5-8 minutes")

print("\nüí° To switch modes, change DEMO_MODE in this cell and re-run notebook")

## 1. Define Structured Output Schema

In [None]:
class SubstantiationLabel(BaseModel):
    """Pydantic model for LLM output.
    
    Ensures consistent JSON structure from GPT-4o.
    """
    all_responses_substantiated: bool
    rationale: str

# Test instantiation
example = SubstantiationLabel(
    all_responses_substantiated=True,
    rationale="All claims verified by tool outputs"
)
print(f"‚úì Schema: {example.model_dump()}")

## 2. Load Cleaned Traces

In [None]:
DATA_FILE = Path("nurtureboss_traces.json")

if not DATA_FILE.exists():
    raise FileNotFoundError(
        f"Run clean_logs.py first to generate {DATA_FILE}"
    )

with open(DATA_FILE) as f:
    traces = json.load(f)

print(f"‚úì Loaded {len(traces)} conversations")
print(f"‚úì Sample ID: {traces[0]['id']}")

## 3. Build Evaluation Prompt

In [None]:
def build_prompt(messages: List[Dict], metadata: Dict) -> str:
    """Construct substantiation evaluation prompt."""
    
    # Format conversation
    convo = "\n".join(
        f"{msg['role'].upper()}: {msg['content']}"
        for msg in messages
    )
    
    # Format metadata
    meta_str = json.dumps(metadata, indent=2) if metadata else "<none>"
    
    return f"""
You are evaluating substantiation in AI conversations.

TASK: Determine if every factual claim can be verified by:
1. User-provided information
2. Tool outputs in metadata
3. Tool capabilities in metadata

PASS = All claims substantiated
FAIL = At least one unsubstantiated claim

Ignore courtesy statements ("How can I help?").

=== CONVERSATION ===
{convo}

=== METADATA ===
{meta_str}

Return JSON: {{"all_responses_substantiated": bool, "rationale": str}}
""".strip()

# Test on first trace
sample_prompt = build_prompt(
    traces[0].get('messages', []),
    {k:v for k,v in traces[0].items() if k not in ['id', 'messages']}
)
print(f"Prompt length: {len(sample_prompt)} chars")
print(f"\nFirst 500 chars:\n{sample_prompt[:500]}...")

## 4. Label Single Conversation (Sequential)

In [None]:
def label_one_conversation(
    trace: Dict,
    model: str = "gpt-4o"
) -> SubstantiationLabel:
    """Call LLM to label one conversation."""
    
    messages = trace.get('messages', [])
    metadata = {k:v for k,v in trace.items() if k not in ['id', 'messages']}
    
    prompt = build_prompt(messages, metadata)
    
    response = litellm.completion(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        response_format=SubstantiationLabel,
        temperature=0
    )
    
    result = json.loads(response.choices[0].message.content)
    return SubstantiationLabel(**result)

# Test on one conversation
print("Testing on first trace (may take 3-5 seconds)...")
test_label = label_one_conversation(traces[0])
print(f"\n‚úì Result: {test_label.all_responses_substantiated}")
print(f"  Rationale: {test_label.rationale}")

# Demo on subset using configuration
demo_traces = traces[:DEMO_SIZE] if DEMO_SIZE else traces

print(f"Labeling {len(demo_traces)} traces (parallel, ~10-15 seconds)...\n")
demo_labeled = label_parallel(demo_traces, max_workers=MAX_WORKERS)

print(f"\n‚úì Labeled {len(demo_labeled)} conversations")
print(f"\nSample result:")
print(f"  ID: {demo_labeled[0]['id']}")
print(f"  Substantiated: {demo_labeled[0].get('all_responses_substantiated')}")
print(f"  Rationale: {demo_labeled[0].get('substantiation_rationale')}")

In [None]:
def label_parallel(
    traces_to_label: List[Dict],
    max_workers: int = 10,
    model: str = "gpt-4o"
) -> List[Dict]:
    """Label conversations in parallel using ThreadPoolExecutor.
    
    Args:
        traces_to_label: Conversations to label
        max_workers: Parallel threads (adjust for API rate limits)
        model: LLM model name
    
    Returns:
        Traces with added 'all_responses_substantiated' and 'substantiation_rationale'
    """
    labeled = []
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_trace = {
            executor.submit(label_one_conversation, trace, model): trace
            for trace in traces_to_label
        }
        
        # Collect results with progress bar
        for future in tqdm(
            as_completed(future_to_trace),
            total=len(traces_to_label),
            desc="Labeling"
        ):
            trace = future_to_trace[future]
            
            try:
                result = future.result()
                trace['all_responses_substantiated'] = result.all_responses_substantiated
                trace['substantiation_rationale'] = result.rationale
                labeled.append(trace)
            except Exception as e:
                print(f"\n‚ö†Ô∏è  Error on {trace['id']}: {e}")
                labeled.append(trace)  # Keep unlabeled
    
    return labeled

# Demo on small subset (5 traces)
DEMO_SIZE = 5
demo_traces = traces[:DEMO_SIZE]

print(f"Labeling {DEMO_SIZE} traces (parallel, ~10-15 seconds)...\n")
demo_labeled = label_parallel(demo_traces, max_workers=5)

print(f"\n‚úì Labeled {len(demo_labeled)} conversations")
print(f"\nSample result:")
print(f"  ID: {demo_labeled[0]['id']}")
print(f"  Substantiated: {demo_labeled[0].get('all_responses_substantiated')}")
print(f"  Rationale: {demo_labeled[0].get('substantiation_rationale')}")

## 6. Cost Estimation

In [None]:
def estimate_cost(
    num_conversations: int,
    avg_tokens_per_call: int = 800,
    model: str = "gpt-4o"
) -> dict:
    """Estimate labeling cost.
    
    Pricing (as of 2024):
    - GPT-4o: ~$2.50 / 1M input tokens, ~$10 / 1M output tokens
    - GPT-4o-mini: ~$0.15 / 1M input tokens, ~$0.60 / 1M output tokens
    """
    
    # Rough estimates
    pricing = {
        "gpt-4o": {"input": 2.50, "output": 10.0},
        "gpt-4o-mini": {"input": 0.15, "output": 0.60}
    }
    
    if model not in pricing:
        return {"error": f"Unknown model: {model}"}
    
    input_tokens = num_conversations * avg_tokens_per_call
    output_tokens = num_conversations * 50  # ~50 tokens for JSON response
    
    input_cost = (input_tokens / 1_000_000) * pricing[model]["input"]
    output_cost = (output_tokens / 1_000_000) * pricing[model]["output"]
    total_cost = input_cost + output_cost
    
    return {
        "conversations": num_conversations,
        "input_tokens": input_tokens,
        "output_tokens": output_tokens,
        "total_cost_usd": round(total_cost, 2),
        "cost_per_label": round(total_cost / num_conversations, 4)
    }

# Estimate for full dataset
full_estimate = estimate_cost(len(traces), model="gpt-4o")
mini_estimate = estimate_cost(len(traces), model="gpt-4o-mini")

print("Cost Estimates:\n")
print(f"GPT-4o ({len(traces)} conversations):")
print(f"  Total: ${full_estimate['total_cost_usd']}")
print(f"  Per label: ${full_estimate['cost_per_label']}")
print()
print(f"GPT-4o-mini ({len(traces)} conversations):")
print(f"  Total: ${mini_estimate['total_cost_usd']}")
print(f"  Per label: ${mini_estimate['cost_per_label']}")
print(f"  Savings: ${full_estimate['total_cost_usd'] - mini_estimate['total_cost_usd']:.2f} (97% reduction)")

## 7. Incremental Labeling (Resume Capability)

In [None]:
def label_with_resume(
    all_traces: List[Dict],
    output_file: Path,
    max_workers: int = 10,
    save_every: int = 50
) -> List[Dict]:
    """Label traces with checkpoint saving.
    
    Skips traces that already have 'all_responses_substantiated' field.
    Saves progress every `save_every` labels.
    """
    
    # Load existing labels if file exists
    if output_file.exists():
        with open(output_file) as f:
            all_traces = json.load(f)
        print(f"‚úì Loaded existing file: {output_file}")
    
    # Filter unlabeled
    unlabeled = [
        t for t in all_traces
        if 'all_responses_substantiated' not in t
    ]
    
    labeled_count = len(all_traces) - len(unlabeled)
    print(f"Already labeled: {labeled_count}")
    print(f"To label: {len(unlabeled)}")
    
    if not unlabeled:
        print("‚úì All traces already labeled!")
        return all_traces
    
    # Label in batches
    for i in range(0, len(unlabeled), save_every):
        batch = unlabeled[i:i+save_every]
        print(f"\nBatch {i//save_every + 1}: Labeling {len(batch)} traces...")
        
        labeled_batch = label_parallel(batch, max_workers=max_workers)
        
        # Update in-place
        for j, trace in enumerate(batch):
            trace.update(labeled_batch[j])
        
        # Save checkpoint
        with open(output_file, 'w') as f:
            json.dump(all_traces, f, indent=2)
        print(f"‚úì Saved checkpoint to {output_file}")
    
    return all_traces

# Demo: Label with checkpointing (small batch)
OUTPUT_FILE = Path("nurtureboss_traces_labeled_demo.json")

print("Demo: Incremental labeling with resume capability\n")
demo_result = label_with_resume(
    traces[:10],  # Only 10 for demo
    OUTPUT_FILE,
    max_workers=5,
    save_every=5
)

print(f"\n‚úì Final count: {len(demo_result)} labeled traces")

## 8. Run Full Labeling (Optional - Costs Money!)

In [None]:
# ‚ö†Ô∏è  UNCOMMENT TO RUN ON FULL DATASET
# This will cost ~$0.50-1.00 depending on model

# FULL_OUTPUT = Path("nurtureboss_traces_labeled.json")
#
# final_traces = label_with_resume(
#     traces,
#     FULL_OUTPUT,
#     max_workers=20,  # Adjust based on API rate limits
#     save_every=50
# )
#
# print(f"\n‚úì Labeled {len(final_traces)} total conversations")
# print(f"  Saved to: {FULL_OUTPUT}")

print("‚è≠Ô∏è  Skipped full labeling (uncomment code above to run)")

## Summary

### What You Learned

1. **Parallel processing with ThreadPoolExecutor** - 10-64x speedup vs sequential
2. **Structured outputs with Pydantic** - Type-safe LLM responses
3. **Incremental labeling** - Resume after interruptions, save checkpoints
4. **Cost optimization** - Estimate before running, choose model wisely
5. **Progress tracking** - tqdm for user feedback

### Performance Comparison

| Approach | Time for 200 traces | Cost (GPT-4o) |
|----------|---------------------|---------------|
| Sequential | ~40 min | $1.00 |
| Parallel (10 workers) | ~5-8 min | $1.00 |
| Parallel (64 workers) | ~2-3 min | $1.00 |

**Key Insight:** Parallelization saves time, not money. Same API calls = same cost.

### Next Steps

- Run on full dataset (costs ~$1)
- Proceed to [Judge Evaluation Pipeline](judge_evaluation_pipeline_tutorial.ipynb)
- Apply bias correction with judgy library

---

**Tutorial Status:** ‚úÖ Complete  
**Last Updated:** 2025-10-30