In [1]:
from __future__ import annotations

import os
import re
import shlex
import shutil
import subprocess
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple

from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_classic.agents import AgentExecutor
from langchain_classic.agents import create_tool_calling_agent


In [None]:
os.environ["OPENROUTER_API_KEY"] = "<API_KEY_HERE>"


In [3]:
os.environ["AGENT_ALLOWED_ROOT"]="./"
# Optional: NuSMV location if not on PATH
os.environ["NUSMV_BIN"]="./NuSMV"
# Optional: timeout per NuSMV run
os.environ["NUSMV_TIMEOUT_S"]="60"

In [4]:
# =============================================================================
# Configuration / Guardrails
# =============================================================================

# The agent is only allowed to read files under this root.
# Set AGENT_ALLOWED_ROOT to a narrow directory for safety.
ALLOWED_ROOT = Path(os.getenv("AGENT_ALLOWED_ROOT", str(Path.cwd()))).resolve()

# NuSMV executable location (PATH or explicit).
NUSMV_BIN = os.getenv("NUSMV_BIN", "NuSMV")

# Timeout for each NuSMV invocation.
DEFAULT_TIMEOUT_S = int(os.getenv("NUSMV_TIMEOUT_S", "60"))

# Output truncation to avoid huge tool payloads
MAX_CHARS = 30000


def _resolve_under_allowed_root(p: Path) -> Path:
    """Resolve p and enforce it stays under ALLOWED_ROOT."""
    rp = p.resolve()
    try:
        rp.relative_to(ALLOWED_ROOT)
    except ValueError as e:
        raise ValueError(f"Path must be under allowed root: {ALLOWED_ROOT}") from e
    return rp


def _find_nusmv_executable() -> str:
    """Find NuSMV executable either as absolute path or via PATH."""
    candidate = Path(NUSMV_BIN)
    if candidate.is_absolute():
        if not candidate.exists():
            raise FileNotFoundError(f"NuSMV not found at: {candidate}")
        if not os.access(candidate, os.X_OK):
            raise PermissionError(f"NuSMV is not executable: {candidate}")
        return str(candidate)

    resolved = shutil.which(NUSMV_BIN)
    if not resolved:
        raise FileNotFoundError(
            "NuSMV executable not found on PATH. "
            "Add NuSMV to PATH or set NUSMV_BIN to its absolute path."
        )
    return resolved

In [5]:
# =============================================================================
# Parsing helpers (lightweight, robust enough for summaries)
# =============================================================================

_SPEC_LINE_RE = re.compile(r"^\s*--\s*specification\s+(.*?)\s+is\s+(true|false)\s*$", re.IGNORECASE)
_COUNTEREX_RE = re.compile(r"^\s*--\s*as\s+demonstrated\s+by\s+the\s+following\s+execution\s+sequence", re.IGNORECASE)


@dataclass
class SpecResult:
    spec: str
    is_true: bool

def parse_nusmv_output(stdout: str, stderr: str) -> Dict[str, Any]:
    """
    Extracts:
      - spec results: list of {spec, is_true}
      - whether a counterexample section appears
      - a few key lines
    """
    specs: List[SpecResult] = []
    counterexample_present = False

    for line in stdout.splitlines():
        m = _SPEC_LINE_RE.match(line)
        if m:
            spec_text = m.group(1).strip()
            truth = m.group(2).lower() == "true"
            specs.append(SpecResult(spec=spec_text, is_true=truth))
        if _COUNTEREX_RE.match(line):
            counterexample_present = True

    # Pull a concise “high-signal” excerpt (first and last few lines)
    out_lines = stdout.splitlines()
    head = "\n".join(out_lines[:30])
    tail = "\n".join(out_lines[-30:]) if len(out_lines) > 30 else ""
    excerpt = head + ("\n...\n" + tail if tail else "")

    return {
        "specs": [{"spec": s.spec, "is_true": s.is_true} for s in specs],
        "counterexample_present": counterexample_present,
        "stderr_nonempty": bool(stderr.strip()),
        "excerpt": excerpt[:MAX_CHARS],
    }

