### Next Steps

1. **Try different LLMs**: Stronger models like GPT-4 may improve accuracy
2. **Increase iterations**: Complex questions may need more exploration steps
3. **Optimize prompts**: The RLM prompts can be tuned for code understanding
4. **Run full evaluation**: Use `longbench_testset` for complete benchmark results

### Analysis

LongBench-v2 Code Repository Understanding is challenging because:

1. **Massive contexts**: Code repositories can be 1-2M characters
2. **Deep understanding required**: Questions require understanding code structure, function signatures, and logic
3. **Human expert baseline is 53.7%**: Even experts struggle with these questions

RLM helps by:
- Allowing the LLM to programmatically search through the codebase
- Using `llm_query()` to reason about specific code snippets
- Building up understanding iteratively through multiple code executions

In [None]:
print(f"\nLongBench-v2 Code Repository Understanding Results:")
print(f"  Examples: {len(longbench_eval_set)}")
print(f"  Accuracy: {longbench_results.score:.1f}%")
print(f"  LLM: {dspy.settings.lm.model}")

# Trajectory statistics
if longbench_results.results:
    trajectory_lengths = [len(r[1].trajectory) for r in longbench_results.results if hasattr(r[1], 'trajectory')]
    if trajectory_lengths:
        print(f"  Avg trajectory length: {sum(trajectory_lengths)/len(trajectory_lengths):.1f}")

In [None]:
# For evaluation, create a non-verbose RLM
longbench_rlm_eval = RLM(
    CodeQA,
    max_iterations=15,
    verbose=False,
)

# Run evaluation
longbench_eval_set = longbench_devset

longbench_evaluate = dspy.Evaluate(
    devset=longbench_eval_set,
    metric=longbench_code_metric,
    num_threads=1,
    display_progress=True,
    display_table=5,
    provide_traceback=True,
)

print(f"Evaluating on {len(longbench_eval_set)} examples...")
longbench_results = longbench_evaluate(longbench_rlm_eval)

### Run Evaluation on Devset

Use `dspy.Evaluate` to run RLM on the devset with parallelism.

In [None]:
# Show the trajectory
print_trajectory(longbench_result.trajectory)

In [None]:
# Create RLM module with typed signature
longbench_rlm = RLM(
    CodeQA,
    max_iterations=15,
    verbose=True,
)

# Pick an example (prefer shorter context for faster demo)
longbench_example = min(longbench_devset, key=lambda x: len(x.context))

print(f"Example:")
print(f"  Difficulty: {longbench_example.difficulty}")
print(f"  Context length: {len(longbench_example.context):,} chars")
print(f"  Question: {longbench_example.query}")
print(f"  Expected answer: {longbench_example.answer}")
print(f"\nRunning RLM...")

longbench_result = longbench_rlm(context=longbench_example.context, query=longbench_example.query)

print(f"\n{'='*60}")
print(f"RESULT")
print(f"{'='*60}")
print(f"Predicted: {longbench_result.answer}")
print(f"Expected: {longbench_example.answer}")
print(f"Correct: {longbench_result.answer == longbench_example.answer}")
print(f"Steps: {len(longbench_result.trajectory)}")

### Run on a Single Example

Let's test RLM on one example to see how it explores the code repository.

In [None]:
def longbench_code_metric(example, pred, trace=None):
    """Multiple choice accuracy metric."""
    gold = example.answer.strip().upper()
    predicted = pred.answer.strip().upper() if pred.answer else ""
    return 1.0 if predicted == gold else 0.0

# Define signature with Literal type for answer
class CodeQA(dspy.Signature):
    """Answer a multiple choice question about a code repository."""
    context: str = dspy.InputField(desc="The code repository contents")
    query: str = dspy.InputField(desc="The question with choices A, B, C, D")
    answer: Literal["A", "B", "C", "D"] = dspy.OutputField(desc="The answer: A, B, C, or D")

### Define the Metric and Signature

Simple multiple-choice accuracy: exact match on the letter (A, B, C, or D).

We use a `Literal` type in the signature to constrain the output to valid choices.

In [None]:
def make_longbench_example(row):
    """Convert a LongBench-v2 row to a DSPy example."""
    # Format choices as part of the query - emphasize single letter answer
    choices_text = f"""

Choices:
A) {row['choice_A']}
B) {row['choice_B']}
C) {row['choice_C']}
D) {row['choice_D']}"""
    
    return dspy.Example(
        id=row["_id"],
        context=row["context"],
        query=row["question"] + choices_text,
        answer=row["answer"],  # A, B, C, or D
        difficulty=row["difficulty"],
        length=row["length"],
        choice_A=row["choice_A"],
        choice_B=row["choice_B"],
        choice_C=row["choice_C"],
        choice_D=row["choice_D"],
    ).with_inputs("context", "query")

