In [None]:
# %% [code]
# 1. Setup & Dependencies

!pip install -q python-dotenv
!pip install -q -U google-adk
!pip install -q google-api-python-client google-auth google-auth-httplib2 requests



In [None]:
# %% [code]
# 2. Imports & Configuration

from dotenv import load_dotenv
import os
import json
import base64
import time
from typing import Dict, Any, List

from email.mime.text import MIMEText

import requests

from google.oauth2 import service_account
from googleapiclient.discovery import build

import asyncio
from datetime import datetime

from google.genai import types

from google.adk.agents import Agent, LlmAgent
from google.adk.runners import Runner
from google.adk.sessions import InMemorySessionService
from google.adk.tools import google_search, load_memory
from google.adk.memory import InMemoryMemoryService
from google.adk.code_executors import BuiltInCodeExecutor

load_dotenv()

GOOGLE_API_KEY = os.getenv("GOOGLE_API_KEY")
if not GOOGLE_API_KEY:
    raise ValueError("GOOGLE_API_KEY not found in environment. Please set it before running.")

os.environ["GOOGLE_API_KEY"] = GOOGLE_API_KEY

MODEL_NAME = "gemini-2.5-flash-lite"

USE_REAL_APIS = os.getenv("USE_REAL_APIS", "false").lower() == "true"

APP_NAME = "flowgenie"
USER_ID = "demo_user"
SESSION_ID = "flowgenie_session_001"

print("‚úÖ Environment ready. Model:", MODEL_NAME, "| USE_REAL_APIS:", USE_REAL_APIS)



In [None]:
# %% [code]
# 3. Shared Auth Helper for Google APIs (Sheets, Gmail, Calendar)

def get_service_account_credentials(scopes: List[str]):
    """
    Build service account credentials from GOOGLE_SERVICE_ACCOUNT_JSON.
    Accepts either:
      - JSON string with double quotes,
      - path to a JSON key file,
      - or Python literal string (via ast.literal_eval as fallback).
    If GMAIL_SENDER_EMAIL is set, we also use it as delegated user
    (for Gmail / Calendar when domain-wide delegation is configured).
    """
    json_str = os.getenv("GOOGLE_SERVICE_ACCOUNT_JSON")
    if not json_str:
        raise RuntimeError("GOOGLE_SERVICE_ACCOUNT_JSON env var not set.")

    # If the env var looks like a file path, read it
    if os.path.exists(json_str):
        with open(json_str, "r", encoding="utf-8") as f:
            json_str = f.read()

    # Try strict JSON first, then Python literal as fallback
    try:
        info = json.loads(json_str)
    except Exception:
        import ast
        try:
            info = ast.literal_eval(json_str)
        except Exception as e:
            raise RuntimeError(
                "Failed to parse GOOGLE_SERVICE_ACCOUNT_JSON. "
                "Provide a valid JSON string or a path to a JSON file."
            ) from e

    creds = service_account.Credentials.from_service_account_info(info, scopes=scopes)

    # Optional: domain-wide delegation (Workspace)
    delegated_user = os.getenv("GMAIL_SENDER_EMAIL")
    if delegated_user:
        creds = creds.with_subject(delegated_user)

    return creds

print("‚úÖ Auth helper loaded.")



In [None]:
# %% [code]
# 4. Workflow Representation (Schema Prompt)

WORKFLOW_JSON_GUIDE = """
You are designing an automation workflow.

Always output a JSON object with this structure:

{
  "name": "<short human-friendly name>",
  "trigger": {
    "type": "<trigger_type>",
    "source": "<where the event originates>",
    "schedule": "<cron or time-based trigger, or null>",
    "conditions": [
      "<optional condition 1>",
      "<optional condition 2>"
    ]
  },
  "actions": [
    {
      "type": "<action_type>",
      "target": "<system or destination>",
      "description": "<what this step does>",
      "inputs": [
        "<critical input or data needed>"
      ]
    }
  ]
}

Rules:
- Use lowercase snake_case for types when possible.
- If something is unknown, put null or an empty list instead of guessing wildly.
- The workflow should be as simple as possible while still solving the user's request.
"""

print("‚úÖ Workflow JSON guide defined.")



In [None]:
# %% [code]
# 5. Planner & Evaluator Agents

