From 8bbf7c60cfbc157ba829c088e0b58ba0e119bafc Mon Sep 17 00:00:00 2001 From: Jack Amadeo Date: Wed, 19 Nov 2025 15:14:47 -0500 Subject: [PATCH] fix: don't block on creating the SSE stream --- .../src/transport/streamable_http_client.rs | 84 ++++++++++--------- 1 file changed, 44 insertions(+), 40 deletions(-) diff --git a/crates/rmcp/src/transport/streamable_http_client.rs b/crates/rmcp/src/transport/streamable_http_client.rs index 8d076c71..4db461a4 100644 --- a/crates/rmcp/src/transport/streamable_http_client.rs +++ b/crates/rmcp/src/transport/streamable_http_client.rs @@ -391,47 +391,51 @@ impl Worker for StreamableHttpClientWorker { } let mut streams = tokio::task::JoinSet::new(); if let Some(session_id) = &session_id { - match self - .client - .get_stream( - config.uri.clone(), - session_id.clone(), - None, - config.auth_header.clone(), - ) - .await - { - Ok(stream) => { - let sse_stream = SseAutoReconnectStream::new( - stream, - StreamableHttpClientReconnect { - client: self.client.clone(), - session_id: session_id.clone(), - uri: config.uri.clone(), - auth_header: config.auth_header.clone(), - }, - self.config.retry_config.clone(), - ); - streams.spawn(Self::execute_sse_stream( - sse_stream, - sse_worker_tx.clone(), - false, - transport_task_ct.child_token(), - )); - tracing::debug!("got common stream"); - } - Err(StreamableHttpError::ServerDoesNotSupportSse) => { - tracing::debug!("server doesn't support sse, skip common stream"); - } - Err(e) => { - // fail to get common stream - tracing::error!("fail to get common stream: {e}"); - return Err(WorkerQuitReason::fatal( - e, - "get general purpose event stream", - )); + let client = self.client.clone(); + let uri = config.uri.clone(); + let session_id = session_id.clone(); + let auth_header = config.auth_header.clone(); + let retry_config = self.config.retry_config.clone(); + let sse_worker_tx = sse_worker_tx.clone(); + let transport_task_ct = transport_task_ct.clone(); + let config_uri = config.uri.clone(); + let config_auth_header = config.auth_header.clone(); + + streams.spawn(async move { + match client + .get_stream(uri.clone(), session_id.clone(), None, auth_header.clone()) + .await + { + Ok(stream) => { + let sse_stream = SseAutoReconnectStream::new( + stream, + StreamableHttpClientReconnect { + client: client.clone(), + session_id: session_id.clone(), + uri: config_uri, + auth_header: config_auth_header, + }, + retry_config, + ); + Self::execute_sse_stream( + sse_stream, + sse_worker_tx, + false, + transport_task_ct.child_token(), + ) + .await + } + Err(StreamableHttpError::ServerDoesNotSupportSse) => { + tracing::debug!("server doesn't support sse, skip common stream"); + Ok(()) + } + Err(e) => { + // fail to get common stream + tracing::error!("fail to get common stream: {e}"); + Err(e) + } } - } + }); } loop { let event = tokio::select! {