# Create examples and shuffle
longbench_examples = [make_longbench_example(row) for row in code_examples]
random.shuffle(longbench_examples)

# Split: first 25 as devset, rest as testset
longbench_devset = longbench_examples[:25]
longbench_testset = longbench_examples[25:]

print(f"Devset: {len(longbench_devset)} examples")
print(f"Testset: {len(longbench_testset)} examples")

# Show an example
ex = longbench_devset[0]
print(f"\nExample:")
print(f"  Difficulty: {ex.difficulty}")
print(f"  Length: {ex.length}")
print(f"  Context: {len(ex.context):,} chars")
print(f"  Question: {ex.query[:200]}...")
print(f"  Answer: {ex.answer}")

### Prepare Examples for Evaluation

Convert the dataset to `dspy.Example` format. Each question has 4 choices (A, B, C, D) and one correct answer.

In [None]:
# Filter for Code Repository Understanding
code_examples = [row for row in longbench_dataset if row['domain'] == 'Code Repository Understanding']

print(f"Code Repository Understanding: {len(code_examples)} examples")
print(f"\nDifficulty distribution:")
print(f"  {dict(Counter(row['difficulty'] for row in code_examples))}")
print(f"\nLength distribution:")
print(f"  {dict(Counter(row['length'] for row in code_examples))}")

# Show context length statistics
context_lens = [len(row['context']) for row in code_examples]
print(f"\nContext lengths:")
print(f"  Min: {min(context_lens):,} chars")
print(f"  Max: {max(context_lens):,} chars")
print(f"  Mean: {sum(context_lens)//len(context_lens):,} chars")

In [None]:
longbench_dataset = load_dataset('THUDM/LongBench-v2', split='train')

print(f"Total examples: {len(longbench_dataset)}")
print(f"\nDomains:")
for domain, count in sorted(Counter(row['domain'] for row in longbench_dataset).items()):
    print(f"  {domain}: {count}")

### Load the LongBench-v2 Dataset

LongBench-v2 contains 503 challenging multiple-choice questions across 6 domains:
- Single-Document QA (175)
- Multi-Document QA (125)
- Long In-context Learning (81)
- Code Repository Understanding (50)
- Long-dialogue History Understanding (39)
- Long Structured Data Understanding (33)

We focus on **Code Repository Understanding** - questions about real code repositories with contexts up to 2M characters.

# Tutorial: Recursive Language Models (RLM)

This tutorial demonstrates **Recursive Language Models (RLM)**, an inference strategy where LLMs treat long contexts as part of an external environment rather than feeding them directly to the model. The LLM writes Python code to programmatically examine, decompose, and recursively call sub-LLMs over snippets.

