# Harbor Framework Tutorial

Learn Harbor by running a simple hello-world example.

**Steps:**
1. Install Harbor
2. Set API key
3. Run hello-world oracle (downloads and tests)
4. View the task template
5. Run with Claude AI agent
6. View results
7. Create a new task

## Step 1.1: Clone Harbor Repository

In [None]:
# Clone the Harbor repository and set task path
import os

!git clone https://github.com/laude-institute/harbor.git

# Define the task path for use throughout the notebook
task_path = os.path.join(".", "harbor", "examples", "tasks", "hello-world")

print("✓ Harbor repository cloned")
print(f"✓ Task path set to: {task_path}")

In [None]:
# Install Harbor
!uv tool install harbor

# Update PATH so harbor command works
import os
os.environ["PATH"] = f"/root/.local/bin:{os.environ.get('PATH', '')}"

print("✓ Harbor installed")

## Step 2: Set Your API Key

In [None]:
import os

gpt_model = "openai/@openai-tbench/gpt-5"
claude_model = "openai/@anthropic-tbench/claude-sonnet-4-5-20250929"
# Replace with your actual Anthropic API key
os.environ["OPENAI_API_KEY"] = "your-api-key-here"
# os.environ["ANTHROPIC_API_KEY"] = "your-api-key-here"

os.environ["OPENAI_BASE_URL"] = "https://api.portkey.ai/v1"

print("✓ API key set")

## Step 3: Run Hello-World Oracle

The **oracle agent** runs the reference solution to verify the task works correctly.

This will:
- Use the local hello-world task from the cloned repository
- Build a Docker container
- Run the reference solution (solution/solve.sh)
- Verify tests pass
- Show you if it succeeded (should be 100% pass)

**This is your hello-world test - it should always pass!**

In [None]:
# Run oracle agent on the local hello-world task

print("=" * 70)
print(f"TEST ORACLE ({task_path}):")
print("=" * 70)
!harbor run -p {task_path} -a oracle

In [None]:
# Run oracle agent on the local hello-world task interactive
import os

abs_task_path = os.path.abspath(task_path)

print("=" * 70)
print(f"TEST ORACLE INTERACTIVE({abs_task_path}):")
print("=" * 70)
print(f"harbor tasks start-env -p {abs_task_path} -a -i")

## Step 4: View the Task Template

Now let's look at what's in the hello-world task from the local repository.

In [None]:
# View the task instruction
instruction_path = os.path.join(task_path, "instruction.md")

print("=" * 70)
print("TASK INSTRUCTION (what agents need to do):")
print("=" * 70)
with open(instruction_path, 'r') as f:
    print(f.read())

In [None]:
# View the reference solution
solution_path = os.path.join(task_path, "solution", "solve.sh")

print("=" * 70)
print("REFERENCE SOLUTION (what oracle ran):")
print("=" * 70)
with open(solution_path, 'r') as f:
    print(f.read())

In [None]:
# View the test script
test_path = os.path.join(task_path, "tests", "test.sh")

print("=" * 70)
print("TEST SCRIPT (how solutions are verified):")
print("=" * 70)
with open(test_path, 'r') as f:
    print(f.read())

## Step 5: Run with Claude AI Agent

Now let's see how Claude solves the same task.

In [None]:
print("=" * 70)
print(f"RUN CHECK ({gpt_model}) ({task_path}):")
print("=" * 70)
!harbor tasks check -m {gpt_model} {task_path}

In [None]:
# Run Claude Code agent on the local hello-world task
print("=" * 70)
print(f"RUN AGENT ({claude_model}) ({task_path}):")
print("=" * 70)
!harbor run -p {task_path} -m {claude_model} -a terminus-2
# !harbor run -p {task_path} -m {claude_model} -a claude-code

## Step 6: View Results

In [None]:
import glob
import json
import os
import re
from pathlib import Path
from datetime import datetime

# Find all job runs
job_dirs = sorted(glob.glob("jobs/*"), key=os.path.getmtime, reverse=True)

print(f"Total runs: {len(job_dirs)}\n")

