# OpenOT2 — Complete Run Example

This notebook walks through the full workflow for running a protocol on an OT-2 robot:

1. **Environment Setup** — imports and logging
2. **Load Vision Model** — YOLO model for tip/liquid detection
3. **Define Labware** — deck layout configuration
4. **Define Protocol** — task-based protocol with vision checks
5. **Define Recovery** — custom error recovery plans
6. **Precheck OT-2** — verify robot connectivity
7. **Precheck Protocol** — validate and dry-run the protocol
8. **Execute** — run the protocol on the robot

### Prerequisites
```bash
pip install openot2[yolo]
```

---
## 1. Environment Setup

Import all necessary modules and configure logging.

In [None]:
import logging

from openot2 import OT2Client, setup_logging
from openot2.vision import YOLOAdapter, USBCamera
from openot2.protocol import (
    ProtocolExecutor,
    ProtocolGenerator,
    ErrorRecovery,
    get_protocol_prompt,
)

# Enable logging so we can see what's happening
setup_logging(level=logging.INFO)

# Robot IP — change this to your OT-2's address
ROBOT_IP = "169.254.8.56"

print("Environment ready.")

---
## 2. Load Vision Model

Load a YOLO model for tip and liquid detection, and set up the USB camera.

If you don't have a trained model or camera, set `model = None` and `camera = None` — the executor will skip vision checks automatically.

In [None]:
# --- Vision Model ---
# Replace with your trained YOLO model path
MODEL_PATH = "path/to/your/model.pt"

try:
    model = YOLOAdapter(
        model_path=MODEL_PATH,
        num_classes=2,  # Tip, Liquid
    )
    print(f"Model loaded: {MODEL_PATH}")
    print(f"Classes: {model.class_names}")
except Exception as e:
    print(f"Could not load model: {e}")
    print("Continuing without vision checks.")
    model = None

In [None]:
# --- Camera ---
try:
    camera = USBCamera(camera_id=0)
    # Test capture
    frame = camera.capture()
    if frame is not None:
        print(f"Camera ready. Frame shape: {frame.shape}")
    else:
        print("Camera returned empty frame.")
        camera = None
except Exception as e:
    print(f"Could not open camera: {e}")
    print("Continuing without vision checks.")
    camera = None

---
## 3. Define Labware

Configure the deck layout: pipette, tiprack, source reservoirs, imaging plate, and dispense plate.

```
OT-2 Deck Layout:
┌─────────┬─────────┬─────────┐
│ 10      │ 11      │ 12      │
│ Dispense│ Tiprack │ (Trash) │
├─────────┼─────────┼─────────┤
│ 7       │ 8       │ 9       │
│         │ Source   │         │
├─────────┼─────────┼─────────┤
│ 4       │ 5       │ 6       │
│         │         │ Imaging │
├─────────┼─────────┼─────────┤
│ 1       │ 2       │ 3       │
│         │         │         │
└─────────┴─────────┴─────────┘
```

In [None]:
labware_config = {
    "pipette": {
        "name": "p300_multi_gen2",
        "mount": "right",
    },
    "tiprack": {
        "name": "opentrons_96_filtertiprack_200ul",
        "slot": "11",
    },
    "sources": {
        "name": "opentrons_tough_1_reservoir_300ml",
        "slots": ["8"],
    },
    "imaging": {
        "name": "opentrons_96_wellplate_200ul_pcr_full_skirt",
        "slot": "6",
    },
    "dispense": {
        "name": "corning_96_wellplate_360ul_flat",
        "slot": "10",
    },
}

print("Labware configuration:")
for key, val in labware_config.items():
    print(f"  {key}: {val}")

---
## 4. Define Protocol

Build the full protocol config with tasks: pickup tips, transfer liquid, drop tips.

Each task can optionally include a `check` for vision verification and an `on_fail` recovery plan.

In [None]:
protocol_config = {
    "labware": labware_config,
    "settings": {
        "imaging_well": "A1",
        "imaging_offset": (0, 0, 50),
        "base_dir": "output",
    },
    "tasks": [
        # Task 1: Pick up tips with vision verification
        {
            "type": "pickup",
            "well": "A1",
            "check": {
                "type": "tip",
                "expected_tips": 8,
                "conf": 0.6,
            },
        },
        # Task 2: Transfer 100 uL from reservoir to well plate
        {
            "type": "transfer",
            "source_slot": "8",
            "source_well": "A1",
            "dest_slot": "10",
            "dest_well": "A1",
            "volume": 100,
            "origin": "top",
            "offset": (0, 0, -35),
            "check": {
                "type": "liquid",
                "expected_tips": 8,
                "conf": 0.6,
            },
        },
        # Task 3: Drop tips back
        {
            "type": "drop",
            "well": "A1",
        },
    ],
}

print(f"Protocol defined: {len(protocol_config['tasks'])} tasks")
for i, task in enumerate(protocol_config["tasks"]):
    print(f"  Task {i + 1}: {task['type']}")

### 4b. (Alternative) Generate Protocol with LLM

Instead of manually defining the protocol, you can use an LLM to generate it from a natural language description.

