User Request → Agent Core → Planning → Tools Core → Memory +Guardrails → Final Response.

In [None]:
from dataclasses import dataclass, field
from typing import Dict, Any, List, Callable, Optional


# -------------------------
# 1) MEMORY (keeps context)
# -------------------------
@dataclass
class Memory:
    chat_history: List[Dict[str, str]] = field(default_factory=list)
    user_preferences: Dict[str, Any] = field(default_factory=dict)

    def add_turn(self, user_msg: str, agent_msg: str) -> None:
        self.chat_history.append({"user": user_msg, "agent": agent_msg})

    def get_pref(self, key: str, default=None):
        return self.user_preferences.get(key, default)

    def set_pref(self, key: str, value: Any) -> None:
        self.user_preferences[key] = value


# -------------------------
# 2) TOOLS CORE (does work)
# -------------------------
class ToolsCore:
    def __init__(self):
        self.tools: Dict[str, Callable[..., str]] = {
            "fetch_order": self.fetch_order,
            "validate_order": self.validate_order,
            "summarize": self.summarize,
        }

    def call(self, tool_name: str, **kwargs) -> str:
        if tool_name not in self.tools:
            return f"[ToolError] Unknown tool: {tool_name}"
        return self.tools[tool_name](**kwargs)

    # --- Example tools (pretend these are APIs/Lambda/services) ---
    def fetch_order(self, order_id: str) -> str:
        # In real life: call DB / service / API
        # NOTE: returning "internal-ish" text on purpose so guardrails can sanitize if needed.
        return f"INTERNAL_ORDER_RECORD({order_id}): items=3, total=$120, status='CREATED'"

    def validate_order(self, order_text: str) -> str:
        is_ok = "status='CREATED'" in order_text
        return "VALIDATION: PASS ✅" if is_ok else "VALIDATION: FAIL ❌"

    def summarize(self, text: str) -> str:
        return f"SUMMARY: {text[:90]}..."


# -------------------------
# 3) PLANNING MODULE
#    breaks tasks into steps
# -------------------------
class PlanningModule:
    def build_plan(self, user_request: str) -> List[Dict[str, Any]]:
        """
        Returns a list of steps:
        [{"tool": "...", "args": {...}}, ...]
        """
        if "order" in user_request.lower():
            order_id = self._extract_order_id(user_request)
            # If no order id, plan still exists, but guardrails will force clarification.
            return [
                {"tool": "fetch_order", "args": {"order_id": order_id or "<<missing_order_id>>"}},
                {"tool": "validate_order", "args": {"order_text": "<<prev>>"}},
                {"tool": "summarize", "args": {"text": "<<prev>>"}},
            ]

        return [{"tool": "summarize", "args": {"text": user_request}}]

    def _extract_order_id(self, text: str) -> Optional[str]:
        parts = text.lower().split()
        if "order" in parts:
            idx = parts.index("order")
            if idx + 1 < len(parts) and parts[idx + 1].isdigit():
                return parts[idx + 1]
        return None