# Show latest run
for i, job in enumerate(job_dirs[:1], 1):
    config_file = f"{job}/config.json"
    result_file = f"{job}/result.json"
    
    if Path(config_file).exists() and Path(result_file).exists():
        try:
            with open(config_file) as f:
                config = json.load(f)
            with open(result_file) as f:
                result = json.load(f)
            
            # Extract Agent and Model info
            agent_info = config.get('agents', [{}])[0]
            agent_name = agent_info.get('name', 'N/A')
            model_name = agent_info.get('model_name', 'N/A')
            
            # Extract Status and Score
            stats = result.get('stats', {})
            n_errors = stats.get('n_errors', 0)
            evals = stats.get('evals', {})
            
            status = "Unknown"
            score = "N/A"
            
            if n_errors > 0:
                status = "Error"
                # Check for specific exceptions if available
                for key, val in evals.items():
                     if val.get('n_errors', 0) > 0:
                         exceptions = val.get('exception_stats', {})
                         if exceptions:
                             status = f"Error ({', '.join(exceptions.keys())})"
            elif evals:
                # Assuming single evaluation key for now or taking the first one
                first_eval = next(iter(evals.values()))
                metrics = first_eval.get('metrics', [{}])
                if metrics:
                    mean_score = metrics[0].get('mean', 0)
                    score = f"{mean_score:.2f}"
                    if mean_score == 1.0:
                        status = "Pass"
                    else:
                        status = "Fail"
            
            # Calculate Duration
            start_time = result.get('started_at')
            end_time = result.get('finished_at')
            duration = "N/A"
            if start_time and end_time:
                try:
                    start = datetime.fromisoformat(start_time)
                    end = datetime.fromisoformat(end_time)
                    duration = str(end - start).split('.')[0] # Remove microseconds
                except ValueError:
                    pass

            print(f"{i}. {Path(job).name}")
            print(f"   Agent:    {agent_name}")
            print(f"   Model:    {model_name}")
            print(f"   Status:   {status}")
            print(f"   Score:    {score}")
            print(f"   Duration: {duration}")
            print()

            # --- List Agent Responses ---
            print("   Agent Responses:")
            # Find task instance directories (e.g., hello-world__...)
            task_dirs = glob.glob(f"{job}/*__*")
            for task_dir in task_dirs:
                task_name = Path(task_dir).name
                print(f"   Task Instance: {task_name}")
                
                # Find episode directories
                episode_dirs = glob.glob(f"{task_dir}/agent/episode-*")
                
                # Sort by episode number
                def get_episode_num(path):
                    match = re.search(r'episode-(\d+)', path)
                    return int(match.group(1)) if match else -1
                
                episode_dirs.sort(key=get_episode_num)
                
                for ep_dir in episode_dirs:
                    ep_num = get_episode_num(ep_dir)
                    response_file = Path(ep_dir) / "response.txt"
                    if response_file.exists():
                        with open(response_file, 'r') as f:
                            response_content = f.read().strip()
                        
                        # Truncate if too long for display
                        display_content = response_content
                        if len(display_content) > 200:
                             display_content = display_content[:200] + "..."
                        
                        print(f"      Episode {ep_num}: {display_content.replace(chr(10), ' ')}") # Replace newlines with spaces for compact view
                print()

            
        except Exception as e:
            print(f"{i}. {Path(job).name} - Error reading results: {e}")
            print()

## Step 7: Create a Custom Task

Let's create a new task by copying the hello-world template. We'll call it `my-task`.

In [None]:
import shutil
import os

# Define new task path
new_task_name = "my-task"
new_task_path = os.path.join("tasks", new_task_name)

# Source path (hello-world)
source_path = os.path.join("harbor", "examples", "tasks", "hello-world")

# Copy if it doesn't exist
if not os.path.exists(new_task_path):
    shutil.copytree(source_path, new_task_path)
    print(f"✓ Created new task at: {new_task_path}")
else:
    print(f"✓ Task already exists at: {new_task_path}")

# List files
print("\nTask files:")
for f in os.listdir(new_task_path):
    print(f" - {f}")

### Edit Task Instructions

Modify the cell below to change what the agent needs to do.

In [None]:
# Edit the instruction.md file
instruction_content = """
Create a python script named `calculate.py` that prints the sum of 5 and 7.
The output should be exactly "12".
"""

instruction_file = os.path.join(new_task_path, "instruction.md")

with open(instruction_file, "w") as f:
    f.write(instruction_content.strip())

print(f"✓ Updated instructions in {instruction_file}")
print("-" * 40)
print(instruction_content.strip())
print("-" * 40)

### Update Reference Solution & Tests

Since we changed the task, we need to update the solution and tests.

In [None]:
# Update the solution/solve.sh
solution_content = """#!/bin/bash
echo 'print(5 + 7)' > calculate.py
"""

with open(os.path.join(new_task_path, "solution", "solve.sh"), "w") as f:
    f.write(solution_content)
os.chmod(os.path.join(new_task_path, "solution", "solve.sh"), 0o755)

# Update tests/test_state.py
test_state_content = """from pathlib import Path
import subprocess

def test_calculate_file_exists():
    file_path = Path("/app/calculate.py")
    assert file_path.exists(), f"File {file_path} does not exist"

def test_calculate_output():
    file_path = Path("/app/calculate.py")
    # Run the script and capture output
    result = subprocess.run(["python3", str(file_path)], capture_output=True, text=True)
    content = result.stdout.strip()
    expected_content = "12"

    assert content == expected_content, (
        f"Output is '{content}', expected '{expected_content}'"
    )
"""