In [None]:
# Uncomment to use LLM-based protocol generation:

# generator = ProtocolGenerator(
#     api_key="sk-...",    # or set OPENAI_API_KEY env var
#     model="gpt-4o",
# )
#
# plan = """
# Transfer 100 uL from a 300mL reservoir in slot 8 to a 96-well flat plate
# in slot 10, well A1. Use a p300 multi-channel pipette with 200uL filter tips
# in slot 11. Include tip and liquid vision checks.
# """
#
# protocol_config = generator.generate(plan)
# print("Generated protocol:")
# import json
# print(json.dumps(protocol_config, indent=2))

---
## 5. Define Recovery Plans

Recovery plans tell the executor what to do when a vision check fails.

You can use the built-in defaults or define custom recovery actions.
Values like `"ctx.tiprack_id"` are resolved at runtime from the current execution context.

In [None]:
# View the built-in default recovery plans
print("Default pickup recovery:")
for step in ErrorRecovery.default_pickup_recovery():
    print(f"  {step}")

print("\nDefault liquid recovery:")
for step in ErrorRecovery.default_liquid_recovery():
    print(f"  {step}")

In [None]:
# Custom recovery: add to any task's "on_fail" field to override defaults.
# For example, a custom liquid failure recovery that pauses for manual inspection:

custom_liquid_recovery = [
    {"type": "dispense", "labware_id": "ctx.source_labware_id",
     "well": "ctx.source_well", "volume": "ctx.volume"},
    {"type": "blow_out", "labware_id": "ctx.source_labware_id",
     "well": "ctx.source_well"},
    {"type": "pause", "message": "Liquid check failed — inspect pipette before continuing."},
    {"type": "drop", "labware_id": "ctx.tiprack_id", "well": "ctx.pick_well"},
    {"type": "home"},
]

# Apply custom recovery to the transfer task
protocol_config["tasks"][1]["on_fail"] = custom_liquid_recovery

print("Custom recovery applied to transfer task.")

---
## 6. Precheck OT-2

Verify the robot is reachable before starting the protocol.

In [None]:
import requests

def precheck_robot(robot_ip: str, port: int = 31950) -> bool:
    """Check if the OT-2 robot is reachable."""
    url = f"http://{robot_ip}:{port}/health"
    try:
        r = requests.get(url, headers={"Opentrons-Version": "*"}, timeout=5)
        r.raise_for_status()
        health = r.json()
        print(f"Robot reachable at {robot_ip}")
        print(f"  Name: {health.get('name', 'N/A')}")
        print(f"  API version: {health.get('api_version', 'N/A')}")
        print(f"  FW version: {health.get('fw_version', 'N/A')}")
        return True
    except requests.ConnectionError:
        print(f"Cannot reach robot at {robot_ip}:{port}")
        print("Check that the robot is powered on and connected.")
        return False
    except Exception as e:
        print(f"Health check error: {e}")
        return False

robot_ok = precheck_robot(ROBOT_IP)

if not robot_ok:
    print("\nFix the connection before proceeding.")

---
## 7. Precheck Protocol

Validate the protocol config against the schema, then dry-run it to catch logical errors (e.g., aspirating without a tip, dispensing more than aspirated).

In [None]:
# Schema validation
schema_errors = ProtocolGenerator.validate(protocol_config)

if schema_errors:
    print("Schema errors found:")
    for err in schema_errors:
        print(f"  - {err}")
else:
    print("Schema validation passed.")

In [None]:
# Dry-run simulation
dry_run = ProtocolGenerator.dry_run(protocol_config)

print(f"Dry-run success: {dry_run.success}")
print(f"\nSteps ({len(dry_run.steps_executed)}):")
for step in dry_run.steps_executed:
    print(f"  {step}")

if dry_run.errors:
    print(f"\nErrors:")
    for err in dry_run.errors:
        print(f"  {err}")

if dry_run.warnings:
    print(f"\nWarnings:")
    for warn in dry_run.warnings:
        print(f"  {warn}")

In [None]:
# Final go/no-go check
all_checks_passed = robot_ok and not schema_errors and dry_run.success

if all_checks_passed:
    print("All prechecks passed — ready to execute.")
else:
    print("Prechecks failed. Review the errors above before running.")

---
## 8. Execute Protocol

Connect to the robot and run the protocol. The executor will:
1. Create a new run on the robot
2. Load all labware onto the deck
3. Execute each task sequentially
4. Run vision checks where configured
5. Trigger recovery plans on failure
6. Home the robot when finished

In [None]:
# Connect to the robot
client = OT2Client(ROBOT_IP)

# Build the executor
executor = ProtocolExecutor(
    client=client,
    model=model,       # None = skip vision checks
    camera=camera,     # None = skip vision checks
    calibration_fn=None,  # Set to build_calibration_from_csv(...) for liquid level checks
)

print("Executor ready.")

In [None]:
# Run the protocol
assert all_checks_passed, "Prechecks did not pass — fix errors before running."

success = executor.execute(protocol_config)

if success:
    print("\nProtocol completed successfully!")
else:
    print("\nProtocol stopped due to an error.")