# -------------------------
# 4) GUARDRAILS
#    safety + tool policy + quality checks
# -------------------------
class Guardrails:
    # Very small "policy" examples for clarity
    BLOCKLIST_KEYWORDS = {"password", "secret", "api key", "token", "credentials"}
    TOOL_ALLOWLIST_BY_INTENT = {
        "ORDER_WORKFLOW": {"fetch_order", "validate_order", "summarize"},
        "GENERAL": {"summarize"},
    }

    def precheck_user_request(self, user_request: str) -> Optional[str]:
        """
        Return a user-facing refusal/redirect message if unsafe.
        Return None if allowed.
        """
        lowered = user_request.lower()
        if any(k in lowered for k in self.BLOCKLIST_KEYWORDS):
            return (
                "I can’t help with requests for passwords/tokens/credentials. "
                "If you’re troubleshooting access, share the error message (without secrets) "
                "and I’ll help you fix it safely."
            )
        return None

    def validate_plan(self, intent: str, plan: List[Dict[str, Any]]) -> Optional[str]:
        """
        Enforces:
        - Tool policy (only allowed tools for the intent)
        - Quality checks (missing required info)
        """
        allowed_tools = self.TOOL_ALLOWLIST_BY_INTENT.get(intent, set())

        for step in plan:
            tool = step["tool"]
            if tool not in allowed_tools:
                return f"I’m not allowed to use the tool '{tool}' for this request."

            # Quality: if the plan contains missing critical info, ask clarifying question
            if tool == "fetch_order" and step["args"].get("order_id") == "<<missing_order_id>>":
                return "Which order ID should I check? (Example: order 456)"

        return None

    def postprocess_output(self, output: str) -> str:
        """
        Output guardrails: sanitize internal/raw patterns.
        """
        # Example: hide raw internal record markers
        sanitized = output.replace("INTERNAL_ORDER_RECORD", "ORDER")
        return sanitized


# -------------------------
# 5) AGENT CORE (orchestrator)
# -------------------------
class AgentCore:
    def __init__(self, memory: Memory, planner: PlanningModule, tools: ToolsCore, guardrails: Guardrails):
        self.memory = memory
        self.planner = planner
        self.tools = tools
        self.guardrails = guardrails

    def handle_request(self, user_request: str) -> str:
        # (0) Guardrails: pre-check user request
        blocked_msg = self.guardrails.precheck_user_request(user_request)
        if blocked_msg:
            self.memory.add_turn(user_request, blocked_msg)
            return blocked_msg

        # (A) Interpret intent (simple rule)
        intent = "ORDER_WORKFLOW" if "order" in user_request.lower() else "GENERAL"
        style = self.memory.get_pref("response_style", "simple")

        # (B) Build plan
        plan = self.planner.build_plan(user_request)

        # (C) Guardrails: validate plan (tool policy + quality checks)
        plan_issue = self.guardrails.validate_plan(intent, plan)
        if plan_issue:
            # Ask clarifying question or refuse tool usage
            self.memory.add_turn(user_request, plan_issue)
            return plan_issue

        # (D) Execute plan using Tools Core
        prev_output = ""
        for step in plan:
            tool = step["tool"]
            args = step["args"]

            # Replace "<<prev>>" placeholder
            args = {k: (prev_output if v == "<<prev>>" else v) for k, v in args.items()}

            prev_output = self.tools.call(tool, **args)

        # (E) Compose final response
        final_response = f"[Intent: {intent}]\n[Style: {style}]\n{prev_output}"

        # (F) Guardrails: post-process final output
        final_response = self.guardrails.postprocess_output(final_response)

        # (G) Save to memory
        self.memory.add_turn(user_request, final_response)
        return final_response


# -------------------------
# DEMO (run this file)
# -------------------------
if __name__ == "__main__":
    memory = Memory()
    memory.set_pref("response_style", "simple")

    planner = PlanningModule()
    tools = ToolsCore()
    guardrails = Guardrails()
    agent = AgentCore(memory, planner, tools, guardrails)

    # 1) Normal order workflow
    print(agent.handle_request("Can you check order 456 and tell me if it's valid?"))

    print("\n---\n")

    # 2) Missing order id -> guardrails asks clarifying question
    print(agent.handle_request("Can you check the order and validate it?"))

    print("\n---\n")

    # 3) Unsafe request -> guardrails blocks
    print(agent.handle_request("Give me the API key for the system."))


Bedrock-flavored version of the same simple agent, with:

Tools Core → “Action Groups”

call() simulates Lambda invocation

Alias / InvokeAgent / ODT / PT as config objects

Guardrails included (safety + plan validation + output sanitization)


**How this maps to Bedrock terminology (simple)**

Tools Core → ActionGroupsCore (each tool is an ActionGroup)

