Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions docs/experimental/tasks-server.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,29 @@ That's it. `enable_tasks()` automatically:
- Registers handlers for `tasks/get`, `tasks/result`, `tasks/list`, `tasks/cancel`
- Updates server capabilities

## Task Visibility

Task IDs generated by `run_task()` embed an opaque marker identifying the session that
created the task, and the default handlers use it to restrict each session to its own
tasks: `tasks/get`, `tasks/result`, and `tasks/cancel` respond with "task not found" for
another session's task, and `tasks/list` returns only the requesting session's tasks. A
client that reconnects gets a new session and can no longer reach tasks it created on the
previous one.

A task ID has no session marker when it was passed to `run_task()` explicitly, when the
task was created directly through the `TaskStore`, or when the server runs in stateless
mode (each request gets a fresh session, so tasks must remain reachable across requests).
Such tasks are accessible to any requestor that presents the exact task ID, and are never
included in `tasks/list` responses because the server cannot tell which session they
belong to. Treat these task IDs as capabilities: generate them with enough entropy that
they cannot be guessed, share them only with the intended recipient, and prefer short
TTLs. Passing an explicit `task_id` to `run_task()` is deprecated for this reason.

To scope tasks to something other than the session — for example a user identity from your
authorization layer — register your own handlers with `@server.experimental.get_task()`,
`@server.experimental.get_task_result()`, `@server.experimental.list_tasks()`, and
`@server.experimental.cancel_task()` instead of relying on the defaults.

## Tool Declaration

Tools declare task support via the `execution.taskSupport` field:
Expand Down
1 change: 1 addition & 0 deletions src/mcp/server/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,5 @@
- mcp.server.experimental.task_support.TaskSupport
- mcp.server.experimental.task_result_handler.TaskResultHandler
- mcp.server.experimental.request_context.Experimental
- mcp.server.experimental.task_scope (session scoping of task IDs)
"""
50 changes: 48 additions & 2 deletions src/mcp/server/experimental/request_context.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@
WARNING: These APIs are experimental and may change without notice.
"""

import warnings
from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from typing import Any
from typing import Any, overload

from typing_extensions import deprecated

from mcp.server.experimental.task_context import ServerTaskContext
from mcp.server.experimental.task_scope import scoped_task_id
from mcp.server.experimental.task_support import TaskSupport
from mcp.server.session import ServerSession
from mcp.shared.exceptions import McpError
Expand All @@ -29,6 +33,14 @@
Tool,
)

EXPLICIT_TASK_ID_DEPRECATION = (
"Passing an explicit task_id to run_task is deprecated. A task created with an "
"explicit ID is not associated with the session that created it: any requestor "
"that presents the ID can read its status and result or cancel it, and it never "
"appears in tasks/list. Omit task_id to let the SDK generate an ID associated "
"with the creating session."
)


@dataclass
class Experimental:
Expand Down Expand Up @@ -143,6 +155,25 @@ def can_use_tool(self, tool_task_mode: TaskExecutionMode | None) -> bool:
return False
return True

@overload
async def run_task(
self,
work: Callable[[ServerTaskContext], Awaitable[Result]],
*,
task_id: None = None,
model_immediate_response: str | None = None,
) -> CreateTaskResult: ...

@overload
@deprecated(EXPLICIT_TASK_ID_DEPRECATION)
async def run_task(
self,
work: Callable[[ServerTaskContext], Awaitable[Result]],
*,
task_id: str,
model_immediate_response: str | None = None,
) -> CreateTaskResult: ...

