## <b><font color='darkblue'>Custom Agents</font></b>
([source](https://google.github.io/adk-docs/agents/custom-agents/)) <font size='3ptx'><b>Custom agents provide the ultimate flexibility in ADK, allowing you to define arbitrary orchestration logic by inheriting directly from `BaseAgent` and implementing your own control flow.</b> This goes beyond the [predefined patterns of `SequentialAgent`, `LoopAgent`, and `ParallelAgent`](https://github.com/johnklee/ml_articles/blob/master/google/agent_development_kit/Workflow_Agent.ipynb), enabling you to build highly specific and complex agentic workflows.</font>

In [1]:
from IPython.display import display, Markdown, Latex

def show_source_code(src_path: str):
    source_code = !cat $src_path
    source_code = [line.replace('```python', "'''python") for line in source_code]
    source_code = [line.replace('```', "'''") for line in source_code]
    display(Markdown(f"""
```python
{'\n'.join(source_code)}
```"""))

### <b><font color='darkgreen'>Introduction: Beyond Predefined Workflows</font></b>

#### <b><font size='3ptx'>What is a Custom Agent?</font></b>
<font size='3ptx'><b>A Custom Agent is essentially any class you create that inherits from <font color='blue'>google.adk.agents.BaseAgent</font> and implements its core execution logic within the `_run_async_impl` asynchronous method.</font> You have complete control over how this method calls other agents (sub-agents), manages state, and handles events.

#### <b>Why Use Them?</b>
While the standard Workflow Agents ([SequentialAgent](https://google.github.io/adk-docs/agents/workflow-agents/sequential-agents/), [LoopAgent](https://google.github.io/adk-docs/agents/workflow-agents/loop-agents/), [ParallelAgent](https://google.github.io/adk-docs/agents/workflow-agents/parallel-agents/)) cover common orchestration patterns, <b><font size='3ptx'>you'll need a Custom agent when your requirements include</font></b>:
* <b><font size='3ptx'>Conditional Logic</font></b>: Executing different sub-agents or taking different paths based on runtime conditions or the results of previous steps.
* <b><font size='3ptx'>Complex State Management</font></b>: Implementing intricate logic for maintaining and updating state throughout the workflow beyond simple sequential passing.
* <b><font size='3ptx'>External Integrations</font></b>: Incorporating calls to external APIs, databases, or custom libraries directly within the orchestration flow control.
* <b><font size='3ptx'>Dynamic Agent Selection</font></b>: Choosing which sub-agent(s) to run next based on dynamic evaluation of the situation or input.
* <b><font size='3ptx'>Unique Workflow Patterns</font></b>: Implementing orchestration logic that doesn't fit the standard sequential, parallel, or loop structures.

![flow](https://google.github.io/adk-docs/assets/custom-agent-flow.png)

### <b><font color='darkgreen'>Implementing Custom Logic:</font></b>
<font size='3ptx'><b>The core of any custom agent is the method where you define its unique asynchronous behavior</b>. This method allows you to orchestrate sub-agents and manage the flow of execution.</font>

The heart of any custom agent is the `_run_async_impl` method. This is where you define its unique behavior.
* <b><font size='3ptx'>Signature</font></b>: `async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]`:
* <b><font size='3ptx'>Asynchronous Generator</font></b>: It must be an `async def` function and return an `AsyncGenerator`. This allows it to yield events produced by sub-agents or its own logic back to the runner.
* <b><font size='3ptx'>`ctx` (InvocationContext)</font></b>: Provides access to crucial runtime information, most importantly `ctx.session.state`, which is the primary way to share data between steps orchestrated by your custom agent.

#### <b><font size='3ptx'>Key Capabilities within the Core Asynchronous Method:</font></b>

1. <font size='3ptx'><b>Calling Sub-Agents:</b></font> You invoke sub-agents (<font color='brown'>which are typically stored as instance attributes like `self.my_llm_agent`</font>) using their `run_async` method and yield their events:

```python
async for event in self.some_sub_agent.run_async(ctx):
    # Optionally inspect or log the event
    yield event # Pass the event up
```

2. <b><font size='3ptx'>Managing State</font></b>: Read from and write to the session state dictionary (`ctx.session.state`) to pass data between sub-agent calls or make decisions:

```python
# Read data set by a previous agent
previous_result = ctx.session.state.get("some_key")

# Make a decision based on state
if previous_result == "some_value":
    # ... call a specific sub-agent ...
else:
    # ... call another sub-agent ...

# Store a result for a later step (often done via a sub-agent's output_key)
# ctx.session.state["my_custom_result"] = "calculated_value"
```

3. <b><font size='3ptx'>Implementing Control Flow</font></b>: Use standard Python constructs (<font color='brown'>if/elif/else, for/while loops, try/except</font>) to create sophisticated, conditional, or iterative workflows involving your sub-agents.

### <b><font color='darkgreen'>Managing Sub-Agents and State</font></b>
Typically, a custom agent orchestrates other agents (like [**LlmAgent**](https://google.github.io/adk-docs/agents/llm-agents/), [**LoopAgent**](https://google.github.io/adk-docs/agents/workflow-agents/loop-agents/), etc.).

* <b><font size='3ptx'>Initialization</font></b>: You usually pass instances of these sub-agents into your custom agent's constructor and store them as instance fields/attributes (<font color='brown'>e.g., `this.story_generator = story_generator_instance` or `self.story_generator = story_generator_instance`</font>). This makes them accessible within the custom agent's core asynchronous execution logic (<font color='brown'>such as: `_run_async_impl` method</font>).
* <b><font size='3ptx'>Sub Agents List</font></b>: When initializing the `BaseAgent` using it's `super()` constructor, you should <b>pass a sub agents list. This list tells the ADK framework about the agents that are part of this custom agent's immediate hierarchy</b>. It's important for framework features like lifecycle management, introspection, and potentially future routing capabilities, even if your core execution logic (`_run_async_impl`) calls the agents directly via `self.xxx_agent`. Include the agents that your custom logic directly invokes at the top level.
* <b><font size='3ptx'>State</font></b>: As mentioned, `ctx.session.state` is the standard way sub-agents (<font color='brown'>especially `LlmAgent`s using `output key`</font>) communicate results back to the orchestrator and how the orchestrator passes necessary inputs down.

### <b><font color='darkgreen'>Design Pattern Example: StoryFlowAgent</font></b>
<font size='3ptx'>Let's illustrate the power of custom agents with an example pattern: <b>a multi-stage content generation workflow with conditional logic</b>.</font>

<font size='3ptx'><b>Goal</b></font>: Create a system that generates a story, iteratively refines it through critique and revision, performs final checks, and crucially, regenerates the story if the final tone check fails.

<font size='3ptx'><b>Why Custom?</b></font> The core requirement driving the need for a custom agent here is the <b>conditional regeneration based on the tone check. Standard workflow agents don't have built-in conditional branching based on the outcome of a sub-agent's task</b>. We need custom logic (`if tone == "negative": ...`) within the orchestrator.

#### <b><font size='3ptx'>Part 1: Simplified custom agent Initialization</font></b>
<font size='3ptx'><b>We define the <font color='blue'>StoryFlowAgent</font> inheriting from `BaseAgent`</b>. In `__init__`, we store the necessary sub-agents (passed in) as instance attributes and tell the `BaseAgent` framework about the top-level agents this custom agent will directly orchestrate</font>.

```python
class StoryFlowAgent(BaseAgent):
    """
    Custom agent for a story generation and refinement workflow.

    This agent orchestrates a sequence of LLM agents to generate a story,
    critique it, revise it, check grammar and tone, and potentially
    regenerate the story if the tone is negative.
    """

    # --- Field Declarations for Pydantic ---
    # Declare the agents passed during initialization as class attributes with type hints
    story_generator: LlmAgent
    critic: LlmAgent
    reviser: LlmAgent
    grammar_check: LlmAgent
    tone_check: LlmAgent

    loop_agent: LoopAgent
    sequential_agent: SequentialAgent

    # model_config allows setting Pydantic configurations if needed, e.g., arbitrary_types_allowed
    model_config = {"arbitrary_types_allowed": True}

    def __init__(
        self,
        name: str,
        story_generator: LlmAgent,
        critic: LlmAgent,
        reviser: LlmAgent,
        grammar_check: LlmAgent,
        tone_check: LlmAgent,
    ):
        """
        Initializes the StoryFlowAgent.

        Args:
            name: The name of the agent.
            story_generator: An LlmAgent to generate the initial story.
            critic: An LlmAgent to critique the story.
            reviser: An LlmAgent to revise the story based on criticism.
            grammar_check: An LlmAgent to check the grammar.
            tone_check: An LlmAgent to analyze the tone.
        """
        # Create internal agents *before* calling super().__init__
        loop_agent = LoopAgent(
            name="CriticReviserLoop", sub_agents=[critic, reviser], max_iterations=2
        )
        sequential_agent = SequentialAgent(
            name="PostProcessing", sub_agents=[grammar_check, tone_check]
        )

        # Define the sub_agents list for the framework
        sub_agents_list = [
            story_generator,
            loop_agent,
            sequential_agent,
        ]

        # Pydantic will validate and assign them based on the class annotations.
        super().__init__(
            name=name,
            story_generator=story_generator,
            critic=critic,
            reviser=reviser,
            grammar_check=grammar_check,
            tone_check=tone_check,
            loop_agent=loop_agent,
            sequential_agent=sequential_agent,
            sub_agents=sub_agents_list, # Pass the sub_agents list directly
        )
```        

#### <b><font size='3ptx'>Part 2: Defining the Custom Execution Logic</font></b>
This method orchestrates the sub-agents using standard Python async/await and control flow.

```python
@override
async def _run_async_impl(
    self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
    """
    Implements the custom orchestration logic for the story workflow.
    Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
    """
    logger.info(f"[{self.name}] Starting story generation workflow.")

    # 1. Initial Story Generation
    logger.info(f"[{self.name}] Running StoryGenerator...")
    async for event in self.story_generator.run_async(ctx):
        logger.info(f"[{self.name}] Event from StoryGenerator: {event.model_dump_json(indent=2, exclude_none=True)}")
        yield event

    # Check if story was generated before proceeding
    if "current_story" not in ctx.session.state or not ctx.session.state["current_story"]:
         logger.error(f"[{self.name}] Failed to generate initial story. Aborting workflow.")
         return # Stop processing if initial story failed

    logger.info(f"[{self.name}] Story state after generator: {ctx.session.state.get('current_story')}")


    # 2. Critic-Reviser Loop
    logger.info(f"[{self.name}] Running CriticReviserLoop...")
    # Use the loop_agent instance attribute assigned during init
    async for event in self.loop_agent.run_async(ctx):
        logger.info(f"[{self.name}] Event from CriticReviserLoop: {event.model_dump_json(indent=2, exclude_none=True)}")
        yield event

    logger.info(f"[{self.name}] Story state after loop: {ctx.session.state.get('current_story')}")

    # 3. Sequential Post-Processing (Grammar and Tone Check)
    logger.info(f"[{self.name}] Running PostProcessing...")
    # Use the sequential_agent instance attribute assigned during init
    async for event in self.sequential_agent.run_async(ctx):
        logger.info(f"[{self.name}] Event from PostProcessing: {event.model_dump_json(indent=2, exclude_none=True)}")
        yield event

    # 4. Tone-Based Conditional Logic
    tone_check_result = ctx.session.state.get("tone_check_result")
    logger.info(f"[{self.name}] Tone check result: {tone_check_result}")

    if tone_check_result == "negative":
        logger.info(f"[{self.name}] Tone is negative. Regenerating story...")
        async for event in self.story_generator.run_async(ctx):
            logger.info(f"[{self.name}] Event from StoryGenerator (Regen): {event.model_dump_json(indent=2, exclude_none=True)}")
            yield event
    else:
        logger.info(f"[{self.name}] Tone is not negative. Keeping current story.")
        pass

    logger.info(f"[{self.name}] Workflow finished.")
```

Explanation of Logic:
1. The initial `story_generator` runs. Its output is expected to be in `ctx.session.state["current_story"]`.
2. The `loop_agent` runs, which internally calls the critic and reviser sequentially for `max_iterations` times. They read/write `current_story` and criticism from/to the state.
3. The `sequential_agent` runs, calling `grammar_check` then `tone_check`, reading `current_story` and writing `grammar_suggestions` and `tone_check_result` to the state.
4. <b><font size='3ptx'>Custom Part</font></b>: The `if` statement checks the `tone_check_result` from the state. If it's "negative", the `story_generator` is called again, overwriting the `current_story` in the state. Otherwise, the flow ends.

#### <b><font size='3ptx'>Part 3: Defining the LLM Sub-Agents</font></b>
<font size='3ptx'><b>These are standard [LlmAgent](https://google.github.io/adk-docs/agents/llm-agents/) definitions, responsible for specific tasks</b>. Their output key parameter is crucial for placing results into the session.state where other agents or the custom orchestrator can access them</font>.

```python
GEMINI_2_FLASH = "gemini-2.0-flash" # Define model constant
# --- Define the individual LLM agents ---
story_generator = LlmAgent(
    name="StoryGenerator",
    model=GEMINI_2_FLASH,
    instruction="""You are a story writer. Write a short story (around 100 words), on the following topic: {topic}""",
    input_schema=None,
    output_key="current_story",  # Key for storing output in session state
)

critic = LlmAgent(
    name="Critic",
    model=GEMINI_2_FLASH,
    instruction="""You are a story critic. Review the story provided: {{current_story}}. Provide 1-2 sentences of constructive criticism
on how to improve it. Focus on plot or character.""",
    input_schema=None,
    output_key="criticism",  # Key for storing criticism in session state
)

reviser = LlmAgent(
    name="Reviser",
    model=GEMINI_2_FLASH,
    instruction="""You are a story reviser. Revise the story provided: {{current_story}}, based on the criticism in
{{criticism}}. Output only the revised story.""",
    input_schema=None,
    output_key="current_story",  # Overwrites the original story
)

grammar_check = LlmAgent(
    name="GrammarCheck",
    model=GEMINI_2_FLASH,
    instruction="""You are a grammar checker. Check the grammar of the story provided: {current_story}. Output only the suggested
corrections as a list, or output 'Grammar is good!' if there are no errors.""",
    input_schema=None,
    output_key="grammar_suggestions",
)

tone_check = LlmAgent(
    name="ToneCheck",
    model=GEMINI_2_FLASH,
    instruction="""You are a tone analyzer. Analyze the tone of the story provided: {current_story}. Output only one word: 'positive' if
the tone is generally positive, 'negative' if the tone is generally negative, or 'neutral'
otherwise.""",
    input_schema=None,
    output_key="tone_check_result", # This agent's output determines the conditional flow
)
```

#### <b><font size='3ptx'>Part 4: Instantiating and Running the custom agent</font></b>
Finally, you instantiate your <b><font color='blue'>StoryFlowAgent</font></b> and use the `Runner` as usual.

```python
# --- Create the custom agent instance ---
story_flow_agent = StoryFlowAgent(
    name="StoryFlowAgent",
    story_generator=story_generator,
    critic=critic,
    reviser=reviser,
    grammar_check=grammar_check,
    tone_check=tone_check,
)

INITIAL_STATE = {"topic": "a brave kitten exploring a haunted house"}

# --- Setup Runner and Session ---
async def setup_session_and_runner():
    session_service = InMemorySessionService()
    session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID, state=INITIAL_STATE)
    logger.info(f"Initial session state: {session.state}")
    runner = Runner(
        agent=story_flow_agent, # Pass the custom orchestrator agent
        app_name=APP_NAME,
        session_service=session_service
    )
    return session_service, runner

# --- Function to Interact with the Agent ---
async def call_agent_async(user_input_topic: str):
    """
    Sends a new topic to the agent (overwriting the initial one if needed)
    and runs the workflow.
    """

    session_service, runner = await setup_session_and_runner()

    current_session = await session_service.get_session(app_name=APP_NAME, 
                                                  user_id=USER_ID, 
                                                  session_id=SESSION_ID)
    if not current_session:
        logger.error("Session not found!")
        return

    current_session.state["topic"] = user_input_topic
    logger.info(f"Updated session state topic to: {user_input_topic}")

    content = types.Content(role='user', parts=[types.Part(text=f"Generate a story about: {user_input_topic}")])
    events = runner.run_async(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    final_response = "No final response captured."
    async for event in events:
        if event.is_final_response() and event.content and event.content.parts:
            logger.info(f"Potential final response from [{event.author}]: {event.content.parts[0].text}")
            final_response = event.content.parts[0].text

    print("\n--- Agent Interaction Result ---")
    print("Agent Final Response: ", final_response)

    final_session = await session_service.get_session(app_name=APP_NAME, 
                                                user_id=USER_ID, 
                                                session_id=SESSION_ID)
    print("Final Session State:")
    import json
    print(json.dumps(final_session.state, indent=2))
    print("-------------------------------\n")

# --- Run the Agent ---
# Note: In Colab, you can directly use 'await' at the top level.
# If running this code as a standalone Python script, you'll need to use asyncio.run() or manage the event loop.
await call_agent_async("a lonely robot finding a friend in a junkyard")
```

### <b><font color='darkgreen'>Full Code Example</font></b>

In [2]:
show_source_code('custom_agent_ex1.py')


```python
#!/usr/bin/env python
# Full runnable code for the StoryFlowAgent example
# Copyright 2025 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import asyncio
import logging
from typing import AsyncGenerator
from typing_extensions import override

from google.adk.agents import LlmAgent, BaseAgent, LoopAgent, SequentialAgent
from google.adk.agents.invocation_context import InvocationContext
from google.genai import types
from google.adk.sessions import InMemorySessionService
from google.adk.runners import Runner
from google.adk.events import Event
from pydantic import BaseModel, Field


# --- Constants ---
APP_NAME = "story_app"
USER_ID = "12345"
SESSION_ID = "123344"
GEMINI_2_FLASH = "gemini-2.0-flash"

# --- Configure Logging ---
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


# --- Define the individual LLM agents ---
story_generator = LlmAgent(
    name="StoryGenerator",
    model=GEMINI_2_FLASH,
    instruction=(
        "You are a story writer. Write a short story (around 100 words), "
        "on the following topic: {topic}"),
    input_schema=None,
    output_key="current_story",  # Key for storing output in session state
)

critic = LlmAgent(
    name="Critic",
    model=GEMINI_2_FLASH,
    instruction=(
        "You are a story critic. Review the story provided: {{current_story}}. "
        "Provide 1-2 sentences of constructive criticism on how to improve it. "
        "Focus on plot or character."),
    input_schema=None,
    output_key="criticism",  # Key for storing criticism in session state
)

reviser = LlmAgent(
    name="Reviser",
    model=GEMINI_2_FLASH,
    instruction=(
        "You are a story reviser. Revise the story provided: {{current_story}}, "
        "based on the criticism in {{criticism}}. Output only the revised story."),
    input_schema=None,
    output_key="current_story",  # Overwrites the original story
)

grammar_check = LlmAgent(
    name="GrammarCheck",
    model=GEMINI_2_FLASH,
    instruction=(
        "You are a grammar checker. Check the grammar of the story provided: {current_story}. "
        "Output only the suggested corrections as a list, or output 'Grammar is good!' "
        "if there are no errors."),
    input_schema=None,
    output_key="grammar_suggestions",
)

tone_check = LlmAgent(
    name="ToneCheck",
    model=GEMINI_2_FLASH,
    instruction=(
        "You are a tone analyzer. Analyze the tone of the story provided: {current_story}. "
        "Output only one word: 'positive' if the tone is generally positive, "
        "'negative' if the tone is generally negative, or 'neutral' otherwise."),
    input_schema=None,
    output_key="tone_check_result", # This agent's output determines the conditional flow
)

# --- Custom Orchestrator Agent ---
class StoryFlowAgent(BaseAgent):
    """
    Custom agent for a story generation and refinement workflow.

    This agent orchestrates a sequence of LLM agents to generate a story,
    critique it, revise it, check grammar and tone, and potentially
    regenerate the story if the tone is negative.
    """

    # --- Field Declarations for Pydantic ---
    # Declare the agents passed during initialization as class attributes with type hints
    story_generator: LlmAgent
    critic: LlmAgent
    reviser: LlmAgent
    grammar_check: LlmAgent
    tone_check: LlmAgent

    loop_agent: LoopAgent
    sequential_agent: SequentialAgent

    # model_config allows setting Pydantic configurations if needed, e.g., arbitrary_types_allowed
    model_config = {"arbitrary_types_allowed": True}

    def __init__(
        self,
        name: str,
        story_generator: LlmAgent,
        critic: LlmAgent,
        reviser: LlmAgent,
        grammar_check: LlmAgent,
        tone_check: LlmAgent,
    ):
        """
        Initializes the StoryFlowAgent.

        Args:
            name: The name of the agent.
            story_generator: An LlmAgent to generate the initial story.
            critic: An LlmAgent to critique the story.
            reviser: An LlmAgent to revise the story based on criticism.
            grammar_check: An LlmAgent to check the grammar.
            tone_check: An LlmAgent to analyze the tone.
        """
        # Create internal agents *before* calling super().__init__
        loop_agent = LoopAgent(
            name="CriticReviserLoop",
            sub_agents=[critic, reviser], max_iterations=2)
        sequential_agent = SequentialAgent(
            name="PostProcessing",
            sub_agents=[grammar_check, tone_check])

        # Define the sub_agents list for the framework
        sub_agents_list = [
            story_generator,
            loop_agent,
            sequential_agent,
        ]

        # Pydantic will validate and assign them based on the class annotations.
        super().__init__(
            name=name,
            story_generator=story_generator,
            critic=critic,
            reviser=reviser,
            grammar_check=grammar_check,
            tone_check=tone_check,
            loop_agent=loop_agent,
            sequential_agent=sequential_agent,
            sub_agents=sub_agents_list, # Pass the sub_agents list directly
        )

    @override
    async def _run_async_impl(
        self, ctx: InvocationContext
    ) -> AsyncGenerator[Event, None]:
        """
        Implements the custom orchestration logic for the story workflow.
        Uses the instance attributes assigned by Pydantic (e.g., self.story_generator).
        """
        logger.info(f"[{self.name}] Starting story generation workflow.")

        # 1. Initial Story Generation
        logger.info(f"[{self.name}] Running StoryGenerator...")
        async for event in self.story_generator.run_async(ctx):
            logger.info(
                f"[{self.name}] Event from StoryGenerator: "
                f"{event.model_dump_json(indent=2, exclude_none=True)}")
            yield event

        # Check if story was generated before proceeding
        if "current_story" not in ctx.session.state or not ctx.session.state["current_story"]:
             logger.error(
                 f"[{self.name}] Failed to generate initial story. "
                 "Aborting workflow.")
             return # Stop processing if initial story failed

        logger.info(
            f"[{self.name}] Story state after generator: "
            f"{ctx.session.state.get('current_story')}")


        # 2. Critic-Reviser Loop
        logger.info(f"[{self.name}] Running CriticReviserLoop...")
        # Use the loop_agent instance attribute assigned during init
        async for event in self.loop_agent.run_async(ctx):
            logger.info(
                f"[{self.name}] Event from CriticReviserLoop: "
                f"{event.model_dump_json(indent=2, exclude_none=True)}")
            yield event

        logger.info(
            f"[{self.name}] Story state after loop: "
            f"{ctx.session.state.get('current_story')}")

        # 3. Sequential Post-Processing (Grammar and Tone Check)
        logger.info(f"[{self.name}] Running PostProcessing...")
        # Use the sequential_agent instance attribute assigned during init
        async for event in self.sequential_agent.run_async(ctx):
            logger.info(
                f"[{self.name}] Event from PostProcessing: "
                f"{event.model_dump_json(indent=2, exclude_none=True)}")
            yield event

        # 4. Tone-Based Conditional Logic
        tone_check_result = ctx.session.state.get("tone_check_result")
        logger.info(f"[{self.name}] Tone check result: {tone_check_result}")

        if tone_check_result == "negative":
            logger.info(f"[{self.name}] Tone is negative. Regenerating story...")
            async for event in self.story_generator.run_async(ctx):
                logger.info(
                    f"[{self.name}] Event from StoryGenerator (Regen): "
                    f"{event.model_dump_json(indent=2, exclude_none=True)}")
                yield event
        else:
            logger.info(f"[{self.name}] Tone is not negative. Keeping current story.")
            pass

        logger.info(f"[{self.name}] Workflow finished.")


# --- Create the custom agent instance ---
story_flow_agent = StoryFlowAgent(
    name="StoryFlowAgent",
    story_generator=story_generator,
    critic=critic,
    reviser=reviser,
    grammar_check=grammar_check,
    tone_check=tone_check,
)

INITIAL_STATE = {"topic": "a brave kitten exploring a haunted house"}

# --- Setup Runner and Session ---
async def setup_session_and_runner():
    session_service = InMemorySessionService()
    session = await session_service.create_session(app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID, state=INITIAL_STATE)
    logger.info(f"Initial session state: {session.state}")
    runner = Runner(
        agent=story_flow_agent, # Pass the custom orchestrator agent
        app_name=APP_NAME,
        session_service=session_service
    )
    return session_service, runner


# --- Function to Interact with the Agent ---
async def call_agent_async(user_input_topic: str):
    """
    Sends a new topic to the agent (overwriting the initial one if needed)
    and runs the workflow.
    """

    session_service, runner = await setup_session_and_runner()

    current_session = await session_service.get_session(app_name=APP_NAME,
                                                  user_id=USER_ID,
                                                  session_id=SESSION_ID)
    if not current_session:
        logger.error("Session not found!")
        return

    current_session.state["topic"] = user_input_topic
    logger.info(f"Updated session state topic to: {user_input_topic}")

    content = types.Content(role='user', parts=[types.Part(text=f"Generate a story about: {user_input_topic}")])
    events = runner.run_async(user_id=USER_ID, session_id=SESSION_ID, new_message=content)

    final_response = "No final response captured."
    async for event in events:
        if event.is_final_response() and event.content and event.content.parts:
            logger.info(f"Potential final response from [{event.author}]: {event.content.parts[0].text}")
            final_response = event.content.parts[0].text

    print("\n--- Agent Interaction Result ---")
    print("Agent Final Response: ", final_response)

    final_session = await session_service.get_session(app_name=APP_NAME,
                                                user_id=USER_ID,
                                                session_id=SESSION_ID)
    print("Final Session State:")
    import json
    print(json.dumps(final_session.state, indent=2))
    print("-------------------------------\n")


async def main():
  # --- Run the Agent ---
  # Note: In Colab, you can directly use 'await' at the top level.
  # If running this code as a standalone Python script, you'll need to use asyncio.run() or manage the event loop.
  await call_agent_async("a lonely robot finding a friend in a junkyard")


if __name__ == '__main__':
  asyncio.run(main())
```