Reference: ["Recursive Language Models" (Zhang, Kraska, Khattab, 2025)](https://arxiv.org/abs/placeholder)

## Usecases

RLMs are good for usecases where you want to let an llm delegate a task to potentially recursive "subagents". The primary benefit of this is that you can avoid polluting the context of the main agent.

This lets the LLM operate symbolically on whatever the outputs of the subagents are.

A simple example is when you need to perform multiple operations sequentially, say a map reduce.

If you tell a typical coding agent to summarize all of the files in a codebase relevant to X feature, it would need to generally do a gathering step, and a synthesis step, all in context.

If you have a subagent built in, such as the explore tool in claude code, it might return a list of files to context with summaries.

To be more concrete, when you call the explore subagent in claude code, it returns a string in the format that the main agent provides. But this is just a string!

If you want to perform any operations on this string, it needs to be written to a file, or to be rewritten.

## Benchmarking

We'll evaluate RLM on the [Oolong benchmark](https://huggingface.co/datasets/oolongbench/oolong-synth), which tests long context reasoning and aggregation capabilities.

Install dependencies: `pip install dspy datasets`

## Setup

Configure DSPy with an LLM. RLM uses this LLM both for generating code and for the `llm_query()` tool inside the sandbox.

In [None]:
%load_ext autoreload
%autoreload 2

In [None]:
import dspy

lm = dspy.LM("openai/gpt-5")
dspy.configure(lm=lm)

In [None]:
import warnings
warnings.filterwarnings('ignore', category=UserWarning, module='pydantic')


## Load the Oolong Dataset

Oolong is a benchmark for evaluating long context reasoning and aggregation. Tasks include counting labels, finding needles in haystacks, and comparing frequencies.

The dataset has two splits with different source datasets:
- **validation** (1,300 examples): `spam` (650), `trec_coarse` (650)
- **test** (5,200 examples): `agnews`, `app_reviews`, `formality`, `imdb`, `metaphors`, `multinli`, `negation`, `yahoo` (650 each)

In [None]:
from datasets import load_dataset
from collections import Counter

# Load both splits
dataset = load_dataset("oolongbench/oolong-synth")
val_data = dataset["validation"]
test_data = dataset["test"]

print(f"Validation: {len(val_data)} examples")
print(f"  Datasets: {dict(Counter(val_data['dataset']))}")

print(f"\nTest: {len(test_data)} examples")
print(f"  Datasets: {dict(Counter(test_data['dataset']))}")

## Prepare Examples for Evaluation

Convert the dataset to `dspy.Example` format, organized by source dataset.

In [None]:
import random
random.seed(42)

def make_example(row):
    return dspy.Example(
        id=row["id"],
        context_window_id=row.get("context_window_id"),
        context=row["context_window_text"],
        query=row["question"],
        answer=row["answer"],
        task=row.get("task", "unknown"),
        answer_type=row.get("answer_type", "LABEL"),
        dataset=row.get("dataset", "unknown"),
        context_len=row.get("context_len", len(row["context_window_text"])),
    ).with_inputs("context", "query")

# Separate validation by source dataset
val_spam = [make_example(row) for row in val_data if row["dataset"] == "spam"]
val_trec = [make_example(row) for row in val_data if row["dataset"] == "trec_coarse"]

# IMPORTANT: Shuffle before splitting! The dataset is ordered by context_len,
# so without shuffling, devset would only contain the shortest (easiest) examples.
random.shuffle(val_spam)
random.shuffle(val_trec)

# For each validation dataset: first 50 as dev, rest as val
devset_spam, valset_spam = val_spam[:50], val_spam[50:]
devset_trec, valset_trec = val_trec[:50], val_trec[50:]

# Combined devset and valset
devset = devset_spam + devset_trec
valset = valset_spam + valset_trec

# Separate test by source dataset
test_by_dataset = {}
for row in test_data:
    ds_name = row["dataset"]
    if ds_name not in test_by_dataset:
        test_by_dataset[ds_name] = []
    test_by_dataset[ds_name].append(make_example(row))

# Full testset (also shuffle for fair sampling)
testset = [make_example(row) for row in test_data]
random.shuffle(testset)

print("Validation splits (shuffled by context_len):")
print(f"  devset_spam: {len(devset_spam)}, valset_spam: {len(valset_spam)}")
print(f"  devset_trec: {len(devset_trec)}, valset_trec: {len(valset_trec)}")
print(f"  devset (combined): {len(devset)}, valset (combined): {len(valset)}")

# Show context_len distribution in devset to verify shuffling worked
from collections import Counter
devset_lens = Counter(ex.context_len for ex in devset)
print(f"\n  devset context_len distribution: {dict(sorted(devset_lens.items()))}")

print(f"\nTest splits:")
for name, examples in sorted(test_by_dataset.items()):
    print(f"  {name}: {len(examples)}")
print(f"  testset (combined): {len(testset)}")

## Define the Metric

The metric matches the [official Oolong benchmark evaluation](https://github.com/abertsch72/oolong/blob/main/src/eval/eval_helpers.py):
- Parses answers by taking text after the last `:`
- Exact string matching for labels
- Partial credit for numeric answers: `0.75 ** abs(gold - predicted)`
- Special handling for comparison answers ("more common", "less common", "same frequency")

In [None]:
import re
import ast

def parse_answer(answer: str) -> tuple[str, str]:
    """Parse model answer, returning (parsed_answer, confidence).
    
    Matches the official Oolong benchmark parsing logic.
    """
    parse_confidence = "low"
    
    if ":" not in answer:
        if len(answer) < 20:
            return answer, parse_confidence
        else:
            return answer.split()[-1], parse_confidence
    
    # Take text after last ":"
    candidate = answer.split(":")[-1].strip()
    
    # Remove markdown bolding and brackets
    candidate = candidate.replace("*", "")
    candidate = candidate.replace("[", "").replace("]", "")
    
    parse_confidence = "med"
    
    # Higher confidence if answer follows expected format
    if any(marker in answer for marker in ["User:", "Answer:", "Date:", "Label"]):
        parse_confidence = "high"
    
    if len(candidate) < 20:
        parse_confidence = "vhigh"
    elif "more common" in candidate:
        candidate = "more common"
    elif "less common" in candidate:
        candidate = "less common"
    elif "same frequency" in candidate:
        candidate = "same frequency"
    
    return candidate, parse_confidence


def oolong_metric(example, pred, trace=None):
    """Official Oolong benchmark metric with partial credit for numeric answers."""
    
    # Parse gold answer
    expected = example.answer
    if isinstance(expected, list):
        gold = str(expected[0])
    else:
        expected_str = str(expected).strip()
        if expected_str.startswith("[") and expected_str.endswith("]"):
            try:
                parsed = ast.literal_eval(expected_str)
                gold = str(parsed[0]) if isinstance(parsed, list) else expected_str
            except:
                gold = expected_str
        else:
            gold = expected_str
    
    # Parse model answer
    trimmed_output, _ = parse_answer(pred.answer)
    
    # Exact match
    if str(trimmed_output).strip().lower() == str(gold).strip().lower():
        return 1.0
    
    # Comparison answers (more/less/same common)
    if trimmed_output in ["more common", "less common", "same frequency"]:
        if trimmed_output in str(gold).lower():
            return 1.0
    
    # Numeric: partial credit with exponential decay
    answer_type = getattr(example, "answer_type", "")
    if answer_type == "ANSWER_TYPE.NUMERIC" or answer_type == "NUMERIC":
        try:
            pred_num = int(trimmed_output)
            gold_num = int(gold)
            return 0.75 ** abs(gold_num - pred_num)
        except (ValueError, TypeError):
            pass
    
    return 0.0

## Initialize RLM

RLM executes in a secure Deno/Pyodide/WASM sandbox. The LLM can:
- Access the `context` variable containing the input data
- Call `llm_query(prompt)` to query a sub-LLM for semantic analysis
- Use standard Python libraries (re, json, collections, etc.)
- Build up answers iteratively through multiple code executions

In [None]:
def llm_query_nano(prompt: str = "") -> str:
    """Query gpt-5-nano with a prompt."""
    if not prompt:
        raise ValueError("prompt is required")
    nano = dspy.LM("openai/gpt-5-nano")
    response = nano(prompt)
    return response[0] if isinstance(response, list) else str(response)

In [None]:
from dspy.predict.rlm import RLM

rlm = RLM("context, query -> answer", max_iterations=20, sub_lm=dspy.LM("openai/gpt-5-nano"), verbose=True)

## Run Evaluation on Devset

Use `dspy.Evaluate` to run RLM on the devset with parallelism. For full evaluation, replace `devset` with `valset` or `testset`.

In [None]:
# Available splits: devset_spam, devset_trec, valset_spam, valset_trec, test_by_dataset[name], testset
dspy.configure(lm=dspy.LM("openai/gpt-5"))

# Use the pre-shuffled devset (now includes mixed context lengths)
current_devset = devset_trec[:20]

evaluate = dspy.Evaluate(
    devset=current_devset,
    metric=oolong_metric,
    num_threads=10,
    display_progress=True,
    display_table=10,
    provide_traceback=True,
)

results = evaluate(rlm)

# Show results with context_len breakdown
print("\nOolong RLM Evaluation Results:")
print(f"Split: devset_trec (shuffled), {len(current_devset)} examples")
print(f"  LLM: {dspy.settings.lm.model}")
print(f"  Alex RLM: 56.5%")
print(f"  DSPy RLM: {results.score:.2f}%")

avg_trajectory_length = sum(len(r[1].trajectory) for r in results.results) / len(results.results)
print(f"  Avg trajectory length: {avg_trajectory_length:.2f}")

In [None]:
avg_trajectory_length = sum(len(result[1].trajectory) for result in results.results) / len(results.results)
print(f"  Avg trajectory length: {avg_trajectory_length:.2f}")

import matplotlib.pyplot as plt

# Collect all trajectory lengths
trajectory_lengths = [len(result[1].trajectory) for result in results.results]

plt.figure(figsize=(8, 5))
counts, bins, patches = plt.hist(
    trajectory_lengths,
    bins=range(1, max(trajectory_lengths) + 2),
    edgecolor='black',
    align='left'
)
plt.xlabel('Trajectory Length')
plt.ylabel('Frequency')
plt.title('Histogram of Trajectory Lengths')
plt.xticks(range(1, max(trajectory_lengths)+1))

# Add counts above each bar
for count, bin_left, patch in zip(counts, bins[:-1], patches):
    if count > 0:
        plt.text(
            bin_left + patch.get_width() / 2,
            count + 0.02 * max(counts),  # A little above the bar
            f"{int(count)}",
            ha='center',
            va='bottom',
            fontsize=10
        )

plt.show()



## Inspect a Single Trajectory

Let's run RLM on one example and examine the trajectory (the sequence of code executions).

In [None]:
# for i, ex in enumerate(current_devset):
    #   print(f"{i}: context_len={ex.context_len:,} tokens")

In [None]:
example = current_devset[8]
print(example.context_len)

def print_trajectory(trajectory):
    """Pretty-print an RLM trajectory."""
    for i, step in enumerate(trajectory):
        print(f"\n{'='*60}")
        print(f"Step {i+1}")
        print(f"{'='*60}")
        
        if step.get("reasoning"):
            reasoning = step['reasoning']
            print(f"\nReasoning: {reasoning}")
        
        print(f"\nCode:")
        print(f"```python")
        print(step["code"])
        print(f"```")
        
        print(f"\nOutput:")
        output = step["output"]
        if len(output) > 500:
            print(output[:500] + "\n... (truncated)")
        else:
            print(output if output else "(no output)")

print(f"Query: {example.query}")
print(f"Expected: {example.answer}")
print(f"Context length: {len(example.context):,} chars, First 100 chars: {example.context[:100]}")
print("\nRunning RLM...")

gpt5 = dspy.LM("openai/gpt-5")
dspy.configure(lm=gpt5)

from dspy.predict.rlm import RLM

rlm = RLM("context, query -> answer", max_iterations=10, verbose=True)

result = rlm(context=example.context, query=example.query)

print_trajectory(result.trajectory)

## Visualize the Trajectory

The trajectory shows each iteration: the code executed and the output received.

In [None]:


# print_trajectory(result.trajectory)

## When to Use RLM

RLM is particularly useful when:

1. **Long contexts**: The context is too large to fit in the LLM's context window, or would be expensive to process directly
2. **Aggregation tasks**: You need to count, compare, or aggregate information across a large document
3. **Structured data**: The context has structure (JSON, tables, sections) that can be programmatically navigated
4. **Iterative exploration**: The answer requires examining the data from multiple angles

The key insight is that LLMs can write code to efficiently process data, calling back to sub-LLMs only when semantic understanding is needed.

## Sanity Tests: Complex Signatures

The tests below validate RLM's ability to parse complex signatures with:
- Multiple typed output fields (`list[T]`, `dict[K,V]`, `Literal[]`, `bool`, `int`, `float`)
- Detailed docstrings with multi-paragraph instructions
- Field-level constraints (`min_length`, `max_length`, `ge`, `le`)
- Nested structures (`list[dict[str, str]]`, `dict[str, list[str]]`)

Each test covers a different use case: RAG document analysis, needle-in-haystack search, and codebase understanding.

### Test 1: RAG Document Analysis

This test validates a complex signature with 5 output fields including nested types (`dict[str, list[str]]`), Literal types, and numeric constraints.

In [None]:
from typing import Literal

class DocumentAnalysisSig(dspy.Signature):
    """Analyze documents to extract structured information including topics, entities, 
    sentiment, and key facts. You must carefully read through all documents and 
    aggregate findings across the corpus.
    
    Requirements:
    - topics: List the main topics discussed, each as a short phrase
    - entities: Extract named entities grouped by their type (PERSON, ORG, LOCATION, etc.)
    - sentiment: Overall sentiment assessment based on tone and content
    - key_facts: List 3-5 most important facts as complete sentences
    - confidence: Your confidence in the analysis accuracy (0.0 to 1.0)
    """
    
    documents: list[str] = dspy.InputField(
        desc="Collection of documents to analyze, each representing a separate text source"
    )
    query: str = dspy.InputField(
        desc="Specific analysis question or focus area"
    )
    
    topics: list[str] = dspy.OutputField(
        desc="Main topics identified across all documents"
    )
    entities: dict[str, list[str]] = dspy.OutputField(
        desc="Named entities grouped by type, e.g., {'PERSON': ['Alice', 'Bob'], 'ORG': ['Acme Corp']}"
    )
    sentiment: Literal["positive", "negative", "neutral", "mixed"] = dspy.OutputField(
        desc="Overall sentiment of the document collection"
    )
    key_facts: list[str] = dspy.OutputField(
        desc="Most important factual statements extracted from documents (3-5 items)"
    )
    confidence: float = dspy.OutputField(
        desc="Confidence score for the analysis between 0.0 and 1.0"
    )

In [None]:
# Sample documents for RAG analysis
rag_documents = [
    "Acme Corporation announced record profits today. CEO Alice Johnson credited the company's innovative AI products. The stock rose 15% on the news.",
    "Industry analysts remain skeptical about Acme's long-term growth. Bob Smith from TechAnalytics noted that competition from GlobalTech is intensifying.",
    "Acme's new AI assistant product received positive reviews from early adopters. Users praised its accuracy and ease of use.",
    "The company plans to expand into European markets next quarter. CFO Carol Williams stated that Acme has secured $50M in additional funding."
]

rag_query = "Analyze the business outlook and key stakeholders for Acme Corporation"

# Configure and run RLM
dspy.configure(lm=dspy.LM("openai/gpt-5-nano"))

rag_rlm = RLM(DocumentAnalysisSig, max_iterations=15, verbose=True)
rag_result = rag_rlm(documents=rag_documents, query=rag_query)

print(f"\n{'='*60}")
print("RAG Analysis Results:")
print(f"{'='*60}")
print(f"Topics: {rag_result.topics}")
print(f"Entities: {rag_result.entities}")
print(f"Sentiment: {rag_result.sentiment}")
print(f"Key Facts: {rag_result.key_facts}")
print(f"Confidence: {rag_result.confidence}")

In [None]:
# Validate RAG test outputs
def validate_rag_result(result):
    """Validate all output types and constraints for RAG analysis."""
    # Type checks
    assert isinstance(result.topics, list), f"topics should be list, got {type(result.topics)}"
    assert all(isinstance(t, str) for t in result.topics), "all topics should be strings"
    
    assert isinstance(result.entities, dict), f"entities should be dict, got {type(result.entities)}"
    for key, values in result.entities.items():
        assert isinstance(key, str), f"entity type should be str, got {type(key)}"
        assert isinstance(values, list), f"entity values should be list, got {type(values)}"
        assert all(isinstance(v, str) for v in values), "all entity names should be strings"
    
    assert result.sentiment in ["positive", "negative", "neutral", "mixed"], \
        f"sentiment should be one of the Literal values, got {result.sentiment}"
    
    assert isinstance(result.key_facts, list), f"key_facts should be list, got {type(result.key_facts)}"
    assert all(isinstance(f, str) for f in result.key_facts), "all key_facts should be strings"
    
    assert isinstance(result.confidence, (int, float)), f"confidence should be numeric, got {type(result.confidence)}"
    assert 0.0 <= result.confidence <= 1.0, f"confidence should be 0.0-1.0, got {result.confidence}"
    
    print("RAG Test: All validations passed!")

validate_rag_result(rag_result)

### Test 2: Needle in Haystack

This test validates precise extraction with `bool`, `int` (with constraints), multiple `str` outputs, and `Literal` types. The signature has 6 output fields.

In [None]:
class NeedleSearchSig(dspy.Signature):
    """Search through a large text corpus to find specific hidden information.
    
    The text contains random filler content with a single "needle" - a specific 
    piece of information you must locate. Use programmatic search combined with 
    semantic understanding to find the needle efficiently.
    
    You MUST return the exact value found, with its location and surrounding context.
    The search should be systematic - consider using regex, substring matching, or
    line-by-line analysis depending on what you're looking for.
    """
    
    haystack: str = dspy.InputField(
        desc="Large text corpus containing random content and one hidden needle"
    )
    needle_description: str = dspy.InputField(
        desc="Description of what to search for, e.g., 'a 7-digit magic number'"
    )
    
    found: bool = dspy.OutputField(
        desc="Whether the needle was successfully located"
    )
    needle_value: str = dspy.OutputField(
        desc="The exact value of the needle if found, empty string if not found"
    )
    line_number: int = dspy.OutputField(
        desc="1-indexed line number where needle was found, 0 if not found"
    )
    context_before: str = dspy.OutputField(
        desc="The text immediately before the needle on the same line"
    )
    context_after: str = dspy.OutputField(
        desc="The text immediately after the needle on the same line"
    )
    search_method: Literal["regex", "substring", "semantic", "hybrid"] = dspy.OutputField(
        desc="The primary method used to locate the needle"
    )


def generate_haystack(num_lines=200, needle_position=0.7, seed=42):
    """Generate a haystack with a hidden needle."""
    import random
    random.seed(seed)
    
    words = ["lorem", "ipsum", "dolor", "sit", "amet", "consectetur", "adipiscing", 
             "elit", "sed", "do", "eiusmod", "tempor", "incididunt", "ut", "labore"]
    lines = []
    for i in range(num_lines):
        line = " ".join(random.choices(words, k=random.randint(8, 15)))
        lines.append(line)
    
    needle_line = int(num_lines * needle_position)
    magic_number = "4827391"
    lines[needle_line] = f"lorem ipsum dolor SECRET_CODE={magic_number} amet consectetur"
    
    return "\n".join(lines), magic_number, needle_line + 1  # 1-indexed

In [None]:
# Generate haystack and run needle search
haystack_text, expected_needle, expected_line = generate_haystack()
needle_desc = "a 7-digit secret code in the format SECRET_CODE=XXXXXXX"

print(f"Haystack: {len(haystack_text):,} chars, {len(haystack_text.splitlines())} lines")
print(f"Expected needle: {expected_needle} at line {expected_line}")

# Run RLM
needle_rlm = RLM(NeedleSearchSig, max_iterations=15, verbose=True)
needle_result = needle_rlm(haystack=haystack_text, needle_description=needle_desc)

print(f"\n{'='*60}")
print("Needle Search Results:")
print(f"{'='*60}")
print(f"Found: {needle_result.found}")
print(f"Needle value: {needle_result.needle_value}")
print(f"Line number: {needle_result.line_number}")
print(f"Context before: {needle_result.context_before}")
print(f"Context after: {needle_result.context_after}")
print(f"Search method: {needle_result.search_method}")

In [None]:
# Validate Needle Search outputs
def validate_needle_result(result, expected_needle, expected_line):
    """Validate all output types and constraints for needle search."""
    # Type checks
    assert isinstance(result.found, bool), f"found should be bool, got {type(result.found)}"
    
    assert isinstance(result.needle_value, str), f"needle_value should be str, got {type(result.needle_value)}"
    
    assert isinstance(result.line_number, int), f"line_number should be int, got {type(result.line_number)}"
    assert result.line_number >= 0, f"line_number should be >= 0, got {result.line_number}"
    
    assert isinstance(result.context_before, str), f"context_before should be str, got {type(result.context_before)}"
    assert isinstance(result.context_after, str), f"context_after should be str, got {type(result.context_after)}"
    
    assert result.search_method in ["regex", "substring", "semantic", "hybrid"], \
        f"search_method should be one of the Literal values, got {result.search_method}"
    
    # Value checks (the needle should be found correctly)
    assert result.found == True, f"Needle should be found, got found={result.found}"
    assert expected_needle in result.needle_value, \
        f"needle_value should contain {expected_needle}, got {result.needle_value}"
    
    print("Needle Search Test: All validations passed!")

validate_needle_result(needle_result, expected_needle, expected_line)

### Test 3: Codebase Understanding

This test validates deeply nested types with 7 output fields including `list[dict[str, str]]`, `dict[str, list[str]]`, and complex nested structures that require careful parsing.

In [None]:
class CodeAnalysisSig(dspy.Signature):
    """Analyze a codebase to extract structural information about modules, classes,
    and functions. Identify dependencies, complexity metrics, and potential issues.
    
    This requires both syntactic parsing (examining code structure) and semantic
    understanding (comprehending what the code does).
    
    You should:
    1. Parse each file to identify classes, functions, and imports
    2. Build a class hierarchy showing inheritance relationships
    3. Map internal dependencies between modules
    4. Identify potential code quality issues
    5. Provide an executive summary of the codebase
    
    Return a comprehensive analysis with properly nested structure information.
    """
    
    codebase: dict[str, str] = dspy.InputField(
        desc="Mapping of file paths to their source code contents"
    )
    analysis_focus: str = dspy.InputField(
        desc="Specific aspect to focus on: 'architecture', 'quality', 'dependencies', or 'all'"
    )
    
    modules: list[dict[str, str]] = dspy.OutputField(
        desc="List of modules, each with keys: 'name', 'path', 'purpose' (brief description)"
    )
    class_hierarchy: dict[str, list[str]] = dspy.OutputField(
        desc="Mapping of base class names to list of derived class names"
    )
    function_count: int = dspy.OutputField(
        desc="Total number of functions/methods across all files"
    )
    complexity_rating: Literal["low", "medium", "high", "very_high"] = dspy.OutputField(
        desc="Overall complexity assessment of the codebase"
    )
    issues: list[dict[str, str]] = dspy.OutputField(
        desc="Potential issues found, each with keys: 'severity' (low/medium/high), 'location', 'description'"
    )
    dependencies: dict[str, list[str]] = dspy.OutputField(
        desc="Internal dependencies: maps module name to list of modules it imports"
    )
    summary: str = dspy.OutputField(
        desc="2-3 sentence executive summary of the codebase analysis"
    )

In [None]:
# Sample codebase for analysis
sample_codebase = {
    "models/user.py": '''
class BaseModel:
    """Base class for all models."""
    def save(self):
        pass
    
    def delete(self):
        pass

class User(BaseModel):
    """User model with authentication."""
    def __init__(self, name, email):
        self.name = name
        self.email = email
    
    def authenticate(self, password):
        return self._check_password(password)
    
    def _check_password(self, password):
        # TODO: implement proper hashing
        return password == "secret"
''',
    "models/product.py": '''
from models.user import BaseModel

class Product(BaseModel):
    """Product catalog item."""
    def __init__(self, name, price):
        self.name = name
        self.price = price
    
    def apply_discount(self, percent):
        self.price *= (1 - percent / 100)
    
    def get_display_price(self):
        return f"${self.price:.2f}"
''',
    "services/auth.py": '''
from models.user import User

def login(email, password):
    """Authenticate a user."""
    user = User("test", email)
    if user.authenticate(password):
        return create_session(user)
    return None

def create_session(user):
    """Create a session token."""
    return f"session_{user.email}"

def logout(session_token):
    """Invalidate a session."""
    pass
''',
    "utils/helpers.py": '''
import re
from collections import Counter

def validate_email(email):
    """Validate email format."""
    pattern = r"^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$"
    return bool(re.match(pattern, email))

def count_words(text):
    """Count word frequencies."""
    words = text.lower().split()
    return dict(Counter(words))

def sanitize_input(text):
    """Remove potentially dangerous characters."""
    return re.sub(r"[<>\"']", "", text)
'''
}

# Run RLM
code_rlm = RLM(CodeAnalysisSig, max_iterations=15, verbose=True)
code_result = code_rlm(codebase=sample_codebase, analysis_focus="all")

print(f"\n{'='*60}")
print("Codebase Analysis Results:")
print(f"{'='*60}")
print(f"Modules: {code_result.modules}")
print(f"Class hierarchy: {code_result.class_hierarchy}")
print(f"Function count: {code_result.function_count}")
print(f"Complexity: {code_result.complexity_rating}")
print(f"Issues: {code_result.issues}")
print(f"Dependencies: {code_result.dependencies}")
print(f"Summary: {code_result.summary}")

In [None]:
# Validate Codebase Analysis outputs
def validate_code_result(result):
    """Validate all output types and nested structures for code analysis."""
    # modules: list[dict[str, str]]
    assert isinstance(result.modules, list), f"modules should be list, got {type(result.modules)}"
    for module in result.modules:
        assert isinstance(module, dict), f"each module should be dict, got {type(module)}"
        assert all(isinstance(k, str) and isinstance(v, str) for k, v in module.items()), \
            "module dict should have str keys and values"
    
    # class_hierarchy: dict[str, list[str]]
    assert isinstance(result.class_hierarchy, dict), f"class_hierarchy should be dict, got {type(result.class_hierarchy)}"
    for base, derived in result.class_hierarchy.items():
        assert isinstance(base, str), f"base class should be str, got {type(base)}"
        assert isinstance(derived, list), f"derived classes should be list, got {type(derived)}"
        assert all(isinstance(d, str) for d in derived), "all derived class names should be strings"
    
    # function_count: int
    assert isinstance(result.function_count, int), f"function_count should be int, got {type(result.function_count)}"
    assert result.function_count >= 0, f"function_count should be >= 0, got {result.function_count}"
    
    # complexity_rating: Literal
    assert result.complexity_rating in ["low", "medium", "high", "very_high"], \
        f"complexity_rating should be one of the Literal values, got {result.complexity_rating}"
    
    # issues: list[dict[str, str]]
    assert isinstance(result.issues, list), f"issues should be list, got {type(result.issues)}"
    for issue in result.issues:
        assert isinstance(issue, dict), f"each issue should be dict, got {type(issue)}"
        assert all(isinstance(k, str) and isinstance(v, str) for k, v in issue.items()), \
            "issue dict should have str keys and values"
    
    # dependencies: dict[str, list[str]]
    assert isinstance(result.dependencies, dict), f"dependencies should be dict, got {type(result.dependencies)}"
    for module, deps in result.dependencies.items():
        assert isinstance(module, str), f"module name should be str, got {type(module)}"
        assert isinstance(deps, list), f"deps should be list, got {type(deps)}"
        assert all(isinstance(d, str) for d in deps), "all dependency names should be strings"
    
    # summary: str
    assert isinstance(result.summary, str), f"summary should be str, got {type(result.summary)}"
    assert len(result.summary) > 0, "summary should not be empty"
    
    print("Codebase Analysis Test: All validations passed!")

validate_code_result(code_result)

### Test 4: Instructions-Only Signature

This test validates that `signature.instructions` (the docstring) are properly transferred to RLM, even with **no field descriptions**. The signature relies entirely on the docstring to convey the task.

In [None]:
class CountXSig(dspy.Signature):
    """Count the number of times the letter 'x' appears in the input word.
    
    Return the exact count as an integer. Case-insensitive: both 'x' and 'X' should be counted.
    """
    
    # No field descriptions - relies entirely on docstring instructions
    word: str = dspy.InputField()
    count: int = dspy.OutputField()

In [None]:
# Test cases with known expected counts
test_cases = [
    ("xerox", 2),           # x at start and middle
    ("example", 1),         # x in middle
    ("FOXBOX", 2),          # uppercase X's
    ("python", 0),          # no x
    ("xXxXx", 5),           # mixed case, multiple
]

count_rlm = RLM(CountXSig, max_iterations=10, verbose=True)

results_instructions_test = []
for word, expected in test_cases:
    result = count_rlm(word=word)
    passed = result.count == expected
    results_instructions_test.append({
        "word": word,
        "expected": expected,
        "got": result.count,
        "passed": passed
    })
    print(f"word='{word}' | expected={expected} | got={result.count} | {'PASS' if passed else 'FAIL'}")

In [None]:
# Validate instructions-only test
all_passed = all(r["passed"] for r in results_instructions_test)
pass_count = sum(1 for r in results_instructions_test if r["passed"])

print(f"\nInstructions-Only Test: {pass_count}/{len(results_instructions_test)} passed")
assert all_passed, f"Some test cases failed: {[r for r in results_instructions_test if not r['passed']]}"
print("Instructions-Only Test: All validations passed!")

## Benchmark: LongBench-v2 Code Repository Understanding

This section demonstrates RLM on the [LongBench-v2](https://huggingface.co/datasets/THUDM/LongBench-v2) Code Repository Understanding benchmark.

LongBench-v2 is a challenging benchmark with 503 multiple-choice questions requiring deep understanding and reasoning over contexts ranging from 8K to 2M words. The **Code Repository Understanding** split contains 50 questions about real code repositories.

Reference: ["LongBench v2: Towards Deeper Understanding and Reasoning on Realistic Long-context Multitasks" (Bai et al., 2024)](https://arxiv.org/abs/2412.15204)