planner_agent = Agent(
    name="workflow_planner",
    model=MODEL_NAME,
    description="Plans automation workflows from natural-language requests.",
    instruction=(
        "You are an expert workflow automation designer.\n"
        "Given a user's natural-language request, generate exactly ONE automation workflow.\n"
        "DO NOT return a list of workflows.\n"
        "DO NOT return multiple possible interpretations.\n"
        "Return ONLY ONE JSON object that follows this schema:\n"
        f"{WORKFLOW_JSON_GUIDE}\n\n"
        "Respond with ONLY the JSON object. Never return a list or surrounding quotes."
    ),
)

evaluator_agent = Agent(
    name="workflow_evaluator",
    model=MODEL_NAME,
    description="Evaluates workflow quality, safety, and completeness.",
    instruction=(
        "You are evaluating an automation workflow JSON.\n"
        "Check for:\n"
        "- Clarity of trigger\n"
        "- Completeness of actions\n"
        "- Edge cases and failure modes\n"
        "- Privacy or safety concerns\n\n"
        "Return a JSON object with:\n"
        "{\n"
        '  \"overall_score\": <0-10>,\n'
        '  \"verdict\": \"<ACCEPT or IMPROVE>\",\n'
        '  \"strengths\": [\"...\"],\n'
        '  \"risks\": [\"...\"],\n'
        '  \"suggested_changes\": [\"...\"]\n'
        "}\n"
        "Be concise but specific. Respond with ONLY JSON."
    ),
)

print("‚úÖ Planner & evaluator agents ready.")



In [None]:
# %% [code]
# 6. Real Tools (Slack, Sheets, Gmail, Calendar)

from urllib.parse import urlparse

def slack_send_notification(channel: str, message: str) -> Dict[str, Any]:
    """
    Send a Slack notification via webhook.
    Uses SLACK_WEBHOOK_URL from environment.
    """
    webhook_url = os.getenv("SLACK_WEBHOOK_URL")
    if not USE_REAL_APIS or not webhook_url:
        print(f"[SLACK NO-OP] Channel={channel} | Message={message}")
        return {
            "channel": channel,
            "message": message,
            "status": "skipped",
            "reason": "USE_REAL_APIS is false or SLACK_WEBHOOK_URL not set",
        }

    # Basic URL sanity check
    try:
        parsed = urlparse(webhook_url)
        if not parsed.scheme or not parsed.netloc:
            raise ValueError("invalid URL")
    except Exception:
        return {
            "channel": channel,
            "message": message,
            "status": "error",
            "reason": "SLACK_WEBHOOK_URL malformed (must start with http(s)://)",
            "webhook_preview": (webhook_url[:60] + "...") if webhook_url else "<empty>",
        }

    payload = {"text": message}
    try:
        resp = requests.post(webhook_url, json=payload, timeout=10)
    except requests.RequestException as e:
        return {
            "channel": channel,
            "message": message,
            "status": "error",
            "reason": f"request_exception: {type(e).__name__}: {e}",
        }

    return {
        "channel": channel,
        "message": message,
        "http_status": resp.status_code,
        "status": "sent" if resp.status_code in (200, 204) else "error",
        "response_text": resp.text[:200],
    }


def sheets_append_row(row_values: List[str]) -> Dict[str, Any]:
    """
    Append a row to a Google Sheet.
    Spreadsheet ID and sheet name are read from environment:
      - SHEETS_SPREADSHEET_ID
      - SHEETS_TAB_NAME (defaults to 'Sheet1')
    """
    spreadsheet_id = os.getenv("SHEETS_SPREADSHEET_ID")
    sheet_name = os.getenv("SHEETS_TAB_NAME", "Sheet1")

    if not spreadsheet_id:
        return {
            "status": "error",
            "reason": "SHEETS_SPREADSHEET_ID not set in environment",
        }

    if not USE_REAL_APIS:
        print(f"[SHEETS NO-OP] Spreadsheet={spreadsheet_id} | Sheet={sheet_name} | Row={row_values}")
        return {
            "spreadsheet_id": spreadsheet_id,
            "sheet_name": sheet_name,
            "row_values": row_values,
            "status": "skipped",
            "reason": "USE_REAL_APIS is false",
        }

    scopes = ["https://www.googleapis.com/auth/spreadsheets"]
    creds = get_service_account_credentials(scopes)
    service = build("sheets", "v4", credentials=creds)

    range_ = f"{sheet_name}!A1"
    body = {"values": [row_values]}

    result = (
        service.spreadsheets()
        .values()
        .append(
            spreadsheetId=spreadsheet_id,
            range=range_,
            valueInputOption="USER_ENTERED",
            insertDataOption="INSERT_ROWS",
            body=body,
        )
        .execute()
    )

    return {
        "spreadsheet_id": spreadsheet_id,
        "sheet_name": sheet_name,
        "row_values": row_values,
        "status": "appended",
        "updates": result.get("updates", {}),
    }