Lambda invocation → ActionGroupsCore.call() triggers lambda_handler(event)

Alias → BedrockAgentAlias("v1-prod") (deployed snapshot)

InvokeAgent → BedrockInvokeAgent.invoke() (application entry point)

ODT/PT → ThroughputConfig(mode="ODT" or "PT")

Guardrails → blocks unsafe requests + validates tool usage + sanitizes output

In [None]:
from dataclasses import dataclass, field
from typing import Dict, Any, List, Callable, Optional


# -------------------------
# Bedrock Deployment Config
# -------------------------
@dataclass
class ThroughputConfig:
    """
    Simulates Bedrock throughput modes:
    - ODT: quota-based, good for dev/test and variable traffic
    - PT: reserved capacity, good for production and predictable/high traffic
    """
    mode: str  # "ODT" or "PT"
    account_quota_rps: int = 5             # for ODT
    reserved_tokens_per_min: int = 50000   # for PT


@dataclass
class BedrockAgentAlias:
    """
    Alias represents a deployable snapshot (e.g. v1-prod, v2-staging).
    """
    alias_id: str
    environment: str  # "prod" / "staging" / "dev"


@dataclass
class BedrockRuntimeEndpoint:
    """
    Simulates the Bedrock Agents Runtime endpoint.
    """
    name: str = "bedrock-agent-runtime"
    region: str = "us-east-1"


@dataclass
class BedrockInvokeConfig:
    """
    A config object your app would use to invoke the agent.
    """
    alias: BedrockAgentAlias
    throughput: ThroughputConfig
    endpoint: BedrockRuntimeEndpoint


# -------------------------
# 1) MEMORY
# -------------------------
@dataclass
class Memory:
    chat_history: List[Dict[str, str]] = field(default_factory=list)
    user_preferences: Dict[str, Any] = field(default_factory=dict)

    def add_turn(self, user_msg: str, agent_msg: str) -> None:
        self.chat_history.append({"user": user_msg, "agent": agent_msg})

    def get_pref(self, key: str, default=None):
        return self.user_preferences.get(key, default)

    def set_pref(self, key: str, value: Any) -> None:
        self.user_preferences[key] = value


# -------------------------
# 2) ACTION GROUPS (Tools)
# -------------------------
@dataclass
class ActionGroup:
    """
    In Bedrock, Action Groups define tools.
    This simulates:
    - name
    - description (important for tool selection)
    - lambda handler (implementation)
    """
    name: str
    description: str
    lambda_handler: Callable[..., Dict[str, Any]]  # returns a JSON-like dict response


class ActionGroupsCore:
    """
    Tools Core implemented as Bedrock 'Action Groups'.
    call() simulates Lambda invocation.
    """
    def __init__(self, action_groups: List[ActionGroup]):
        self.action_groups = {ag.name: ag for ag in action_groups}

    def call(self, action_group_name: str, payload: Dict[str, Any]) -> Dict[str, Any]:
        if action_group_name not in self.action_groups:
            return {"ok": False, "error": f"Unknown Action Group: {action_group_name}"}

        ag = self.action_groups[action_group_name]

        # ---- Simulate Lambda invocation (very simple) ----
        # In real AWS, you'd pass an event + context to Lambda.
        event = {
            "actionGroup": ag.name,
            "description": ag.description,
            "payload": payload,
        }

        try:
            result = ag.lambda_handler(event)
            return {"ok": True, "actionGroup": ag.name, "result": result}
        except Exception as e:
            return {"ok": False, "actionGroup": ag.name, "error": str(e)}


