In [None]:
# Install dependencies
%pip install z3-solver langgraph langchain-openai ipywidgets requests -q

In [None]:
from dataclasses import dataclass, field
from typing import Literal, Optional, Any
from enum import IntEnum
import requests
from z3 import *

## Part 1: Permission Model (mirrors ACL2 spec)

File access and execute are orthogonal permissions:
- File access: none (0), read (1), read-write (2)
- Execute: separate boolean

In [None]:
class AccessLevel(IntEnum):
    """File access levels - matches ACL2 *access-none*, *access-read*, *access-read-write*"""
    NONE = 0
    READ = 1
    READ_WRITE = 2

def access_sufficient(required: int, granted: int) -> bool:
    """Check if granted access >= required access (mirrors ACL2 access-sufficient-p)"""
    return granted >= required

def tool_permitted(required_access: int, requires_execute: bool, 
                   granted_access: int, execute_allowed: bool) -> bool:
    """Check if tool can be invoked (mirrors ACL2 tool-permitted-p)"""
    access_ok = access_sufficient(required_access, granted_access)
    execute_ok = (not requires_execute) or execute_allowed
    return access_ok and execute_ok

## Part 2: LLM Model Specification

In [None]:
@dataclass
class LLMModelSpec:
    """LLM model specification (mirrors ACL2 llm-model-spec-p)"""
    name: str
    tokens_per_second: float  # throughput for time estimation
    cost_per_1k_input: float  # cost in millicents per 1000 input tokens
    cost_per_1k_output: float # cost in millicents per 1000 output tokens
    
    def call_cost(self, input_tokens: int, output_tokens: int) -> float:
        """Calculate LLM call cost in millicents (mirrors ACL2 llm-call-cost)"""
        return ((input_tokens / 1000) * self.cost_per_1k_input + 
                (output_tokens / 1000) * self.cost_per_1k_output)
    
    def call_time_ms(self, input_tokens: int, output_tokens: int) -> float:
        """Calculate LLM call time in milliseconds (mirrors ACL2 llm-call-time-ms)"""
        return 1000 * (input_tokens + output_tokens) / self.tokens_per_second

# Manual model registry (to be populated from LM Studio)
MODEL_REGISTRY: dict[str, LLMModelSpec] = {}

In [None]:
def fetch_lm_studio_models(base_url: str = "http://host.docker.internal:1234") -> list[str]:
    """Fetch available models from LM Studio"""
    try:
        response = requests.get(f"{base_url}/v1/models", timeout=5)
        response.raise_for_status()
        data = response.json()
        models = [m["id"] for m in data.get("data", [])]
        print(f"Found {len(models)} models: {models}")
        return models
    except Exception as e:
        print(f"Could not connect to LM Studio: {e}")
        return []

def register_model(name: str, tps: float = 50.0, 
                   cost_in: float = 0.0, cost_out: float = 0.0):
    """Register a model with performance characteristics"""
    MODEL_REGISTRY[name] = LLMModelSpec(name, tps, cost_in, cost_out)
    print(f"Registered model: {name} ({tps} tok/s)")

# Fetch and register models
available_models = fetch_lm_studio_models()
for model in available_models:
    # Default: 50 tok/s, free (local)
    register_model(model, tps=50.0, cost_in=0.0, cost_out=0.0)

# Fallback default model if LM Studio not available
if not MODEL_REGISTRY:
    register_model("default-local", tps=50.0, cost_in=0.0, cost_out=0.0)

## Part 3: Tool Specification

In [None]:
@dataclass
class ToolSpec:
    """Tool specification (mirrors ACL2 tool-spec-p)"""
    name: str
    required_access: AccessLevel
    requires_execute: bool
    base_cost: int          # millicents
    time_estimate_ms: int   # milliseconds
    token_estimate: int     # tokens added to context