def gmail_send_email(to: str, subject: str, body: str) -> Dict[str, Any]:
    """
    Send an email using Gmail API and a service account (optionally with domain-wide delegation).
    Uses GMAIL_SENDER_EMAIL from env as 'from'.
    """
    sender = os.getenv("GMAIL_SENDER_EMAIL") or "me@example.com"

    if not USE_REAL_APIS:
        print(f"[GMAIL NO-OP] From={sender} | To={to} | Subject={subject}")
        return {
            "from": sender,
            "to": to,
            "subject": subject,
            "status": "skipped",
            "reason": "USE_REAL_APIS is false",
        }

    scopes = ["https://www.googleapis.com/auth/gmail.send"]
    creds = get_service_account_credentials(scopes)
    service = build("gmail", "v1", credentials=creds)

    msg = MIMEText(body)
    msg["to"] = to
    msg["from"] = sender
    msg["subject"] = subject

    raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("utf-8")
    message = {"raw": raw}

    sent = (
        service.users()
        .messages()
        .send(userId="me", body=message)
        .execute()
    )

    return {
        "from": sender,
        "to": to,
        "subject": subject,
        "status": "sent",
        "message_id": sent.get("id"),
    }


def calendar_create_event(calendar_id: str, title: str, start_time: str, end_time: str) -> Dict[str, Any]:
    """
    Create a calendar event using Google Calendar API.
    """
    if not USE_REAL_APIS:
        print(f"[CALENDAR NO-OP] Calendar={calendar_id} | Title={title} | {start_time} -> {end_time}")
        return {
            "calendar_id": calendar_id,
            "title": title,
            "start_time": start_time,
            "end_time": end_time,
            "status": "skipped",
            "reason": "USE_REAL_APIS is false",
        }

    scopes = ["https://www.googleapis.com/auth/calendar"]
    creds = get_service_account_credentials(scopes)
    service = build("calendar", "v3", credentials=creds)

    event = {
        "summary": title,
        "start": {"dateTime": start_time},
        "end": {"dateTime": end_time},
    }

    created = (
        service.events()
        .insert(calendarId=calendar_id, body=event)
        .execute()
    )

    return {
        "calendar_id": calendar_id,
        "title": title,
        "start_time": start_time,
        "end_time": end_time,
        "status": "created",
        "event_id": created.get("id"),
    }

print("‚úÖ Real tools defined (Slack, Sheets, Gmail, Calendar).")



In [None]:
# %% [code]
# 7. Tool Agents Wrapping the Real Tools

slack_agent = Agent(
    name="slack_agent",
    model=MODEL_NAME,
    description="Agent that sends Slack notifications via a tool.",
    instruction=(
        "You send notifications to Slack channels.\n"
        "ALWAYS call the tool slack_send_notification(channel=<channel>, message=<message>).\n"
        "Return ONLY the JSON returned by the tool, no extra text."
    ),
    tools=[slack_send_notification],
)

sheets_agent = Agent(
    name="sheets_agent",
    model=MODEL_NAME,
    description="Agent that appends rows to a Google Sheet via a tool.",
    instruction=(
        "You receive a request to append data to a spreadsheet.\n"
        "Extract ONLY the row_values from the input.\n"
        "ALWAYS call sheets_append_row(row_values=<list of strings>).\n"
        "Spreadsheet ID and sheet name are loaded from environment inside the tool.\n"
        "Return ONLY the JSON returned by the tool."
    ),
    tools=[sheets_append_row],
)

gmail_agent = Agent(
    name="gmail_agent",
    model=MODEL_NAME,
    description="Agent that sends emails via a tool.",
    instruction=(
        "You send emails.\n"
        "ALWAYS call gmail_send_email(to=<email>, subject=<subject>, body=<body>).\n"
        "Return ONLY the JSON returned by the tool."
    ),
    tools=[gmail_send_email],
)

calendar_agent = Agent(
    name="calendar_agent",
    model=MODEL_NAME,
    description="Agent that creates calendar events via a tool.",
    instruction=(
        "You create calendar events.\n"
        "ALWAYS call calendar_create_event(calendar_id=<calendar_id>, title=<title>, "
        "start_time=<start>, end_time=<end>).\n"
        "Return ONLY the JSON returned by the tool."
    ),
    tools=[calendar_create_event],
)

print("‚úÖ Tool agents (Slack / Sheets / Gmail / Calendar) created.")



In [None]:
# %% [code]
# 8. Sessions, Memory & Runners (Core ADK Pattern)

session_service = InMemorySessionService()
memory_service = InMemoryMemoryService()

# Create a base session for FlowGenie (used by debug runs)
base_session = await session_service.create_session(
    app_name=APP_NAME,
    user_id=USER_ID,
    session_id=SESSION_ID,
    state={},
)

planner_runner = Runner(
    agent=planner_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
)

evaluator_runner = Runner(
    agent=evaluator_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
)

slack_runner = Runner(
    agent=slack_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
)

sheets_runner = Runner(
    agent=sheets_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
)

gmail_runner = Runner(
    agent=gmail_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
)

calendar_runner = Runner(
    agent=calendar_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
)

print("‚úÖ Sessions + runners + memory service initialized.")



In [None]:
# %% [code]
# 9. Robust JSON Output Parser for ADK Events

def parse_json_output(raw):
    """Robust JSON extraction for ADK responses and LLM text."""
    import re
    from json.decoder import JSONDecodeError

    # Handle ADK Event object
    if hasattr(raw, 'content') and getattr(raw.content, "parts", None):
        parts = raw.content.parts
        if parts:
            # Prefer text
            if parts[0].text:
                raw = parts[0].text
            # Fallback: function_call / function_response we ignore here
    elif hasattr(raw, "text"):
        raw = raw.text
    elif hasattr(raw, "parts") and raw.parts:
        raw = raw.parts[0].text
    elif hasattr(raw, "data"):
        raw = raw.data

    # If list ‚Üí search for first thing that parses
    if isinstance(raw, list):
        for item in raw:
            try:
                return parse_json_output(item)
            except Exception:
                continue
        raise ValueError(f"No valid JSON found in list: {raw}")

    # If dict ‚Üí already JSON-like
    if isinstance(raw, dict):
        return raw

    if not isinstance(raw, str):
        raise ValueError(f"Unsupported type: {type(raw).__name__}")

    txt = raw.strip()

    # Remove ```json ... ``` fences
    if txt.startswith("```"):
        txt = txt.strip("`")
        if txt.startswith("json"):
            parts = txt.split("\n", 1)
            txt = parts[1] if len(parts) > 1 else ""

    # Direct JSON attempt
    try:
        return json.loads(txt)
    except Exception:
        pass

    # Try to extract first {...} block
    match = re.search(r"\{[\s\S]*\}", txt)
    if match:
        try:
            return json.loads(match.group(0))
        except Exception:
            pass

    # If still failing, raise clear error
    raise JSONDecodeError(
        f"LLM did not return valid JSON. Raw:\n{txt[:500]}",
        txt,
        0,
    )

print("‚úÖ JSON parser ready.")



In [None]:
# %% [code]
# 10. Executor Agent (Plans Which Tool Agent Handles Each Action)

