In [None]:
import asyncio
import os
from pathlib import Path
import tempfile
import dspy
from roma_dspy.config import ConfigManager
from roma_dspy.core.engine.solve import RecursiveSolver
from roma_dspy.core.modules.recursive_solver import RecursiveSolverModule

# Get the absolute path to the project root
# Use the roma_dspy package location to find the project root
import roma_dspy
package_path = Path(roma_dspy.__file__).parent.parent.parent
project_root = package_path

print(f"Project root: {project_root}")

# Create config manager with absolute path to config directory
config_manager = ConfigManager(config_dir=project_root / "config")
config = config_manager.load_config(profile="general")

# Optional: Customize configuration programmatically
config.runtime.max_depth = 2
config.runtime.verbose = True
config.runtime.enable_logging = True

# Use temp directory for cache in notebooks to avoid permission issues
config.runtime.cache.disk_cache_dir = str(Path(tempfile.gettempdir()) / "roma_dspy_cache")

# Create solver from config (disable checkpoints for notebook usage)
solver = RecursiveSolver(config=config, enable_checkpoints=False)

# Wrap in DSPy module for .forward() interface
dspy_module = RecursiveSolverModule(solver=solver)

In [None]:
task_goal = "Write me a blog post about the benefits of using DSPy."

# Use async_event_solve in Jupyter notebooks (which run in an event loop)
completed_task = await solver.async_event_solve(task=task_goal)

# Display result
print(f"Status: {completed_task.status}")
print(f"\nResult:\n{completed_task.result}")

In [None]:
# Display the execution trace (text format)
from roma_dspy.core.utils.trace_formatter import format_solver_trace

trace = format_solver_trace(solver)
print(trace)

In [None]:
# Access the DAG for detailed inspection
from roma_dspy.core.utils.trace_formatter import format_dag_summary

dag = solver.last_dag
print(format_dag_summary(dag))

# Get execution ID for use with TUI/API
execution_id = dag.get_execution_id()
print(f"\nExecution ID: {execution_id}")

In [None]:
# Inspect individual tasks
print(f"Root Task ID: {completed_task.task_id}")
print(f"Status: {completed_task.status}")
print(f"Subtasks: {len(completed_task.children)}")

# Iterate through children
for child_id in completed_task.children:
    child = dag.get_node(child_id)
    if child:
        print(f"\n  Child: {child.goal[:60]}...")
        print(f"  Status: {child.status}")