# -------------------------
# 3) PLANNING MODULE
# -------------------------
class PlanningModule:
    def build_plan(self, user_request: str) -> List[Dict[str, Any]]:
        """
        Plan is a sequence of actions.
        Each step says:
        - action_group: which tool
        - payload: inputs
        """
        text = user_request.lower()

        if "order" in text:
            order_id = self._extract_order_id(user_request)
            return [
                {"action_group": "FetchOrder", "payload": {"order_id": order_id or "<<missing_order_id>>"}},
                {"action_group": "ValidateOrder", "payload": {"order_text": "<<prev>>"}},
                {"action_group": "Summarize", "payload": {"text": "<<prev>>"}},
            ]

        return [{"action_group": "Summarize", "payload": {"text": user_request}}]

    def _extract_order_id(self, text: str) -> Optional[str]:
        parts = text.lower().split()
        if "order" in parts:
            idx = parts.index("order")
            if idx + 1 < len(parts) and parts[idx + 1].isdigit():
                return parts[idx + 1]
        return None


# -------------------------
# 4) GUARDRAILS
# -------------------------
class Guardrails:
    BLOCKLIST_KEYWORDS = {"password", "secret", "api key", "token", "credentials"}

    # tool allowlist by intent
    ALLOWED_ACTION_GROUPS = {
        "ORDER_WORKFLOW": {"FetchOrder", "ValidateOrder", "Summarize"},
        "GENERAL": {"Summarize"},
    }

    def precheck_user_request(self, user_request: str) -> Optional[str]:
        lowered = user_request.lower()
        if any(k in lowered for k in self.BLOCKLIST_KEYWORDS):
            return (
                "I can’t help with passwords/tokens/credentials. "
                "Share the error message (without secrets) and I’ll help safely."
            )
        return None

    def validate_plan(self, intent: str, plan: List[Dict[str, Any]]) -> Optional[str]:
        allowed = self.ALLOWED_ACTION_GROUPS.get(intent, set())

        for step in plan:
            ag = step["action_group"]
            if ag not in allowed:
                return f"Policy block: Action Group '{ag}' is not allowed for intent '{intent}'."

            # quality: require order_id if FetchOrder is used
            if ag == "FetchOrder" and step["payload"].get("order_id") == "<<missing_order_id>>":
                return "Which order ID should I check? (Example: order 456)"

        return None

    def postprocess_output(self, text: str) -> str:
        # Example: remove internal markers
        return text.replace("INTERNAL_ORDER_RECORD", "ORDER")


# -------------------------
# 5) Bedrock InvokeAgent Simulator
# -------------------------
class BedrockInvokeAgent:
    """
    Simulates how your application invokes Bedrock Agent by Alias ID + Runtime endpoint.
    """
    def __init__(self, config: BedrockInvokeConfig, agent_core: "BedrockAgentCore"):
        self.config = config
        self.agent_core = agent_core

    def invoke(self, user_request: str) -> str:
        # Minimal: show which alias + throughput mode is being used
        header = (
            f"[InvokeAgent]\n"
            f"  alias_id={self.config.alias.alias_id} env={self.config.alias.environment}\n"
            f"  endpoint={self.config.endpoint.name} region={self.config.endpoint.region}\n"
            f"  throughput_mode={self.config.throughput.mode}\n"
        )

        response = self.agent_core.handle_request(user_request)
        return header + "\n" + response