In [6]:
# =============================================================================
# Fix A: Pure Python functions (callable) + thin @tool wrappers
# =============================================================================

def list_smv_files_fn(directory: str) -> Dict[str, Any]:
    """
    Pure function: list .smv files in directory (non-recursive).
    Returns {"directory": "...", "files": [...], "count": N}
    """
    d = _resolve_under_allowed_root(Path(directory))
    if not d.exists():
        raise FileNotFoundError(f"Directory not found: {d}")
    if not d.is_dir():
        raise ValueError(f"Not a directory: {d}")

    files = sorted([p.resolve() for p in d.iterdir() if p.is_file() and p.suffix.lower() == ".smv"])
    safe_files = [str(_resolve_under_allowed_root(p)) for p in files]
    return {"directory": str(d), "files": safe_files, "count": len(safe_files)}


def run_nusmv_on_file_fn(
    smv_file: str,
    extra_args: Optional[str] = None,
    timeout_s: int = DEFAULT_TIMEOUT_S,
) -> Dict[str, Any]:
    """
    Pure function: run NuSMV on a single .smv file.
    """
    exe = _find_nusmv_executable()

    f = _resolve_under_allowed_root(Path(smv_file))
    if not f.exists():
        raise FileNotFoundError(f"File not found: {f}")
    if not f.is_file():
        raise ValueError(f"Not a regular file: {f}")
    if f.suffix.lower() != ".smv":
        raise ValueError(f"Not an .smv file: {f}")

    cmd = [exe, str(f)]
    if extra_args:
        cmd.extend(shlex.split(extra_args))

    try:
        p = subprocess.run(
            cmd,
            text=True,
            capture_output=True,
            timeout=timeout_s,
            check=False,
        )
        stdout = (p.stdout or "")[:MAX_CHARS]
        stderr = (p.stderr or "")[:MAX_CHARS]
        parsed = parse_nusmv_output(stdout=stdout, stderr=stderr)

        return {
            "file": str(f),
            "command": cmd,
            "exit_code": p.returncode,
            "stdout": stdout,
            "stderr": stderr,
            "parsed": parsed,
        }

    except subprocess.TimeoutExpired as e:
        stdout = (e.stdout or "")[:MAX_CHARS]
        stderr = (e.stderr or "")[:MAX_CHARS]
        return {
            "file": str(f),
            "command": cmd,
            "exit_code": None,
            "stdout": stdout,
            "stderr": stderr,
            "error": f"Timed out after {timeout_s} seconds",
        }


def run_nusmv_on_directory_fn(
    directory: str,
    extra_args: Optional[str] = None,
    timeout_s: int = DEFAULT_TIMEOUT_S,
    max_files: int = 50,
) -> Dict[str, Any]:
    """
    Pure function: list .smv files then run NuSMV on each (sequential).
    """
    listing = list_smv_files_fn(directory)
    files: List[str] = listing["files"][:max_files]

    results: List[Dict[str, Any]] = []
    for fp in files:
        results.append(run_nusmv_on_file_fn(fp, extra_args=extra_args, timeout_s=timeout_s))

    return {"directory": listing["directory"], "count": len(results), "results": results}


# =============================================================================
# Tool wrappers (thin)
# =============================================================================

@tool("list_smv_files")
def list_smv_files(directory: str) -> Dict[str, Any]:
    """List .smv files under a directory (non-recursive)."""
    return list_smv_files_fn(directory)


@tool("run_nusmv_on_file")
def run_nusmv_on_file(
    smv_file: str,
    extra_args: Optional[str] = None,
    timeout_s: int = DEFAULT_TIMEOUT_S,
) -> Dict[str, Any]:
    """Run NuSMV on a single .smv file."""
    return run_nusmv_on_file_fn(smv_file, extra_args=extra_args, timeout_s=timeout_s)