async def run_task(
self,
work: Callable[[ServerTaskContext], Awaitable[Result]],
Expand All @@ -167,9 +198,17 @@ async def run_task(
When work() returns a Result, the task is auto-completed with that result.
If work() raises an exception, the task is auto-failed.

Generated task IDs embed the session's task scope so that the default
task handlers only serve the task to the session that created it. An
explicitly provided `task_id` is used verbatim and is not associated
with the session, so any session can access it through the default
handlers; passing one is deprecated for that reason.

Args:
work: Async function that does the actual work
task_id: Optional task ID (generated if not provided)
task_id: Deprecated. Optional task ID, used verbatim and not
associated with the creating session. Omit it to let the SDK
generate one.
model_immediate_response: Optional string to include in _meta as
io.modelcontextprotocol/model-immediate-response

Expand All @@ -196,6 +235,8 @@ async def work(task: ServerTaskContext) -> CallToolResult:

WARNING: This API is experimental and may change without notice.
"""
if task_id is not None:
warnings.warn(EXPLICIT_TASK_ID_DEPRECATION, DeprecationWarning, stacklevel=2)
if self._task_support is None:
raise RuntimeError("Task support not enabled. Call server.experimental.enable_tasks() first.")
if self._session is None:
Expand All @@ -210,6 +251,11 @@ async def work(task: ServerTaskContext) -> CallToolResult:
# Access task_group via TaskSupport - raises if not in run() context
task_group = support.task_group

if task_id is None:
session_scope = self._session.experimental.task_session_scope
if session_scope is not None:
task_id = scoped_task_id(session_scope)

task = await support.store.create_task(self.task_metadata, task_id)

task_ctx = ServerTaskContext(
Expand Down
6 changes: 6 additions & 0 deletions src/mcp/server/experimental/session_features.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ class ExperimentalServerSessionFeatures:

def __init__(self, session: "ServerSession") -> None:
self._session = session
# Opaque marker identifying this session for task scoping. Assigned by
# TaskSupport.configure_session(). Task IDs generated by run_task()
# embed it so the default task handlers can restrict task access to
# the session that created the task. None means tasks created on this
# session are not associated with it (e.g. stateless servers).
self.task_session_scope: str | None = None

async def get_task(self, task_id: str) -> types.GetTaskResult:
"""
Expand Down
8 changes: 5 additions & 3 deletions src/mcp/server/experimental/task_result_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ class TaskResultHandler:
4. Blocks until task reaches terminal state
5. Returns the final result

Prefer `server.experimental.enable_tasks()`, whose default tasks/result
handler wraps `handle()` and only serves tasks created by the requesting
session. A custom handler that calls `handle()` directly is responsible
for deciding which requestors may access which tasks.

Usage:
# Create handler with store and queue
handler = TaskResultHandler(task_store, message_queue)
Expand All @@ -55,9 +60,6 @@ class TaskResultHandler:
async def handle_task_result(req: GetTaskPayloadRequest) -> GetTaskPayloadResult:
ctx = server.request_context
return await handler.handle(req, ctx.session, ctx.request_id)

# Or use the convenience method
handler.register(server)
"""

def __init__(
Expand Down
75 changes: 75 additions & 0 deletions src/mcp/server/experimental/task_scope.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
"""
Session scoping for experimental task identifiers.

Task IDs generated by `run_task()` embed an opaque, per-session marker (the
"session scope") so that the default task handlers can tell which session
created a task. The default handlers for tasks/get, tasks/result, tasks/list,
and tasks/cancel only operate on tasks created by the requesting session.

Task IDs without a session scope (explicitly provided IDs, IDs created
directly through a TaskStore, or IDs created in stateless mode) have no known
creator. They can be used with tasks/get, tasks/result, and tasks/cancel from
any session - possession of the ID is what grants access - but they are never
included in tasks/list responses.

WARNING: These APIs are experimental and may change without notice.
"""

import re
from uuid import uuid4

__all__ = [
"new_session_scope",
"scoped_task_id",
"session_scope_of",
"task_in_session_scope",
"task_listable_in_session_scope",
]

# A scoped task ID has the form "<32 hex chars>:<uuid4>". Both halves must
# match exactly so that explicitly chosen task IDs are never mistaken for
# scoped ones. \Z rather than $ so a trailing newline cannot match.
_SCOPED_TASK_ID = re.compile(
r"\A(?P<scope>[0-9a-f]{32}):"
r"[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\Z"
)


def new_session_scope() -> str:
"""Create a new opaque session scope token."""
return uuid4().hex


def scoped_task_id(session_scope: str) -> str:
"""Generate a task ID associated with the given session scope."""
return f"{session_scope}:{uuid4()}"


def session_scope_of(task_id: str) -> str | None:
"""Return the session scope embedded in a task ID, or None if it has none."""
match = _SCOPED_TASK_ID.match(task_id)
return match.group("scope") if match else None


def task_in_session_scope(task_id: str, session_scope: str | None) -> bool:
"""Whether a task may be used by a requestor with the given session scope.

Used by tasks/get, tasks/result, and tasks/cancel. A task whose ID carries
no session scope has no known creator, so possession of the ID is what
grants access to it: it can be used from any session.
"""
embedded = session_scope_of(task_id)
return embedded is None or embedded == session_scope


def task_listable_in_session_scope(task_id: str, session_scope: str | None) -> bool:
"""Whether a task may be included in a tasks/list response for the given session scope.

Used by tasks/list. Listing is stricter than access by ID: a task is only
listed to the session that created it. Tasks with no session scope are
never listed because they have no known creator, and requestors with no
session scope are never shown any tasks because the server cannot tell
them apart.
"""
embedded = session_scope_of(task_id)
return embedded is not None and embedded == session_scope
13 changes: 12 additions & 1 deletion src/mcp/server/experimental/task_support.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from anyio.abc import TaskGroup

from mcp.server.experimental.task_result_handler import TaskResultHandler
from mcp.server.experimental.task_scope import new_session_scope
from mcp.server.session import ServerSession
from mcp.shared.experimental.tasks.in_memory_task_store import InMemoryTaskStore
from mcp.shared.experimental.tasks.message_queue import InMemoryTaskMessageQueue, TaskMessageQueue
Expand Down Expand Up @@ -83,20 +84,30 @@ async def run(self) -> AsyncIterator[None]:
finally:
self._task_group = None

def configure_session(self, session: ServerSession) -> None:
def configure_session(self, session: ServerSession, *, stateless: bool = False) -> None:
"""
Configure a session for task support.

This registers the result handler as a response router so that
responses to queued requests (elicitation, sampling) are routed
back to the waiting resolvers.

It also assigns the session a task session scope. Task IDs generated
by `run_task()` embed this scope, and the default task handlers only
operate on tasks created by the requesting session. Stateless sessions
are not assigned a scope: each request runs on a fresh session, so a
task created by one request could never be retrieved by a later one if
tasks were bound to the session that created them.

Called automatically by Server.run() for each new session.

Args:
session: The session to configure
stateless: Whether the session belongs to a stateless server run
"""
session.add_response_router(self.handler)
if not stateless and session.experimental.task_session_scope is None:
session.experimental.task_session_scope = new_session_scope()

@classmethod
def in_memory(cls) -> "TaskSupport":
Expand Down
60 changes: 56 additions & 4 deletions src/mcp/server/lowlevel/experimental.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING

from mcp.server.experimental.task_scope import task_in_session_scope, task_listable_in_session_scope
from mcp.server.experimental.task_support import TaskSupport
from mcp.server.lowlevel.func_inspection import create_call_wrapper
from mcp.shared.exceptions import McpError
Expand All @@ -31,6 +32,7 @@
ServerResult,
ServerTasksCapability,
ServerTasksRequestsCapability,
Task,
TasksCallCapability,
TasksCancelCapability,
TasksListCapability,
Expand Down Expand Up @@ -125,15 +127,46 @@ def enable_tasks(

return self._task_support

def _requestor_session_scope(self) -> str | None:
"""Return the task session scope of the session making the current request."""
return self._server.request_context.session.experimental.task_session_scope

def _require_task_in_requestor_scope(self, task_id: str) -> None:
"""Reject task IDs that belong to a different session.

Task IDs generated by `run_task()` embed the creating session's
scope. The default handlers treat a task created by another session
exactly like a task that does not exist, so a requestor cannot tell
whether such a task exists. Task IDs without an embedded scope are
accepted from any session.

Raises:
McpError: With INVALID_PARAMS if the task belongs to another session.
"""
if not task_in_session_scope(task_id, self._requestor_session_scope()):
raise McpError(
ErrorData(
code=INVALID_PARAMS,
message=f"Task not found: {task_id}",
)
)

def _register_default_task_handlers(self) -> None:
"""Register default handlers for task operations."""
"""Register default handlers for task operations.

Each default handler only operates on tasks created by the requesting
session (see `_require_task_in_requestor_scope`), and tasks/list only
returns the requesting session's own tasks (see
`task_listable_in_session_scope`).
"""
assert self._task_support is not None
support = self._task_support

# Register get_task handler if not already registered
if GetTaskRequest not in self._request_handlers:

async def _default_get_task(req: GetTaskRequest) -> ServerResult:
self._require_task_in_requestor_scope(req.params.taskId)
task = await support.store.get_task(req.params.taskId)
if task is None:
raise McpError(
Expand All @@ -160,6 +193,7 @@ async def _default_get_task(req: GetTaskRequest) -> ServerResult:
if GetTaskPayloadRequest not in self._request_handlers:

async def _default_get_task_result(req: GetTaskPayloadRequest) -> ServerResult:
self._require_task_in_requestor_scope(req.params.taskId)
ctx = self._server.request_context
result = await support.handler.handle(req, ctx.session, ctx.request_id)
return ServerResult(result)
Expand All @@ -170,16 +204,34 @@ async def _default_get_task_result(req: GetTaskPayloadRequest) -> ServerResult:
if ListTasksRequest not in self._request_handlers:

async def _default_list_tasks(req: ListTasksRequest) -> ServerResult:
cursor = req.params.cursor if req.params else None
tasks, next_cursor = await support.store.list_tasks(cursor)
return ServerResult(ListTasksResult(tasks=tasks, nextCursor=next_cursor))
requestor_scope = self._requestor_session_scope()
if requestor_scope is None:
# The server cannot tell this requestor apart from any
# other, so there are no tasks it can be shown.
return ServerResult(ListTasksResult(tasks=[]))
# Return every task that belongs to the requesting session in
# a single page. The store's pagination cursor is never sent
# to the requestor: it is derived from the unfiltered listing,
# so it could identify a task belonging to a different
# session. For the same reason the request's cursor is not
# forwarded to the store.
own_tasks: list[Task] = []
cursor: str | None = None
while True:
page, cursor = await support.store.list_tasks(cursor)
own_tasks.extend(
task for task in page if task_listable_in_session_scope(task.taskId, requestor_scope)
)
if cursor is None:
return ServerResult(ListTasksResult(tasks=own_tasks))

self._request_handlers[ListTasksRequest] = _default_list_tasks

# Register cancel_task handler if not already registered
if CancelTaskRequest not in self._request_handlers:

async def _default_cancel_task(req: CancelTaskRequest) -> ServerResult:
self._require_task_in_requestor_scope(req.params.taskId)
result = await cancel_task(support.store, req.params.taskId)
return ServerResult(result)

Expand Down
Loading
Loading