Skip to content

Implement Server-to-Client Bidirectional Communication (Notifications, Sampling, Elicitation) #87

@avrabe

Description

@avrabe

Summary

Implement full bidirectional communication support to pass the remaining 5 conformance tests:

  • tools-call-with-logging - Server sends notifications/message to client
  • tools-call-with-progress - Server sends notifications/progress to client
  • tools-call-sampling - Server makes sampling/createMessage request to client
  • tools-call-elicitation - Server makes elicitation/create request to client
  • elicitation-sep1034-defaults - Elicitation with default values

Current State

Conformance: 21/26 tests passing (81%)

The framework currently supports only client → server request/response patterns. The MCP spec also requires:

  1. Server → Client notifications (fire-and-forget)
  2. Server → Client requests (with response correlation)

MCP Specification Requirements

1. Logging Notifications (notifications/message)

Servers must be able to send log messages during tool execution:

{
  "jsonrpc": "2.0",
  "method": "notifications/message",
  "params": {
    "level": "info",
    "logger": "tool_name",
    "data": { "message": "Processing step 1..." }
  }
}

Spec Reference: https://modelcontextprotocol.io/specification/2025-06-18/server/utilities/logging

2. Progress Notifications (notifications/progress)

Servers must report progress during long-running operations:

{
  "jsonrpc": "2.0",
  "method": "notifications/progress",
  "params": {
    "progressToken": "unique-token",
    "progress": 50,
    "total": 100
  }
}

Spec Reference: https://modelcontextprotocol.io/specification/2025-06-18/server/utilities/progress

3. Sampling Requests (sampling/createMessage)

Servers can request LLM completions from clients:

// Server → Client Request
{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "sampling/createMessage",
  "params": {
    "messages": [{ "role": "user", "content": { "type": "text", "text": "Summarize this" } }],
    "maxTokens": 100
  }
}

// Client → Server Response
{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "role": "assistant",
    "content": { "type": "text", "text": "Here's the summary..." },
    "model": "claude-3-sonnet",
    "stopReason": "endTurn"
  }
}

Spec Reference: https://modelcontextprotocol.io/specification/2025-06-18/client/sampling

4. Elicitation Requests (elicitation/create)

Servers can request structured user input from clients:

// Server → Client Request
{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "elicitation/create",
  "params": {
    "message": "Please provide your name",
    "requestedSchema": { "type": "object", "properties": { "name": { "type": "string" } } }
  }
}

Spec Reference: https://modelcontextprotocol.io/specification/2025-06-18/client/elicitation

Implementation Plan

Phase 1: Core Infrastructure

1.1 Define ToolContext Trait

Create a context object that tool handlers can use to communicate with clients:

// mcp-server/src/context.rs
use async_trait::async_trait;

/// Context provided to tool handlers for bidirectional communication
#[async_trait]
pub trait ToolContext: Send + Sync {
    /// Send a log notification to the client
    async fn send_log(&self, level: LogLevel, logger: Option<&str>, data: Value) -> Result<()>;
    
    /// Send a progress notification to the client
    async fn send_progress(&self, token: &str, progress: u64, total: Option<u64>) -> Result<()>;
    
    /// Request LLM sampling from the client (blocks until response)
    async fn request_sampling(&self, params: CreateMessageRequest) -> Result<CreateMessageResult>;
    
    /// Request user input from the client (blocks until response)
    async fn request_elicitation(&self, params: ElicitationRequest) -> Result<ElicitationResult>;
    
    /// Get the progress token for this request (if provided by client)
    fn progress_token(&self) -> Option<&str>;
}

1.2 Transport Layer Changes

Add notification/request sending capability to transports:

// mcp-transport/src/lib.rs
#[async_trait]
pub trait TransportSender: Send + Sync {
    /// Send a notification (no response expected)
    async fn send_notification(&self, method: &str, params: Value) -> Result<()>;
    
    /// Send a request and await response (with timeout)
    async fn send_request(&self, method: &str, params: Value, timeout: Duration) -> Result<Value>;
}

1.3 Session-Aware Request Handling

For HTTP/SSE transport, notifications must be sent on the correct session's SSE stream:

// Track active sessions and their notification channels
pub struct SessionManager {
    sessions: DashMap<SessionId, mpsc::Sender<Notification>>,
    pending_requests: DashMap<RequestId, oneshot::Sender<Value>>,
}

Phase 2: Backend API Changes

2.1 Update McpBackend Trait

Option A: Add context parameter (breaking change)

async fn call_tool(
    &self,
    ctx: &dyn ToolContext,
    request: CallToolRequestParam,
) -> Result<CallToolResult, Self::Error>;

Option B: Use context injection via task-local storage (non-breaking)

// Tool handlers access context via:
let ctx = mcp_context::current();
ctx.send_progress("token", 50, Some(100)).await?;