executor_agent = Agent(
    name="workflow_executor",
    model=MODEL_NAME,
    description="Analyzes workflow JSON and creates a structured step-by-step execution plan for tool agents.",
    instruction=(
        "You receive a workflow JSON object with a list of actions.\n"
        "Your job is to produce a structured PLAN listing which tool agent should execute each action.\n\n"
        "For each action in the workflow, inspect action['type'], action['target'], action['description'], action['inputs'].\n"
        "Create parameters for the tool agent:\n"
        "- For Slack-like actions (type includes 'slack', 'notification', 'alert'):\n"
        "    agent: 'slack_agent'\n"
        "    parameters: {\"channel\": action['target'] or '#general', \"message\": <short text>}\n"
        "- For Sheets-like actions (type includes 'sheet', 'spreadsheet', 'row'):\n"
        "    agent: 'sheets_agent'\n"
        "    parameters: {\"row_values\": <list of stringified inputs>}\n"
        "- For email-like actions (type includes 'email', 'gmail', 'mail'):\n"
        "    agent: 'gmail_agent'\n"
        "    parameters: {\"to\": action['target'] or 'user@example.com', "
        "\"subject\": action['description'] or 'Automated notification', "
        "\"body\": concatenated inputs}\n"
        "- For calendar-like actions (type includes 'calendar', 'event', 'schedule'):\n"
        "    agent: 'calendar_agent'\n"
        "    parameters: {\"calendar_id\": action['target'] or 'primary', "
        "\"title\": action['description'] or 'Automated event', "
        "\"start_time\": simple ISO time like '2025-01-01T10:00:00Z', "
        "\"end_time\": '2025-01-01T11:00:00Z'}\n"
        "- Otherwise, mark agent: 'skipped'.\n\n"
        "Return ONLY this JSON structure:\n"
        "{\n"
        "  \"plan\": [\n"
        "    {\n"
        "      \"action_index\": <1-based index>,\n"
        "      \"agent\": \"slack_agent | sheets_agent | gmail_agent | calendar_agent | skipped\",\n"
        "      \"parameters\": { ... }\n"
        "    }\n"
        "  ],\n"
        "  \"summary\": \"<short human-friendly summary of the plan>\"\n"
        "}\n"
        "Do not wrap response in quotes or code fences. Output ONLY JSON."
    ),
)

executor_runner = Runner(
    agent=executor_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
)

print("‚úÖ Executor agent + runner ready.")



In [None]:
# %% [code]
# 11. Simulated Workflow Execution (local Python, not an LLM tool)

def simulate_workflow_execution(workflow: Dict[str, Any]) -> Dict[str, Any]:
    """
    Pure Python simulation: iterate actions and mark them as 'success'.
    Used to show execution log separately from actual tool calls.
    """
    name = workflow.get("name", "unnamed_workflow")
    actions = workflow.get("actions", [])

    log = []
    for idx, action in enumerate(actions, start=1):
        log.append(
            {
                "step": idx,
                "type": action.get("type"),
                "target": action.get("target"),
                "status": "success",
                "timestamp": datetime.utcnow().isoformat() + "Z",
                "note": f"Simulated execution of step {idx}",
            }
        )

    return {
        "workflow_name": name,
        "total_steps": len(actions),
        "completed_steps": len(actions),
        "status": "completed" if actions else "no_actions",
        "log": log,
    }

print("‚úÖ Simulation helper defined.")



In [None]:
# %% [code]
# 12. Execute the Tool Plan (A2A-style routing)

async def execute_action_plan(plan: List[Dict[str, Any]]):
    """
    Given the executor-agent plan (list of {action_index, agent, parameters}),
    route to the correct agent runner and collect tool results.
    """
    results = []

    for step in plan:
        agent_name = step.get("agent")
        params = step.get("parameters", {})
        idx = step.get("action_index", None)

        if agent_name == "slack_agent":
            resp = await slack_runner.run_debug(json.dumps(params), verbose=False)
        elif agent_name == "sheets_agent":
            resp = await sheets_runner.run_debug(json.dumps(params), verbose=False)
        elif agent_name == "gmail_agent":
            resp = await gmail_runner.run_debug(json.dumps(params), verbose=False)
        elif agent_name == "calendar_agent":
            resp = await calendar_runner.run_debug(json.dumps(params), verbose=False)
        else:
            results.append({
                "action_index": idx,
                "agent": agent_name,
                "result": {"status": "skipped_no_matching_agent"},
            })
            continue

        # Parse the tool agent's output; resp is an Event or list of Events
        try:
            tool_result = parse_json_output(resp)
        except Exception as e:
            tool_result = {"error": str(e), "raw_response": str(resp)}

        results.append({
            "action_index": idx,
            "agent": agent_name,
            "result": tool_result,
        })

    return results

print("‚úÖ Action plan executor ready.")



In [None]:
# %% [code]
# 13. Memory Agents (Capture + Recall via load_memory)

