Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 44 additions & 40 deletions crates/rmcp/src/transport/streamable_http_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,47 +391,51 @@ impl<C: StreamableHttpClient> Worker for StreamableHttpClientWorker<C> {
}
let mut streams = tokio::task::JoinSet::new();
if let Some(session_id) = &session_id {
match self
.client
.get_stream(
config.uri.clone(),
session_id.clone(),
None,
config.auth_header.clone(),
)
.await
{
Ok(stream) => {
let sse_stream = SseAutoReconnectStream::new(
stream,
StreamableHttpClientReconnect {
client: self.client.clone(),
session_id: session_id.clone(),
uri: config.uri.clone(),
auth_header: config.auth_header.clone(),
},
self.config.retry_config.clone(),
);
streams.spawn(Self::execute_sse_stream(
sse_stream,
sse_worker_tx.clone(),
false,
transport_task_ct.child_token(),
));
tracing::debug!("got common stream");
}
Err(StreamableHttpError::ServerDoesNotSupportSse) => {
tracing::debug!("server doesn't support sse, skip common stream");
}
Err(e) => {
// fail to get common stream
tracing::error!("fail to get common stream: {e}");
return Err(WorkerQuitReason::fatal(
e,
"get general purpose event stream",
));
let client = self.client.clone();
let uri = config.uri.clone();
let session_id = session_id.clone();
let auth_header = config.auth_header.clone();
let retry_config = self.config.retry_config.clone();
let sse_worker_tx = sse_worker_tx.clone();
let transport_task_ct = transport_task_ct.clone();
let config_uri = config.uri.clone();
let config_auth_header = config.auth_header.clone();

streams.spawn(async move {
match client
.get_stream(uri.clone(), session_id.clone(), None, auth_header.clone())
.await
{
Ok(stream) => {
let sse_stream = SseAutoReconnectStream::new(
stream,
StreamableHttpClientReconnect {
client: client.clone(),
session_id: session_id.clone(),
uri: config_uri,
auth_header: config_auth_header,
},
retry_config,
);
Self::execute_sse_stream(
sse_stream,
sse_worker_tx,
false,
transport_task_ct.child_token(),
)
.await
}
Err(StreamableHttpError::ServerDoesNotSupportSse) => {
tracing::debug!("server doesn't support sse, skip common stream");
Ok(())
}
Err(e) => {
// fail to get common stream
tracing::error!("fail to get common stream: {e}");
Err(e)
}
}
}
});
}
loop {
let event = tokio::select! {
Expand Down