2.2 Protocol Types

Add missing types to mcp-protocol/src/model.rs:

/// Sampling request parameters
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateMessageRequest {
    pub messages: Vec<SamplingMessage>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub model_preferences: Option<ModelPreferences>,
    #[serde(skip_serializing_if = "Option::is_none")]
    pub system_prompt: Option<String>,
    #[serde(rename = "maxTokens")]
    pub max_tokens: u32,
}

/// Sampling response
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateMessageResult {
    pub role: String,
    pub content: SamplingContent,
    pub model: String,
    #[serde(rename = "stopReason")]
    pub stop_reason: String,
}

Phase 3: Macro Support

3.1 Update #[mcp_tool] Macro

Add context support to the tool macro:

#[mcp_tool(
    name = "process_data",
    description = "Process data with progress reporting"
)]
async fn process_data(ctx: ToolContext, input: String) -> Result<String> {
    for i in 0..100 {
        ctx.send_progress("processing", i, Some(100)).await?;
        // ... processing logic
    }
    Ok("Done".to_string())
}

3.2 Context-Aware Tool Signature Detection

The macro should detect whether the first parameter is ToolContext and adjust code generation accordingly.

Phase 4: Testing

4.1 Unit Tests

#[tokio::test]
async fn test_tool_context_send_log() {
    let (ctx, receiver) = MockToolContext::new();
    ctx.send_log(LogLevel::Info, Some("test"), json!({"msg": "hello"})).await.unwrap();
    
    let notification = receiver.recv().await.unwrap();
    assert_eq!(notification.method, "notifications/message");
}

#[tokio::test]
async fn test_tool_context_request_sampling() {
    let (ctx, handler) = MockToolContext::with_sampling_handler(|req| {
        CreateMessageResult {
            role: "assistant".to_string(),
            content: SamplingContent::text("Response"),
            model: "test-model".to_string(),
            stop_reason: "endTurn".to_string(),
        }
    });
    
    let result = ctx.request_sampling(CreateMessageRequest { ... }).await.unwrap();
    assert_eq!(result.role, "assistant");
}

4.2 Integration Tests

#[tokio::test]
async fn test_conformance_logging() {
    let server = start_test_server().await;
    let client = TestClient::connect(&server).await;
    
    // Call tool that sends log notifications
    let (result, notifications) = client.call_tool_with_notifications(
        "test_tool_with_logging",
        json!({})
    ).await;
    
    assert!(!notifications.is_empty());
    assert_eq!(notifications[0].method, "notifications/message");
}

4.3 Conformance Test Coverage

Update conformance-server example to use new APIs:

"test_tool_with_logging" => {
    ctx.send_log(LogLevel::Info, Some("tool"), json!({"step": "starting"})).await?;
    ctx.send_log(LogLevel::Info, Some("tool"), json!({"step": "completed"})).await?;
    Ok(CallToolResult::text("Logged successfully"))
}

"test_tool_with_progress" => {
    for i in 0..=100 {
        ctx.send_progress("progress", i, Some(100)).await?;
    }
    Ok(CallToolResult::text("Progress complete"))
}

"test_sampling" => {
    let response = ctx.request_sampling(CreateMessageRequest {
        messages: vec![SamplingMessage::user("Summarize: test data")],
        max_tokens: 100,
        ..Default::default()
    }).await?;
    Ok(CallToolResult::text(format!("LLM said: {}", response.content.text())))
}

Phase 5: Documentation

  • Update McpBackend trait documentation
  • Add "Bidirectional Communication" guide to docs
  • Update macro documentation with context examples
  • Add migration guide for existing tools

Files to Modify/Create

New Files

  • mcp-server/src/context.rs - ToolContext trait and implementations
  • mcp-server/src/session.rs - Session management for request correlation
  • mcp-protocol/src/sampling.rs - Sampling types
  • mcp-protocol/src/elicitation.rs - Elicitation types (if not already complete)

Modified Files

  • mcp-server/src/backend.rs - Update trait signature
  • mcp-server/src/handler.rs - Wire up context to tool calls
  • mcp-transport/src/streamable_http.rs - Add notification sending
  • mcp-transport/src/stdio.rs - Add notification sending
  • mcp-macros/src/tool.rs - Support context parameter
  • examples/conformance-server/src/main.rs - Implement remaining tools

Success Criteria

$ npx @modelcontextprotocol/conformance server --url http://localhost:3000/mcp

=== SUMMARY ===
✓ All 26 scenarios passed
Total: 26 passed, 0 failed

Breaking Changes

This will be a breaking change for existing McpBackend implementations if we choose Option A (explicit context parameter). We should:

  1. Document the migration path clearly
  2. Consider a deprecation period with Option B (task-local storage)
  3. Update all examples and documentation

References

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions