Skip to content

Comments

gracefully stop AgentTask and parent agents when session close#4730

Merged
longcw merged 10 commits intomainfrom
longc/graceful-stop-agent-task
Feb 18, 2026
Merged

gracefully stop AgentTask and parent agents when session close#4730
longcw merged 10 commits intomainfrom
longc/graceful-stop-agent-task

Conversation

@longcw
Copy link
Contributor

@longcw longcw commented Feb 6, 2026

  • clean up the old agent's activity when the session is closed during an AgentTask
  • fix deadlock issue when session closes during tool execution but before waiting for AgentTask

Open with Devin

@chenghao-mou chenghao-mou requested a review from a team February 6, 2026 06:14
Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

✅ Devin Review: No Issues Found

Devin Review analyzed this PR and found no potential bugs to report.

View in Devin Review to see 6 additional findings.

Open in Devin Review

Copy link
Contributor

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View 14 additional findings in Devin Review.

Open in Devin Review

Copy link
Member

@chenghao-mou chenghao-mou left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm.

Did three tests: normal task, nested task, and long-running task, all exited normally when disconnected.

Code ```python import asyncio import logging from dataclasses import dataclass

from dotenv import load_dotenv

from livekit.agents import (
Agent,
AgentServer,
AgentSession,
AgentTask,
JobContext,
JobProcess,
MetricsCollectedEvent,
RunContext,
cli,
inference,
metrics,
room_io,
)
from livekit.agents.llm import function_tool
from livekit.plugins import silero
from livekit.plugins.turn_detector.multilingual import MultilingualModel

uncomment to enable Krisp background voice/noise cancellation

from livekit.plugins import noise_cancellation

logger = logging.getLogger("basic-agent")

load_dotenv()

--- AgentTask definitions for testing graceful shutdown ---

@DataClass
class DetailedReport:
topic: str
findings: str

class DeepResearchTask(AgentTask[DetailedReport]):
"""A nested sub-task that takes a while, used to test nested task cancellation."""

def __init__(self, topic: str) -> None:
    super().__init__(
        instructions=(
            f"You are a research specialist investigating '{topic}'. "
            "Ask the user 2-3 follow-up questions to understand what specific aspects "
            "they care about, then call record_findings with your results."
        ),
    )
    self._topic = topic

async def on_enter(self) -> None:
    logger.info(f"DeepResearchTask on_enter: starting research on '{self._topic}'")
    self.session.generate_reply(
        instructions=f"Start by telling the user you're now doing a deep dive on '{self._topic}' "
        "and ask them a specific follow-up question.",
    )

async def on_exit(self) -> None:
    logger.info(f"DeepResearchTask on_exit: '{self._topic}'")

@function_tool()
async def record_findings(self, context: RunContext, findings: str) -> None:
    """Call when you have gathered enough information about the topic.

    Args:
        findings: A summary of findings from the research
    """
    logger.info(f"DeepResearchTask completed: {findings[:80]}...")
    self.complete(DetailedReport(topic=self._topic, findings=findings))

@DataClass
class ResearchResult:
topic: str
summary: str

class ResearchTask(AgentTask[ResearchResult]):
"""A task that can optionally spawn a nested DeepResearchTask.
Useful for testing both single-level and nested task cancellation during shutdown.
"""

def __init__(self, topic: str) -> None:
    super().__init__(
        instructions=(
            f"You are a research assistant investigating '{topic}'. "
            "Have a conversation with the user about this topic. "
            "When you have enough information, call submit_summary. "
            "If the user asks you to go deeper, call go_deeper to start a deep research sub-task."
        ),
    )
    self._topic = topic

async def on_enter(self) -> None:
    logger.info(f"ResearchTask on_enter: '{self._topic}'")
    self.session.generate_reply(
        instructions=f"Tell the user you're starting research on '{self._topic}'. "
        "Ask them what they already know or what angle they'd like to explore.",
    )

async def on_exit(self) -> None:
    logger.info(f"ResearchTask on_exit: '{self._topic}'")

@function_tool()
async def go_deeper(self, context: RunContext) -> DeepResearchTask:
    """Call when the user wants a more thorough investigation of the topic."""
    logger.info(f"ResearchTask: spawning DeepResearchTask for '{self._topic}'")
    deep_task = DeepResearchTask(self._topic)
    result = await deep_task
    logger.info(f"DeepResearchTask returned: {result.findings[:80]}...")
    # incorporate the deep research findings and complete
    self.complete(ResearchResult(topic=self._topic, summary=result.findings))

@function_tool()
async def submit_summary(self, context: RunContext, summary: str) -> None:
    """Call when you have a good summary of the research topic.

    Args:
        summary: The research summary
    """
    logger.info(f"ResearchTask completed: {summary[:80]}...")
    self.complete(ResearchResult(topic=self._topic, summary=summary))

class SlowTask(AgentTask[str]):
"""A task that deliberately takes a long time (sleeps), useful for testing
cancellation when the session closes while the task is blocked.
"""

def __init__(self) -> None:
    super().__init__(
        instructions=(
            "You are processing a long-running operation. "
            "Tell the user it will take about 30 seconds and ask them to wait."
        ),
    )

async def on_enter(self) -> None:
    logger.info("SlowTask on_enter: starting long operation")
    self.session.generate_reply(
        instructions="Tell the user you're starting a long operation that takes about 30 seconds."
    )

async def on_exit(self) -> None:
    logger.info("SlowTask on_exit")

@function_tool()
async def start_processing(self, context: RunContext) -> str:
    """Call to begin the long-running processing operation."""
    logger.info("SlowTask: sleeping for 30s to simulate long work")
    try:
        await asyncio.sleep(30)
        self.complete("processing finished successfully")
    except asyncio.CancelledError:
        logger.info("SlowTask: sleep was cancelled")
        raise
    return "processing started"

--- Main agent ---

class MyAgent(Agent):
def init(self) -> None:
super().init(
instructions="Your name is Kelly. You would interact with users via voice."
"with that in mind keep your responses concise and to the point."
"do not use emojis, asterisks, markdown, or other special characters in your responses."
"You are curious and friendly, and have a sense of humor."
"you will speak english to the user."
"\n\nYou have three special tools available:"
"\n1. research_topic - starts a research task (can also go deeper with a nested sub-task)"
"\n2. start_slow_task - starts a deliberately slow task (good for testing cancellation)"
"\n3. lookup_weather - looks up weather"
"\nWhen the user asks to research something, use research_topic."
"\nWhen the user asks you to do something slow or test cancellation, use start_slow_task.",
)

async def on_enter(self):
    logger.info("MyAgent on_enter")
    self.session.generate_reply(allow_interruptions=False)

async def on_exit(self):
    logger.info("MyAgent on_exit")

@function_tool
async def lookup_weather(
    self, context: RunContext, location: str, latitude: str, longitude: str
):
    """Called when the user asks for weather related information.

    Args:
        location: The location they are asking for
        latitude: The latitude of the location, do not ask user for it
        longitude: The longitude of the location, do not ask user for it
    """
    logger.info(f"Looking up weather for {location}")
    return "sunny with a temperature of 70 degrees."

@function_tool()
async def research_topic(self, context: RunContext, topic: str) -> ResearchTask:
    """Start a research task on a given topic. Use this when the user asks to
    research or investigate something.

    Args:
        topic: The topic to research
    """
    logger.info(f"MyAgent: starting ResearchTask for '{topic}'")
    task = ResearchTask(topic)
    result = await task
    logger.info(f"ResearchTask returned: topic={result.topic}, summary={result.summary[:80]}...")
    return task

@function_tool()
async def start_slow_task(self, context: RunContext) -> SlowTask:
    """Start a deliberately slow task. Use when the user asks to test
    cancellation or do something slow."""
    logger.info("MyAgent: starting SlowTask")
    task = SlowTask()
    result = await task
    logger.info(f"SlowTask returned: {result}")
    return task

server = AgentServer()

def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()

server.setup_fnc = prewarm

@server.rtc_session()
async def entrypoint(ctx: JobContext):
# each log entry will include these fields
ctx.log_context_fields = {
"room": ctx.room.name,
}
session = AgentSession(
stt=inference.STT("deepgram/nova-3", language="multi"),
llm=inference.LLM("openai/gpt-4.1-mini"),
tts=inference.TTS("cartesia/sonic-3", voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc"),
turn_detection=MultilingualModel(),
vad=ctx.proc.userdata["vad"],
preemptive_generation=True,
resume_false_interruption=True,
false_interruption_timeout=1.0,
)

# log metrics as they are emitted, and total usage after session is over
usage_collector = metrics.UsageCollector()

@session.on("metrics_collected")
def _on_metrics_collected(ev: MetricsCollectedEvent):
    metrics.log_metrics(ev.metrics)
    usage_collector.collect(ev.metrics)

async def log_usage():
    summary = usage_collector.get_summary()
    logger.info(f"Usage: {summary}")

# shutdown callbacks are triggered when the session is over
ctx.add_shutdown_callback(log_usage)

await session.start(
    agent=MyAgent(),
    room=ctx.room,
    room_options=room_io.RoomOptions(
        audio_input=room_io.AudioInputOptions(
            # uncomment to enable the Krisp BVC noise cancellation
            # noise_cancellation=noise_cancellation.BVC(),
        ),
    ),
)

if name == "main":
cli.run_app(server)

</details>

logger.warning(
f"session is closing, skipping {new_activity} activity of {self._next_activity.agent.id}",
)
self._next_activity = None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we drain and close the new next activity first?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when new_activity is start, the next activity is created self._next_activity = AgentActivity(agent, self) but not started, so no need to close it?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine I guess, unless we later change the AgentActivity init to consume some resources.

@longcw longcw merged commit d3e4279 into main Feb 18, 2026
50 of 54 checks passed
@longcw longcw deleted the longc/graceful-stop-agent-task branch February 18, 2026 15:14
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants