# Run NaviTrace Evaluation

This notebook describes the process of evaluating models on our benchmark [NaviTrace](https://leggedrobotics.github.io/navitrace_webpage/), including model inference via API and the score calculation.
The benchmark consists of a validation split and a test split with hidden ground-truths.
If you want to see how your model scores on the test set or want to submit your model to the leaderboard, check out this [Hugging Face Space](https://huggingface.co/spaces/leggedrobotics/navitrace_leaderboard).

## Setup

1. Create and activate a Python 3.10 environment with your preferred tools
2. `pip install -r ./requirements.txt`
3. Prepare an API key and base URL for the model that you want to evaluate

## Load NaviTrace

In [None]:
from datasets import load_dataset
from itables import show

In [None]:
# (Optional) Login at HF
from huggingface_hub import login
login()

In [None]:
# Load dataset
dataset = load_dataset("leggedrobotics/navitrace")

Have a look at the [dataset card](https://huggingface.co/datasets/leggedrobotics/navitrace) for information about the available columns. You can also explore the dataset with the following code:

In [None]:
# Explore dataset (without images)
df = dataset["validation"].to_pandas().drop(columns=["image"])
show(df.head(5))

In [None]:
# Display images
img = dataset["validation"][42]["image"]
img.show()

## Model Inference

In [None]:
import base64
import io
import json
import re
import time
from typing import Dict, List, Any
from datetime import datetime
from getpass import getpass
import os
from openai import OpenAI
import pandas as pd
from PIL.Image import Image
from tqdm import tqdm

### Setup Model API

In [None]:
# API key
os.environ['MODEL_API_KEY'] = getpass("Enter your model API key: ")

# Settings
# Note: We use OpenRouter to access multiple models but you can also call a provider directly
model_name = "google/gemini-2.5-pro"
base_url = "https://openrouter.ai/api/v1"

### Define Prompts

In [None]:
system_prompt = """You are a navigation expert for various embodiments including robots and humans. Given an image of the current scenario, a specified embodiment (e.g., legged robot, wheeled robot, human, or bike), and a navigation task (e.g., "Go down the road"), you will predict a feasible future trajectory as a sequence of 2D points in normalized image coordinates (ranging from 0 to 1, where [0,0] is the top-left and [1,1] is the bottom-right).

- The image shows a first-person view of the navigation scenario
- Start your trajectory near the bottom center of the image, which corresponds approximately to normalized coordinate [0.5, 0.95] (representing the current position of the embodiment)
- The trajectory should be adapted to the embodiment's abilities and limitations
- Plan the path forward from this starting position based on what the embodiment can see and navigate
- The trajectory should extend all the way to the goal if the path is visible. If the path is occluded, the trajectory should end where the path becomes fully obscured, unless the path can be reasonably inferred from the visible context.
- If a red traffic light is visible and affects the planned path, or if crossing traffic or moving vehicles are present that make it unsafe to proceed, stop at an appropriate waiting position (e.g., just before the intersection or curb) and end the trajectory there.
- All tasks that you are given have a solution
- Output **only** the list of 2D points in normalized image coordinates (values between 0 and 1) in the following format: `[[x1, y1], [x2, y2], ..., [xn, yn]]`
- Do not include any explanation or additional output

### Embodiment Movement Characteristics

- **Human**: A standard pedestrian. Can navigate stairs and ramps but cannot climb tall obstacles.
- **Legged Robot**: A quadruped like ANYmal. Behaves similarly to a human, but it is shorter. It can handle stairs and escalators.
- **Wheeled Robot**: A wheeled delivery robot. Behaves like a wheelchair, preferring smooth surfaces such as walkways and ramps. It cannot use stairs or escalators.
- **Bicycle**: A standard cyclist. Follows traffic regulations and prefers bike lanes or streets. Cannot navigate stairs."""

user_prompt = """**Embodiment**: {embodiment}
**Task**: {task}

The image shows a first-person view from the embodiment's current position. Begin your trajectory near the bottom center of the image (around normalized coordinate [0.5, 0.95]) and predict the path forward as a list of 2D points in normalized coordinates (values from 0 to 1) according to the embodiment and the scenario shown in the image."""


### Define API Model Class

Processing a single sample produces a dict with the form:

| Column | Type | Description |
| --- | ---- | ----------- |
| sample_id | `str` | Unique identifier of a scenario |
| embodiment | `str` | Selected embodiment |
| category | `List[str]` | Scenario categories |
| raw_response | `str` | Raw text response of the model |
| reasoning | `str` | If available, the reasoning output of the model |
| prediction | `List[List[float]]` | List of [x, y] points representing the predicted trace |

In [None]:
def encode_image_to_base64(image: Image) -> str:

    # Convert to RGB if necessary
    if image.mode in ("RGBA", "P"):
        image = image.convert("RGB")
    # Save image to a bytes buffer as JPEG
    buffer = io.BytesIO()
    image.save(buffer, format="JPEG")
    buffer.seek(0)
    # Encode buffer in base64
    img_bytes = buffer.read()
    img_b64 = base64.b64encode(img_bytes).decode("utf-8")

    return img_b64

def parse_trace(text: str) -> List[List[float]]:
    """
    Parse point trace from model response.
    Expected format: [[x1, y1], [x2, y2], ...] or similar variations.
    Returns a list of [x, y] coordinate pairs, or an empty list if parsing fails.
    """

    try:
        # Try to find JSON-like array in response
        patterns = [
            r"\[\s*\[[\d\s,.-]+\]\s*(?:,\s*\[[\d\s,.-]+\]\s*)*\]",  # [[x,y], [x,y], ...]
            r"\(\s*\([\d\s,.-]+\)\s*(?:,\s*\([\d\s,.-]+\)\s*)*\)",  # ((x,y), (x,y), ...)
        ]

        for pattern in patterns:
            matches = re.findall(pattern, text)
            if matches:
                # Parse the first match
                match = matches[0]
                # Convert to proper JSON format
                match = match.replace("(", "[").replace(")", "]")
                points = json.loads(match)
                # Validate format
                if all(isinstance(p, list) and len(p) == 2 for p in points):
                    return [[float(p[0]), float(p[1])] for p in points]

        # If no pattern matches, try to extract numbers and pair them
        numbers = re.findall(r"-?\d+\.?\d*", text)
        if len(numbers) >= 2 and len(numbers) % 2 == 0:
            points = []
            for i in range(0, len(numbers), 2):
                points.append([float(numbers[i]), float(numbers[i + 1])])
            return points

    except (json.JSONDecodeError, ValueError, IndexError) as e:
        print(f"Failed to parse trace: {e}")
        return []


class ApiModel():

    def __init__(
        self,
        model_name: str,
        base_url:str,
        user_prompt: str,
        system_prompt: str,
        normalized_coordinates: bool = True,
        request_delay: float = 0.5,
        retry_delay: float = 2,
        max_retries: int = 3,
        max_tokens: int = 5000,
        temperature: float = 1.0,
    ):

        self.model_name = model_name
        self.user_prompt = user_prompt
        self.system_prompt = system_prompt
        self.normalized_coordinates = normalized_coordinates
        self.request_delay = request_delay
        self.retry_delay = retry_delay
        self.max_retries = max_retries
        self.max_tokens = max_tokens
        self.temperature = temperature

        # Configure OpenAI client
        api_key = os.environ.get("MODEL_API_KEY")
        self.client = OpenAI(api_key=api_key, base_url=base_url)


    def process_sample(self, sample: Dict, embodiment: str) -> Dict[str, Any]:

        # Extract fields
        sample_id = sample["sample_id"]
        category = sample["category"]
        image = sample["image"]
        task = sample["task"]

        # Format prompt
        prompt = self.user_prompt.format(task=task, embodiment=embodiment)

        # Encode image to base64
        image_b64 = encode_image_to_base64(image)

        # Prepare message content
        content = [
            {
                "type": "image_url",
                "image_url": {"url": f"data:image/jpeg;base64,{image_b64}"},
            },
            {"type": "text", "text": prompt},
        ]

        # Limit request rate
        time.sleep(self.request_delay)

        # Make API request with retries
        for attempt in range(self.max_retries):
            try:
                response = self.client.chat.completions.create(
                    model=self.model_name,
                    messages=[
                        {"role": "system", "content": self.system_prompt},
                        {"role": "user", "content": content},
                    ],
                    max_tokens=self.max_tokens,
                    temperature=self.temperature,
                )

                # Extract the trace
                response_text = response.choices[0].message.content
                predicted_trace = parse_trace(response_text)

                # Unnormalize coordinates
                if self.normalized_coordinates:
                    width, height = image.size
                    predicted_trace = [
                        [int(x * width), int(y * height)] for x, y in predicted_trace
                    ]

                # Extract reasoning if available
                if hasattr(response.choices[0].message, "reasoning"):
                    reasoning_text = response.choices[0].message.reasoning
                else:
                    reasoning_text = ""

                return {
                    "sample_id": sample_id,
                    "embodiment": embodiment,
                    "category": category,
                    "raw_response": response_text,
                    "reasoning": reasoning_text,
                    "prediction": predicted_trace,
                }

            except Exception as e:
                if attempt < self.max_retries - 1:
                    print(f"Attempt {attempt + 1} failed: {e}. Retrying in {self.retry_delay} seconds...")
                    time.sleep(self.retry_delay)
                else:
                    print(f"Failed after {self.max_retries} attempts: {e}")
                    return {
                        "sample_id": sample_id,
                        "embodiment": embodiment,
                        "category": category,
                        "raw_response": "",
                        "reasoning": "",
                        "prediction": [],
                    }

### Validation Split

In [None]:
# Create Model
model = ApiModel(model_name, base_url, user_prompt, system_prompt)

# Iterate over dataset
results = []
dataset = dataset["validation"]
for i, sample in tqdm(enumerate(dataset), desc=f"Inference with {model_name}", total=len(dataset)):
    
    #TODO remove
    if i >= 3:
        break

    # Iterate over emodiments of a sample
    embodiments = sample["embodiments"]
    for embodiment in embodiments:
        result = model.process_sample(sample, embodiment)
        results.append(result)

results_df = pd.DataFrame(results)

# Save results
model_safe_name = model_name.replace("/", "_").replace(":", "_")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
results_path = f"./{model_safe_name}_validation_{timestamp}.tsv"
results_df.to_csv(
    results_path,
    sep="\t",
    index=False,
    encoding="utf-8",
)

### Test Split

In [None]:
# Create Model
model = ApiModel(model_name, base_url, user_prompt, system_prompt)

# Iterate over dataset
results = []
dataset = dataset["test"]
for i, sample in tqdm(enumerate(dataset), desc=f"Inference with {model_name}", total=len(dataset)):

    # Iterate over emodiments of a sample
    embodiments = sample["embodiments"]
    for embodiment in embodiments:
        result = model.process_sample(sample, embodiment)
        results.append(result)

results_df = pd.DataFrame(results)

# Save results
model_safe_name = model_name.replace("/", "_").replace(":", "_")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
results_path = f"./{model_safe_name}_test_{timestamp}.tsv"
results_df.to_csv(
    results_path,
    sep="\t",
    index=False,
    encoding="utf-8",
)

## Calculate Score

In [None]:
import ast
import functools
import json
import multiprocessing
from pathlib import Path
from typing import Dict, List, Tuple
from datasets import load_dataset
import numpy as np
import pandas as pd
from scipy.spatial import KDTree
from skimage.draw import line_aa
from skimage.draw import line as sk_line
from tqdm import tqdm

### Define Penalty Masks

In [None]:
PENALTY_SCORES_PATH = "./category_penalty.tsv"
M2F_CONFIG_PATH = "./mask2former_config.json"

@functools.lru_cache(maxsize=4)
def create_penalty_lookup(embodiment: str) -> Dict[int, float]:
    """Creates a direct mapping from a category ID (`label_id`) to its penalty factor."""

    # Load fixed penalty values
    penalty_values_df = pd.read_csv(PENALTY_SCORES_PATH, sep="\t")

    # Load Mask2Former mapping from IDs to labels
    with open(M2F_CONFIG_PATH, "r") as f:
        config = json.load(f)
    id2label = {int(k): v for k, v in config["id2label"].items()}

    label_id_to_penalty = {}
    for label_id, category_name in id2label.items():
        
        # Look up the penalty value
        row = penalty_values_df[
            penalty_values_df["category"] == category_name
        ]
        penalty_value = float(row.iloc[0][embodiment]) * 0.8  # Adjust scale
        label_id_to_penalty[label_id] = penalty_value

    return label_id_to_penalty

def rasterize_gt_trace(
    gt_trace: List[List[float]], height: int, width: int
) -> np.ndarray:
    """Converts a line trace into a dense array of pixel coordinates."""

    gt_trace_np = np.array(gt_trace)
    gt_line_pixels = []
    if len(gt_trace_np) > 1:
        for i in range(len(gt_trace_np) - 1):
            p1, p2 = gt_trace_np[i], gt_trace_np[i + 1]
            r0, c0, r1, c1 = (
                int(round(p1[1])),
                int(round(p1[0])),
                int(round(p2[1])),
                int(round(p2[0])),
            )
            rr, cc, _ = line_aa(r0, c0, r1, c1)
            valid = (rr >= 0) & (rr < height) & (cc >= 0) & (cc < width)
            gt_line_pixels.extend(zip(rr[valid], cc[valid]))
    elif len(gt_trace_np) == 1:
        r, c = int(round(gt_trace_np[0][1])), int(round(gt_trace_np[0][0]))
        if 0 <= r < height and 0 <= c < width:
            gt_line_pixels.append((r, c))

    return np.array(gt_line_pixels)

def create_penalty_mask(
    segmentation_mask: np.ndarray,
    gt_trace: List[List[float]],
    embodiment: str,
    distance_threshold: float = 35,
) -> np.ndarray:

    # Initialize mask with default no penalty
    height, width = segmentation_mask.shape
    penalty_mask = np.full((height, width), 0, dtype=float)

    # Create a KDTree from ground truth pixels for efficient distance queries
    gt_line_pixels = rasterize_gt_trace(gt_trace, height, width)
    gt_tree = KDTree(gt_line_pixels)

    # Create a more efficient lookup for segment info and penalty values
    label_id_to_penalty = create_penalty_lookup(embodiment)

    # Get label IDs for all pixels
    all_label_ids = segmentation_mask.ravel()

    # Identify pixels that belong to undesired segments
    undesired_mask = np.isin(all_label_ids, list(label_id_to_penalty.keys()))
    undesired_indices = np.where(undesired_mask)[0]
    if undesired_indices.size == 0:
        return penalty_mask

    # Map indices to coordinates
    rows, cols = np.unravel_index(undesired_indices, (height, width))
    undesired_coords = np.vstack((rows, cols)).T

    # Perform a single batch query for distances for all undesired pixels
    distances, _ = gt_tree.query(undesired_coords)

    # Filter for pixels that are beyond the distance threshold
    coords_to_penalize = undesired_coords[distances > distance_threshold]

    if coords_to_penalize.size > 0:
        # Apply penalties
        rows_pen, cols_pen = coords_to_penalize[:, 0], coords_to_penalize[:, 1]
        label_ids_to_penalize = segmentation_mask[rows_pen, cols_pen]
        penalties = np.vectorize(label_id_to_penalty.get)(label_ids_to_penalize)
        penalty_mask[rows_pen, cols_pen] = penalties

    return penalty_mask

### Define Score Function

In [None]:
def resample_to_match_length(
    trace_1: np.ndarray, trace_2: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:

    if len(trace_1) == len(trace_2):
        return trace_1, trace_2
    elif len(trace_1) > len(trace_2):
        longer, shorter = (trace_1, trace_2)
    else:
        shorter, longer = (trace_1, trace_2)
    if len(shorter) == 1:
        return shorter * len(longer), longer

    # Parameterize shorter trajectory by cumulative distance
    dists = np.cumsum(
        [0]
        + [np.linalg.norm(shorter[i] - shorter[i - 1]) for i in range(1, len(shorter))]
    )
    dists = dists / dists[-1]  # Normalize to [0,1]

    # Create new parameter values matching longer trajectory length
    new_params = np.linspace(0, 1, len(longer))

    # Interpolate x and y coordinates separately
    new_x = np.interp(new_params, dists, shorter[:, 0])
    new_y = np.interp(new_params, dists, shorter[:, 1])
    shorter = np.column_stack([new_x, new_y])

    if len(trace_1) > len(trace_2):
        return longer, shorter
    else:
        return shorter, longer

def calculate_semantic_penalty(
    prediction: np.ndarray, penalty_mask: np.ndarray
) -> List[float]:

    penalties = []
    for i in range(len(prediction) - 1):
        x1, y1 = int(round(prediction[i][0])), int(round(prediction[i][1]))
        x2, y2 = int(round(prediction[i + 1][0])), int(round(prediction[i + 1][1]))

        # Use scikit-image's optimized line drawing algorithm
        rr, cc = sk_line(y1, x1, y2, x2)

        # Access mask using (y, x) coordinates
        height, width = penalty_mask.shape
        valid_indices = (rr >= 0) & (rr < height) & (cc >= 0) & (cc < width)
        penalties.extend(penalty_mask[rr[valid_indices], cc[valid_indices]].tolist())

    return np.mean(penalties)

def calculate_fde(prediction: np.ndarray, ground_truth: np.ndarray):

    return np.linalg.norm(prediction[-1] - ground_truth[-1])

def calculate_dtw(prediction: np.ndarray, ground_truth: np.ndarray):

    # Create cost matrix
    n, m = len(prediction), len(ground_truth)
    cost_matrix = np.full((n + 1, m + 1), np.inf)
    cost_matrix[0, 0] = 0

    for i in range(1, n + 1):
        for j in range(1, m + 1):
            euclidean_distance = np.linalg.norm(prediction[i - 1] - ground_truth[j - 1])

            # Find the minimum from the three possible previous cells
            min_prev_cost = min(
                cost_matrix[i - 1, j],  # Insertion
                cost_matrix[i, j - 1],  # Deletion
                cost_matrix[i - 1, j - 1],  # Match
            )

            cost_matrix[i, j] = euclidean_distance + min_prev_cost

    return cost_matrix[n, m]

def score(
    prediction: List[List[float]],
    ground_truths: List[List[List[float]]],
    segmentation_mask: np.ndarray,
    embodiment: str,
):
    
    # Iterate over all ground-truths
    scores = []
    for ground_truth in ground_truths:
        
        # Create penalty mask
        penalty_mask = create_penalty_mask(segmentation_mask, ground_truth, embodiment)

        # Convert to NumPy
        prediction, ground_truth = np.array(prediction), np.array(ground_truth)

        # Resample if necessary
        if len(prediction) != len(ground_truth):
            prediction, ground_truth = resample_to_match_length(prediction, ground_truth)

        # Calculate score function
        sem_penalty = calculate_semantic_penalty(prediction, penalty_mask)
        fde = calculate_fde(prediction, ground_truth)
        dtw = calculate_dtw(prediction, ground_truth)
        scores.append(dtw + fde + sem_penalty)
        
    # Select the best score
    return min(scores)

### Parallelized Score Calculation

In [None]:
def _initialize_worker(results_path, dataset_id, split_name):

    global _results_df, _get_sample
    
    # Load data
    _results_df = pd.read_csv(results_path, sep="\t")
    data_split = load_dataset(dataset_id)[split_name]

    # Build lookup index for efficient sample retrieval
    id_to_index = {sample_id: i for i, sample_id in enumerate(data_split["sample_id"])}
    
    def get_sample(sample_id):
        idx = id_to_index[sample_id]
        return data_split[idx]
    
    _get_sample = get_sample


def _score_chunk(indices: List[int]) -> List[Tuple[int, float]]:

    results = []
    for idx in indices:
        row = _results_df.loc[idx]
        
        # Skip invalid predictions
        if len(row["prediction"]) == 0:
            results.append((idx, np.nan))
            continue
        
        # Extract prediction and ground truth
        sample = _get_sample(row["sample_id"])
        embodiment = row["embodiment"]
        prediction = json.loads(row["prediction"])
        ground_truths = sample["ground_truth"][row["embodiment"]]
        segmentation_mask = np.array(sample["segmentation_mask"])
        
        # Check that ground-truth is not hidden as it is for the test split
        if ground_truths is None:
            raise ValueError(f"The sample {sample} has hidden ground-truths")

        # Calculate score
        s = score(prediction, ground_truths, segmentation_mask, embodiment)
        results.append((idx, s))
    
    return results


def score_predictions_parallel(results_path, dataset_id, split_name, num_processes=4):

    # Load results file
    results_df = pd.read_csv(results_path, sep='\t')

    # Split work into chunks
    total_rows = len(results_df)
    chunk_size = (total_rows + num_processes - 1) // num_processes  # Ceiling division
    indices_chunks = [
        list(range(i, min(i + chunk_size, total_rows)))
        for i in range(0, total_rows, chunk_size)
    ]
        
    # Process chunks in parallel
    scored_df = results_df.copy()
    scored_df["score"] = np.nan
    with multiprocessing.Pool(
        processes=num_processes,
        initializer=_initialize_worker,
        initargs=(
            results_path,
            dataset_id,
            split_name,
        ),
    ) as pool:
        with tqdm(total=total_rows, desc="Scoring predictions") as pbar:
            for chunk_results in pool.imap_unordered(_score_chunk, indices_chunks):
                for idx, s in chunk_results:
                    scored_df.at[idx, "score"] = s
                pbar.update(len(chunk_results))
    
    return scored_df

### Validation Split

In [None]:
# (Optional) Select results file manually
results_path = "./google_gemini-2.5-pro_validation_20251010_162842.tsv"

# Calculate score in parallel
scored_df = score_predictions_parallel(results_path, "leggedrobotics/navitrace", "validation", num_processes=4)

# Save results with scores
score_path = Path(results_path)
score_path = score_path.parent / f"score_{score_path.name}"
scored_df.to_csv(score_path, sep="\t", index=False, encoding="utf-8")

### Visualize Score

In [None]:
# Total Score
scored_df = pd.read_csv(score_path, sep="\t")
total_score = scored_df['score'].mean()

# Score per embodiment
score_per_embodiment = scored_df.groupby('embodiment')['score'].mean()

# Score per category
scored_df['category'] = scored_df['category'].apply(ast.literal_eval)
exploded_df = scored_df.explode('category')
score_per_category = exploded_df.groupby('category')['score'].mean()

# Number of scores that are np.nan
nan_count = scored_df['score'].isna().sum()

# Print score summary
width = 32
title_len = 15
print(" " * ((width - title_len) // 2) + "🥇 \033[1mSCORE SUMMARY\033[0m")
print(width * "━")
print(f"\033[1mTotal Score\033[0m           : {total_score:>8.2f}")
print(f"\033[1mInvalid Predictions\033[0m   : {nan_count:>8}")
print(f"\033[1mScore per Embodiment\033[0m")
for embodiment, score in score_per_embodiment.items():
    print(f"- {embodiment:<20}: {score:>8.2f}")
print(f"\033[1mScore per Category\033[0m")
for category, mean_score in score_per_category.items():
    print(f"- {category:<20}: {mean_score:>8.2f}")

### Test Split

We hide the ground-truths for the test split to allow for a fair leaderboard and to prevent model training.
But you can use this [Hugging Face Space](https://huggingface.co/spaces/leggedrobotics/navitrace_leaderboard) to calculate your test scores and optionally submit your model to the leaderboard.