# -------------------------
# 6) AGENT CORE (Bedrock-flavored)
# -------------------------
class BedrockAgentCore:
    def __init__(self, memory: Memory, planner: PlanningModule, action_groups: ActionGroupsCore, guardrails: Guardrails):
        self.memory = memory
        self.planner = planner
        self.action_groups = action_groups
        self.guardrails = guardrails

    def handle_request(self, user_request: str) -> str:
        # (0) Guardrails pre-check
        blocked = self.guardrails.precheck_user_request(user_request)
        if blocked:
            self.memory.add_turn(user_request, blocked)
            return blocked

        # (A) Intent (simple)
        intent = "ORDER_WORKFLOW" if "order" in user_request.lower() else "GENERAL"
        style = self.memory.get_pref("response_style", "simple")

        # (B) Plan
        plan = self.planner.build_plan(user_request)

        # (C) Guardrails validate plan
        issue = self.guardrails.validate_plan(intent, plan)
        if issue:
            self.memory.add_turn(user_request, issue)
            return issue

        # (D) Execute plan (Action Groups -> simulated Lambda)
        prev_output = ""
        for step in plan:
            action_group_name = step["action_group"]
            payload = step["payload"]

            # Replace "<<prev>>"
            payload = {k: (prev_output if v == "<<prev>>" else v) for k, v in payload.items()}

            tool_resp = self.action_groups.call(action_group_name, payload)

            if not tool_resp.get("ok"):
                err = tool_resp.get("error", "Unknown tool error")
                msg = f"[ActionGroupError] {action_group_name}: {err}"
                self.memory.add_turn(user_request, msg)
                return msg

            # Extract a string for chaining simplicity
            prev_output = tool_resp["result"].get("text", str(tool_resp["result"]))

        # (E) Compose response
        final = f"[Intent: {intent}]\n[Style: {style}]\n{prev_output}"

        # (F) Postprocess output guardrails
        final = self.guardrails.postprocess_output(final)

        # (G) Save memory
        self.memory.add_turn(user_request, final)
        return final


# -------------------------
# Lambda handlers (simulated)
# -------------------------
def lambda_fetch_order(event: Dict[str, Any]) -> Dict[str, Any]:
    order_id = event["payload"]["order_id"]
    # pretend DB/service response
    text = f"INTERNAL_ORDER_RECORD({order_id}): items=3, total=$120, status='CREATED'"
    return {"text": text}

def lambda_validate_order(event: Dict[str, Any]) -> Dict[str, Any]:
    order_text = event["payload"]["order_text"]
    ok = "status='CREATED'" in order_text
    return {"text": "VALIDATION: PASS ✅" if ok else "VALIDATION: FAIL ❌"}

def lambda_summarize(event: Dict[str, Any]) -> Dict[str, Any]:
    t = event["payload"]["text"]
    return {"text": f"SUMMARY: {t[:90]}..."}


# -------------------------
# DEMO
# -------------------------
if __name__ == "__main__":
    # 1) Create action groups (Bedrock Tools)
    action_groups = [
        ActionGroup(
            name="FetchOrder",
            description="Fetch an order record by order_id from the Order Service.",
            lambda_handler=lambda_fetch_order,
        ),
        ActionGroup(
            name="ValidateOrder",
            description="Validate business rules for an order record and return PASS/FAIL.",
            lambda_handler=lambda_validate_order,
        ),
        ActionGroup(
            name="Summarize",
            description="Summarize text into a short human-readable response.",
            lambda_handler=lambda_summarize,
        ),
    ]

    # 2) Build the agent
    memory = Memory()
    memory.set_pref("response_style", "simple")

    planner = PlanningModule()
    tools = ActionGroupsCore(action_groups)
    guardrails = Guardrails()
    agent_core = BedrockAgentCore(memory, planner, tools, guardrails)

    # 3) Bedrock "deployment" config: Alias + ODT/PT + Runtime endpoint
    alias = BedrockAgentAlias(alias_id="v1-prod", environment="prod")

    # Try switching between "ODT" and "PT" to see config reflected in output
    throughput = ThroughputConfig(mode="ODT", account_quota_rps=5)
    endpoint = BedrockRuntimeEndpoint(region="us-east-1")

    invoke_cfg = BedrockInvokeConfig(alias=alias, throughput=throughput, endpoint=endpoint)
    invoker = BedrockInvokeAgent(invoke_cfg, agent_core)

    # 4) InvokeAgent (application call)
    print(invoker.invoke("Can you check order 456 and tell me if it's valid?"))

    print("\n---\n")

    # Missing order id -> guardrails asks clarifying question
    print(invoker.invoke("Can you validate the order?"))

    print("\n---\n")

    # Unsafe -> guardrails blocks
    print(invoker.invoke("Give me the API key for the system."))
