From 01957e087f08808b333ca1c7de2793b45dbdf72d Mon Sep 17 00:00:00 2001 From: aimable100 <129232709+aimable100@users.noreply.github.com> Date: Tue, 14 Apr 2026 14:43:33 -0700 Subject: [PATCH] Add Tenuo authorization contrib module MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds `temporalio.contrib.tenuo`, a SimplePlugin integration for Tenuo warrant-based authorization in Temporal workflows. The plugin (`TenuoPlugin`) wires client interceptors, worker interceptors, and workflow sandbox passthrough in a single line: from temporalio.contrib.tenuo import TenuoPlugin plugin = TenuoPlugin(config) client = await Client.connect("localhost:7233", plugins=[plugin]) Key design decisions: - Thin adapter: only TenuoPlugin, TENUO_PLUGIN_NAME, and ensure_tenuo_workflow_runner are exported from the contrib module. All other types (TenuoPluginConfig, EnvKeyResolver, etc.) are imported directly from tenuo.temporal. - No private imports: all tenuo.temporal internals used by the plugin are exposed through public lazy-loaded names. - No re-exports of external package types, matching the pattern established by openai_agents and other contrib modules. Files: - temporalio/contrib/tenuo/__init__.py — public API (3 exports) - temporalio/contrib/tenuo/_plugin.py — TenuoPlugin SimplePlugin subclass - temporalio/contrib/tenuo/README.md — multi-agent delegation example - tests/contrib/tenuo/test_tenuo.py — unit + live integration tests - tests/contrib/tenuo/test_tenuo_replay.py — record-and-replay tests - pyproject.toml — tenuo optional dependency --- pyproject.toml | 2 + temporalio/contrib/tenuo/README.md | 371 ++++++++++++++++++ temporalio/contrib/tenuo/__init__.py | 27 ++ temporalio/contrib/tenuo/_plugin.py | 180 +++++++++ tests/contrib/tenuo/__init__.py | 0 tests/contrib/tenuo/test_tenuo.py | 464 +++++++++++++++++++++++ tests/contrib/tenuo/test_tenuo_replay.py | 243 ++++++++++++ 7 files changed, 1287 insertions(+) create mode 100644 temporalio/contrib/tenuo/README.md create mode 100644 temporalio/contrib/tenuo/__init__.py create mode 100644 temporalio/contrib/tenuo/_plugin.py create mode 100644 tests/contrib/tenuo/__init__.py create mode 100644 tests/contrib/tenuo/test_tenuo.py create mode 100644 tests/contrib/tenuo/test_tenuo_replay.py diff --git a/pyproject.toml b/pyproject.toml index 6ea339047..0512c2024 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,6 +31,7 @@ pydantic = ["pydantic>=2.0.0,<3"] openai-agents = ["openai-agents>=0.3,<0.7", "mcp>=1.9.4, <2"] google-adk = ["google-adk>=1.27.0,<2"] langsmith = ["langsmith>=0.7.0,<0.8"] +tenuo = ["tenuo>=0.1.0b22"] lambda-worker-otel = [ "opentelemetry-api>=1.11.1,<2", "opentelemetry-sdk>=1.11.1,<2", @@ -84,6 +85,7 @@ dev = [ "opentelemetry-exporter-otlp-proto-grpc>=1.11.1,<2", "opentelemetry-semantic-conventions>=0.40b0,<1", "opentelemetry-sdk-extension-aws>=2.0.0,<3", + "tenuo>=0.1.0b22", ] [tool.poe.tasks] diff --git a/temporalio/contrib/tenuo/README.md b/temporalio/contrib/tenuo/README.md new file mode 100644 index 000000000..d2df4d99f --- /dev/null +++ b/temporalio/contrib/tenuo/README.md @@ -0,0 +1,371 @@ +# Tenuo Authorization for Temporal + +## Introduction + +Temporal workflows are a natural fit for AI agents: the workflow is the +reasoning loop, activities are the tools it calls, and child workflows are +sub-agents it delegates to. Tenuo adds cryptographic authorization to this +model so that every agent — and every sub-agent — can only do what its +warrant allows. + +``` +Control Plane + │ mints warrant: [lookup_customer, search_kb, call_llm, send_response, escalate] + │ + ▼ +Triage Agent (Workflow, 1-hour warrant, 5 tools) + ├── lookup_customer activity ← CRM read + ├── call_llm activity ← LLM decides next step + ├── send_response activity ← restricted to email + chat + │ + └──▶ Research Agent (Child Workflow, 60s warrant, 2 tools) + ├── call_llm activity ← pinned to gpt-4o-mini + └── search_kb activity ← scoped to "support" department +``` + +**What Tenuo adds to Temporal:** + +- **Warrant-scoped tool dispatch.** Each agent workflow carries a signed warrant + specifying which activities (tools) it can call and with what argument constraints. +- **Delegation with monotonic attenuation.** Sub-agent workflows receive narrower + warrants — capabilities can only shrink, never expand. A research agent cannot + call `send_response` even if the parent can. +- **Proof-of-Possession (PoP).** Every tool call is cryptographically signed, + proving the warrant holder initiated it. +- **Zero-trust by default.** Tools reject calls unless all arguments are explicitly + declared in the warrant. + +## Quick Start + +### Installation + +```bash +pip install 'temporalio[tenuo]' +``` + +Or if you already have `temporalio` installed: + +```bash +pip install tenuo +``` + +### Multi-Agent Support System + +A triage agent that looks up customer context, delegates research to a +sub-agent, and responds — each agent scoped to only the tools it needs. + +**Activities (tools):** + +```python +from temporalio import activity + +@activity.defn +async def lookup_customer(customer_id: str) -> dict: + """CRM lookup.""" + ... + +@activity.defn +async def search_knowledge_base(query: str, department: str) -> list[dict]: + """Semantic search over internal docs, scoped to a department.""" + ... + +@activity.defn +async def call_llm(prompt: str, model: str, max_tokens: int) -> str: + """LLM inference — warrant pins the model and token budget.""" + ... + +@activity.defn +async def send_response(channel: str, ticket_id: str, message: str) -> str: + """Send a customer response via the specified channel.""" + ... + +@activity.defn +async def escalate_to_human(ticket_id: str, reason: str, priority: str) -> str: + """Route the ticket to a human agent.""" + ... +``` + +**Research sub-agent (child workflow):** + +```python +from datetime import timedelta +from temporalio import workflow +from tenuo.temporal import tenuo_execute_activity + +from .activities import search_knowledge_base, call_llm + +@workflow.defn +class ResearchAgent: + """Sub-agent that can only search docs and call an LLM — nothing else.""" + + @workflow.run + async def run(self, question: str) -> str: + docs = await tenuo_execute_activity( + search_knowledge_base, + args=[question, "support"], + start_to_close_timeout=timedelta(seconds=15), + ) + return await tenuo_execute_activity( + call_llm, + args=[f"Summarize: {docs}", "gpt-4o-mini", 512], + start_to_close_timeout=timedelta(seconds=30), + ) +``` + +**Triage agent (parent workflow) — delegates to the research sub-agent:** + +```python +from datetime import timedelta + +from temporalio import workflow +from tenuo import Exact, AnyOf +from tenuo.temporal import tenuo_execute_activity, tenuo_execute_child_workflow + +from .activities import lookup_customer, call_llm, send_response +from .research_agent import ResearchAgent + +@workflow.defn +class TriageAgent: + @workflow.run + async def run(self, ticket_id: str, customer_id: str, question: str) -> str: + customer = await tenuo_execute_activity( + lookup_customer, + args=[customer_id], + start_to_close_timeout=timedelta(seconds=10), + ) + + # Delegate to research sub-agent with a narrower warrant: + # only search_knowledge_base + call_llm, 60-second TTL. + summary = await tenuo_execute_child_workflow( + ResearchAgent.run, + args=[question], + tools=["search_knowledge_base", "call_llm"], + constraints={ + "call_llm": {"model": Exact("gpt-4o-mini")}, + "search_knowledge_base": {"department": AnyOf(["support"])}, + }, + ttl_seconds=60, + ) + + answer = await tenuo_execute_activity( + call_llm, + args=[ + f"Customer: {customer['name']}\nResearch: {summary}\nQuestion: {question}", + "gpt-4o-mini", + 512, + ], + start_to_close_timeout=timedelta(seconds=30), + ) + + return await tenuo_execute_activity( + send_response, + args=["email", ticket_id, answer], + start_to_close_timeout=timedelta(seconds=10), + ) +``` + +> **Note:** The plugin configures sandbox passthrough for `tenuo` and `tenuo_core` +> automatically. You do **not** need `workflow.unsafe.imports_passed_through()`. + +**Worker setup:** + +```python +import asyncio +from temporalio.client import Client +from temporalio.worker import Worker +from temporalio.contrib.tenuo import TenuoPlugin + +from tenuo import SigningKey, Warrant, Exact, AnyOf, Wildcard +from tenuo.temporal import TenuoPluginConfig, EnvKeyResolver, execute_workflow_authorized + +from .activities import ( + lookup_customer, search_knowledge_base, + call_llm, send_response, escalate_to_human, +) +from .triage_agent import TriageAgent +from .research_agent import ResearchAgent + +async def main(): + # In production, load the key from env/vault instead of generating each time + signing_key = SigningKey.generate() + + plugin = TenuoPlugin(TenuoPluginConfig( + key_resolver=EnvKeyResolver(), + trusted_roots=[signing_key.public_key], + )) + + client = await Client.connect("localhost:7233", plugins=[plugin]) + + async with Worker( + client, + task_queue="support-agents", + workflows=[TriageAgent, ResearchAgent], + activities=[ + lookup_customer, search_knowledge_base, + call_llm, send_response, escalate_to_human, + ], + ): + # Mint a warrant for the triage agent — all five tools + warrant = ( + Warrant.mint_builder() + .capability("lookup_customer", customer_id=Wildcard()) + .capability("search_knowledge_base", + query=Wildcard(), + department=AnyOf(["support", "billing"]), + ) + .capability("call_llm", + prompt=Wildcard(), + model=Exact("gpt-4o-mini"), + max_tokens=Wildcard(), + ) + .capability("send_response", + channel=AnyOf(["email", "chat"]), + ticket_id=Wildcard(), + message=Wildcard(), + ) + .capability("escalate_to_human", + ticket_id=Wildcard(), + reason=Wildcard(), + priority=AnyOf(["low", "medium"]), + ) + .holder(signing_key.public_key) + .ttl(3600) + .mint(signing_key) + ) + + result = await execute_workflow_authorized( + client, + TriageAgent.run, + args=["TICKET-42", "cust-1001", "How do I reset my password?"], + warrant=warrant, + key_id="default", + id="triage-42", + task_queue="support-agents", + ) + print(result) + +asyncio.run(main()) +``` + +### What Happens at Each Level + +| Agent | Warrant scope | What it **can** do | What it **cannot** do | +|-------|--------------|--------------------|-----------------------| +| Triage Agent | 5 tools, 1-hour TTL | All tools, delegates to sub-agents | Call tools not in warrant | +| Research Agent | 2 tools, 60s TTL | `search_knowledge_base` (support only), `call_llm` (gpt-4o-mini) | `send_response`, `escalate_to_human`, `lookup_customer` | + +The research agent's warrant is automatically attenuated from the parent — it +**cannot** widen scope, switch to a more expensive model, or access billing +department docs. + +## How It Works + +### Plugin Setup + +`TenuoPlugin` is a Temporal `SimplePlugin` that automatically configures: + +1. **Client interceptor:** Injects warrant headers into workflow start requests. +2. **Worker interceptors:** Signs tool calls with PoP (outbound) and verifies authorization (inbound). +3. **Sandbox passthrough:** Adds `tenuo` and `tenuo_core` to the workflow sandbox passthrough list so the native extension loads correctly. + +```python +from temporalio.contrib.tenuo import TenuoPlugin +from tenuo.temporal import TenuoPluginConfig, EnvKeyResolver + +plugin = TenuoPlugin(TenuoPluginConfig( + key_resolver=EnvKeyResolver(), + trusted_roots=[issuer_pubkey], +)) + +client = await Client.connect("localhost:7233", plugins=[plugin]) +``` + +Register the plugin on `Client.connect(plugins=[...])` only. Workers built from that client inherit the plugin automatically — do not pass it again to `Worker(plugins=[...])`. + +### Constraint Types + +Warrants use a **closed-world (zero-trust) model** — every activity parameter must +be declared, even if unconstrained. Available constraint types: + +| Constraint | What it enforces | Example | +|------------|-----------------|---------| +| `Wildcard()` | Any value (parameter declared but unconstrained) | `prompt=Wildcard()` | +| `Exact(value)` | Must match exactly | `model=Exact("gpt-4o-mini")` | +| `AnyOf([...])` | Must be one of the listed values | `priority=AnyOf(["low", "medium"])` | +| `Subpath(prefix)` | String must start with prefix (path scoping) | `path=Subpath("/data/tenant-a")` | +| `Cidr(network)` | IP must be within a CIDR block (IPv4/IPv6) | `source_ip=Cidr("10.0.0.0/8")` | +| `UrlSafe(...)` | URL with allowed schemes, domains, no private IPs | `url=UrlSafe(allow_domains=["api.stripe.com"])` | +| `Range(min, max)` | Numeric bounds | `max_tokens=Range(1, 1024)` | + +See the [Tenuo documentation](https://tenuo.ai/docs) for the full list. + +### Key Resolution + +`EnvKeyResolver` loads signing keys from `TENUO_KEY_` environment variables. Keys are base64 or hex-encoded: + +```bash +export TENUO_KEY_default=$(python -c "from tenuo import SigningKey; print(SigningKey.generate().to_base64())") +``` + +The plugin automatically calls `preload_all()` at worker startup, caching all keys so that `resolve_sync()` never touches `os.environ` inside the workflow sandbox. + +For production, use `VaultKeyResolver`, `AWSSecretsManagerKeyResolver`, or `GCPSecretManagerKeyResolver` from `tenuo.temporal`. + +### Activity Summaries + +The outbound interceptor automatically sets a human-readable summary on every tool dispatch, visible in the Temporal Web UI: + +```text +[tenuo.TenuoTemporalPlugin] call_llm +[tenuo.TenuoTemporalPlugin] lookup_customer: triage cust-1001 +``` + +If you pass a `summary` to `tenuo_execute_activity`, it is preserved and prefixed with the plugin ID and tool name. + +### Replay Safety + +The integration is designed for Temporal replay determinism: + +- **PoP timestamps** use `workflow.now()`, not wall-clock time. +- **PoP signatures** are deterministic: same inputs always produce the same output. +- **No non-deterministic calls** (`os.urandom`, `random`, `uuid4`) in the workflow interceptor code path. +- **Key resolution** uses pre-cached keys inside the sandbox, never `os.environ`. + +## What the Plugin Handles + +| Feature | How | +|---------|-----| +| Sandbox passthrough | Adds `tenuo` and `tenuo_core` to workflow runner passthrough — automatic | +| Client interceptor | Creates `TenuoClientInterceptor` for warrant header injection — automatic | +| Worker interceptors | Registers outbound PoP signer and inbound authorization verifier — automatic | +| Activity auto-discovery | Populates `activity_fns` from the worker's activity list — automatic | +| Key preloading | Calls `preload_all()` on `EnvKeyResolver` at startup — automatic | +| Activity summaries | Prefixes activity summaries with `[tenuo.TenuoTemporalPlugin]` in Web UI — automatic | +| Replay safety | PoP signing uses `workflow.now()`; verified by record-and-replay tests | + +All other features (`execute_workflow_authorized`, `tenuo_execute_activity`, delegation, +`AuthorizedWorkflow`, key resolvers, etc.) live in the `tenuo` package and are imported +from `tenuo.temporal`. + +## Production + +For local development, warrants and keys are created inline (as shown above). +In production, [Tenuo Cloud](https://cloud.tenuo.ai) provides a managed control +plane for warrant issuance, key rotation, audit logging, and policy management. +Workers connect via a connect token — no changes to workflow or activity code: + +```bash +export TENUO_CONNECT_TOKEN="tcp_..." +``` + +Self-hosted deployments can use `VaultKeyResolver`, `AWSSecretsManagerKeyResolver`, +or `GCPSecretManagerKeyResolver` from `tenuo.temporal` for key management without +a managed control plane. + +## Further Reading + +- [Tenuo documentation](https://tenuo.ai/docs) +- [Tenuo Cloud](https://cloud.tenuo.ai) — managed control plane for warrant issuance, key rotation, and audit +- [Tenuo Temporal guide](https://tenuo.ai/docs/temporal) +- [Tenuo Temporal reference](https://tenuo.ai/docs/temporal-reference) +- [Tenuo GitHub](https://github.com/tenuo-ai/tenuo) diff --git a/temporalio/contrib/tenuo/__init__.py b/temporalio/contrib/tenuo/__init__.py new file mode 100644 index 000000000..8d8099bdc --- /dev/null +++ b/temporalio/contrib/tenuo/__init__.py @@ -0,0 +1,27 @@ +"""Tenuo warrant-based authorization for Temporal workflows. + +This module provides :class:`TenuoPlugin`, a Temporal :class:`~temporalio.plugin.SimplePlugin` +that wires Tenuo's warrant-based authorization into a Temporal client and worker. + +All other Tenuo types (``TenuoPluginConfig``, ``EnvKeyResolver``, +``execute_workflow_authorized``, ``tenuo_execute_activity``, etc.) should be +imported directly from ``tenuo.temporal``:: + + from temporalio.contrib.tenuo import TenuoPlugin + from tenuo.temporal import TenuoPluginConfig, EnvKeyResolver + +See the `Tenuo Temporal guide `_ for full +documentation. +""" + +from temporalio.contrib.tenuo._plugin import ( + TENUO_PLUGIN_NAME, + TenuoPlugin, + ensure_tenuo_workflow_runner, +) + +__all__ = [ + "TENUO_PLUGIN_NAME", + "TenuoPlugin", + "ensure_tenuo_workflow_runner", +] diff --git a/temporalio/contrib/tenuo/_plugin.py b/temporalio/contrib/tenuo/_plugin.py new file mode 100644 index 000000000..27044fa4d --- /dev/null +++ b/temporalio/contrib/tenuo/_plugin.py @@ -0,0 +1,180 @@ +"""Temporal SimplePlugin integration for Tenuo warrant-based authorization. + +Provides :class:`TenuoPlugin` which registers client + worker interceptors and +configures the workflow sandbox passthrough for ``tenuo`` / ``tenuo_core`` +(PyO3 native extension). + +Example:: + + from temporalio.client import Client + from temporalio.contrib.tenuo import TenuoPlugin + from tenuo.temporal import TenuoPluginConfig, EnvKeyResolver + + plugin = TenuoPlugin(TenuoPluginConfig( + key_resolver=EnvKeyResolver(), + trusted_roots=[issuer_pubkey], + )) + client = await Client.connect("localhost:7233", plugins=[plugin]) +""" + +from __future__ import annotations + +import dataclasses +import logging +from collections.abc import Callable, Sequence +from typing import TYPE_CHECKING, Any + +from tenuo.temporal import ( + TenuoClientInterceptor, + TenuoPlugin as _TenuoWorkerInterceptor, + TenuoPluginConfig, + build_activity_registry, + set_worker_config, + tenuo_internal_mint_activity, +) + +from temporalio.plugin import SimplePlugin + +if TYPE_CHECKING: + from temporalio.worker import WorkflowRunner + +_logger = logging.getLogger("tenuo.temporal") + +TENUO_PLUGIN_NAME = "tenuo.TenuoTemporalPlugin" + + +def ensure_tenuo_workflow_runner( + existing: WorkflowRunner | None, +) -> WorkflowRunner: + """Return a workflow runner with ``tenuo`` and ``tenuo_core`` sandbox passthrough. + + Args: + existing: The current workflow runner, or ``None``. + + Returns: + A workflow runner with tenuo passthrough modules added. + """ + from temporalio.worker.workflow_sandbox import ( + SandboxedWorkflowRunner, + SandboxRestrictions, + ) + + passthrough = ("tenuo", "tenuo_core") + if existing is None: + return SandboxedWorkflowRunner( + restrictions=SandboxRestrictions.default.with_passthrough_modules( + *passthrough + ) + ) + if isinstance(existing, SandboxedWorkflowRunner): + return dataclasses.replace( + existing, + restrictions=existing.restrictions.with_passthrough_modules(*passthrough), + ) + return existing + + +class TenuoPlugin(SimplePlugin): + """Temporal plugin for Tenuo warrant-based authorization and PoP enforcement. + + Configures: + + - **Client:** :class:`~tenuo.temporal.TenuoClientInterceptor` for warrant + header injection and workflow-ID binding. + - **Worker:** Worker interceptors for outbound PoP signing and inbound + activity authorization. + - **Workflow runner:** Sandbox passthrough for ``tenuo`` and ``tenuo_core``. + + After construction, :attr:`client_interceptor` provides the interceptor + instance registered on the client. + + Example:: + + from temporalio.client import Client + from temporalio.worker import Worker + from temporalio.contrib.tenuo import TenuoPlugin + from tenuo.temporal import TenuoPluginConfig, EnvKeyResolver + + plugin = TenuoPlugin(TenuoPluginConfig( + key_resolver=EnvKeyResolver(), + trusted_roots=[issuer_pubkey], + )) + + client = await Client.connect("localhost:7233", plugins=[plugin]) + async with Worker( + client, + task_queue="my-queue", + workflows=[MyWorkflow], + activities=[my_activity], + ): + pass + + .. warning:: + + Register this plugin on ``Client.connect(plugins=[plugin])`` only. + Workers built from that client inherit the plugin automatically. + Passing the same instance to ``Worker(plugins=[...])`` double-registers + interceptors. + """ + + client_interceptor: TenuoClientInterceptor + """The client interceptor wired into this plugin.""" + + def __init__( + self, + config: TenuoPluginConfig, + *, + client_interceptor: TenuoClientInterceptor | None = None, + ) -> None: + """Create a Tenuo plugin with the given configuration. + + Args: + config: Worker-level configuration for authorization enforcement. + client_interceptor: Optional pre-existing client interceptor. If + ``None``, a new one is created. + """ + worker_interceptor = _TenuoWorkerInterceptor(config) + self.client_interceptor = client_interceptor or TenuoClientInterceptor() + self._tenuo_config = config + self._tenuo_worker_configured = False + + def _add_activities( + activities: Sequence[Callable[..., Any]] | None, + ) -> Sequence[Callable[..., Any]]: + """Append internal activities and auto-discover activity_fns.""" + if self._tenuo_worker_configured: + raise RuntimeError( + "Duplicate Tenuo plugin registration: the same TenuoPlugin " + "instance was used to configure_worker more than once. " + "Create separate instances for each worker." + ) + self._tenuo_worker_configured = True + + set_worker_config(config) + existing = list(activities or []) + + if not config.activity_fns and existing: + config.activity_fns = list(existing) + config._activity_registry = build_activity_registry(config.activity_fns) + _logger.info( + "Tenuo: auto-discovered %d activity function(s)", + len(config.activity_fns), + ) + + _preload_all = getattr(config.key_resolver, "preload_all", None) + if _preload_all is not None: + try: + _preload_all() + except Exception as exc: + _logger.warning("key preload failed: %s", exc) + + if tenuo_internal_mint_activity is not None: + existing.append(tenuo_internal_mint_activity) + return existing + + super().__init__( + TENUO_PLUGIN_NAME, + workflow_runner=ensure_tenuo_workflow_runner, + activities=_add_activities, + interceptors=[self.client_interceptor, worker_interceptor], + ) diff --git a/tests/contrib/tenuo/__init__.py b/tests/contrib/tenuo/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/contrib/tenuo/test_tenuo.py b/tests/contrib/tenuo/test_tenuo.py new file mode 100644 index 000000000..e99b9cbd1 --- /dev/null +++ b/tests/contrib/tenuo/test_tenuo.py @@ -0,0 +1,464 @@ +"""Tests for temporalio.contrib.tenuo.""" + +import inspect +from datetime import timedelta +from unittest.mock import MagicMock, patch + +import pytest + +import tenuo +from tenuo.temporal import ( + EnvKeyResolver, + TenuoPluginConfig, + execute_workflow_authorized, + start_workflow_authorized, + tenuo_execute_activity, +) +from tenuo.temporal._interceptors import _TenuoWorkflowOutboundInterceptor + +from temporalio import activity, workflow +from temporalio.client import Client +from temporalio.contrib.tenuo import TenuoPlugin +from temporalio.contrib.tenuo._plugin import ensure_tenuo_workflow_runner +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Worker +from temporalio.worker.workflow_sandbox import ( + SandboxedWorkflowRunner, + SandboxRestrictions, +) + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + + +@pytest.fixture +def signing_key(): + """Generate an ephemeral signing key for tests.""" + return tenuo.SigningKey.generate() + + +@pytest.fixture +def config(signing_key): + """Minimal TenuoPluginConfig with an EnvKeyResolver.""" + resolver = EnvKeyResolver() + resolver._key_cache["test-key"] = signing_key + return TenuoPluginConfig( + key_resolver=resolver, + trusted_roots=[signing_key.public_key], + ) + + +# --------------------------------------------------------------------------- +# Plugin construction +# --------------------------------------------------------------------------- + + +class TestPluginConstruction: + """TenuoPlugin initializes correctly as a SimplePlugin.""" + + def test_creates_client_interceptor(self, config): + """Plugin creates a client interceptor.""" + plugin = TenuoPlugin(config) + assert plugin.client_interceptor is not None + + def test_accepts_external_client_interceptor(self, config): + """Plugin accepts an externally provided client interceptor.""" + from tenuo.temporal import TenuoClientInterceptor + + ext = TenuoClientInterceptor() + plugin = TenuoPlugin(config, client_interceptor=ext) + assert plugin.client_interceptor is ext + + def test_duplicate_worker_flag(self, config): + """Plugin tracks whether it has been configured for a worker.""" + plugin = TenuoPlugin(config) + assert plugin._tenuo_worker_configured is False + + def test_duplicate_registration_raises(self, config): + """Using the same plugin for two configure_worker calls raises RuntimeError.""" + plugin = TenuoPlugin(config) + worker_config: dict = {"activities": []} + plugin.configure_worker(worker_config) + assert plugin._tenuo_worker_configured is True + + with pytest.raises(RuntimeError, match="Duplicate Tenuo plugin"): + plugin.configure_worker({"activities": []}) + + +# --------------------------------------------------------------------------- +# Sandbox passthrough +# --------------------------------------------------------------------------- + + +class TestSandboxPassthrough: + """_ensure_tenuo_workflow_runner adds tenuo/tenuo_core passthrough.""" + + def test_creates_sandboxed_runner_from_none(self): + """When no runner exists, creates one with passthrough.""" + runner = ensure_tenuo_workflow_runner(None) + assert isinstance(runner, SandboxedWorkflowRunner) + + def test_adds_passthrough_to_existing_sandboxed_runner(self): + """Adds passthrough modules to an existing SandboxedWorkflowRunner.""" + existing = SandboxedWorkflowRunner(restrictions=SandboxRestrictions.default) + runner = ensure_tenuo_workflow_runner(existing) + assert isinstance(runner, SandboxedWorkflowRunner) + + def test_returns_unsandboxed_runner_unchanged(self): + """Non-sandboxed runners are returned as-is.""" + mock_runner = MagicMock() + result = ensure_tenuo_workflow_runner(mock_runner) + assert result is mock_runner + + +# --------------------------------------------------------------------------- +# Replay safety — determinism properties +# --------------------------------------------------------------------------- + + +class TestReplaySafety: + """Workflow interceptor code is deterministic and safe for replay.""" + + def test_uses_workflow_now_not_time_time(self): + """Outbound interceptor uses workflow.now() for PoP timestamps.""" + source = inspect.getsource(_TenuoWorkflowOutboundInterceptor) + assert "workflow.now()" in source + assert "time.time()" not in source + + def test_uses_workflow_now_not_datetime_now(self): + """Outbound interceptor does not call datetime.now().""" + source = inspect.getsource(_TenuoWorkflowOutboundInterceptor) + assert "datetime.now()" not in source + + def test_no_non_deterministic_calls(self): + """No non-deterministic stdlib calls in the workflow interceptor.""" + source = inspect.getsource(_TenuoWorkflowOutboundInterceptor) + forbidden = ["os.urandom", "random.random", "random.randint", "uuid4()"] + violations = [p for p in forbidden if p in source] + assert not violations, f"Non-deterministic calls found: {violations}" + + def test_no_time_sleep(self): + """Interceptor must not call time.sleep() — blocks the event loop.""" + source = inspect.getsource(_TenuoWorkflowOutboundInterceptor) + assert "time.sleep" not in source, ( + "time.sleep() in workflow interceptor blocks the event loop " + "and breaks Temporal replay." + ) + + def test_no_threading_in_interceptor(self): + """Interceptor must not spawn threads — non-deterministic under replay.""" + source = inspect.getsource(_TenuoWorkflowOutboundInterceptor) + forbidden = ["threading.Thread(", "Thread("] + violations = [p for p in forbidden if p in source] + assert not violations, ( + f"Threading calls found in workflow interceptor: {violations}. " + "Spawning threads is non-deterministic under Temporal replay." + ) + + def test_pop_deterministic_with_fixed_timestamp(self, signing_key): + """Same inputs produce the same PoP signature.""" + warrant = ( + tenuo.Warrant.mint_builder() + .tools(["read_file"]) + .ttl(3600) + .mint(signing_key) + ) + ts = 1704110400 + sig1 = warrant.sign(signing_key, "read_file", {"path": "/tmp"}, ts) + sig2 = warrant.sign(signing_key, "read_file", {"path": "/tmp"}, ts) + assert sig1 == sig2 + + def test_pop_differs_with_different_timestamps(self, signing_key): + """Different timestamps produce different PoP signatures.""" + warrant = ( + tenuo.Warrant.mint_builder() + .tools(["read_file"]) + .ttl(3600) + .mint(signing_key) + ) + sig1 = warrant.sign(signing_key, "read_file", {"path": "/tmp"}, 1704110400) + sig2 = warrant.sign(signing_key, "read_file", {"path": "/tmp"}, 1704110460) + assert sig1 != sig2 + + +# --------------------------------------------------------------------------- +# EnvKeyResolver sandbox safety +# --------------------------------------------------------------------------- + + +class TestEnvKeyResolverSandbox: + """EnvKeyResolver uses cached keys inside the workflow sandbox.""" + + def test_resolve_sync_uses_cache(self, signing_key): + """resolve_sync returns cached key without os.environ access.""" + resolver = EnvKeyResolver() + resolver._key_cache["cached-key"] = signing_key + + with patch.dict("os.environ", {}, clear=True): + resolved = resolver.resolve_sync("cached-key") + + assert resolved is not None + + def test_preload_all_caches_env_keys(self, signing_key, monkeypatch): + """preload_all() reads TENUO_KEY_* vars into cache.""" + import base64 + + key_b64 = base64.b64encode(signing_key.secret_key_bytes()).decode() + monkeypatch.setenv("TENUO_KEY_mykey", key_b64) + + resolver = EnvKeyResolver() + resolver.preload_all() + + assert "mykey" in resolver._key_cache + + +# --------------------------------------------------------------------------- +# Re-exports +# --------------------------------------------------------------------------- + + +class TestReExports: + """Public API is accessible from temporalio.contrib.tenuo.""" + + def test_plugin_importable(self): + """TenuoPlugin is importable from contrib.""" + from temporalio.contrib.tenuo import TenuoPlugin as P + + assert P is not None + + def test_plugin_name_importable(self): + """TENUO_PLUGIN_NAME is importable from contrib.""" + from temporalio.contrib.tenuo import TENUO_PLUGIN_NAME + + assert TENUO_PLUGIN_NAME == "tenuo.TenuoTemporalPlugin" + + def test_ensure_runner_importable(self): + """ensure_tenuo_workflow_runner is importable from contrib.""" + from temporalio.contrib.tenuo import ensure_tenuo_workflow_runner + + assert ensure_tenuo_workflow_runner is not None + + def test_no_tenuo_temporal_reexports(self): + """Contrib does not re-export tenuo.temporal types.""" + import temporalio.contrib.tenuo as mod + + assert not hasattr(mod, "TenuoPluginConfig") + assert not hasattr(mod, "EnvKeyResolver") + assert not hasattr(mod, "execute_workflow_authorized") + assert not hasattr(mod, "tenuo_execute_activity") + + +# --------------------------------------------------------------------------- +# Live integration — full warrant → PoP → authorize → reject flow +# --------------------------------------------------------------------------- + + +@activity.defn +async def lookup_customer(customer_id: str) -> dict: + """Fetch customer record — authorized by warrant.""" + return {"id": customer_id, "name": "Alice", "plan": "pro"} + + +@activity.defn +async def search_knowledge_base(query: str, department: str) -> str: + """Search internal docs — authorized by warrant.""" + return f"docs for '{query}' in {department}" + + +@activity.defn +async def call_llm(prompt: str, model: str, max_tokens: int) -> str: + """Call an LLM — warrant pins the model.""" + return f"LLM response (model={model})" + + +@activity.defn +async def send_response(channel: str, ticket_id: str, message: str) -> str: + """Send a customer response — warrant restricts channels.""" + return f"sent via {channel}: {ticket_id}" + + +@activity.defn +async def drop_customer_table(confirm: str) -> str: + """Destructive activity NOT covered by the warrant.""" + return "table dropped" + + +@workflow.defn +class SupportTriageWorkflow: + """Agent workflow: look up customer, search docs, respond via LLM.""" + + @workflow.run + async def run(self, customer_id: str) -> str: + """Run the workflow.""" + customer = await tenuo_execute_activity( + lookup_customer, + args=[customer_id], + start_to_close_timeout=timedelta(seconds=10), + ) + return f"Helped {customer['name']}" + + +@workflow.defn +class UnauthorizedDropWorkflow: + """Workflow that attempts to call an activity NOT in the warrant.""" + + @workflow.run + async def run(self, confirm: str) -> str: + """Run the workflow.""" + return await tenuo_execute_activity( + drop_customer_table, + args=[confirm], + start_to_close_timeout=timedelta(seconds=10), + ) + + +ALL_ACTIVITIES = [ + lookup_customer, + search_knowledge_base, + call_llm, + send_response, + drop_customer_table, +] + + +def _make_plugin_and_warrant(signing_key): + """Create a plugin and warrant scoped to the support triage tools.""" + resolver = EnvKeyResolver() + resolver._key_cache["test-key"] = signing_key + + config = TenuoPluginConfig( + key_resolver=resolver, + trusted_roots=[signing_key.public_key], + ) + plugin = TenuoPlugin(config) + + warrant = ( + tenuo.Warrant.mint_builder() + .capability("lookup_customer") + .capability("search_knowledge_base") + .capability("call_llm") + .capability("send_response") + .holder(signing_key.public_key) + .ttl(3600) + .mint(signing_key) + ) + return plugin, warrant + + +@pytest.mark.timeout(120) +async def test_authorized_activity_succeeds(): + """Warrant covers the activity — support triage succeeds with PoP.""" + signing_key = tenuo.SigningKey.generate() + plugin, warrant = _make_plugin_and_warrant(signing_key) + + async with await WorkflowEnvironment.start_local() as env: + client = Client( + env.client.service_client, + namespace=env.client.namespace, + data_converter=env.client.data_converter, + plugins=[plugin], + ) + + async with Worker( + client, + task_queue="triage-test-queue", + workflows=[SupportTriageWorkflow], + activities=ALL_ACTIVITIES, + ): + result = await execute_workflow_authorized( + client=client, + workflow_run_fn=SupportTriageWorkflow.run, + args=["cust-1001"], + warrant=warrant, + key_id="test-key", + workflow_id="triage-wf-1", + task_queue="triage-test-queue", + ) + assert result == "Helped Alice" + + +@pytest.mark.timeout(120) +async def test_start_workflow_authorized(): + """start_workflow_authorized starts the workflow and returns a handle.""" + signing_key = tenuo.SigningKey.generate() + plugin, warrant = _make_plugin_and_warrant(signing_key) + + async with await WorkflowEnvironment.start_local() as env: + client = Client( + env.client.service_client, + namespace=env.client.namespace, + data_converter=env.client.data_converter, + plugins=[plugin], + ) + + async with Worker( + client, + task_queue="triage-start-queue", + workflows=[SupportTriageWorkflow], + activities=ALL_ACTIVITIES, + ): + handle = await start_workflow_authorized( + client=client, + workflow_run_fn=SupportTriageWorkflow.run, + args=["cust-1001"], + warrant=warrant, + key_id="test-key", + workflow_id="triage-start-wf-1", + task_queue="triage-start-queue", + ) + result = await handle.result() + assert result == "Helped Alice" + + +@pytest.mark.timeout(120) +async def test_unauthorized_activity_is_non_retryable(): + """Warrant does NOT cover drop_customer_table — non-retryable ApplicationError. + + Authorization failures must be non-retryable so Temporal doesn't retry the + workflow task forever. This test verifies the full chain: the outbound + interceptor detects that ``drop_customer_table`` is not in the warrant, + wraps the error as ``ApplicationError(non_retryable=True)``, and the + workflow surfaces it as a ``WorkflowFailureError`` whose cause is an + ``ApplicationError`` with the non-retryable flag set. + """ + signing_key = tenuo.SigningKey.generate() + plugin, warrant = _make_plugin_and_warrant(signing_key) + + async with await WorkflowEnvironment.start_local() as env: + client = Client( + env.client.service_client, + namespace=env.client.namespace, + data_converter=env.client.data_converter, + plugins=[plugin], + ) + + async with Worker( + client, + task_queue="triage-deny-queue", + workflows=[UnauthorizedDropWorkflow], + activities=ALL_ACTIVITIES, + ): + from temporalio.client import WorkflowFailureError + from temporalio.exceptions import ApplicationError + + with pytest.raises(WorkflowFailureError) as exc_info: + await execute_workflow_authorized( + client=client, + workflow_run_fn=UnauthorizedDropWorkflow.run, + args=["yes"], + warrant=warrant, + key_id="test-key", + workflow_id="triage-deny-wf-1", + task_queue="triage-deny-queue", + ) + + cause = exc_info.value.cause + assert isinstance(cause, ApplicationError), ( + f"Expected ApplicationError, got {type(cause).__name__}" + ) + assert cause.non_retryable, ( + "Authorization failures must be non-retryable to prevent " + "Temporal from retrying the workflow task indefinitely" + ) diff --git a/tests/contrib/tenuo/test_tenuo_replay.py b/tests/contrib/tenuo/test_tenuo_replay.py new file mode 100644 index 000000000..f9b6882af --- /dev/null +++ b/tests/contrib/tenuo/test_tenuo_replay.py @@ -0,0 +1,243 @@ +"""Replay safety tests for temporalio.contrib.tenuo. + +Proves that replaying the recorded history of a completed warranted workflow +does NOT produce non-determinism errors or repeated side effects. + +Each test follows the same pattern the Temporal review team expects: + +1. Run an authorized workflow to completion (warrant + PoP signing). +2. Fetch the workflow history via ``handle.fetch_history()``. +3. Create a **fresh** ``TenuoPlugin`` instance (simulating a restarted worker). +4. Replay the history with ``Replayer(workflows=[...], plugins=[replay_plugin])``. +5. Assert ``replay_failure is None`` — no non-determinism errors. + +This validates that: +- PoP signatures generated via ``tenuo_execute_activity`` (which uses + ``workflow.now()`` for timestamps) are deterministic across execution and + replay. +- Multiple sequential PoP-signed activity calls replay in the same order. +- No side effects (network calls, randomness, wall-clock reads) are repeated. +""" + +from datetime import timedelta + +import pytest + +import tenuo +from tenuo.temporal import ( + KeyResolver, + TenuoPluginConfig, + execute_workflow_authorized, + tenuo_execute_activity, +) + +from temporalio import activity, workflow +from temporalio.client import Client +from temporalio.contrib.tenuo import TenuoPlugin +from temporalio.testing import WorkflowEnvironment +from temporalio.worker import Replayer, Worker + + +KEY_ID = "replay-key" + + +class DictKeyResolver(KeyResolver): + """Pre-loaded key resolver for tests (no os.environ access).""" + + def __init__(self, keys: dict) -> None: + """Initialize with a dict of key_id -> SigningKey.""" + self.keys = keys + + async def resolve(self, key_id: str) -> tenuo.SigningKey: + """Resolve a key by ID.""" + return self.resolve_sync(key_id) + + def resolve_sync(self, key_id: str) -> tenuo.SigningKey: + """Resolve a key by ID synchronously.""" + if key_id not in self.keys: + raise ValueError(f"Key {key_id!r} not found") + return self.keys[key_id] + + +# --------------------------------------------------------------------------- +# Activities — realistic agentic tools +# --------------------------------------------------------------------------- + + +@activity.defn +async def lookup_customer(customer_id: str) -> dict: + """Fetch customer record from the CRM.""" + return {"id": customer_id, "name": "Alice", "plan": "pro"} + + +@activity.defn +async def call_llm(prompt: str, model: str, max_tokens: int) -> str: + """Generate a response from an LLM.""" + return f"LLM response (model={model})" + + +# --------------------------------------------------------------------------- +# Workflows — exercise tenuo_execute_activity (PoP signing on hot path) +# --------------------------------------------------------------------------- + + +@workflow.defn +class SingleToolWorkflow: + """Single authorized activity call with PoP signing.""" + + @workflow.run + async def run(self, customer_id: str) -> str: + """Look up a customer — one PoP signature generated.""" + result = await tenuo_execute_activity( + lookup_customer, + args=[customer_id], + start_to_close_timeout=timedelta(seconds=10), + ) + return result["name"] + + +@workflow.defn +class MultiToolWorkflow: + """Two sequential authorized activities — PoP order must be deterministic.""" + + @workflow.run + async def run(self, customer_id: str) -> str: + """Look up customer then call LLM — two PoP signatures in order.""" + customer = await tenuo_execute_activity( + lookup_customer, + args=[customer_id], + start_to_close_timeout=timedelta(seconds=10), + ) + return await tenuo_execute_activity( + call_llm, + args=[f"Help {customer['name']}", "gpt-4o-mini", 256], + start_to_close_timeout=timedelta(seconds=30), + ) + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _make_config(signing_key: tenuo.SigningKey) -> TenuoPluginConfig: + return TenuoPluginConfig( + key_resolver=DictKeyResolver({KEY_ID: signing_key}), + trusted_roots=[signing_key.public_key], + ) + + +async def _record_and_replay( + signing_key: tenuo.SigningKey, + workflow_cls: type, + workflow_input: str, + workflow_id: str, + task_queue: str, + activities: list, + tools: list[str], +) -> None: + """Run a warranted workflow, capture history, replay with a fresh plugin. + + Steps: + 1. Start a local Temporal server. + 2. Run the workflow to completion via execute_workflow_authorized(). + 3. Fetch the completed workflow's history. + 4. Create a brand-new TenuoPlugin (fresh interceptor state). + 5. Replay the history through Replayer and assert no failure. + """ + # -- Step 1-3: record ------------------------------------------------------- + plugin = TenuoPlugin(_make_config(signing_key)) + + warrant = ( + tenuo.Warrant.mint_builder() + .tools(tools) + .holder(signing_key.public_key) + .ttl(3600) + .mint(signing_key) + ) + + async with await WorkflowEnvironment.start_local() as env: + client = Client( + env.client.service_client, + namespace=env.client.namespace, + data_converter=env.client.data_converter, + plugins=[plugin], + ) + + async with Worker( + client, + task_queue=task_queue, + workflows=[workflow_cls], + activities=activities, + ): + result = await execute_workflow_authorized( + client=client, + workflow_run_fn=workflow_cls.run, + args=[workflow_input], + warrant=warrant, + key_id=KEY_ID, + workflow_id=workflow_id, + task_queue=task_queue, + ) + assert isinstance(result, str), f"Expected str result, got {type(result)}" + + handle = client.get_workflow_handle(workflow_id) + history = await handle.fetch_history() + + # -- Step 4-5: replay with fresh plugin ------------------------------------ + replay_plugin = TenuoPlugin(_make_config(signing_key)) + + replay_result = await Replayer( + workflows=[workflow_cls], + plugins=[replay_plugin], + ).replay_workflow(history, raise_on_replay_failure=False) + + assert replay_result.replay_failure is None, ( + f"Replay produced a non-determinism error, meaning the TenuoPlugin " + f"interceptor generated different commands on replay than during the " + f"original execution:\n{replay_result.replay_failure}" + ) + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +@pytest.mark.timeout(120) +async def test_single_tool_replay() -> None: + """Record a customer lookup workflow, replay it — no non-determinism. + + Exercises the full PoP signing path: tenuo_execute_activity generates a PoP + header using workflow.now() as the timestamp. On replay, Temporal feeds the + same recorded timestamp back, so the interceptor must produce identical + commands. + """ + await _record_and_replay( + signing_key=tenuo.SigningKey.generate(), + workflow_cls=SingleToolWorkflow, + workflow_input="cust-1001", + workflow_id="replay-single-1", + task_queue="replay-single-queue", + activities=[lookup_customer, call_llm], + tools=["lookup_customer"], + ) + + +@pytest.mark.timeout(120) +async def test_multi_tool_replay_ordering() -> None: + """Record a lookup + LLM workflow, replay it — PoP order is deterministic. + + Two sequential tenuo_execute_activity calls each generate a separate PoP + signature with a (potentially different) workflow.now() timestamp. Replay + must produce commands in the same order with the same PoP payloads. + """ + await _record_and_replay( + signing_key=tenuo.SigningKey.generate(), + workflow_cls=MultiToolWorkflow, + workflow_input="cust-2002", + workflow_id="replay-multi-1", + task_queue="replay-multi-queue", + activities=[lookup_customer, call_llm], + tools=["lookup_customer", "call_llm"], + )