From a64cee720895c6632908313699fe0194f5772920 Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Wed, 26 Nov 2025 14:16:28 +0000 Subject: [PATCH 1/2] feat: examples for interactive cases --- .../client/simpleTaskInteractiveClient.ts | 204 +++++ .../server/README-simpleTaskInteractive.md | 161 ++++ src/examples/server/simpleTaskInteractive.ts | 744 ++++++++++++++++++ 3 files changed, 1109 insertions(+) create mode 100644 src/examples/client/simpleTaskInteractiveClient.ts create mode 100644 src/examples/server/README-simpleTaskInteractive.md create mode 100644 src/examples/server/simpleTaskInteractive.ts diff --git a/src/examples/client/simpleTaskInteractiveClient.ts b/src/examples/client/simpleTaskInteractiveClient.ts new file mode 100644 index 000000000..06ed0ead1 --- /dev/null +++ b/src/examples/client/simpleTaskInteractiveClient.ts @@ -0,0 +1,204 @@ +/** + * Simple interactive task client demonstrating elicitation and sampling responses. + * + * This client connects to simpleTaskInteractive.ts server and demonstrates: + * - Handling elicitation requests (y/n confirmation) + * - Handling sampling requests (returns a hardcoded haiku) + * - Using task-based tool execution with streaming + */ + +import { Client } from '../../client/index.js'; +import { StreamableHTTPClientTransport } from '../../client/streamableHttp.js'; +import { createInterface } from 'node:readline'; +import { + CallToolResultSchema, + TextContent, + ElicitRequestSchema, + CreateMessageRequestSchema, + CreateMessageRequest, + CreateMessageResult, + ErrorCode, + McpError +} from '../../types.js'; + +// Create readline interface for user input +const readline = createInterface({ + input: process.stdin, + output: process.stdout +}); + +function question(prompt: string): Promise { + return new Promise(resolve => { + readline.question(prompt, answer => { + resolve(answer.trim()); + }); + }); +} + +function getTextContent(result: { content: Array<{ type: string; text?: string }> }): string { + const textContent = result.content.find((c): c is TextContent => c.type === 'text'); + return textContent?.text ?? '(no text)'; +} + +async function elicitationCallback(params: { + mode?: string; + message: string; + requestedSchema?: object; +}): Promise<{ action: string; content?: Record }> { + console.log(`\n[Elicitation] Server asks: ${params.message}`); + + // Simple terminal prompt for y/n + const response = await question('Your response (y/n): '); + const confirmed = ['y', 'yes', 'true', '1'].includes(response.toLowerCase()); + + console.log(`[Elicitation] Responding with: confirm=${confirmed}`); + return { action: 'accept', content: { confirm: confirmed } }; +} + +async function samplingCallback(params: CreateMessageRequest['params']): Promise { + // Get the prompt from the first message + let prompt = 'unknown'; + if (params.messages && params.messages.length > 0) { + const firstMessage = params.messages[0]; + const content = firstMessage.content; + if (typeof content === 'object' && !Array.isArray(content) && content.type === 'text' && 'text' in content) { + prompt = content.text; + } else if (Array.isArray(content)) { + const textPart = content.find(c => c.type === 'text' && 'text' in c); + if (textPart && 'text' in textPart) { + prompt = textPart.text; + } + } + } + + console.log(`\n[Sampling] Server requests LLM completion for: ${prompt}`); + + // Return a hardcoded haiku (in real use, call your LLM here) + const haiku = `Cherry blossoms fall +Softly on the quiet pond +Spring whispers goodbye`; + + console.log('[Sampling] Responding with haiku'); + return { + model: 'mock-haiku-model', + role: 'assistant', + content: { type: 'text', text: haiku } + }; +} + +async function run(url: string): Promise { + console.log('Simple Task Interactive Client'); + console.log('=============================='); + console.log(`Connecting to ${url}...`); + + // Create client with elicitation and sampling capabilities + const client = new Client( + { name: 'simple-task-interactive-client', version: '1.0.0' }, + { + capabilities: { + elicitation: { form: {} }, + sampling: {} + } + } + ); + + // Set up elicitation request handler + client.setRequestHandler(ElicitRequestSchema, async request => { + if (request.params.mode && request.params.mode !== 'form') { + throw new McpError(ErrorCode.InvalidParams, `Unsupported elicitation mode: ${request.params.mode}`); + } + return elicitationCallback(request.params); + }); + + // Set up sampling request handler + client.setRequestHandler(CreateMessageRequestSchema, async request => { + return samplingCallback(request.params) as unknown as ReturnType; + }); + + // Connect to server + const transport = new StreamableHTTPClientTransport(new URL(url)); + await client.connect(transport); + console.log('Connected!\n'); + + // List tools + const toolsResult = await client.listTools(); + console.log(`Available tools: ${toolsResult.tools.map(t => t.name).join(', ')}`); + + // Demo 1: Elicitation (confirm_delete) + console.log('\n--- Demo 1: Elicitation ---'); + console.log('Calling confirm_delete tool...'); + + const confirmStream = client.experimental.tasks.callToolStream( + { name: 'confirm_delete', arguments: { filename: 'important.txt' } }, + CallToolResultSchema, + { task: { ttl: 60000 } } + ); + + for await (const message of confirmStream) { + switch (message.type) { + case 'taskCreated': + console.log(`Task created: ${message.task.taskId}`); + break; + case 'taskStatus': + console.log(`Task status: ${message.task.status}`); + break; + case 'result': + console.log(`Result: ${getTextContent(message.result)}`); + break; + case 'error': + console.error(`Error: ${message.error}`); + break; + } + } + + // Demo 2: Sampling (write_haiku) + console.log('\n--- Demo 2: Sampling ---'); + console.log('Calling write_haiku tool...'); + + const haikuStream = client.experimental.tasks.callToolStream( + { name: 'write_haiku', arguments: { topic: 'autumn leaves' } }, + CallToolResultSchema, + { + task: { ttl: 60000 } + } + ); + + for await (const message of haikuStream) { + switch (message.type) { + case 'taskCreated': + console.log(`Task created: ${message.task.taskId}`); + break; + case 'taskStatus': + console.log(`Task status: ${message.task.status}`); + break; + case 'result': + console.log(`Result:\n${getTextContent(message.result)}`); + break; + case 'error': + console.error(`Error: ${message.error}`); + break; + } + } + + // Cleanup + console.log('\nDemo complete. Closing connection...'); + await transport.close(); + readline.close(); +} + +// Parse command line arguments +const args = process.argv.slice(2); +let url = 'http://localhost:8000/mcp'; + +for (let i = 0; i < args.length; i++) { + if (args[i] === '--url' && args[i + 1]) { + url = args[i + 1]; + i++; + } +} + +// Run the client +run(url).catch(error => { + console.error('Error running client:', error); + process.exit(1); +}); diff --git a/src/examples/server/README-simpleTaskInteractive.md b/src/examples/server/README-simpleTaskInteractive.md new file mode 100644 index 000000000..6e8cd345b --- /dev/null +++ b/src/examples/server/README-simpleTaskInteractive.md @@ -0,0 +1,161 @@ +# Simple Task Interactive Example + +This example demonstrates the MCP Tasks message queue pattern with interactive server-to-client requests (elicitation and sampling). + +## Overview + +The example consists of two components: + +1. **Server** (`simpleTaskInteractive.ts`) - Exposes two task-based tools that require client interaction: + - `confirm_delete` - Uses elicitation to ask the user for confirmation before "deleting" a file + - `write_haiku` - Uses sampling to request an LLM to generate a haiku on a topic + +2. **Client** (`simpleTaskInteractiveClient.ts`) - Connects to the server and handles: + - Elicitation requests with simple y/n terminal prompts + - Sampling requests with a mock haiku generator + +## Key Concepts + +### Task-Based Execution + +Both tools use `execution.taskSupport: 'required'`, meaning they follow the "call-now, fetch-later" pattern: + +1. Client calls tool with `task: { ttl: 60000 }` parameter +2. Server creates a task and returns `CreateTaskResult` immediately +3. Client polls via `tasks/result` to get the final result +4. Server sends elicitation/sampling requests through the task message queue +5. Client handles requests and returns responses +6. Server completes the task with the final result + +### Message Queue Pattern + +When a tool needs to interact with the client (elicitation or sampling), it: + +1. Updates task status to `input_required` +2. Enqueues the request in the task message queue +3. Waits for the response via a Resolver +4. Updates task status back to `working` +5. Continues processing + +The `TaskResultHandler` dequeues messages when the client calls `tasks/result` and routes responses back to waiting Resolvers. + +## Running the Example + +### Start the Server + +```bash +# From the SDK root directory +npx tsx src/examples/server/simpleTaskInteractive.ts + +# Or with a custom port +PORT=9000 npx tsx src/examples/server/simpleTaskInteractive.ts +``` + +The server will start on http://localhost:8000/mcp (or your custom port). + +### Run the Client + +```bash +# From the SDK root directory +npx tsx src/examples/client/simpleTaskInteractiveClient.ts + +# Or connect to a different server +npx tsx src/examples/client/simpleTaskInteractiveClient.ts --url http://localhost:9000/mcp +``` + +## Expected Output + +### Server Output + +``` +Starting server on http://localhost:8000/mcp + +Available tools: + - confirm_delete: Demonstrates elicitation (asks user y/n) + - write_haiku: Demonstrates sampling (requests LLM completion) + +[Server] confirm_delete called, task created: task-abc123 +[Server] confirm_delete: asking about 'important.txt' +[Server] Sending elicitation request to client... +[Server] tasks/result called for task task-abc123 +[Server] Delivering queued request message for task task-abc123 +[Server] Received elicitation response: action=accept, content={"confirm":true} +[Server] Completing task with result: Deleted 'important.txt' + +[Server] write_haiku called, task created: task-def456 +[Server] write_haiku: topic 'autumn leaves' +[Server] Sending sampling request to client... +[Server] tasks/result called for task task-def456 +[Server] Delivering queued request message for task task-def456 +[Server] Received sampling response: Cherry blossoms fall... +[Server] Completing task with haiku +``` + +### Client Output + +``` +Simple Task Interactive Client +============================== +Connecting to http://localhost:8000/mcp... +Connected! + +Available tools: confirm_delete, write_haiku + +--- Demo 1: Elicitation --- +Calling confirm_delete tool... +Task created: task-abc123 +Task status: working + +[Elicitation] Server asks: Are you sure you want to delete 'important.txt'? +Your response (y/n): y +[Elicitation] Responding with: confirm=true +Task status: input_required +Task status: completed +Result: Deleted 'important.txt' + +--- Demo 2: Sampling --- +Calling write_haiku tool... +Task created: task-def456 +Task status: working + +[Sampling] Server requests LLM completion for: Write a haiku about autumn leaves +[Sampling] Responding with haiku +Task status: input_required +Task status: completed +Result: +Haiku: +Cherry blossoms fall +Softly on the quiet pond +Spring whispers goodbye + +Demo complete. Closing connection... +``` + +## Implementation Details + +### Server Components + +- **Resolver**: Promise-like class for passing results between async operations +- **TaskMessageQueueWithResolvers**: Extended message queue that tracks pending requests with their Resolvers +- **TaskStoreWithNotifications**: Extended task store with notification support for status changes +- **TaskResultHandler**: Handles `tasks/result` requests by dequeuing messages and routing responses +- **TaskSession**: Wraps the server to enqueue requests during task execution + +### Client Capabilities + +The client declares these capabilities during initialization: + +```typescript +capabilities: { + elicitation: { form: {} }, + sampling: {} +} +``` + +This tells the server that the client can handle both form-based elicitation and sampling requests. + +## Related Files + +- `src/shared/task.ts` - Core task interfaces (TaskStore, TaskMessageQueue) +- `src/examples/shared/inMemoryTaskStore.ts` - In-memory implementations +- `src/types.ts` - Task-related types (Task, CreateTaskResult, GetTaskRequestSchema, etc.) diff --git a/src/examples/server/simpleTaskInteractive.ts b/src/examples/server/simpleTaskInteractive.ts new file mode 100644 index 000000000..3e0e90646 --- /dev/null +++ b/src/examples/server/simpleTaskInteractive.ts @@ -0,0 +1,744 @@ +/** + * Simple interactive task server demonstrating elicitation and sampling. + * + * This server demonstrates the task message queue pattern from the MCP Tasks spec: + * - confirm_delete: Uses elicitation to ask the user for confirmation + * - write_haiku: Uses sampling to request an LLM to generate content + * + * Both tools use the "call-now, fetch-later" pattern where the initial call + * creates a task, and the result is fetched via tasks/result endpoint. + */ + +import express, { Request, Response } from 'express'; +import { randomUUID } from 'node:crypto'; +import { Server } from '../../server/index.js'; +import { StreamableHTTPServerTransport } from '../../server/streamableHttp.js'; +import { + CallToolResult, + CreateTaskResult, + GetTaskResult, + Tool, + TextContent, + RELATED_TASK_META_KEY, + Task, + Result, + RequestId, + JSONRPCRequest, + SamplingMessage, + ElicitRequestFormParams, + CreateMessageRequest, + ElicitResult, + CreateMessageResult, + PrimitiveSchemaDefinition, + ListToolsRequestSchema, + CallToolRequestSchema, + GetTaskRequestSchema, + GetTaskPayloadRequestSchema +} from '../../types.js'; +import { TaskMessageQueue, QueuedMessage, QueuedRequest, isTerminal, CreateTaskOptions } from '../../experimental/tasks/interfaces.js'; +import { InMemoryTaskStore } from '../../experimental/tasks/stores/in-memory.js'; + +// ============================================================================ +// Resolver - Promise-like for passing results between async operations +// ============================================================================ + +class Resolver { + private _resolve!: (value: T) => void; + private _reject!: (error: Error) => void; + private _promise: Promise; + private _done = false; + + constructor() { + this._promise = new Promise((resolve, reject) => { + this._resolve = resolve; + this._reject = reject; + }); + } + + setResult(value: T): void { + if (this._done) return; + this._done = true; + this._resolve(value); + } + + setException(error: Error): void { + if (this._done) return; + this._done = true; + this._reject(error); + } + + wait(): Promise { + return this._promise; + } + + done(): boolean { + return this._done; + } +} + +// ============================================================================ +// Extended message queue with resolver support and wait functionality +// ============================================================================ + +interface QueuedRequestWithResolver extends QueuedRequest { + resolver?: Resolver>; + originalRequestId?: RequestId; +} + +type QueuedMessageWithResolver = QueuedRequestWithResolver | QueuedMessage; + +class TaskMessageQueueWithResolvers implements TaskMessageQueue { + private queues = new Map(); + private waitResolvers = new Map void)[]>(); + + private getQueue(taskId: string): QueuedMessageWithResolver[] { + let queue = this.queues.get(taskId); + if (!queue) { + queue = []; + this.queues.set(taskId, queue); + } + return queue; + } + + async enqueue(taskId: string, message: QueuedMessage, _sessionId?: string, maxSize?: number): Promise { + const queue = this.getQueue(taskId); + if (maxSize !== undefined && queue.length >= maxSize) { + throw new Error(`Task message queue overflow: queue size (${queue.length}) exceeds maximum (${maxSize})`); + } + queue.push(message); + // Notify any waiters + this.notifyWaiters(taskId); + } + + async enqueueWithResolver( + taskId: string, + message: JSONRPCRequest, + resolver: Resolver>, + originalRequestId: RequestId + ): Promise { + const queue = this.getQueue(taskId); + const queuedMessage: QueuedRequestWithResolver = { + type: 'request', + message, + timestamp: Date.now(), + resolver, + originalRequestId + }; + queue.push(queuedMessage); + this.notifyWaiters(taskId); + } + + async dequeue(taskId: string, _sessionId?: string): Promise { + const queue = this.getQueue(taskId); + return queue.shift(); + } + + async dequeueAll(taskId: string, _sessionId?: string): Promise { + const queue = this.queues.get(taskId) ?? []; + this.queues.delete(taskId); + return queue; + } + + async waitForMessage(taskId: string): Promise { + // Check if there are already messages + const queue = this.getQueue(taskId); + if (queue.length > 0) return; + + // Wait for a message to be added + return new Promise(resolve => { + let waiters = this.waitResolvers.get(taskId); + if (!waiters) { + waiters = []; + this.waitResolvers.set(taskId, waiters); + } + waiters.push(resolve); + }); + } + + private notifyWaiters(taskId: string): void { + const waiters = this.waitResolvers.get(taskId); + if (waiters) { + this.waitResolvers.delete(taskId); + for (const resolve of waiters) { + resolve(); + } + } + } + + cleanup(): void { + this.queues.clear(); + this.waitResolvers.clear(); + } +} + +// ============================================================================ +// Extended task store with wait functionality +// ============================================================================ + +class TaskStoreWithNotifications extends InMemoryTaskStore { + private updateResolvers = new Map void)[]>(); + + async updateTaskStatus(taskId: string, status: Task['status'], statusMessage?: string, sessionId?: string): Promise { + await super.updateTaskStatus(taskId, status, statusMessage, sessionId); + this.notifyUpdate(taskId); + } + + async storeTaskResult(taskId: string, status: 'completed' | 'failed', result: Result, sessionId?: string): Promise { + await super.storeTaskResult(taskId, status, result, sessionId); + this.notifyUpdate(taskId); + } + + async waitForUpdate(taskId: string): Promise { + return new Promise(resolve => { + let waiters = this.updateResolvers.get(taskId); + if (!waiters) { + waiters = []; + this.updateResolvers.set(taskId, waiters); + } + waiters.push(resolve); + }); + } + + private notifyUpdate(taskId: string): void { + const waiters = this.updateResolvers.get(taskId); + if (waiters) { + this.updateResolvers.delete(taskId); + for (const resolve of waiters) { + resolve(); + } + } + } +} + +// ============================================================================ +// Task Result Handler - delivers queued messages and routes responses +// ============================================================================ + +class TaskResultHandler { + private pendingRequests = new Map>>(); + + constructor( + private store: TaskStoreWithNotifications, + private queue: TaskMessageQueueWithResolvers + ) {} + + async handle(taskId: string, server: Server, _sessionId: string): Promise { + while (true) { + // Get fresh task state + const task = await this.store.getTask(taskId); + if (!task) { + throw new Error(`Task not found: ${taskId}`); + } + + // Dequeue and send all pending messages + await this.deliverQueuedMessages(taskId, server, _sessionId); + + // If task is terminal, return result + if (isTerminal(task.status)) { + const result = await this.store.getTaskResult(taskId); + // Add related-task metadata per spec + return { + ...result, + _meta: { + ...(result._meta || {}), + [RELATED_TASK_META_KEY]: { taskId } + } + }; + } + + // Wait for task update or new message + await this.waitForUpdate(taskId); + } + } + + private async deliverQueuedMessages(taskId: string, server: Server, _sessionId: string): Promise { + while (true) { + const message = await this.queue.dequeue(taskId); + if (!message) break; + + console.log(`[Server] Delivering queued ${message.type} message for task ${taskId}`); + + if (message.type === 'request') { + const reqMessage = message as QueuedRequestWithResolver; + // Send the request via the server + // Store the resolver so we can route the response back + if (reqMessage.resolver && reqMessage.originalRequestId) { + this.pendingRequests.set(reqMessage.originalRequestId, reqMessage.resolver); + } + + // Send the message - for elicitation/sampling, we use the server's methods + // But since we're in tasks/result context, we need to send via transport + // This is simplified - in production you'd use proper message routing + try { + const request = reqMessage.message; + let response: ElicitResult | CreateMessageResult; + + if (request.method === 'elicitation/create') { + // Send elicitation request to client + const params = request.params as ElicitRequestFormParams; + response = await server.elicitInput(params); + } else if (request.method === 'sampling/createMessage') { + // Send sampling request to client + const params = request.params as CreateMessageRequest['params']; + response = await server.createMessage(params); + } else { + throw new Error(`Unknown request method: ${request.method}`); + } + + // Route response back to resolver + if (reqMessage.resolver) { + reqMessage.resolver.setResult(response as unknown as Record); + } + } catch (error) { + if (reqMessage.resolver) { + reqMessage.resolver.setException(error instanceof Error ? error : new Error(String(error))); + } + } + } + // For notifications, we'd send them too but this example focuses on requests + } + } + + private async waitForUpdate(taskId: string): Promise { + // Race between store update and queue message + await Promise.race([this.store.waitForUpdate(taskId), this.queue.waitForMessage(taskId)]); + } + + routeResponse(requestId: RequestId, response: Record): boolean { + const resolver = this.pendingRequests.get(requestId); + if (resolver && !resolver.done()) { + this.pendingRequests.delete(requestId); + resolver.setResult(response); + return true; + } + return false; + } + + routeError(requestId: RequestId, error: Error): boolean { + const resolver = this.pendingRequests.get(requestId); + if (resolver && !resolver.done()) { + this.pendingRequests.delete(requestId); + resolver.setException(error); + return true; + } + return false; + } +} + +// ============================================================================ +// Task Session - wraps server to enqueue requests during task execution +// ============================================================================ + +class TaskSession { + private requestCounter = 0; + + constructor( + private server: Server, + private taskId: string, + private store: TaskStoreWithNotifications, + private queue: TaskMessageQueueWithResolvers + ) {} + + private nextRequestId(): string { + return `task-${this.taskId}-${++this.requestCounter}`; + } + + async elicit( + message: string, + requestedSchema: { + type: 'object'; + properties: Record; + required?: string[]; + } + ): Promise<{ action: string; content?: Record }> { + // Update task status to input_required + await this.store.updateTaskStatus(this.taskId, 'input_required'); + + const requestId = this.nextRequestId(); + + // Build the elicitation request with related-task metadata + const params: ElicitRequestFormParams = { + message, + requestedSchema, + mode: 'form', + _meta: { + [RELATED_TASK_META_KEY]: { taskId: this.taskId } + } + }; + + const jsonrpcRequest: JSONRPCRequest = { + jsonrpc: '2.0', + id: requestId, + method: 'elicitation/create', + params + }; + + // Create resolver to wait for response + const resolver = new Resolver>(); + + // Enqueue the request + await this.queue.enqueueWithResolver(this.taskId, jsonrpcRequest, resolver, requestId); + + try { + // Wait for response + const response = await resolver.wait(); + + // Update status back to working + await this.store.updateTaskStatus(this.taskId, 'working'); + + return response as { action: string; content?: Record }; + } catch (error) { + await this.store.updateTaskStatus(this.taskId, 'working'); + throw error; + } + } + + async createMessage( + messages: SamplingMessage[], + maxTokens: number + ): Promise<{ role: string; content: TextContent | { type: string } }> { + // Update task status to input_required + await this.store.updateTaskStatus(this.taskId, 'input_required'); + + const requestId = this.nextRequestId(); + + // Build the sampling request with related-task metadata + const params = { + messages, + maxTokens, + _meta: { + [RELATED_TASK_META_KEY]: { taskId: this.taskId } + } + }; + + const jsonrpcRequest: JSONRPCRequest = { + jsonrpc: '2.0', + id: requestId, + method: 'sampling/createMessage', + params + }; + + // Create resolver to wait for response + const resolver = new Resolver>(); + + // Enqueue the request + await this.queue.enqueueWithResolver(this.taskId, jsonrpcRequest, resolver, requestId); + + try { + // Wait for response + const response = await resolver.wait(); + + // Update status back to working + await this.store.updateTaskStatus(this.taskId, 'working'); + + return response as { role: string; content: TextContent | { type: string } }; + } catch (error) { + await this.store.updateTaskStatus(this.taskId, 'working'); + throw error; + } + } +} + +// ============================================================================ +// Server Setup +// ============================================================================ + +const PORT = process.env.PORT ? parseInt(process.env.PORT, 10) : 8000; + +// Create shared stores +const taskStore = new TaskStoreWithNotifications(); +const messageQueue = new TaskMessageQueueWithResolvers(); +const taskResultHandler = new TaskResultHandler(taskStore, messageQueue); + +// Track active task executions +const activeTaskExecutions = new Map< + string, + { + promise: Promise; + server: Server; + sessionId: string; + } +>(); + +// Create the server +const createServer = (): Server => { + const server = new Server( + { name: 'simple-task-interactive', version: '1.0.0' }, + { + capabilities: { + tools: {}, + tasks: { + requests: { + tools: { call: {} } + } + } + } + } + ); + + // Register tools + server.setRequestHandler(ListToolsRequestSchema, async (): Promise<{ tools: Tool[] }> => { + return { + tools: [ + { + name: 'confirm_delete', + description: 'Asks for confirmation before deleting (demonstrates elicitation)', + inputSchema: { + type: 'object', + properties: { + filename: { type: 'string' } + } + }, + execution: { taskSupport: 'required' } + }, + { + name: 'write_haiku', + description: 'Asks LLM to write a haiku (demonstrates sampling)', + inputSchema: { + type: 'object', + properties: { + topic: { type: 'string' } + } + }, + execution: { taskSupport: 'required' } + } + ] + }; + }); + + // Handle tool calls + server.setRequestHandler(CallToolRequestSchema, async (request, extra): Promise => { + const { name, arguments: args } = request.params; + const taskParams = (request.params._meta?.task || request.params.task) as { ttl?: number; pollInterval?: number } | undefined; + + // Validate task mode - these tools require tasks + if (!taskParams) { + throw new Error(`Tool ${name} requires task mode`); + } + + // Create task + const taskOptions: CreateTaskOptions = { + ttl: taskParams.ttl, + pollInterval: taskParams.pollInterval ?? 1000 + }; + + const task = await taskStore.createTask(taskOptions, extra.requestId, request, extra.sessionId); + + console.log(`\n[Server] ${name} called, task created: ${task.taskId}`); + + // Start background task execution + const taskExecution = (async () => { + try { + const taskSession = new TaskSession(server, task.taskId, taskStore, messageQueue); + + if (name === 'confirm_delete') { + const filename = args?.filename ?? 'unknown.txt'; + console.log(`[Server] confirm_delete: asking about '${filename}'`); + + console.log('[Server] Sending elicitation request to client...'); + const result = await taskSession.elicit(`Are you sure you want to delete '${filename}'?`, { + type: 'object', + properties: { + confirm: { type: 'boolean' } + }, + required: ['confirm'] + }); + + console.log( + `[Server] Received elicitation response: action=${result.action}, content=${JSON.stringify(result.content)}` + ); + + let text: string; + if (result.action === 'accept' && result.content) { + const confirmed = result.content.confirm; + text = confirmed ? `Deleted '${filename}'` : 'Deletion cancelled'; + } else { + text = 'Deletion cancelled'; + } + + console.log(`[Server] Completing task with result: ${text}`); + await taskStore.storeTaskResult(task.taskId, 'completed', { + content: [{ type: 'text', text }] + }); + } else if (name === 'write_haiku') { + const topic = args?.topic ?? 'nature'; + console.log(`[Server] write_haiku: topic '${topic}'`); + + console.log('[Server] Sending sampling request to client...'); + const result = await taskSession.createMessage( + [ + { + role: 'user', + content: { type: 'text', text: `Write a haiku about ${topic}` } + } + ], + 50 + ); + + let haiku = 'No response'; + if (result.content && 'text' in result.content) { + haiku = (result.content as TextContent).text; + } + + console.log(`[Server] Received sampling response: ${haiku.substring(0, 50)}...`); + console.log('[Server] Completing task with haiku'); + await taskStore.storeTaskResult(task.taskId, 'completed', { + content: [{ type: 'text', text: `Haiku:\n${haiku}` }] + }); + } + } catch (error) { + console.error(`[Server] Task ${task.taskId} failed:`, error); + await taskStore.storeTaskResult(task.taskId, 'failed', { + content: [{ type: 'text', text: `Error: ${error}` }], + isError: true + }); + } finally { + activeTaskExecutions.delete(task.taskId); + } + })(); + + activeTaskExecutions.set(task.taskId, { + promise: taskExecution, + server, + sessionId: extra.sessionId ?? '' + }); + + return { task }; + }); + + // Handle tasks/get + server.setRequestHandler(GetTaskRequestSchema, async (request): Promise => { + const { taskId } = request.params; + const task = await taskStore.getTask(taskId); + if (!task) { + throw new Error(`Task ${taskId} not found`); + } + return task; + }); + + // Handle tasks/result + server.setRequestHandler(GetTaskPayloadRequestSchema, async (request, extra): Promise => { + const { taskId } = request.params; + console.log(`[Server] tasks/result called for task ${taskId}`); + return taskResultHandler.handle(taskId, server, extra.sessionId ?? ''); + }); + + return server; +}; + +// ============================================================================ +// Express App Setup +// ============================================================================ + +const app = express(); +app.use(express.json()); + +// Map to store transports by session ID +const transports: { [sessionId: string]: StreamableHTTPServerTransport } = {}; + +// Helper to check if request is initialize +const isInitializeRequest = (body: unknown): boolean => { + return typeof body === 'object' && body !== null && 'method' in body && (body as { method: string }).method === 'initialize'; +}; + +// MCP POST endpoint +app.post('/mcp', async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + + try { + let transport: StreamableHTTPServerTransport; + + if (sessionId && transports[sessionId]) { + transport = transports[sessionId]; + } else if (!sessionId && isInitializeRequest(req.body)) { + transport = new StreamableHTTPServerTransport({ + sessionIdGenerator: () => randomUUID(), + onsessioninitialized: sid => { + console.log(`Session initialized: ${sid}`); + transports[sid] = transport; + } + }); + + transport.onclose = () => { + const sid = transport.sessionId; + if (sid && transports[sid]) { + console.log(`Transport closed for session ${sid}`); + delete transports[sid]; + } + }; + + const server = createServer(); + await server.connect(transport); + await transport.handleRequest(req, res, req.body); + return; + } else { + res.status(400).json({ + jsonrpc: '2.0', + error: { code: -32000, message: 'Bad Request: No valid session ID' }, + id: null + }); + return; + } + + await transport.handleRequest(req, res, req.body); + } catch (error) { + console.error('Error handling MCP request:', error); + if (!res.headersSent) { + res.status(500).json({ + jsonrpc: '2.0', + error: { code: -32603, message: 'Internal server error' }, + id: null + }); + } + } +}); + +// Handle GET requests for SSE streams +app.get('/mcp', async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !transports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + const transport = transports[sessionId]; + await transport.handleRequest(req, res); +}); + +// Handle DELETE requests for session termination +app.delete('/mcp', async (req: Request, res: Response) => { + const sessionId = req.headers['mcp-session-id'] as string | undefined; + if (!sessionId || !transports[sessionId]) { + res.status(400).send('Invalid or missing session ID'); + return; + } + + console.log(`Session termination request: ${sessionId}`); + const transport = transports[sessionId]; + await transport.handleRequest(req, res); +}); + +// Start server +app.listen(PORT, () => { + console.log(`Starting server on http://localhost:${PORT}/mcp`); + console.log('\nAvailable tools:'); + console.log(' - confirm_delete: Demonstrates elicitation (asks user y/n)'); + console.log(' - write_haiku: Demonstrates sampling (requests LLM completion)'); +}); + +// Handle shutdown +process.on('SIGINT', async () => { + console.log('\nShutting down server...'); + for (const sessionId of Object.keys(transports)) { + try { + await transports[sessionId].close(); + delete transports[sessionId]; + } catch (error) { + console.error(`Error closing session ${sessionId}:`, error); + } + } + taskStore.cleanup(); + messageQueue.cleanup(); + console.log('Server shutdown complete'); + process.exit(0); +}); From 9608c035400c4d80e3bdc3433d4660abd282eaef Mon Sep 17 00:00:00 2001 From: Felix Weinberger Date: Thu, 27 Nov 2025 20:09:05 +0000 Subject: [PATCH 2/2] fix: call tasks/result when input_required to deliver side-channel messages Add input_required handling to the polling loop in requestStream(). When task status is input_required, call tasks/result to deliver queued messages (elicitation, sampling) via SSE and block until terminal. --- src/integration-tests/taskLifecycle.test.ts | 73 +++++++++++++++++++++ src/shared/protocol.ts | 8 +++ 2 files changed, 81 insertions(+) diff --git a/src/integration-tests/taskLifecycle.test.ts b/src/integration-tests/taskLifecycle.test.ts index 1a569e485..8b7f942ad 100644 --- a/src/integration-tests/taskLifecycle.test.ts +++ b/src/integration-tests/taskLifecycle.test.ts @@ -1604,4 +1604,77 @@ describe('Task Lifecycle Integration Tests', () => { await transport.close(); }); }); + + describe('callToolStream with elicitation', () => { + it('should deliver elicitation via callToolStream and complete task', async () => { + const client = new Client( + { + name: 'test-client', + version: '1.0.0' + }, + { + capabilities: { + elicitation: {} + } + } + ); + + // Track elicitation request receipt + let elicitationReceived = false; + let elicitationMessage = ''; + + // Set up elicitation handler on client + client.setRequestHandler(ElicitRequestSchema, async request => { + elicitationReceived = true; + elicitationMessage = request.params.message; + + return { + action: 'accept' as const, + content: { + userName: 'StreamUser' + } + }; + }); + + const transport = new StreamableHTTPClientTransport(baseUrl); + await client.connect(transport); + + // Use callToolStream instead of raw request() + const stream = client.experimental.tasks.callToolStream({ name: 'input-task', arguments: {} }, CallToolResultSchema, { + task: { ttl: 60000 } + }); + + // Collect all stream messages + const messages: Array<{ type: string; task?: unknown; result?: unknown; error?: unknown }> = []; + for await (const message of stream) { + messages.push(message); + } + + // Verify stream yielded expected message types + expect(messages.length).toBeGreaterThanOrEqual(2); + + // First message should be taskCreated + expect(messages[0].type).toBe('taskCreated'); + expect(messages[0].task).toBeDefined(); + + // Should have a taskStatus message + const statusMessages = messages.filter(m => m.type === 'taskStatus'); + expect(statusMessages.length).toBeGreaterThanOrEqual(1); + + // Last message should be result + const lastMessage = messages[messages.length - 1]; + expect(lastMessage.type).toBe('result'); + expect(lastMessage.result).toBeDefined(); + + // Verify elicitation was received and processed + expect(elicitationReceived).toBe(true); + expect(elicitationMessage).toContain('What is your name?'); + + // Verify result content + const result = lastMessage.result as { content: Array<{ type: string; text: string }> }; + expect(result.content).toEqual([{ type: 'text', text: 'Hello, StreamUser!' }]); + + await transport.close(); + }, 15000); + }); }); diff --git a/src/shared/protocol.ts b/src/shared/protocol.ts index cac95fcc4..61312926e 100644 --- a/src/shared/protocol.ts +++ b/src/shared/protocol.ts @@ -1021,6 +1021,14 @@ export abstract class Protocol setTimeout(resolve, pollInterval));