diff --git a/packages/agentcore-strands/agent-container/container-sources/_boot_assert.py b/packages/agentcore-strands/agent-container/container-sources/_boot_assert.py index fcda01216..f7201c7e4 100644 --- a/packages/agentcore-strands/agent-container/container-sources/_boot_assert.py +++ b/packages/agentcore-strands/agent-container/container-sources/_boot_assert.py @@ -43,6 +43,7 @@ "server", "skill_dispatcher", "skill_inputs", + "skill_meta_tool", "skill_runner", "skill_session_pool", "update_agent_name_tool", diff --git a/packages/agentcore-strands/agent-container/container-sources/skill_meta_tool.py b/packages/agentcore-strands/agent-container/container-sources/skill_meta_tool.py new file mode 100644 index 000000000..0a1574743 --- /dev/null +++ b/packages/agentcore-strands/agent-container/container-sources/skill_meta_tool.py @@ -0,0 +1,314 @@ +"""The single `Skill(name, args)` meta-tool. + +This is the invocation surface every skill-with-scripts goes through once +U6 flips the cutover. The model calls ``Skill("sales-prep", {...})``; this +module validates the name against the per-session allowlist, then either: + + * hands off to ``skill_dispatcher.dispatch_skill_script`` when the bundle + ships a ``scripts/`` directory, or + * returns the SKILL.md body as a string when the bundle is pure-context + (no scripts — the model consumes the SKILL.md inline). + +Why a meta-tool instead of per-skill named tools: + 4 enterprises × 100+ agents × ~5 templates ⇒ hundreds of skills per + session if each were a registered tool. Strands' tool registry would + swell beyond the model's token budget for tool-use schemas. The + meta-tool has a fixed schema (``name`` + ``args``); the AgentSkills + plugin does Level-1 progressive disclosure into the system prompt so + the model knows what's available without paying the per-schema cost. + +Why not AgentSkills' own ``skills`` invocation tool: + AgentSkills from the Strands SDK registers a built-in tool named + ``skills`` that reads ``SKILL.md`` on demand. It overlaps with our + meta-tool in a confusing way — two mechanisms to invoke a skill, with + different semantics for scripts. V1 keeps AgentSkills' disclosure + side and suppresses its invocation side; our ``Skill`` meta-tool is + the sole invocation path (plan #007 §Key Technical Decisions). + +Session allowlist invariant (plan R6/R7): + ``tenant_skills ∩ template_skills ∩ ¬template_blocks ∩ ¬tenant_kill_switches`` + A template cannot widen past what the tenant enabled. ``allowed-tools`` + frontmatter on the skill intersects with the session's tool allowlist + at registration time — narrow-only, never widens. +""" + +from __future__ import annotations + +import logging +from collections.abc import Awaitable, Callable +from dataclasses import dataclass, field +from typing import Any, Protocol + +from skill_dispatcher import ( + SkillDispatchError, + SkillNotFound, + TurnCounters, + dispatch_skill_script, +) +from skill_session_pool import SkillSessionPool + +logger = logging.getLogger(__name__) + + +class SkillUnauthorized(SkillDispatchError): + """Slug is in the catalog but not in this session's allowlist. + + Distinct from ``SkillNotFound`` — the model should not learn that a + catalog slug exists if the current session can't invoke it. The + error message stays generic at the tool surface; the full context + goes to the audit log. + """ + + +@dataclass(frozen=True) +class AllowlistInput: + """Raw inputs the session allowlist intersects at Agent(tools=...) time.""" + + tenant_skills: frozenset[str] + template_skills: frozenset[str] + template_blocked_tools: frozenset[str] + tenant_disabled_builtin_tools: frozenset[str] + + +@dataclass(frozen=True) +class SessionAllowlist: + """Resolved set of skill slugs the current session may invoke. + + The intersection is pre-computed once at Agent construction — runtime + lookups are O(1). ``warnings`` captures slugs the template *named* that + got filtered out, so operators can spot misconfiguration without the + dispatcher raising on every call. + """ + + slugs: frozenset[str] + warnings: tuple[str, ...] = () + + def contains(self, slug: str) -> bool: + return slug in self.slugs + + @classmethod + def from_inputs(cls, inputs: AllowlistInput) -> "SessionAllowlist": + # Start with template slugs — the template author decides what the + # agent *can* see. Narrow by tenant-library (tenant hasn't installed + # it? not available), template-blocks (explicit opt-out), and + # tenant-kill-switches (tenant-wide disable; strongest precedence). + resolved: set[str] = set(inputs.template_skills) & set(inputs.tenant_skills) + blocked = inputs.template_blocked_tools | inputs.tenant_disabled_builtin_tools + resolved -= blocked + + warnings: list[str] = [] + missing_from_tenant = inputs.template_skills - inputs.tenant_skills + if missing_from_tenant: + warnings.append( + f"template names {len(missing_from_tenant)} skill(s) the tenant has " + f"not installed: {sorted(missing_from_tenant)}" + ) + unblocked_by_tenant = ( + inputs.template_skills & inputs.tenant_disabled_builtin_tools + ) + if unblocked_by_tenant: + warnings.append( + f"tenant kill-switch filters {len(unblocked_by_tenant)} template-named " + f"skill(s): {sorted(unblocked_by_tenant)}" + ) + + return cls(slugs=frozenset(resolved), warnings=tuple(warnings)) + + +@dataclass +class SkillMetaResponse: + """Shape the meta-tool returns to the Strands runtime.""" + + kind: str # 'script-result' | 'skill-md-body' + slug: str + result: Any = None + body: str | None = None + duration_ms: int | None = None + + def to_dict(self) -> dict[str, Any]: + out: dict[str, Any] = {"kind": self.kind, "slug": self.slug} + if self.result is not None: + out["result"] = self.result + if self.body is not None: + out["body"] = self.body + if self.duration_ms is not None: + out["duration_ms"] = self.duration_ms + return out + + +class BundleLoader(Protocol): + """Extends skill_dispatcher.SkillBundleLoader with SKILL.md body access.""" + + def load_bundle(self, slug: str) -> Any: ... + + def has_scripts(self, slug: str) -> bool: + """True if the bundle ships ``scripts//entrypoint.py``.""" + ... + + def skill_md_body(self, slug: str) -> str: + """Return the SKILL.md body for pure-context skills.""" + ... + + +@dataclass +class SkillMetaContext: + """Per-turn wiring passed to every Skill() call. + + Constructed once at Agent(tools=...) construction and captured by the + tool closure. Split into its own type so test harnesses can swap + individual parts without rebuilding the whole tool. + """ + + tenant_id: str + user_id: str + environment: str + allowlist: SessionAllowlist + pool: SkillSessionPool + catalog: BundleLoader + runner: Any # duck-typed as skill_dispatcher.SandboxRunner + counters: TurnCounters = field(default_factory=TurnCounters) + on_audit: Callable[[dict[str, Any]], Awaitable[None]] | None = None + + +# --------------------------------------------------------------------------- +# Tool factory +# --------------------------------------------------------------------------- + + +async def invoke_skill( + name: str, + args: dict[str, Any] | None, + *, + ctx: SkillMetaContext, +) -> dict[str, Any]: + """Pure entry point the Strands @tool wrapper calls. + + Kept decoupled from the Strands SDK so unit tests exercise the full + decision tree without importing strands. ``build_skill_meta_tool`` + below wraps this for the runtime. + """ + call_args = dict(args or {}) + + # SkillNotFound vs SkillUnauthorized: + # * not in the catalog at all → NotFound (returned to the model so it + # learns the slug doesn't exist anywhere). + # * in the catalog but the current session can't see it → Unauthorized + # (returned with a generic message so the model cannot enumerate + # tenant-scoped catalog membership by probing slugs). + try: + ctx.catalog.load_bundle(name) + except KeyError as e: + raise SkillNotFound(f"skill '{name}' is not registered in the catalog") from e + + if not ctx.allowlist.contains(name): + logger.info( + "skill-meta: %s blocked — not in session allowlist " + "(tenant=%s user=%s env=%s)", + name, + ctx.tenant_id, + ctx.user_id, + ctx.environment, + ) + raise SkillUnauthorized( + f"skill '{name}' is not available in this session" + ) + + # Pure-SKILL.md skills never touch the sandbox. Returning the body lets + # the model consume the instructions inline; no quota, no pool slot. + if not ctx.catalog.has_scripts(name): + body = ctx.catalog.skill_md_body(name) + logger.info("skill-meta: %s served from SKILL.md body (%d chars)", name, len(body)) + return SkillMetaResponse(kind="skill-md-body", slug=name, body=body).to_dict() + + # Script-bundle skills flow through U4's unified dispatcher. + dispatch_result = await dispatch_skill_script( + tenant_id=ctx.tenant_id, + user_id=ctx.user_id, + skill_slug=name, + args=call_args, + environment=ctx.environment, + pool=ctx.pool, + catalog=ctx.catalog, + runner=ctx.runner, + counters=ctx.counters, + on_audit=ctx.on_audit, + ) + return SkillMetaResponse( + kind="script-result", + slug=name, + result=dispatch_result.result, + duration_ms=dispatch_result.duration_ms, + ).to_dict() + + +def build_skill_meta_tool(ctx: SkillMetaContext) -> Callable[..., Awaitable[dict[str, Any]]]: + """Return a coroutine the Strands @tool decorator can wrap. + + Caller is expected to wrap the returned function with ``@strands.tool`` + (or the equivalent ``strands_tool(...)`` decorator). The factory itself + avoids importing strands so tests can exercise the tool without + triggering the SDK's side effects. + """ + + async def Skill(name: str, args: dict[str, Any] | None = None) -> dict[str, Any]: + """Invoke a registered skill by slug. + + Args: + name: Skill slug, e.g. "sales-prep". + args: Keyword arguments forwarded to the skill's ``run()`` function. + + Returns: + Structured result with kind='script-result' or 'skill-md-body'. + """ + return await invoke_skill(name, args, ctx=ctx) + + return Skill + + +# --------------------------------------------------------------------------- +# allowed-tools intersection (frontmatter advisory → session allowlist) +# --------------------------------------------------------------------------- + + +def intersect_allowed_tools( + declared: list[str] | None, + session_tools: frozenset[str], +) -> tuple[frozenset[str], list[str]]: + """Narrow a skill's declared ``allowed-tools`` against the session's + effective tool allowlist. + + Anthropic's Agent Skills spec treats ``allowed-tools`` as the set of + tools the skill is allowed to use *within its own execution*. The + Claude Code CLI honours it by clamping the skill's tool access; we + honour it as an informational hint that narrows toward the session's + harness-constructed allowlist (plan #007 §Key Technical Decisions). + + Returns (effective_tools, warnings). Warnings list any tool the skill + declared but the session doesn't grant — logged at registration time + so operators can spot tenants that disabled a dependency. + """ + if not declared: + return session_tools, [] + declared_set = frozenset(declared) + missing = declared_set - session_tools + effective = declared_set & session_tools + warnings: list[str] = [] + if missing: + warnings.append( + "declared tools absent from session allowlist — narrowing. " + f"missing={sorted(missing)}" + ) + return effective, warnings + + +__all__ = [ + "AllowlistInput", + "BundleLoader", + "SessionAllowlist", + "SkillMetaContext", + "SkillMetaResponse", + "SkillUnauthorized", + "build_skill_meta_tool", + "intersect_allowed_tools", + "invoke_skill", +] diff --git a/packages/agentcore-strands/agent-container/test_skill_meta_tool.py b/packages/agentcore-strands/agent-container/test_skill_meta_tool.py new file mode 100644 index 000000000..5aa5cb622 --- /dev/null +++ b/packages/agentcore-strands/agent-container/test_skill_meta_tool.py @@ -0,0 +1,336 @@ +"""Tests for the Skill meta-tool + session allowlist intersection. + +Run with: + uv run --no-project --with pytest --with pytest-asyncio \ + pytest packages/agentcore-strands/agent-container/test_skill_meta_tool.py +""" + +from __future__ import annotations + +import asyncio +from dataclasses import dataclass, field +from typing import Any + +import pytest +import pytest_asyncio # noqa: F401 +from skill_dispatcher import SkillNotFound, TurnCounters +from skill_meta_tool import ( + AllowlistInput, + SessionAllowlist, + SkillMetaContext, + SkillUnauthorized, + build_skill_meta_tool, + intersect_allowed_tools, + invoke_skill, +) +from skill_session_pool import SkillSessionPool + +# --------------------------------------------------------------------------- +# Fake catalog + runner — extend the U4 test shape with SKILL.md body access. +# --------------------------------------------------------------------------- + + +@dataclass +class FakeBundle: + slug: str + files: list[dict[str, str]] = field(default_factory=list) + timeout_s: int = 30 + scripts: bool = True + body: str = "" + + def files_for_interpreter(self) -> list[dict[str, str]]: + return list(self.files) + + +@dataclass +class FakeCatalog: + bundles: dict[str, FakeBundle] = field(default_factory=dict) + + def load_bundle(self, slug: str) -> FakeBundle: + if slug not in self.bundles: + raise KeyError(slug) + return self.bundles[slug] + + def has_scripts(self, slug: str) -> bool: + return self.bundles[slug].scripts + + def skill_md_body(self, slug: str) -> str: + return self.bundles[slug].body + + +class FakeRunner: + def __init__(self) -> None: + self.execute_code_calls: list[str] = [] + self.write_files_calls: list[list[dict[str, str]]] = [] + self.responses: list[dict[str, Any]] = [] + + def queue(self, **resp) -> None: + self.responses.append( + { + "stdout": resp.get("stdout", "{}\n"), + "stderr": resp.get("stderr", ""), + "exit_code": resp.get("exit_code", 0), + "timed_out": resp.get("timed_out", False), + } + ) + + async def write_files(self, handle, files): + self.write_files_calls.append(files) + + async def execute_code(self, handle, code, *, timeout_s): + self.execute_code_calls.append(code) + return self.responses.pop(0) if self.responses else { + "stdout": "{}\n", + "stderr": "", + "exit_code": 0, + "timed_out": False, + } + + +async def _fake_start(_ipi: str, _timeout_s: int) -> str: + await asyncio.sleep(0) + return "sess-meta" + + +async def _fake_stop(_ipi: str, _sess: str) -> None: + await asyncio.sleep(0) + + +def _pool() -> SkillSessionPool: + return SkillSessionPool( + interpreter_id="ipi-meta", + start_session=_fake_start, + stop_session=_fake_stop, + ) + + +def _ctx( + *, + allowlist_slugs: set[str], + catalog: FakeCatalog, + runner: FakeRunner, + counters: TurnCounters | None = None, +) -> SkillMetaContext: + return SkillMetaContext( + tenant_id="tenant-a", + user_id="user-a", + environment="dev", + allowlist=SessionAllowlist(slugs=frozenset(allowlist_slugs)), + pool=_pool(), + catalog=catalog, + runner=runner, + counters=counters or TurnCounters(), + ) + + +# --------------------------------------------------------------------------- +# Happy path — covers plan AE4 +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_happy_path_script_skill_routes_to_dispatcher(): + catalog = FakeCatalog( + bundles={"sales-prep": FakeBundle(slug="sales-prep", scripts=True)} + ) + runner = FakeRunner() + runner.queue(stdout='{"brief": "ready"}\n') + ctx = _ctx(allowlist_slugs={"sales-prep"}, catalog=catalog, runner=runner) + + result = await invoke_skill("sales-prep", {"account": "Acme"}, ctx=ctx) + + assert result["kind"] == "script-result" + assert result["slug"] == "sales-prep" + assert result["result"] == {"brief": "ready"} + # Confirm the dispatcher's exec template was actually sent. + assert runner.execute_code_calls, "dispatcher must hit the runner" + assert "scripts.sales-prep.entrypoint" in runner.execute_code_calls[0] + + +@pytest.mark.asyncio +async def test_nested_skill_invocations_share_the_same_turn_counters(): + catalog = FakeCatalog( + bundles={ + "sales-prep": FakeBundle(slug="sales-prep"), + "gather-crm-context": FakeBundle(slug="gather-crm-context"), + } + ) + runner = FakeRunner() + runner.queue() + runner.queue() + counters = TurnCounters() + ctx = _ctx( + allowlist_slugs={"sales-prep", "gather-crm-context"}, + catalog=catalog, + runner=runner, + counters=counters, + ) + + await invoke_skill("sales-prep", {}, ctx=ctx) + await invoke_skill("gather-crm-context", {}, ctx=ctx) + + # Both calls ran; both counted against the same per-turn budget. + assert counters.total == 2 + assert counters.history == ["sales-prep", "gather-crm-context"] + + +# --------------------------------------------------------------------------- +# Pure-SKILL.md path +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_pure_skill_md_returns_body_without_sandbox(): + catalog = FakeCatalog( + bundles={ + "synthesize": FakeBundle( + slug="synthesize", + scripts=False, + body="# Synthesize\n\nProse the model reads inline.", + ) + } + ) + runner = FakeRunner() + ctx = _ctx(allowlist_slugs={"synthesize"}, catalog=catalog, runner=runner) + + result = await invoke_skill("synthesize", None, ctx=ctx) + + assert result["kind"] == "skill-md-body" + assert "Prose the model reads inline." in result["body"] + # No sandbox roundtrip when the bundle has no scripts. + assert runner.execute_code_calls == [] + assert runner.write_files_calls == [] + + +# --------------------------------------------------------------------------- +# Authorization errors +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_unknown_slug_raises_skill_not_found(): + catalog = FakeCatalog(bundles={}) + runner = FakeRunner() + ctx = _ctx(allowlist_slugs=set(), catalog=catalog, runner=runner) + + with pytest.raises(SkillNotFound): + await invoke_skill("missing", {}, ctx=ctx) + + +@pytest.mark.asyncio +async def test_in_catalog_but_not_in_session_raises_skill_unauthorized(): + """Distinct from SkillNotFound — the model must not learn a slug exists + in the catalog if the current session can't invoke it.""" + catalog = FakeCatalog(bundles={"admin-op": FakeBundle(slug="admin-op")}) + runner = FakeRunner() + ctx = _ctx(allowlist_slugs={"some-other-skill"}, catalog=catalog, runner=runner) + + with pytest.raises(SkillUnauthorized): + await invoke_skill("admin-op", {}, ctx=ctx) + # Skipped dispatcher entirely — no sandbox contact. + assert runner.execute_code_calls == [] + + +# --------------------------------------------------------------------------- +# Session allowlist intersection (plan R6 / R7) +# --------------------------------------------------------------------------- + + +def test_allowlist_intersects_tenant_and_template_then_subtracts_blocks(): + inputs = AllowlistInput( + tenant_skills=frozenset({"a", "b", "c", "d"}), + template_skills=frozenset({"a", "b", "c"}), + template_blocked_tools=frozenset({"b"}), + tenant_disabled_builtin_tools=frozenset({"c"}), + ) + allowlist = SessionAllowlist.from_inputs(inputs) + + # 'a' is tenant ∩ template; 'b' removed by template block; 'c' removed + # by tenant kill-switch; 'd' not in template so not in session. + assert allowlist.slugs == frozenset({"a"}) + + +def test_allowlist_warns_when_template_names_skills_tenant_has_not_installed(): + inputs = AllowlistInput( + tenant_skills=frozenset({"a"}), + template_skills=frozenset({"a", "b"}), + template_blocked_tools=frozenset(), + tenant_disabled_builtin_tools=frozenset(), + ) + allowlist = SessionAllowlist.from_inputs(inputs) + + assert allowlist.slugs == frozenset({"a"}) + # Warning captures the tenant-missing slug so operators can spot + # misconfigured templates without the dispatcher raising per-call. + assert any("b" in w for w in allowlist.warnings) + + +def test_allowlist_template_cannot_unblock_a_tenant_kill_switch(): + inputs = AllowlistInput( + tenant_skills=frozenset({"critical-skill"}), + template_skills=frozenset({"critical-skill"}), + template_blocked_tools=frozenset(), + tenant_disabled_builtin_tools=frozenset({"critical-skill"}), + ) + allowlist = SessionAllowlist.from_inputs(inputs) + + # Tenant disable trumps template enablement — the session sees no + # skill, period. + assert allowlist.slugs == frozenset() + assert any("critical-skill" in w for w in allowlist.warnings) + + +# --------------------------------------------------------------------------- +# allowed-tools intersection (narrow-only) +# --------------------------------------------------------------------------- + + +def test_allowed_tools_narrows_to_session_intersection(): + effective, warnings = intersect_allowed_tools( + declared=["Read", "Grep", "Bash"], + session_tools=frozenset({"Read", "Grep"}), + ) + assert effective == frozenset({"Read", "Grep"}) + # Bash was declared but not granted — surfaced as a warning. + assert any("Bash" in w for w in warnings) + + +def test_allowed_tools_never_widens_past_session(): + """Even if a skill declares a tool outside the session, it's filtered out.""" + effective, _ = intersect_allowed_tools( + declared=["Shell"], + session_tools=frozenset({"Read"}), + ) + assert effective == frozenset() + + +def test_allowed_tools_returns_full_session_when_declared_is_absent(): + """Per spec, absent allowed-tools means the skill doesn't opt into + narrowing — the session's own allowlist applies unchanged.""" + effective, warnings = intersect_allowed_tools( + declared=None, + session_tools=frozenset({"Read", "Grep"}), + ) + assert effective == frozenset({"Read", "Grep"}) + assert warnings == [] + + +# --------------------------------------------------------------------------- +# build_skill_meta_tool wiring +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_build_skill_meta_tool_returns_callable_that_invokes_correctly(): + """The factory closure must capture ctx and route through invoke_skill.""" + catalog = FakeCatalog(bundles={"demo": FakeBundle(slug="demo")}) + runner = FakeRunner() + runner.queue(stdout='{"ok": 1}\n') + ctx = _ctx(allowlist_slugs={"demo"}, catalog=catalog, runner=runner) + + Skill = build_skill_meta_tool(ctx) + result = await Skill("demo", {"x": 1}) + + assert result["kind"] == "script-result" + assert result["slug"] == "demo" + assert result["result"] == {"ok": 1}