# Agent to capture utterances that will be stored into long-term memory
memory_capture_agent = LlmAgent(
    name="memory_capture_agent",
    model=MODEL_NAME,
    instruction=(
        "You are a simple assistant that acknowledges what the user says. "
        "Your conversation will be stored into long-term memory. "
        "Do NOT invent extra information."
    ),
)

# Agent to recall using load_memory built-in tool
memory_recall_agent = LlmAgent(
    name="memory_recall_agent",
    model=MODEL_NAME,
    instruction=(
        "Answer the user's question. If you need past context, call load_memory "
        "with a short query and use its result."
    ),
    tools=[load_memory],
)

memory_capture_runner = Runner(
    agent=memory_capture_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
)

memory_recall_runner = Runner(
    agent=memory_recall_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
)

print("‚úÖ Memory capture & recall agents ready.")



In [None]:
# %% [code]
# 14. Helper Functions: Store & Recall Memory

async def store_memory_utterance(text: str) -> dict:
    """
    Run a tiny one-turn session whose contents get added to long-term memory.
    """
    session_id = f"mem_store_{int(time.time())}"

    # Create a dedicated session
    await memory_capture_runner.session_service.create_session(
        app_name=APP_NAME,
        user_id=USER_ID,
        session_id=session_id,
        state={},
    )

    user_content = types.Content(
        parts=[types.Part(text=text)],
        role="user",
    )

    # We don't care about the response text; we only want the session to exist
    async for _ in memory_capture_runner.run_async(
        user_id=USER_ID,
        session_id=session_id,
        new_message=user_content,
    ):
        pass

    completed = await memory_capture_runner.session_service.get_session(
        app_name=APP_NAME,
        user_id=USER_ID,
        session_id=session_id,
    )
    await memory_service.add_session_to_memory(completed)

    return {
        "session_id": session_id,
        "status": "stored_in_memory",
    }


async def recall_from_memory(query: str) -> str:
    """
    Ask the memory_recall_agent; it will call load_memory internally.
    """
    session_id = f"mem_recall_{int(time.time())}"

    await memory_recall_runner.session_service.create_session(
        app_name=APP_NAME,
        user_id=USER_ID,
        session_id=session_id,
        state={},
    )

    user_content = types.Content(
        parts=[types.Part(text=query)],
        role="user",
    )

    final_answer = "(no answer)"
    async for event in memory_recall_runner.run_async(
        user_id=USER_ID,
        session_id=session_id,
        new_message=user_content,
    ):
        if event.is_final_response() and event.content and event.content.parts:
            for part in event.content.parts:
                if part.text:
                    final_answer = part.text
                    break

    return final_answer

print("‚úÖ Memory helpers ready (store & recall).")



In [None]:
# %% [code]
# 15. Main Orchestrator: run_flowgenie (Memory + Automation)

import re

async def run_flowgenie(user_prompt: str):
    """
    Main entrypoint:
    - If prompt looks like 'remember that ...'  -> store into long-term memory.
    - If prompt looks like a recall question    -> query memory and answer.
    - Else                                      -> normal FlowGenie automation pipeline.
    """

    # --- 0) Memory intent routing -----------------------------------------
    store_pattern = r"(remember that|save this|note that|store this)"
    recall_pattern = r"(what did i say earlier|what did i tell you earlier|who is my|what is my)"

    # 0a) STORE memory
    if re.search(store_pattern, user_prompt, re.IGNORECASE):
        store_info = await store_memory_utterance(user_prompt)
        return {
            "mode": "memory_store",
            "prompt": user_prompt,
            "storage": store_info,
            "message": "‚úÖ I've stored this in FlowGenie's long-term memory (in-memory demo).",
        }

    # 0b) RECALL memory
    if re.search(recall_pattern, user_prompt, re.IGNORECASE):
        answer = await recall_from_memory(user_prompt)
        return {
            "mode": "memory_recall",
            "prompt": user_prompt,
            "answer": answer,
        }

    # --- 1) Normal FlowGenie automation pipeline --------------------------

    # Step 1 ‚Äî Planning
    plan_resp = await planner_runner.run_debug(user_prompt, verbose=True)
    workflow = parse_json_output(plan_resp)

    # Step 2 ‚Äî Evaluation
    eval_resp = await evaluator_runner.run_debug(json.dumps(workflow), verbose=True)
    evaluation = parse_json_output(eval_resp)

    # Step 3 ‚Äî Executor: produce a plan of tool calls
    exec_resp = await executor_runner.run_debug(json.dumps(workflow), verbose=True)
    exec_obj = parse_json_output(exec_resp)

    action_plan = exec_obj.get("plan", [])
    summary = exec_obj.get("summary", "")

    # Step 4 ‚Äî Execute the plan (A2A)
    action_results = await execute_action_plan(action_plan)

    # Step 5 ‚Äî Local simulation
    simulation = simulate_workflow_execution(workflow)

    return {
        "mode": "automation",
        "workflow": workflow,
        "evaluation": evaluation,
        "action_plan": action_plan,
        "action_results": action_results,
        "simulation": simulation,
        "summary": summary,
    }