@tool("run_nusmv_on_directory")
def run_nusmv_on_directory(
    directory: str,
    extra_args: Optional[str] = None,
    timeout_s: int = DEFAULT_TIMEOUT_S,
    max_files: int = 50,
) -> Dict[str, Any]:
    """Run NuSMV on all .smv files under a directory."""
    return run_nusmv_on_directory_fn(directory, extra_args=extra_args, timeout_s=timeout_s, max_files=max_files)


TOOLS = [list_smv_files, run_nusmv_on_file, run_nusmv_on_directory]

In [7]:
# =============================================================================
# Agent (OpenRouter + tool calling)
# =============================================================================

PROMPT = ChatPromptTemplate.from_messages(
    [
        (
            "system",
            "You are a formal verification assistant.\n"
            "When asked to analyze a directory of SMV files, call run_nusmv_on_directory.\n"
            "Use tool output only; do not fabricate NuSMV results.\n"
            "Summarize per file:\n"
            " - number of specs found\n"
            " - which specs are false\n"
            " - whether a counterexample was indicated\n"
            "Then provide a short overall summary."
        ),
        ("human", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ]
)


def build_agent_executor() -> AgentExecutor:
    api_key = os.getenv("OPENROUTER_API_KEY")
    if not api_key:
        raise RuntimeError("Missing OPENROUTER_API_KEY environment variable.")

    llm = ChatOpenAI(
        model="mistralai/devstral-2512:free",
        api_key=api_key,
        base_url="https://openrouter.ai/api/v1",
        temperature=0.2,
    )

    agent = create_tool_calling_agent(llm=llm, tools=TOOLS, prompt=PROMPT)

    return AgentExecutor(
        agent=agent,
        tools=TOOLS,
        verbose=True,
        handle_parsing_errors=True,
        max_iterations=6,
    )

In [8]:
def main() -> None:
    print(f"Allowed root: {ALLOWED_ROOT}")
    print(f"NuSMV bin setting: {NUSMV_BIN}")

    target_dir = os.getenv("SMV_DIR", str(Path.cwd()))
    user_request = (
        f"Analyze all .smv files in directory: {target_dir}. "
        f"Run NuSMV and summarize pass/fail specs per file. "
        f"Include an overall summary for the whole directory."
    )

    executor = build_agent_executor()
    result = executor.invoke({"input": user_request})

    print("\nASSISTANT OUTPUT:\n")
    print(result["output"])


if __name__ == "__main__":
    main()

Allowed root: /Users/omersubasi/Downloads/AgentforModelChecking
NuSMV bin setting: ./NuSMV


[1m> Entering new AgentExecutor chain...[0m
[32;1m[1;3m
Invoking: `run_nusmv_on_directory` with `{'directory': '/Users/omersubasi/Downloads/AgentforModelChecking', 'timeout_s': 60}`


[0m[38;5;200m[1;3m{'directory': '/Users/omersubasi/Downloads/AgentforModelChecking', 'count': 2, 'results': [{'file': '/Users/omersubasi/Downloads/AgentforModelChecking/mutex.smv', 'command': ['./NuSMV', '/Users/omersubasi/Downloads/AgentforModelChecking/mutex.smv'], 'exit_code': 0, 'stdout': '*** This is NuSMV 2.7.0 (compiled on Thu Oct 24 15:29:16 2024)\n*** Enabled addons are: compass\n*** For more information on NuSMV see <http://nusmv.fbk.eu>\n*** or email to <nusmv-users@list.fbk.eu>.\n*** Please report bugs to <Please report bugs to <nusmv-users@fbk.eu>>\n\n*** Copyright (c) 2010-2024, Fondazione Bruno Kessler\n\n*** This version of NuSMV is linked to the CUDD library version 2.4.1\n*** Copyright (c)