with open(os.path.join(new_task_path, "tests", "test_state.py"), "w") as f:
    f.write(test_state_content)

print("✓ Updated solution and tests")

In [None]:
# Run oracle on the new task
print("=" * 70)
print(f"TEST ORACLE ({new_task_path}):")
print("=" * 70)
!harbor run -p {new_task_path} -a oracle

## Step 8: Run with Agent

Now that we've verified the task with the oracle, let's run the AI agent on it.

In [None]:
# Run the agent on the new task
print("=" * 70)
print(f"RUN AGENT ({claude_model}) ({new_task_path}):")
print("=" * 70)
!harbor run -p {new_task_path} -m {claude_model} -a terminus-2

## Step 9: View Results

Let's see how the agent performed on our custom task.

In [None]:
import glob
import json
import os
import re
from pathlib import Path
from datetime import datetime

# Find all job runs
job_dirs = sorted(glob.glob("jobs/*"), key=os.path.getmtime, reverse=True)

print(f"Total runs: {len(job_dirs)}\n")

# Show latest run
for i, job in enumerate(job_dirs[:1], 1):
    config_file = f"{job}/config.json"
    result_file = f"{job}/result.json"
    
    if Path(config_file).exists() and Path(result_file).exists():
        try:
            with open(config_file) as f:
                config = json.load(f)
            with open(result_file) as f:
                result = json.load(f)
            
            # Extract Agent and Model info
            agent_info = config.get('agents', [{}])[0]
            agent_name = agent_info.get('name', 'N/A')
            model_name = agent_info.get('model_name', 'N/A')
            
            # Extract Status and Score
            stats = result.get('stats', {})
            n_errors = stats.get('n_errors', 0)
            evals = stats.get('evals', {})
            
            status = "Unknown"
            score = "N/A"
            
            if n_errors > 0:
                status = "Error"
                # Check for specific exceptions if available
                for key, val in evals.items():
                     if val.get('n_errors', 0) > 0:
                         exceptions = val.get('exception_stats', {})
                         if exceptions:
                             status = f"Error ({', '.join(exceptions.keys())})"
            elif evals:
                # Assuming single evaluation key for now or taking the first one
                first_eval = next(iter(evals.values()))
                metrics = first_eval.get('metrics', [{}])
                if metrics:
                    mean_score = metrics[0].get('mean', 0)
                    score = f"{mean_score:.2f}"
                    if mean_score == 1.0:
                        status = "Pass"
                    else:
                        status = "Fail"
            
            # Calculate Duration
            start_time = result.get('started_at')
            end_time = result.get('finished_at')
            duration = "N/A"
            if start_time and end_time:
                try:
                    start = datetime.fromisoformat(start_time)
                    end = datetime.fromisoformat(end_time)
                    duration = str(end - start).split('.')[0] # Remove microseconds
                except ValueError:
                    pass

            print(f"{i}. {Path(job).name}")
            print(f"   Agent:    {agent_name}")
            print(f"   Model:    {model_name}")
            print(f"   Status:   {status}")
            print(f"   Score:    {score}")
            print(f"   Duration: {duration}")
            print()

            # --- List Agent Responses ---
            print("   Agent Responses:")
            # Find task instance directories (e.g., hello-world__...)
            task_dirs = glob.glob(f"{job}/*__*")
            for task_dir in task_dirs:
                task_name = Path(task_dir).name
                print(f"   Task Instance: {task_name}")
                
                # Find episode directories
                episode_dirs = glob.glob(f"{task_dir}/agent/episode-*")
                
                # Sort by episode number
                def get_episode_num(path):
                    match = re.search(r'episode-(\d+)', path)
                    return int(match.group(1)) if match else -1
                
                episode_dirs.sort(key=get_episode_num)
                
                for ep_dir in episode_dirs:
                    ep_num = get_episode_num(ep_dir)
                    response_file = Path(ep_dir) / "response.txt"
                    if response_file.exists():
                        with open(response_file, 'r') as f:
                            response_content = f.read().strip()
                        
                        # Truncate if too long for display
                        display_content = response_content
                        if len(display_content) > 200:
                             display_content = display_content[:200] + "..."
                        
                        print(f"      Episode {ep_num}: {display_content.replace(chr(10), ' ')}") # Replace newlines with spaces for compact view
                print()

            
        except Exception as e:
            print(f"{i}. {Path(job).name} - Error reading results: {e}")
            print()