diff --git a/crates/rmcp/src/transport/streamable_http_client.rs b/crates/rmcp/src/transport/streamable_http_client.rs index 86b9b3a1..aed20d1c 100644 --- a/crates/rmcp/src/transport/streamable_http_client.rs +++ b/crates/rmcp/src/transport/streamable_http_client.rs @@ -5,6 +5,7 @@ pub use sse_stream::Error as SseError; use sse_stream::Sse; use thiserror::Error; use tokio_util::sync::CancellationToken; +use tracing::debug; use super::common::client_side_sse::{ExponentialBackoff, SseRetryPolicy, SseStreamReconnect}; use crate::{ @@ -90,16 +91,28 @@ impl StreamableHttpPostResponse { match self { Self::Json(message, session_id) => Ok((message, session_id)), Self::Sse(mut stream, session_id) => { - let event = - stream - .next() - .await - .ok_or(StreamableHttpError::UnexpectedServerResponse( - "empty sse stream".into(), - ))??; - let message: ServerJsonRpcMessage = - serde_json::from_str(&event.data.unwrap_or_default())?; - Ok((message, session_id)) + while let Some(event) = stream.next().await { + let event = event?; + let payload = event.data.unwrap_or_default(); + if payload.trim().is_empty() { + continue; + } + + let message: ServerJsonRpcMessage = serde_json::from_str(&payload)?; + + if matches!(message, ServerJsonRpcMessage::Response(_)) { + return Ok((message, session_id)); + } + + debug!( + ?message, + "received message before initialize response; continuing to drain stream" + ); + } + + Err(StreamableHttpError::UnexpectedServerResponse( + "empty sse stream".into(), + )) } _ => Err(StreamableHttpError::UnexpectedServerResponse( "expect initialized, accepted".into(),