# Example tool registry
TOOL_REGISTRY: dict[str, ToolSpec] = {
    "read_file": ToolSpec("read_file", AccessLevel.READ, False, 0, 100, 500),
    "write_file": ToolSpec("write_file", AccessLevel.READ_WRITE, False, 0, 200, 100),
    "run_python": ToolSpec("run_python", AccessLevel.NONE, True, 0, 5000, 200),
    "web_search": ToolSpec("web_search", AccessLevel.NONE, False, 10, 2000, 1000),
}

## Part 4: Agent State

In [None]:
@dataclass
class AgentState:
    """Agent state (mirrors ACL2 agent-state-p)"""
    iteration: int = 0
    max_iterations: int = 10
    token_budget: int = 100000      # remaining tokens
    cost_budget: int = 100000       # remaining millicents ($1.00)
    time_budget_ms: int = 60000     # remaining time (60 seconds)
    file_access: AccessLevel = AccessLevel.READ
    execute_allowed: bool = False
    satisfaction: float = 0.0       # 0.0 to 1.0
    done: bool = False
    
    # Messages for LangGraph
    messages: list = field(default_factory=list)
    
    def copy(self) -> 'AgentState':
        """Create a copy of the state"""
        return AgentState(
            iteration=self.iteration,
            max_iterations=self.max_iterations,
            token_budget=self.token_budget,
            cost_budget=self.cost_budget,
            time_budget_ms=self.time_budget_ms,
            file_access=self.file_access,
            execute_allowed=self.execute_allowed,
            satisfaction=self.satisfaction,
            done=self.done,
            messages=self.messages.copy()
        )

## Part 5: Z3 Constraint Checking

This is where Z3 enforces the constraints proven in ACL2.

In [None]:
# Constants matching ACL2 spec
MIN_LLM_TOKENS = 100
MIN_ITERATION_COST = 10
MIN_ITERATION_TIME = 1000
SATISFACTION_THRESHOLD = 0.9

def z3_must_respond(state: AgentState) -> bool:
    """Check if agent must respond now (mirrors ACL2 must-respond-p)
    
    Uses Z3 to verify the constraint.
    """
    s = Solver()
    
    # Z3 variables for state
    done = Bool('done')
    iteration = Int('iteration')
    max_iter = Int('max_iter')
    token_budget = Int('token_budget')
    cost_budget = Int('cost_budget')
    time_budget = Int('time_budget')
    
    # Add current state as constraints
    s.add(done == state.done)
    s.add(iteration == state.iteration)
    s.add(max_iter == state.max_iterations)
    s.add(token_budget == state.token_budget)
    s.add(cost_budget == state.cost_budget)
    s.add(time_budget == state.time_budget_ms)
    
    # Must respond condition (mirrors ACL2)
    must_respond = Or(
        done,
        iteration >= max_iter,
        token_budget < MIN_LLM_TOKENS,
        cost_budget < MIN_ITERATION_COST,
        time_budget < MIN_ITERATION_TIME
    )
    
    # Check if must_respond is satisfiable (it always is, we want its value)
    s.add(must_respond)
    return s.check() == sat

def z3_should_continue(state: AgentState) -> bool:
    """Check if agent should continue (mirrors ACL2 should-continue-p)
    
    Uses Z3 to verify: NOT must_respond AND satisfaction < threshold
    """
    if z3_must_respond(state):
        return False
    return state.satisfaction < SATISFACTION_THRESHOLD