print("‚úÖ run_flowgenie orchestrator ready.")



In [None]:
# %% [code]
# 16. Built-in Tools Example: Code Execution Agent (for Kaggle spec)

code_exec_agent = LlmAgent(
    name="python_executor_agent",
    model=MODEL_NAME,
    instruction=(
        "You are a code execution agent. "
        "When the user gives a programming task or math expression, "
        "generate Python code and run it using the built-in code executor. "
        "Return only the final result (no markdown)."
    ),
    description="Agent that runs Python code through BuiltInCodeExecutor",
    code_executor=BuiltInCodeExecutor(),
)

async def setup_code_runner():
    code_session_service = InMemorySessionService()
    await code_session_service.create_session(
        app_name="code_exec_app",
        user_id=USER_ID,
        session_id="code_sess_001",
        state={},
    )
    runner = Runner(
        agent=code_exec_agent,
        app_name="code_exec_app",
        session_service=code_session_service,
    )
    return runner

async def ask_code_agent(query: str):
    runner = await setup_code_runner()
    content = types.Content(role="user", parts=[types.Part(text=query)])

    async for event in runner.run_async(
        user_id=USER_ID,
        session_id="code_sess_001",
        new_message=content,
    ):
        # show any code / result for debugging
        if event.content and event.content.parts:
            for part in event.content.parts:
                if part.executable_code:
                    print("\n--- CODE ---")
                    print(part.executable_code.code)
                if part.code_execution_result:
                    print("\n--- EXECUTION OUTPUT ---")
                    print(part.code_execution_result.output)

        if event.is_final_response():
            if event.content and event.content.parts:
                for part in event.content.parts:
                    if part.text:
                        print("\nFinal Response:", part.text)
                        return

    print("‚ö†Ô∏è No usable response received from code agent.")

print("‚úÖ Code execution agent ready.")

# Example call (uncomment to test):
# await ask_code_agent("What is (8 + 5) * 9?")



In [None]:
# %% [code]
# MCP stub (commented out to avoid runtime errors if MCP is not configured)

"""
from google.adk.tools.mcp_tool import McpToolset

mcp_tools = McpToolset.from_config(
    name="crm_mcp",
    description="Internal CRM ticketing MCP toolset",
    config_path="mcp/enterprise_crm.json",
)

mcp_agent = Agent(
    name="mcp_agent",
    model=MODEL_NAME,
    instruction="Use MCP tools to interact with the internal CRM and summarize results.",
    tools=[mcp_tools],
)

mcp_runner = Runner(
    agent=mcp_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
)
"""

print("üìå MCP extension documented for future deployment (stub only).")



In [None]:
# %% [code]
# 18. Optional: Google Search Built-in Tool Example (for completeness)

search_agent = Agent(
    name="search_agent",
    model=MODEL_NAME,
    description="Agent that calls Google Search tool and summarizes results.",
    instruction=(
        "When the user asks to search, ALWAYS call google_search with a short query, "
        "then summarize the results briefly."
    ),
    tools=[google_search],
)

search_runner = Runner(
    agent=search_agent,
    app_name=APP_NAME,
    session_service=session_service,
    memory_service=memory_service,
)

print("‚úÖ Search agent with google_search tool ready.")

# Example (uncomment if search credentials are configured):
# async for e in search_runner.run_async(
#     user_id=USER_ID,
#     session_id="search_sess_1",
#     new_message=types.Content(
#         parts=[types.Part(text="Search: Slack webhook JSON schema")],
#         role="user",
#     ),
# ):
#     if e.is_final_response():
#         print(e.content.parts[0].text)