def z3_can_invoke_tool(state: AgentState, tool: ToolSpec) -> bool:
    """Check if tool can be invoked (mirrors ACL2 can-invoke-tool-p)
    
    Verifies both permission and budget constraints via Z3.
    """
    s = Solver()
    
    # Permission check
    required_access = Int('required_access')
    requires_execute = Bool('requires_execute')
    granted_access = Int('granted_access')
    execute_allowed = Bool('execute_allowed')
    
    s.add(required_access == tool.required_access.value)
    s.add(requires_execute == tool.requires_execute)
    s.add(granted_access == state.file_access.value)
    s.add(execute_allowed == state.execute_allowed)
    
    # Permission constraint (mirrors ACL2 tool-permitted-p)
    access_ok = granted_access >= required_access
    execute_ok = Or(Not(requires_execute), execute_allowed)
    permission_ok = And(access_ok, execute_ok)
    
    # Budget check
    tool_cost = Int('tool_cost')
    tool_time = Int('tool_time')
    tool_tokens = Int('tool_tokens')
    cost_budget = Int('cost_budget')
    time_budget = Int('time_budget')
    token_budget = Int('token_budget')
    
    s.add(tool_cost == tool.base_cost)
    s.add(tool_time == tool.time_estimate_ms)
    s.add(tool_tokens == tool.token_estimate)
    s.add(cost_budget == state.cost_budget)
    s.add(time_budget == state.time_budget_ms)
    s.add(token_budget == state.token_budget)
    
    # Budget constraint (mirrors ACL2 tool-budget-sufficient-p)
    budget_ok = And(
        tool_cost <= cost_budget,
        tool_time <= time_budget,
        tool_tokens <= token_budget
    )
    
    # Both must hold
    s.add(And(permission_ok, budget_ok))
    
    return s.check() == sat

## Part 6: State Transitions

In [None]:
def deduct_tool_cost(state: AgentState, tool: ToolSpec) -> AgentState:
    """Deduct tool costs from state (mirrors ACL2 deduct-tool-cost)"""
    new_state = state.copy()
    new_state.token_budget = max(0, state.token_budget - tool.token_estimate)
    new_state.cost_budget = max(0, state.cost_budget - tool.base_cost)
    new_state.time_budget_ms = max(0, state.time_budget_ms - tool.time_estimate_ms)
    return new_state

def deduct_llm_cost(state: AgentState, model: LLMModelSpec, 
                    input_tokens: int, output_tokens: int) -> AgentState:
    """Deduct LLM call costs from state"""
    new_state = state.copy()
    cost = model.call_cost(input_tokens, output_tokens)
    time = model.call_time_ms(input_tokens, output_tokens)
    
    new_state.iteration += 1
    new_state.token_budget = max(0, state.token_budget - (input_tokens + output_tokens))
    new_state.cost_budget = max(0, int(state.cost_budget - cost))
    new_state.time_budget_ms = max(0, int(state.time_budget_ms - time))
    return new_state

def update_satisfaction(state: AgentState, score: float) -> AgentState:
    """Update satisfaction score (mirrors ACL2 update-satisfaction)"""
    new_state = state.copy()
    new_state.satisfaction = max(0.0, min(1.0, score))
    return new_state

def mark_done(state: AgentState) -> AgentState:
    """Mark agent as done (mirrors ACL2 mark-done)"""
    new_state = state.copy()
    new_state.done = True
    return new_state

## Part 7: UI Controls

In [None]:
import ipywidgets as widgets
from IPython.display import display

# Permission controls
read_permission = widgets.Checkbox(value=True, description='Read files')
write_permission = widgets.Checkbox(value=False, description='Write files')
execute_permission = widgets.Checkbox(value=False, description='Execute code')

# Budget controls
max_tokens = widgets.IntSlider(value=100000, min=1000, max=500000, step=1000, 
                                description='Max tokens:')
max_cost = widgets.FloatSlider(value=1.0, min=0.01, max=10.0, step=0.01,
                                description='Max cost ($):')
max_time = widgets.IntSlider(value=60, min=5, max=300, step=5,
                              description='Max time (s):')
max_iterations = widgets.IntSlider(value=10, min=1, max=50, step=1,
                                    description='Max iterations:')

# Model selection
model_dropdown = widgets.Dropdown(
    options=list(MODEL_REGISTRY.keys()) or ['default-local'],
    description='LLM Model:'
)

# Display controls
permissions_box = widgets.VBox([widgets.Label('Permissions:'), 
                                 read_permission, write_permission, execute_permission])
budget_box = widgets.VBox([widgets.Label('Budgets:'),
                           max_tokens, max_cost, max_time, max_iterations])
model_box = widgets.VBox([widgets.Label('Model:'), model_dropdown])

display(widgets.HBox([permissions_box, budget_box, model_box]))

In [None]:
def get_permissions_from_ui() -> tuple[AccessLevel, bool]:
    """Get current permissions from UI controls"""
    if write_permission.value:
        access = AccessLevel.READ_WRITE
    elif read_permission.value:
        access = AccessLevel.READ
    else:
        access = AccessLevel.NONE
    return access, execute_permission.value

def create_initial_state() -> AgentState:
    """Create initial agent state from UI controls"""
    access, execute = get_permissions_from_ui()
    return AgentState(
        iteration=0,
        max_iterations=max_iterations.value,
        token_budget=max_tokens.value,
        cost_budget=int(max_cost.value * 100000),  # dollars to millicents
        time_budget_ms=max_time.value * 1000,       # seconds to ms
        file_access=access,
        execute_allowed=execute,
        satisfaction=0.0,
        done=False,
        messages=[]
    )

## Part 8: LLM-as-Judge for Satisfaction Assessment

In [None]:
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage, SystemMessage

# LM Studio connection
LM_STUDIO_BASE_URL = "http://host.docker.internal:1234/v1"

def get_llm(model_name: str = None) -> ChatOpenAI:
    """Get LLM client configured for LM Studio"""
    model = model_name or model_dropdown.value
    return ChatOpenAI(
        base_url=LM_STUDIO_BASE_URL,
        api_key="lm-studio",  # LM Studio doesn't require a real key
        model=model,
        temperature=0.7
    )

JUDGE_PROMPT = """You are evaluating how well the current response answers the original question.

Original question: {question}

Current response/progress:
{response}

Rate the satisfaction on a scale of 0-10, where:
- 0: No progress toward answering
- 5: Partial answer, significant gaps
- 10: Complete, accurate answer

Respond with ONLY a single number 0-10."""

def assess_satisfaction(question: str, response: str, llm: ChatOpenAI = None) -> float:
    """Use LLM-as-judge to assess response satisfaction (0.0 to 1.0)"""
    if llm is None:
        llm = get_llm()
    
    try:
        prompt = JUDGE_PROMPT.format(question=question, response=response)
        result = llm.invoke([HumanMessage(content=prompt)])
        score = int(result.content.strip())
        return max(0.0, min(1.0, score / 10.0))
    except Exception as e:
        print(f"Satisfaction assessment failed: {e}")
        return 0.5  # default middle score on error

## Part 9: LangGraph Agent with Z3 Routing

In [None]:
from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated
import operator

class GraphState(TypedDict):
    """State for LangGraph"""
    messages: Annotated[list, operator.add]
    agent_state: AgentState
    question: str

def call_llm(state: GraphState) -> GraphState:
    """Call LLM and update state"""
    llm = get_llm()
    agent_st = state["agent_state"]
    
    # Estimate tokens (rough)
    input_tokens = sum(len(m.content) // 4 for m in state["messages"])
    expected_output = 500
    
    # Get model spec
    model_spec = MODEL_REGISTRY.get(model_dropdown.value)
    if model_spec is None:
        model_spec = LLMModelSpec("default", 50.0, 0.0, 0.0)
    
    # Call LLM
    response = llm.invoke(state["messages"])
    actual_output = len(response.content) // 4
    
    # Update state with costs
    new_agent_st = deduct_llm_cost(agent_st, model_spec, input_tokens, actual_output)
    
    # Assess satisfaction
    satisfaction = assess_satisfaction(
        state["question"], 
        response.content,
        llm
    )
    new_agent_st = update_satisfaction(new_agent_st, satisfaction)
    
    print(f"Iteration {new_agent_st.iteration}: satisfaction={satisfaction:.2f}, "
          f"tokens_left={new_agent_st.token_budget}, "
          f"time_left={new_agent_st.time_budget_ms}ms")
    
    return {
        "messages": [response],
        "agent_state": new_agent_st,
        "question": state["question"]
    }

def should_continue(state: GraphState) -> Literal["continue", "end"]:
    """Z3-based routing decision (mirrors ACL2 should-continue-p)"""
    agent_st = state["agent_state"]
    
    if z3_should_continue(agent_st):
        print(f"  -> Z3: should continue (satisfaction={agent_st.satisfaction:.2f} < {SATISFACTION_THRESHOLD})")
        return "continue"
    else:
        reason = "done" if agent_st.done else \
                 "max iterations" if agent_st.iteration >= agent_st.max_iterations else \
                 "satisfaction met" if agent_st.satisfaction >= SATISFACTION_THRESHOLD else \
                 "budget exhausted"
        print(f"  -> Z3: must respond ({reason})")
        return "end"

In [None]:
def build_agent_graph() -> StateGraph:
    """Build the LangGraph agent with Z3 routing"""
    graph = StateGraph(GraphState)
    
    # Add nodes
    graph.add_node("llm", call_llm)
    
    # Add edges
    graph.set_entry_point("llm")
    graph.add_conditional_edges(
        "llm",
        should_continue,
        {
            "continue": "llm",
            "end": END
        }
    )
    
    return graph.compile()

agent = build_agent_graph()

## Part 10: Run the Agent

In [None]:
def run_agent(question: str) -> str:
    """Run the verified ReAct agent on a question"""
    initial_state = create_initial_state()
    
    print(f"Starting agent with:")
    print(f"  - File access: {initial_state.file_access.name}")
    print(f"  - Execute allowed: {initial_state.execute_allowed}")
    print(f"  - Token budget: {initial_state.token_budget}")
    print(f"  - Cost budget: ${initial_state.cost_budget / 100000:.2f}")
    print(f"  - Time budget: {initial_state.time_budget_ms / 1000}s")
    print(f"  - Max iterations: {initial_state.max_iterations}")
    print()
    
    graph_state: GraphState = {
        "messages": [
            SystemMessage(content="You are a helpful assistant. Think step by step."),
            HumanMessage(content=question)
        ],
        "agent_state": initial_state,
        "question": question
    }
    
    result = agent.invoke(graph_state)
    
    final_state = result["agent_state"]
    print(f"\nFinal state:")
    print(f"  - Iterations used: {final_state.iteration}")
    print(f"  - Final satisfaction: {final_state.satisfaction:.2f}")
    print(f"  - Tokens remaining: {final_state.token_budget}")
    
    # Return last AI message
    for msg in reversed(result["messages"]):
        if isinstance(msg, AIMessage):
            return msg.content
    return "No response generated"

In [None]:
# Example: Run the agent
# Adjust the UI controls above before running

question = "What is the capital of France and what is it known for?"
response = run_agent(question)
print("\n" + "="*50)
print("FINAL RESPONSE:")
print("="*50)
print(response)

## Verification Notes

This notebook implements the same constraints proven in `experiment-01-react-verified.lisp`:

1. **Permission Safety** (`permission-safety` theorem): `z3_can_invoke_tool` ensures tool permissions are satisfied
2. **Budget Non-negativity** (`tool-deduction-preserves-budget-nonneg`): `deduct_tool_cost` uses `max(0, ...)` 
3. **Iteration Increases** (`iteration-increases`): `deduct_llm_cost` increments iteration
4. **Termination** (`termination-by-iteration`): `z3_must_respond` checks iteration bound

The Z3 solver provides runtime enforcement of the constraints proven correct in ACL2.