From ba3d8ef047243cfafa73970b00f76dcd6ab9670c Mon Sep 17 00:00:00 2001 From: Adam Jones Date: Tue, 25 Nov 2025 20:03:29 +0000 Subject: [PATCH] feat!: remove SSE transport support SSE transport has been removed from the MCP specification in favor of streamable HTTP. This removes all SSE-specific transport code: - Remove `transport-sse-client` and `transport-sse-server` features - Remove `SseClientTransport` and `SseServer` types - Remove SSE-specific examples (`counter_sse`, `counter_sse_directly`) - Migrate auth examples from SSE to streamable HTTP - Update tests to remove SSE transport usage - Update documentation BREAKING CHANGE: The following have been removed: - `transport-sse-client` feature - `transport-sse-client-reqwest` feature - `transport-sse-server` feature - `SseClientTransport` type - `SseServer` type - `sse_client` and `sse_server` modules Users should migrate to streamable HTTP transport which provides equivalent functionality. See `StreamableHttpClientTransport` and `StreamableHttpService` for the replacement APIs. Ref: https://github.com/modelcontextprotocol/rust-sdk/pull/561#issuecomment-3576551699 --- crates/rmcp/Cargo.toml | 19 +- crates/rmcp/README.md | 7 +- crates/rmcp/src/transport.rs | 17 +- crates/rmcp/src/transport/auth.rs | 4 +- crates/rmcp/src/transport/common.rs | 8 +- crates/rmcp/src/transport/common/auth.rs | 4 - .../src/transport/common/auth/sse_client.rs | 45 -- crates/rmcp/src/transport/common/reqwest.rs | 4 - .../transport/common/reqwest/sse_client.rs | 118 ----- crates/rmcp/src/transport/sse_client.rs | 458 ------------------ crates/rmcp/src/transport/sse_server.rs | 343 ------------- crates/rmcp/tests/test_with_js.rs | 35 +- crates/rmcp/tests/test_with_python.rs | 82 +--- docs/OAUTH_SUPPORT.md | 51 +- examples/clients/Cargo.toml | 5 - examples/clients/README.md | 18 +- examples/clients/src/auth/oauth_client.rs | 15 +- examples/clients/src/progress_client.rs | 72 +-- examples/clients/src/sse.rs | 52 -- examples/rig-integration/Cargo.toml | 1 - examples/rig-integration/src/config/mcp.rs | 7 - examples/servers/Cargo.toml | 25 +- examples/servers/README.md | 65 +-- ...auth_sse.rs => complex_auth_streamhttp.rs} | 88 ++-- examples/servers/src/counter_sse.rs | 54 --- examples/servers/src/counter_sse_directly.rs | 29 -- examples/servers/src/html/sse_auth_index.html | 33 -- examples/servers/src/progress_demo.rs | 82 +--- ..._auth_sse.rs => simple_auth_streamhttp.rs} | 102 ++-- examples/simple-chat-client/Cargo.toml | 1 - examples/simple-chat-client/src/config.rs | 8 - 31 files changed, 164 insertions(+), 1688 deletions(-) delete mode 100644 crates/rmcp/src/transport/common/auth/sse_client.rs delete mode 100644 crates/rmcp/src/transport/common/reqwest/sse_client.rs delete mode 100644 crates/rmcp/src/transport/sse_client.rs delete mode 100644 crates/rmcp/src/transport/sse_server.rs delete mode 100644 examples/clients/src/sse.rs rename examples/servers/src/{complex_auth_sse.rs => complex_auth_streamhttp.rs} (91%) delete mode 100644 examples/servers/src/counter_sse.rs delete mode 100644 examples/servers/src/counter_sse_directly.rs delete mode 100644 examples/servers/src/html/sse_auth_index.html rename examples/servers/src/{simple_auth_sse.rs => simple_auth_streamhttp.rs} (64%) diff --git a/crates/rmcp/Cargo.toml b/crates/rmcp/Cargo.toml index 16a813c0..4211e360 100644 --- a/crates/rmcp/Cargo.toml +++ b/crates/rmcp/Cargo.toml @@ -34,7 +34,7 @@ schemars = { version = "1.0", optional = true, features = ["chrono04"] } # for image encoding base64 = { version = "0.22", optional = true } -# for SSE client +# for HTTP client reqwest = { version = "0.12", default-features = false, features = [ "json", "stream", @@ -100,20 +100,16 @@ server-side-http = [ "dep:sse-stream", "tower", ] -# SSE client -client-side-sse = ["dep:sse-stream", "dep:http"] - -transport-sse-client = ["client-side-sse", "transport-worker"] -transport-sse-client-reqwest = ["transport-sse-client", "reqwest"] transport-worker = ["dep:tokio-stream"] +# SSE stream parsing utilities (used by streamable HTTP client for SSE-formatted responses) +client-side-sse = ["dep:sse-stream", "dep:http"] # Streamable HTTP client transport-streamable-http-client = ["client-side-sse", "transport-worker"] transport-streamable-http-client-reqwest = ["transport-streamable-http-client", "reqwest"] - transport-async-rw = ["tokio/io-util", "tokio-util/codec"] transport-io = ["transport-async-rw", "tokio/io-std"] transport-child-process = [ @@ -121,12 +117,6 @@ transport-child-process = [ "tokio/process", "dep:process-wrap", ] -transport-sse-server = [ - "transport-async-rw", - "transport-worker", - "server-side-http", - "dep:axum", -] transport-streamable-http-server = [ "transport-streamable-http-server-session", "server-side-http", @@ -163,8 +153,6 @@ required-features = [ "reqwest", "server", "client", - "transport-sse-server", - "transport-sse-client", "transport-child-process", ] path = "tests/test_with_python.rs" @@ -174,7 +162,6 @@ name = "test_with_js" required-features = [ "server", "client", - "transport-sse-server", "transport-child-process", "transport-streamable-http-server", "transport-streamable-http-client", diff --git a/crates/rmcp/README.md b/crates/rmcp/README.md index 057295aa..8f021d33 100644 --- a/crates/rmcp/README.md +++ b/crates/rmcp/README.md @@ -203,8 +203,6 @@ RMCP uses feature flags to control which components are included: - `transport-async-rw`: Async read/write support - `transport-io`: I/O stream support - `transport-child-process`: Child process support - - `transport-sse-client` / `transport-sse-server`: SSE support (client agnostic) - - `transport-sse-client-reqwest`: a default `reqwest` implementation of the SSE client - `transport-streamable-http-client` / `transport-streamable-http-server`: HTTP streaming (client agnostic, see [`StreamableHttpClientTransport`] for details) - `transport-streamable-http-client-reqwest`: a default `reqwest` implementation of the streamable http client - `auth`: OAuth2 authentication support @@ -214,22 +212,19 @@ RMCP uses feature flags to control which components are included: ## Transports - `transport-io`: Server stdio transport -- `transport-sse-server`: Server SSE transport - `transport-child-process`: Client stdio transport -- `transport-sse-client`: Client sse transport - `transport-streamable-http-server` streamable http server transport - `transport-streamable-http-client` streamable http client transport
Transport The transport type must implemented [`Transport`] trait, which allow it send message concurrently and receive message sequentially. -There are 3 pairs of standard transport types: +There are 2 pairs of standard transport types: | transport | client | server | |:-: |:-: |:-: | | std IO | [`child_process::TokioChildProcess`] | [`io::stdio`] | | streamable http | [`streamable_http_client::StreamableHttpClientTransport`] | [`streamable_http_server::session::create_session`] | -| sse | [`sse_client::SseClientTransport`] | [`sse_server::SseServer`] | #### [IntoTransport](`IntoTransport`) trait [`IntoTransport`] is a helper trait that implicitly convert a type into a transport type. diff --git a/crates/rmcp/src/transport.rs b/crates/rmcp/src/transport.rs index 81286fed..aeb8c795 100644 --- a/crates/rmcp/src/transport.rs +++ b/crates/rmcp/src/transport.rs @@ -2,13 +2,12 @@ //! The transport type must implemented [`Transport`] trait, which allow it send message concurrently and receive message sequentially. //! //! ## Standard Transport Types -//! There are 3 pairs of standard transport types: +//! There are 2 pairs of standard transport types: //! //! | transport | client | server | //! |:-: |:-: |:-: | //! | std IO | [`child_process::TokioChildProcess`] | [`io::stdio`] | //! | streamable http | [`streamable_http_client::StreamableHttpClientTransport`] | [`streamable_http_server::StreamableHttpService`] | -//! | sse | [`sse_client::SseClientTransport`] | [`sse_server::SseServer`] | //! //!## Helper Transport Types //! Thers are several helper transport types that can help you to create transport quickly. @@ -95,20 +94,6 @@ pub mod io; #[cfg_attr(docsrs, doc(cfg(feature = "transport-io")))] pub use io::stdio; -#[cfg(feature = "transport-sse-client")] -#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client")))] -pub mod sse_client; -#[cfg(feature = "transport-sse-client")] -#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client")))] -pub use sse_client::SseClientTransport; - -#[cfg(feature = "transport-sse-server")] -#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-server")))] -pub mod sse_server; -#[cfg(feature = "transport-sse-server")] -#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-server")))] -pub use sse_server::SseServer; - #[cfg(feature = "auth")] #[cfg_attr(docsrs, doc(cfg(feature = "auth")))] pub mod auth; diff --git a/crates/rmcp/src/transport/auth.rs b/crates/rmcp/src/transport/auth.rs index f1ae1e0a..68dfc208 100644 --- a/crates/rmcp/src/transport/auth.rs +++ b/crates/rmcp/src/transport/auth.rs @@ -74,7 +74,7 @@ impl CredentialStore for InMemoryCredentialStore { } } -/// sse client with oauth2 authorization +/// HTTP client with OAuth 2.0 authorization #[derive(Clone)] pub struct AuthClient { pub http_client: C, @@ -91,7 +91,7 @@ impl std::fmt::Debug for AuthClient { } impl AuthClient { - /// create new authorized sse client + /// Create a new authorized HTTP client pub fn new(http_client: C, auth_manager: AuthorizationManager) -> Self { Self { http_client, diff --git a/crates/rmcp/src/transport/common.rs b/crates/rmcp/src/transport/common.rs index 401c6f2d..e78d4a6e 100644 --- a/crates/rmcp/src/transport/common.rs +++ b/crates/rmcp/src/transport/common.rs @@ -1,7 +1,4 @@ -#[cfg(any( - feature = "transport-streamable-http-server", - feature = "transport-sse-server" -))] +#[cfg(feature = "transport-streamable-http-server")] pub mod server_side_http; pub mod http_header; @@ -10,6 +7,9 @@ pub mod http_header; #[cfg_attr(docsrs, doc(cfg(feature = "reqwest")))] mod reqwest; +// Note: This module provides SSE stream parsing and auto-reconnect utilities. +// It's used by the streamable HTTP client (which receives SSE-formatted responses), +// not the removed SSE transport. The name is historical. #[cfg(feature = "client-side-sse")] #[cfg_attr(docsrs, doc(cfg(feature = "client-side-sse")))] pub mod client_side_sse; diff --git a/crates/rmcp/src/transport/common/auth.rs b/crates/rmcp/src/transport/common/auth.rs index 5395d571..f9e3a071 100644 --- a/crates/rmcp/src/transport/common/auth.rs +++ b/crates/rmcp/src/transport/common/auth.rs @@ -1,7 +1,3 @@ #[cfg(feature = "transport-streamable-http-client")] #[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client")))] mod streamable_http_client; - -#[cfg(feature = "transport-sse-client")] -#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client")))] -mod sse_client; diff --git a/crates/rmcp/src/transport/common/auth/sse_client.rs b/crates/rmcp/src/transport/common/auth/sse_client.rs deleted file mode 100644 index 009593e1..00000000 --- a/crates/rmcp/src/transport/common/auth/sse_client.rs +++ /dev/null @@ -1,45 +0,0 @@ -use http::Uri; - -use crate::transport::{ - auth::AuthClient, - sse_client::{SseClient, SseTransportError}, -}; -impl SseClient for AuthClient -where - C: SseClient, -{ - type Error = SseTransportError; - - async fn post_message( - &self, - uri: Uri, - message: crate::model::ClientJsonRpcMessage, - mut auth_token: Option, - ) -> Result<(), SseTransportError> { - if auth_token.is_none() { - auth_token = Some(self.get_access_token().await?); - } - self.http_client - .post_message(uri, message, auth_token) - .await - .map_err(SseTransportError::Client) - } - - async fn get_stream( - &self, - uri: Uri, - last_event_id: Option, - mut auth_token: Option, - ) -> Result< - crate::transport::common::client_side_sse::BoxedSseResponse, - SseTransportError, - > { - if auth_token.is_none() { - auth_token = Some(self.get_access_token().await?); - } - self.http_client - .get_stream(uri, last_event_id, auth_token) - .await - .map_err(SseTransportError::Client) - } -} diff --git a/crates/rmcp/src/transport/common/reqwest.rs b/crates/rmcp/src/transport/common/reqwest.rs index 4f9dc0dc..a51fa955 100644 --- a/crates/rmcp/src/transport/common/reqwest.rs +++ b/crates/rmcp/src/transport/common/reqwest.rs @@ -1,7 +1,3 @@ #[cfg(feature = "transport-streamable-http-client-reqwest")] #[cfg_attr(docsrs, doc(cfg(feature = "transport-streamable-http-client-reqwest")))] mod streamable_http_client; - -#[cfg(feature = "transport-sse-client-reqwest")] -#[cfg_attr(docsrs, doc(cfg(feature = "transport-sse-client-reqwest")))] -mod sse_client; diff --git a/crates/rmcp/src/transport/common/reqwest/sse_client.rs b/crates/rmcp/src/transport/common/reqwest/sse_client.rs deleted file mode 100644 index a5362d79..00000000 --- a/crates/rmcp/src/transport/common/reqwest/sse_client.rs +++ /dev/null @@ -1,118 +0,0 @@ -use std::sync::Arc; - -use futures::StreamExt; -use http::Uri; -use reqwest::header::ACCEPT; -use sse_stream::SseStream; - -use crate::transport::{ - SseClientTransport, - common::http_header::{EVENT_STREAM_MIME_TYPE, HEADER_LAST_EVENT_ID}, - sse_client::{SseClient, SseClientConfig, SseTransportError}, -}; - -impl From for SseTransportError { - fn from(e: reqwest::Error) -> Self { - SseTransportError::Client(e) - } -} - -impl SseClient for reqwest::Client { - type Error = reqwest::Error; - - async fn post_message( - &self, - uri: Uri, - message: crate::model::ClientJsonRpcMessage, - auth_token: Option, - ) -> Result<(), SseTransportError> { - let mut request_builder = self.post(uri.to_string()).json(&message); - if let Some(auth_header) = auth_token { - request_builder = request_builder.bearer_auth(auth_header); - } - request_builder - .send() - .await - .and_then(|resp| resp.error_for_status()) - .map_err(SseTransportError::from) - .map(drop) - } - - async fn get_stream( - &self, - uri: Uri, - last_event_id: Option, - auth_token: Option, - ) -> Result< - crate::transport::common::client_side_sse::BoxedSseResponse, - SseTransportError, - > { - let mut request_builder = self - .get(uri.to_string()) - .header(ACCEPT, EVENT_STREAM_MIME_TYPE); - if let Some(auth_header) = auth_token { - request_builder = request_builder.bearer_auth(auth_header); - } - if let Some(last_event_id) = last_event_id { - request_builder = request_builder.header(HEADER_LAST_EVENT_ID, last_event_id); - } - let response = request_builder.send().await?; - let response = response.error_for_status()?; - match response.headers().get(reqwest::header::CONTENT_TYPE) { - Some(ct) => { - if !ct.as_bytes().starts_with(EVENT_STREAM_MIME_TYPE.as_bytes()) { - return Err(SseTransportError::UnexpectedContentType(Some( - String::from_utf8_lossy(ct.as_bytes()).to_string(), - ))); - } - } - None => { - return Err(SseTransportError::UnexpectedContentType(None)); - } - } - let event_stream = SseStream::from_byte_stream(response.bytes_stream()).boxed(); - Ok(event_stream) - } -} - -impl SseClientTransport { - /// Creates a new transport using reqwest with the specified SSE endpoint. - /// - /// This is a convenience method that creates a transport using the default - /// reqwest client. This method is only available when the - /// `transport-sse-client-reqwest` feature is enabled. - /// - /// # Arguments - /// - /// * `uri` - The SSE endpoint to connect to - /// - /// # Example - /// - /// ```rust - /// use rmcp::transport::SseClientTransport; - /// - /// // Enable the reqwest feature in Cargo.toml: - /// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] } - /// - /// # async fn example() -> Result<(), Box> { - /// let transport = SseClientTransport::start("http://localhost:8000/sse").await?; - /// # Ok(()) - /// # } - /// ``` - /// - /// # Feature requirement - /// - /// This method requires the `transport-sse-client-reqwest` feature. - pub async fn start( - uri: impl Into>, - ) -> Result> { - SseClientTransport::start_with_client( - reqwest::Client::default(), - SseClientConfig { - sse_endpoint: uri.into(), - ..Default::default() - }, - ) - .await - } -} diff --git a/crates/rmcp/src/transport/sse_client.rs b/crates/rmcp/src/transport/sse_client.rs deleted file mode 100644 index 7b61e3d8..00000000 --- a/crates/rmcp/src/transport/sse_client.rs +++ /dev/null @@ -1,458 +0,0 @@ -//! Reference: -use std::{ - pin::Pin, - sync::{Arc, RwLock}, -}; - -use futures::{StreamExt, future::BoxFuture}; -use http::Uri; -use sse_stream::{Error as SseError, Sse}; -use thiserror::Error; - -use super::{ - Transport, - common::client_side_sse::{BoxedSseResponse, SseRetryPolicy, SseStreamReconnect}, -}; -use crate::{ - RoleClient, - model::{ClientJsonRpcMessage, ServerJsonRpcMessage}, - transport::common::client_side_sse::SseAutoReconnectStream, -}; - -#[derive(Error, Debug)] -pub enum SseTransportError { - #[error("SSE error: {0}")] - Sse(#[from] SseError), - #[error("IO error: {0}")] - Io(#[from] std::io::Error), - #[error("Client error: {0}")] - Client(E), - #[error("unexpected end of stream")] - UnexpectedEndOfStream, - #[error("Unexpected content type: {0:?}")] - UnexpectedContentType(Option), - #[cfg(feature = "auth")] - #[cfg_attr(docsrs, doc(cfg(feature = "auth")))] - #[error("Auth error: {0}")] - Auth(#[from] crate::transport::auth::AuthError), - #[error("Invalid uri: {0}")] - InvalidUri(#[from] http::uri::InvalidUri), - #[error("Invalid uri parts: {0}")] - InvalidUriParts(#[from] http::uri::InvalidUriParts), -} - -pub trait SseClient: Clone + Send + Sync + 'static { - type Error: std::error::Error + Send + Sync + 'static; - fn post_message( - &self, - uri: Uri, - message: ClientJsonRpcMessage, - auth_token: Option, - ) -> impl Future>> + Send + '_; - fn get_stream( - &self, - uri: Uri, - last_event_id: Option, - auth_token: Option, - ) -> impl Future>> + Send + '_; -} - -/// Helper that refreshes the POST endpoint whenever the server emits -/// control frames during SSE reconnect; used together with -/// [`SseAutoReconnectStream`]. -struct SseClientReconnect { - pub client: C, - pub uri: Uri, - pub message_endpoint: Arc>, -} - -impl SseStreamReconnect for SseClientReconnect { - type Error = SseTransportError; - type Future = BoxFuture<'static, Result>; - fn retry_connection(&mut self, last_event_id: Option<&str>) -> Self::Future { - let client = self.client.clone(); - let uri = self.uri.clone(); - let last_event_id = last_event_id.map(|s| s.to_owned()); - Box::pin(async move { client.get_stream(uri, last_event_id, None).await }) - } - - fn handle_control_event(&mut self, event: &Sse) -> Result<(), Self::Error> { - if event.event.as_deref() != Some("endpoint") { - return Ok(()); - } - let Some(data) = event.data.as_ref() else { - return Ok(()); - }; - // Servers typically resend the message POST endpoint (often with a new - // sessionId) when a stream reconnects. Reuse `message_endpoint` helper - // to resolve it and update the shared URI. - let new_endpoint = message_endpoint(self.uri.clone(), data.clone()) - .map_err(SseTransportError::InvalidUri)?; - *self - .message_endpoint - .write() - .expect("message endpoint lock poisoned") = new_endpoint; - Ok(()) - } - - fn handle_stream_error( - &mut self, - error: &(dyn std::error::Error + 'static), - last_event_id: Option<&str>, - ) { - tracing::warn!( - uri = %self.uri, - last_event_id = last_event_id.unwrap_or(""), - "sse stream error: {error}" - ); - } -} -type ServerMessageStream = Pin>>>; - -/// A client-agnostic SSE transport for RMCP that supports Server-Sent Events. -/// -/// This transport allows you to choose your preferred HTTP client implementation -/// by implementing the [`SseClient`] trait. The transport handles SSE streaming -/// and automatic reconnection. -/// -/// # Usage -/// -/// ## Using reqwest -/// -/// ```rust,ignore -/// use rmcp::transport::SseClientTransport; -/// -/// // Enable the reqwest feature in Cargo.toml: -/// // rmcp = { version = "0.5", features = ["transport-sse-client-reqwest"] } -/// -/// # async fn example() -> Result<(), Box> { -/// let transport = SseClientTransport::start("http://localhost:8000/sse").await?; -/// # Ok(()) -/// # } -/// ``` -/// -/// ## Using a custom HTTP client -/// -/// ```rust,ignore -/// use rmcp::transport::sse_client::{SseClient, SseClientTransport, SseClientConfig}; -/// use std::sync::Arc; -/// use futures::stream::BoxStream; -/// use rmcp::model::ClientJsonRpcMessage; -/// use sse_stream::{Sse, Error as SseError}; -/// use http::Uri; -/// -/// #[derive(Clone)] -/// struct MyHttpClient; -/// -/// #[derive(Debug, thiserror::Error)] -/// struct MyError; -/// -/// impl std::fmt::Display for MyError { -/// fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { -/// write!(f, "MyError") -/// } -/// } -/// -/// impl SseClient for MyHttpClient { -/// type Error = MyError; -/// -/// async fn post_message( -/// &self, -/// _uri: Uri, -/// _message: ClientJsonRpcMessage, -/// _auth_token: Option, -/// ) -> Result<(), rmcp::transport::sse_client::SseTransportError> { -/// todo!() -/// } -/// -/// async fn get_stream( -/// &self, -/// _uri: Uri, -/// _last_event_id: Option, -/// _auth_token: Option, -/// ) -> Result>, rmcp::transport::sse_client::SseTransportError> { -/// todo!() -/// } -/// } -/// -/// # async fn example() -> Result<(), Box> { -/// let config = SseClientConfig { -/// sse_endpoint: "http://localhost:8000/sse".into(), -/// ..Default::default() -/// }; -/// let transport = SseClientTransport::start_with_client(MyHttpClient, config).await?; -/// # Ok(()) -/// # } -/// ``` -/// -/// # Feature Flags -/// -/// - `transport-sse-client`: Base feature providing the generic transport infrastructure -/// - `transport-sse-client-reqwest`: Includes reqwest HTTP client support with convenience methods -pub struct SseClientTransport { - client: C, - config: SseClientConfig, - /// Current POST endpoint; refreshed when the server sends new endpoint - /// control frames. - message_endpoint: Arc>, - stream: Option>, -} - -impl Transport for SseClientTransport { - type Error = SseTransportError; - async fn receive(&mut self) -> Option { - self.stream.as_mut()?.next().await?.ok() - } - fn send( - &mut self, - item: crate::service::TxJsonRpcMessage, - ) -> impl Future> + Send + 'static { - let client = self.client.clone(); - let message_endpoint = self.message_endpoint.clone(); - async move { - let uri = { - let guard = message_endpoint - .read() - .expect("message endpoint lock poisoned"); - guard.clone() - }; - client.post_message(uri, item, None).await - } - } - async fn close(&mut self) -> Result<(), Self::Error> { - self.stream.take(); - Ok(()) - } -} - -impl std::fmt::Debug for SseClientTransport { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("SseClientWorker") - .field("client", &self.client) - .field("config", &self.config) - .finish() - } -} - -impl SseClientTransport { - pub async fn start_with_client( - client: C, - config: SseClientConfig, - ) -> Result> { - let sse_endpoint = config.sse_endpoint.as_ref().parse::()?; - - let mut sse_stream = client.get_stream(sse_endpoint.clone(), None, None).await?; - let initial_message_endpoint = if let Some(endpoint) = config.use_message_endpoint.clone() { - let ep = endpoint.parse::()?; - let mut sse_endpoint_parts = sse_endpoint.clone().into_parts(); - sse_endpoint_parts.path_and_query = ep.into_parts().path_and_query; - Uri::from_parts(sse_endpoint_parts)? - } else { - // wait the endpoint event - loop { - let sse = sse_stream - .next() - .await - .ok_or(SseTransportError::UnexpectedEndOfStream)??; - let Some("endpoint") = sse.event.as_deref() else { - continue; - }; - let ep = sse.data.unwrap_or_default(); - - break message_endpoint(sse_endpoint.clone(), ep)?; - } - }; - let message_endpoint = Arc::new(RwLock::new(initial_message_endpoint)); - - let stream = Box::pin(SseAutoReconnectStream::new( - sse_stream, - SseClientReconnect { - client: client.clone(), - uri: sse_endpoint.clone(), - message_endpoint: message_endpoint.clone(), - }, - config.retry_policy.clone(), - )); - Ok(Self { - client, - config, - message_endpoint, - stream: Some(stream), - }) - } -} - -fn message_endpoint(base: http::Uri, endpoint: String) -> Result { - // If endpoint is a full URL, parse and return it directly - if endpoint.starts_with("http://") || endpoint.starts_with("https://") { - return endpoint.parse::(); - } - - let mut base_parts = base.into_parts(); - let endpoint_clone = endpoint.clone(); - - if endpoint.starts_with("?") { - // Query only - keep base path and append query - if let Some(base_path_and_query) = &base_parts.path_and_query { - let base_path = base_path_and_query.path(); - base_parts.path_and_query = Some(format!("{}{}", base_path, endpoint).parse()?); - } else { - base_parts.path_and_query = Some(format!("/{}", endpoint).parse()?); - } - } else { - // Path (with optional query) - replace entire path_and_query - let path_to_use = if endpoint.starts_with("/") { - endpoint // Use absolute path as-is - } else { - format!("/{}", endpoint) // Make relative path absolute - }; - base_parts.path_and_query = Some(path_to_use.parse()?); - } - - http::Uri::from_parts(base_parts).map_err(|_| endpoint_clone.parse::().unwrap_err()) -} - -#[derive(Debug, Clone)] -pub struct SseClientConfig { - /// client sse endpoint - /// - /// # How this client resolve the message endpoint - /// if sse_endpoint has this format: ``, - /// then the message endpoint will be ``. - /// - /// For example, if you config the sse_endpoint as `http://example.com/some_path/sse`, - /// and the server send the message endpoint event as `message?session_id=123`, - /// then the message endpoint will be `http://example.com/message`. - /// - /// This follows the rules of JavaScript's [`new URL(url, base)`](https://developer.mozilla.org/en-US/docs/Web/API/URL/URL) - pub sse_endpoint: Arc, - pub retry_policy: Arc, - /// if this is settled, the client will use this endpoint to send message and skip get the endpoint event - pub use_message_endpoint: Option, -} - -impl Default for SseClientConfig { - fn default() -> Self { - Self { - sse_endpoint: "".into(), - retry_policy: Arc::new(super::common::client_side_sse::FixedInterval::default()), - use_message_endpoint: None, - } - } -} - -#[cfg(test)] -mod tests { - use futures::StreamExt; - use serde_json::{Value, json}; - - use super::*; - - #[derive(Clone)] - struct DummyClient; - - #[derive(Debug, thiserror::Error)] - #[error("dummy error")] - struct DummyError; - - impl SseClient for DummyClient { - type Error = DummyError; - - async fn post_message( - &self, - _uri: Uri, - _message: ClientJsonRpcMessage, - _auth_token: Option, - ) -> Result<(), SseTransportError> { - Ok(()) - } - - async fn get_stream( - &self, - _uri: Uri, - _last_event_id: Option, - _auth_token: Option, - ) -> Result> { - unreachable!("get_stream should not be called in this test") - } - } - - #[test] - fn test_message_endpoint() { - let base_url = "https://localhost/sse".parse::().unwrap(); - - // Query only - let result = message_endpoint(base_url.clone(), "?sessionId=x".to_string()).unwrap(); - assert_eq!(result.to_string(), "https://localhost/sse?sessionId=x"); - - // Relative path with query - let result = message_endpoint(base_url.clone(), "mypath?sessionId=x".to_string()).unwrap(); - assert_eq!(result.to_string(), "https://localhost/mypath?sessionId=x"); - - // Absolute path with query - let result = message_endpoint(base_url.clone(), "/xxx?sessionId=x".to_string()).unwrap(); - assert_eq!(result.to_string(), "https://localhost/xxx?sessionId=x"); - - // Full URL - let result = message_endpoint( - base_url.clone(), - "http://example.com/xxx?sessionId=x".to_string(), - ) - .unwrap(); - assert_eq!(result.to_string(), "http://example.com/xxx?sessionId=x"); - } - - #[test] - fn handle_endpoint_control_event_updates_uri() { - let initial_endpoint = "https://example.com/message?sessionId=old" - .parse::() - .unwrap(); - let shared_endpoint = Arc::new(RwLock::new(initial_endpoint)); - let mut reconnect = SseClientReconnect { - client: DummyClient, - uri: "https://example.com/sse".parse::().unwrap(), - message_endpoint: shared_endpoint.clone(), - }; - - let control_event = Sse::default() - .event("endpoint") - .data("/message?sessionId=new"); - - reconnect.handle_control_event(&control_event).unwrap(); - - let guard = shared_endpoint.read().expect("lock poisoned"); - assert_eq!( - guard.to_string(), - "https://example.com/message?sessionId=new" - ); - } - - #[tokio::test] - async fn control_event_frames_are_skipped() { - let payload = json!({ - "jsonrpc": "2.0", - "id": 1, - "result": {"ok": true} - }) - .to_string(); - - let events = vec![ - Ok(Sse::default() - .event("endpoint") - .data("/message?sessionId=reconnect")), - Ok(Sse::default().event("message").data(payload.clone())), - ]; - - let sse_src: BoxedSseResponse = futures::stream::iter(events).boxed(); - let reconn_stream = SseAutoReconnectStream::never_reconnect(sse_src, DummyError); - futures::pin_mut!(reconn_stream); - - let message = reconn_stream.next().await.expect("stream item").unwrap(); - let actual: Value = serde_json::to_value(message).expect("serialize actual message"); - // We only need to assert that a valid JSON-RPC response came through after - // skipping control frames. The exact `result` shape depends on the SDK's - // typed result enums and is not asserted here. - assert_eq!(actual.get("jsonrpc"), Some(&Value::String("2.0".into()))); - assert_eq!(actual.get("id"), Some(&Value::Number(1u64.into()))); - } -} diff --git a/crates/rmcp/src/transport/sse_server.rs b/crates/rmcp/src/transport/sse_server.rs deleted file mode 100644 index 15a65cb5..00000000 --- a/crates/rmcp/src/transport/sse_server.rs +++ /dev/null @@ -1,343 +0,0 @@ -use std::{collections::HashMap, io, net::SocketAddr, sync::Arc, time::Duration}; - -use axum::{ - Extension, Json, Router, - extract::{NestedPath, Query, State}, - http::{StatusCode, request::Parts}, - response::{ - Response, - sse::{Event, KeepAlive, Sse}, - }, - routing::{get, post}, -}; -use futures::{Sink, SinkExt, Stream}; -use tokio_stream::wrappers::ReceiverStream; -use tokio_util::sync::{CancellationToken, PollSender}; -use tracing::Instrument; - -use crate::{ - RoleServer, Service, - model::ClientJsonRpcMessage, - service::{RxJsonRpcMessage, TxJsonRpcMessage, serve_directly_with_ct}, - transport::common::server_side_http::{DEFAULT_AUTO_PING_INTERVAL, SessionId, session_id}, -}; - -type TxStore = - Arc>>>; -pub type TransportReceiver = ReceiverStream>; - -#[derive(Clone)] -struct App { - txs: TxStore, - transport_tx: tokio::sync::mpsc::UnboundedSender, - post_path: Arc, - sse_ping_interval: Duration, -} - -impl App { - pub fn new( - post_path: String, - sse_ping_interval: Duration, - ) -> ( - Self, - tokio::sync::mpsc::UnboundedReceiver, - ) { - let (transport_tx, transport_rx) = tokio::sync::mpsc::unbounded_channel(); - ( - Self { - txs: Default::default(), - transport_tx, - post_path: post_path.into(), - sse_ping_interval, - }, - transport_rx, - ) - } -} - -#[derive(Debug, serde::Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct PostEventQuery { - pub session_id: String, -} - -async fn post_event_handler( - State(app): State, - Query(PostEventQuery { session_id }): Query, - parts: Parts, - Json(mut message): Json, -) -> Result { - tracing::debug!(session_id, ?parts, ?message, "new client message"); - let tx = { - let rg = app.txs.read().await; - rg.get(session_id.as_str()) - .ok_or(StatusCode::NOT_FOUND)? - .clone() - }; - message.insert_extension(parts); - if tx.send(message).await.is_err() { - tracing::error!("send message error"); - return Err(StatusCode::GONE); - } - Ok(StatusCode::ACCEPTED) -} - -async fn sse_handler( - State(app): State, - nested_path: Option>, - parts: Parts, -) -> Result>>, Response> { - let session = session_id(); - tracing::info!(%session, ?parts, "sse connection"); - use tokio_stream::{StreamExt, wrappers::ReceiverStream}; - use tokio_util::sync::PollSender; - let (from_client_tx, from_client_rx) = tokio::sync::mpsc::channel(64); - let (to_client_tx, to_client_rx) = tokio::sync::mpsc::channel(64); - let to_client_tx_clone = to_client_tx.clone(); - - app.txs - .write() - .await - .insert(session.clone(), from_client_tx); - let session = session.clone(); - let stream = ReceiverStream::new(from_client_rx); - let sink = PollSender::new(to_client_tx); - let transport = SseServerTransport { - stream, - sink, - session_id: session.clone(), - tx_store: app.txs.clone(), - }; - let transport_send_result = app.transport_tx.send(transport); - if transport_send_result.is_err() { - tracing::warn!("send transport out error"); - let mut response = - Response::new("fail to send out transport, it seems server is closed".to_string()); - *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - return Err(response); - } - let nested_path = nested_path.as_deref().map(NestedPath::as_str).unwrap_or(""); - let post_path = app.post_path.as_ref(); - let ping_interval = app.sse_ping_interval; - let stream = futures::stream::once(futures::future::ok( - Event::default() - .event("endpoint") - .data(format!("{nested_path}{post_path}?sessionId={session}")), - )) - .chain(ReceiverStream::new(to_client_rx).map(|message| { - match serde_json::to_string(&message) { - Ok(bytes) => Ok(Event::default().event("message").data(&bytes)), - Err(e) => Err(io::Error::new(io::ErrorKind::InvalidData, e)), - } - })); - - tokio::spawn(async move { - // Wait for connection closure - to_client_tx_clone.closed().await; - - // Clean up session - let session_id = session.clone(); - let tx_store = app.txs.clone(); - let mut txs = tx_store.write().await; - txs.remove(&session_id); - tracing::debug!(%session_id, "Closed session and cleaned up resources"); - }); - - Ok(Sse::new(stream).keep_alive(KeepAlive::new().interval(ping_interval))) -} - -pub struct SseServerTransport { - stream: ReceiverStream>, - sink: PollSender>, - session_id: SessionId, - tx_store: TxStore, -} - -impl Sink> for SseServerTransport { - type Error = io::Error; - - fn poll_ready( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.sink - .poll_ready_unpin(cx) - .map_err(std::io::Error::other) - } - - fn start_send( - mut self: std::pin::Pin<&mut Self>, - item: TxJsonRpcMessage, - ) -> Result<(), Self::Error> { - self.sink - .start_send_unpin(item) - .map_err(std::io::Error::other) - } - - fn poll_flush( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.sink - .poll_flush_unpin(cx) - .map_err(std::io::Error::other) - } - - fn poll_close( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - let inner_close_result = self - .sink - .poll_close_unpin(cx) - .map_err(std::io::Error::other); - if inner_close_result.is_ready() { - let session_id = self.session_id.clone(); - let tx_store = self.tx_store.clone(); - tokio::spawn(async move { - tx_store.write().await.remove(&session_id); - }); - } - inner_close_result - } -} - -impl Stream for SseServerTransport { - type Item = RxJsonRpcMessage; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - use futures::StreamExt; - self.stream.poll_next_unpin(cx) - } -} - -#[derive(Debug, Clone)] -pub struct SseServerConfig { - pub bind: SocketAddr, - pub sse_path: String, - pub post_path: String, - pub ct: CancellationToken, - pub sse_keep_alive: Option, -} - -#[derive(Debug)] -pub struct SseServer { - transport_rx: tokio::sync::mpsc::UnboundedReceiver, - pub config: SseServerConfig, -} - -impl SseServer { - pub async fn serve(bind: SocketAddr) -> io::Result { - Self::serve_with_config(SseServerConfig { - bind, - sse_path: "/sse".to_string(), - post_path: "/message".to_string(), - ct: CancellationToken::new(), - sse_keep_alive: None, - }) - .await - } - pub async fn serve_with_config(config: SseServerConfig) -> io::Result { - let (sse_server, service) = Self::new(config); - let listener = tokio::net::TcpListener::bind(sse_server.config.bind).await?; - let ct = sse_server.config.ct.child_token(); - let server = axum::serve(listener, service).with_graceful_shutdown(async move { - ct.cancelled().await; - tracing::info!("sse server cancelled"); - }); - tokio::spawn( - async move { - if let Err(e) = server.await { - tracing::error!(error = %e, "sse server shutdown with error"); - } - } - .instrument(tracing::info_span!("sse-server", bind_address = %sse_server.config.bind)), - ); - Ok(sse_server) - } - - pub fn new(config: SseServerConfig) -> (SseServer, Router) { - let (app, transport_rx) = App::new( - config.post_path.clone(), - config.sse_keep_alive.unwrap_or(DEFAULT_AUTO_PING_INTERVAL), - ); - let router = Router::new() - .route(&config.sse_path, get(sse_handler)) - .route(&config.post_path, post(post_event_handler)) - .with_state(app); - - let server = SseServer { - transport_rx, - config, - }; - - (server, router) - } - - pub fn with_service(mut self, service_provider: F) -> CancellationToken - where - S: Service, - F: Fn() -> S + Send + 'static, - { - use crate::service::ServiceExt; - let ct = self.config.ct.clone(); - tokio::spawn(async move { - while let Some(transport) = self.next_transport().await { - let service = service_provider(); - let ct = self.config.ct.child_token(); - tokio::spawn(async move { - let server = service - .serve_with_ct(transport, ct) - .await - .map_err(std::io::Error::other)?; - server.waiting().await?; - tokio::io::Result::Ok(()) - }); - } - }); - ct - } - - /// This allows you to skip the initialization steps for incoming request. - pub fn with_service_directly(mut self, service_provider: F) -> CancellationToken - where - S: Service, - F: Fn() -> S + Send + 'static, - { - let ct = self.config.ct.clone(); - tokio::spawn(async move { - while let Some(transport) = self.next_transport().await { - let service = service_provider(); - let ct = self.config.ct.child_token(); - tokio::spawn(async move { - let server = serve_directly_with_ct(service, transport, None, ct); - server.waiting().await?; - tokio::io::Result::Ok(()) - }); - } - }); - ct - } - - pub fn cancel(&self) { - self.config.ct.cancel(); - } - - pub async fn next_transport(&mut self) -> Option { - self.transport_rx.recv().await - } -} - -impl Stream for SseServer { - type Item = SseServerTransport; - - fn poll_next( - mut self: std::pin::Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> std::task::Poll> { - self.transport_rx.poll_recv(cx) - } -} diff --git a/crates/rmcp/tests/test_with_js.rs b/crates/rmcp/tests/test_with_js.rs index 3f2761cd..a9fc5c90 100644 --- a/crates/rmcp/tests/test_with_js.rs +++ b/crates/rmcp/tests/test_with_js.rs @@ -2,7 +2,7 @@ use rmcp::{ ServiceExt, service::QuitReason, transport::{ - ConfigureCommandExt, SseServer, StreamableHttpClientTransport, StreamableHttpServerConfig, + ConfigureCommandExt, StreamableHttpClientTransport, StreamableHttpServerConfig, TokioChildProcess, streamable_http_server::{ session::local::LocalSessionManager, tower::StreamableHttpService, @@ -14,42 +14,11 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod common; use common::calculator::Calculator; -const SSE_BIND_ADDRESS: &str = "127.0.0.1:8000"; const STREAMABLE_HTTP_BIND_ADDRESS: &str = "127.0.0.1:8001"; const STREAMABLE_HTTP_JS_BIND_ADDRESS: &str = "127.0.0.1:8002"; #[tokio::test] -async fn test_with_js_client() -> anyhow::Result<()> { - let _ = tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "debug".to_string().into()), - ) - .with(tracing_subscriber::fmt::layer()) - .try_init(); - tokio::process::Command::new("npm") - .arg("install") - .current_dir("tests/test_with_js") - .spawn()? - .wait() - .await?; - - let ct = SseServer::serve(SSE_BIND_ADDRESS.parse()?) - .await? - .with_service(Calculator::default); - - let exit_status = tokio::process::Command::new("node") - .arg("tests/test_with_js/client.js") - .spawn()? - .wait() - .await?; - assert!(exit_status.success()); - ct.cancel(); - Ok(()) -} - -#[tokio::test] -async fn test_with_js_server() -> anyhow::Result<()> { +async fn test_with_js_stdio_server() -> anyhow::Result<()> { let _ = tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env() diff --git a/crates/rmcp/tests/test_with_python.rs b/crates/rmcp/tests/test_with_python.rs index 8971a327..3f883c96 100644 --- a/crates/rmcp/tests/test_with_python.rs +++ b/crates/rmcp/tests/test_with_python.rs @@ -1,15 +1,12 @@ use std::process::Stdio; -use axum::Router; use rmcp::{ ServiceExt, - transport::{ConfigureCommandExt, SseServer, TokioChildProcess, sse_server::SseServerConfig}, + transport::{ConfigureCommandExt, TokioChildProcess}, }; -use tokio::{io::AsyncReadExt, time::timeout}; -use tokio_util::sync::CancellationToken; +use tokio::io::AsyncReadExt; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; mod common; -use common::calculator::Calculator; async fn init() -> anyhow::Result<()> { let _ = tracing_subscriber::registry() @@ -28,81 +25,6 @@ async fn init() -> anyhow::Result<()> { Ok(()) } -#[tokio::test] -async fn test_with_python_client() -> anyhow::Result<()> { - init().await?; - - const BIND_ADDRESS: &str = "127.0.0.1:8000"; - - let ct = SseServer::serve(BIND_ADDRESS.parse()?) - .await? - .with_service(Calculator::default); - - let status = tokio::process::Command::new("uv") - .arg("run") - .arg("client.py") - .arg(format!("http://{BIND_ADDRESS}/sse")) - .current_dir("tests/test_with_python") - .spawn()? - .wait() - .await?; - assert!(status.success()); - ct.cancel(); - Ok(()) -} - -/// Test the SSE server in a nested Axum router. -#[tokio::test] -async fn test_nested_with_python_client() -> anyhow::Result<()> { - init().await?; - - const BIND_ADDRESS: &str = "127.0.0.1:8001"; - - // Create an SSE router - let sse_config = SseServerConfig { - bind: BIND_ADDRESS.parse()?, - sse_path: "/sse".to_string(), - post_path: "/message".to_string(), - ct: CancellationToken::new(), - sse_keep_alive: None, - }; - - let listener = tokio::net::TcpListener::bind(&sse_config.bind).await?; - - let (sse_server, sse_router) = SseServer::new(sse_config); - let ct = sse_server.with_service(Calculator::default); - - let main_router = Router::new().nest("/nested", sse_router); - - let server_ct = ct.clone(); - let server = axum::serve(listener, main_router).with_graceful_shutdown(async move { - server_ct.cancelled().await; - tracing::info!("sse server cancelled"); - }); - - tokio::spawn(async move { - let _ = server.await; - tracing::info!("sse server shutting down"); - }); - - // Spawn the process with timeout, as failure to access the '/message' URL - // causes the client to never exit. - let status = timeout( - tokio::time::Duration::from_secs(5), - tokio::process::Command::new("uv") - .arg("run") - .arg("client.py") - .arg(format!("http://{BIND_ADDRESS}/nested/sse")) - .current_dir("tests/test_with_python") - .spawn()? - .wait(), - ) - .await?; - assert!(status?.success()); - ct.cancel(); - Ok(()) -} - #[tokio::test] async fn test_with_python_server() -> anyhow::Result<()> { init().await?; diff --git a/docs/OAUTH_SUPPORT.md b/docs/OAUTH_SUPPORT.md index 4b585786..152e535c 100644 --- a/docs/OAUTH_SUPPORT.md +++ b/docs/OAUTH_SUPPORT.md @@ -9,8 +9,8 @@ This document describes the OAuth 2.1 authorization implementation for Model Con - Authorization server metadata discovery - Dynamic client registration - Automatic token refresh -- Authorized SSE transport implementation - Authorized HTTP Client implementation + ## Usage Guide ### 1. Enable Features @@ -19,7 +19,7 @@ Enable the auth feature in Cargo.toml: ```toml [dependencies] -rmcp = { version = "0.1", features = ["auth", "transport-sse-client"] } +rmcp = { version = "0.1", features = ["auth", "transport-streamable-http-client-reqwest"] } ``` ### 2. Use OAuthState @@ -42,29 +42,27 @@ rmcp = { version = "0.1", features = ["auth", "transport-sse-client"] } // Get authorization URL and guide user to open it let auth_url = oauth_state.get_authorization_url().await?; println!("Please open the following URL in your browser for authorization:\n{}", auth_url); - + // Handle callback - In real applications, this is typically done in a callback server let auth_code = "Authorization code (`code` param) obtained from browser after user authorization"; let csrf_token = "CSRF token (`state` param) obtained from browser after user authorization"; let credentials = oauth_state.handle_callback(auth_code, csrf_token).await?; - + println!("Authorization successful, access token: {}", credentials.access_token); ``` -### 4. Use Authorized SSE Transport and create client +### 4. Use Authorized Streamable HTTP Transport and create client ```rust ignore - let transport = - match create_authorized_transport(MCP_SSE_URL.to_string(), oauth_state, Some(retry_config)) - .await - { - Ok(t) => t, - Err(e) => { - tracing::error!("Failed to create authorized transport: {}", e); - return Err(anyhow::anyhow!("Connection failed: {}", e)); - } - }; + let am = oauth_state + .into_authorization_manager() + .ok_or_else(|| anyhow::anyhow!("Failed to get authorization manager"))?; + let client = AuthClient::new(reqwest::Client::default(), am); + let transport = StreamableHttpClientTransport::with_client( + client, + StreamableHttpClientTransportConfig::with_uri(MCP_SERVER_URL), + ); // Create client and connect to MCP server let client_service = ClientInfo::default(); @@ -77,20 +75,19 @@ rmcp = { version = "0.1", features = ["auth", "transport-sse-client"] } let client = oauth_state.to_authorized_http_client().await?; ``` -## Complete Example -client: Please refer to `examples/clients/src/auth/oauth_client.rs` for a complete usage example. -server: Please refer to `examples/servers/src/complex_auth_sse.rs` for a complete usage example. -### Running the Example in server -```bash -# Run example -cargo run --example complex_auth_sse -``` +## Complete Examples + +- **Client**: `examples/clients/src/auth/oauth_client.rs` +- **Server**: `examples/servers/src/complex_auth_streamhttp.rs` -### Running the Example in client +### Running the Examples ```bash -# Run example -cargo run --example oauth-client +# Run the OAuth server +cargo run --example servers_complex_auth_streamhttp + +# Run the OAuth client (in another terminal) +cargo run --example clients_oauth_client ``` ## Authorization Flow Description @@ -123,4 +120,4 @@ If you encounter authorization issues, check the following: - [MCP Authorization Specification](https://spec.modelcontextprotocol.io/specification/2025-03-26/basic/authorization/) - [OAuth 2.1 Specification Draft](https://oauth.net/2.1/) - [RFC 8414: OAuth 2.0 Authorization Server Metadata](https://datatracker.ietf.org/doc/html/rfc8414) -- [RFC 7591: OAuth 2.0 Dynamic Client Registration Protocol](https://datatracker.ietf.org/doc/html/rfc7591) +- [RFC 7591: OAuth 2.0 Dynamic Client Registration Protocol](https://datatracker.ietf.org/doc/html/rfc7591) diff --git a/examples/clients/Cargo.toml b/examples/clients/Cargo.toml index edc9d15d..e086e811 100644 --- a/examples/clients/Cargo.toml +++ b/examples/clients/Cargo.toml @@ -9,7 +9,6 @@ publish = false [dependencies] rmcp = { workspace = true, features = [ "client", - "transport-sse-client-reqwest", "reqwest", "transport-streamable-http-client-reqwest", "transport-child-process", @@ -30,10 +29,6 @@ axum = "0.8" reqwest = "0.12" clap = { version = "4.0", features = ["derive"] } -[[example]] -name = "clients_sse" -path = "src/sse.rs" - [[example]] name = "clients_git_stdio" path = "src/git_stdio.rs" diff --git a/examples/clients/README.md b/examples/clients/README.md index bd937001..1012867f 100644 --- a/examples/clients/README.md +++ b/examples/clients/README.md @@ -4,15 +4,6 @@ This directory contains Model Context Protocol (MCP) client examples implemented ## Example List - -### SSE Client (`sse.rs`) - -A client that communicates with an MCP server using Server-Sent Events (SSE) transport. - -- Connects to an MCP server running at `http://localhost:8000/sse` -- Retrieves server information and list of available tools -- Calls a tool named "increment" - ### Git Standard I/O Client (`git_stdio.rs`) A client that communicates with a Git-related MCP server using standard input/output. @@ -68,14 +59,14 @@ A client demonstrating how to use the sampling tool. - Retrieves server information and list of available tools - Calls the `ask_llm` tool -### Progress Test Client (`progress_test_client.rs`) +### Progress Test Client (`progress_client.rs`) A client that communicates with an MCP server using progress notifications. -- Launches the `cargo run --example clients_progress_client -- --transport {stdio|sse|http|all}` to test the progress notifications +- Launches the `cargo run --example clients_progress_client -- --transport {stdio|http|all}` to test the progress notifications - Connects to the server using different transport methods - Tests the progress notifications -- The sse and http should run the server first +- The http transport should run the server first ## How to Run @@ -83,9 +74,6 @@ A client that communicates with an MCP server using progress notifications. Each example can be run using Cargo: ```bash -# Run the SSE client example -cargo run --example clients_sse - # Run the Git standard I/O client example cargo run --example clients_git_stdio diff --git a/examples/clients/src/auth/oauth_client.rs b/examples/clients/src/auth/oauth_client.rs index ea7a1d3d..53fa5cca 100644 --- a/examples/clients/src/auth/oauth_client.rs +++ b/examples/clients/src/auth/oauth_client.rs @@ -11,9 +11,9 @@ use rmcp::{ ServiceExt, model::ClientInfo, transport::{ - SseClientTransport, + StreamableHttpClientTransport, auth::{AuthClient, OAuthState}, - sse_client::SseClientConfig, + streamable_http_client::StreamableHttpClientTransportConfig, }, }; use serde::Deserialize; @@ -25,7 +25,6 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const MCP_SERVER_URL: &str = "http://localhost:3000/mcp"; const MCP_REDIRECT_URI: &str = "http://localhost:8080/callback"; -const MCP_SSE_URL: &str = "http://localhost:3000/mcp/sse"; const CALLBACK_PORT: u16 = 8080; const CALLBACK_HTML: &str = include_str!("callback.html"); @@ -150,14 +149,10 @@ async fn main() -> Result<()> { .into_authorization_manager() .ok_or_else(|| anyhow::anyhow!("Failed to get authorization manager"))?; let client = AuthClient::new(reqwest::Client::default(), am); - let transport = SseClientTransport::start_with_client( + let transport = StreamableHttpClientTransport::with_client( client, - SseClientConfig { - sse_endpoint: MCP_SSE_URL.into(), - ..Default::default() - }, - ) - .await?; + StreamableHttpClientTransportConfig::with_uri(MCP_SERVER_URL), + ); // Create client and connect to MCP server let client_service = ClientInfo::default(); diff --git a/examples/clients/src/progress_client.rs b/examples/clients/src/progress_client.rs index 1ab7d85f..ddf18b2f 100644 --- a/examples/clients/src/progress_client.rs +++ b/examples/clients/src/progress_client.rs @@ -12,7 +12,7 @@ use rmcp::{ ProgressNotificationParam, }, service::{NotificationContext, RoleClient}, - transport::{SseClientTransport, StreamableHttpClientTransport, TokioChildProcess}, + transport::{StreamableHttpClientTransport, TokioChildProcess}, }; use tokio::{process::Command, time::sleep}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -20,7 +20,6 @@ use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; #[derive(Debug, Clone, ValueEnum)] enum TransportType { Stdio, - Sse, Http, All, } @@ -37,10 +36,6 @@ struct Args { #[arg(short, long, default_value_t = 10)] records: u32, - /// SSE server URL - #[arg(long, default_value = "http://127.0.0.1:8000/sse")] - sse_url: String, - /// HTTP server URL #[arg(long, default_value = "http://127.0.0.1:8001/mcp")] http_url: String, @@ -203,59 +198,7 @@ async fn test_stdio_transport(records: u32) -> Result<()> { tracing::info!("STDIO transport test completed successfully!"); Ok(()) } -// Test SSE transport, must run the server with `cargo run --example servers_progress_demo -- sse` in the servers directory -async fn test_sse_transport(sse_url: &str, records: u32) -> Result<()> { - tracing::info!("Testing SSE Transport"); - tracing::info!("========================="); - tracing::info!("SSE URL: {}", sse_url); - - // Wait a bit for server to be ready - sleep(Duration::from_secs(1)).await; - - let transport = SseClientTransport::start(sse_url).await?; - - // Create progress-aware client handler - let client_handler = ProgressAwareClient::new(); - client_handler.start_tracking(); - let client_handler_clone = client_handler.clone(); - - let client = client_handler.serve(transport).await.inspect_err(|e| { - tracing::error!("SSE client error: {:?}", e); - })?; - - // Initialize - let server_info = client.peer_info(); - if let Some(info) = server_info { - tracing::info!("Connected to server: {:?}", info.server_info.name); - } - - // List tools - let tools = client.list_tools(Default::default()).await?; - tracing::info!( - "Available tools: {:?}", - tools.tools.iter().map(|t| &t.name).collect::>() - ); - // Call stream processor tool - tracing::info!("Starting to process {} records...", records); - let tool_result = client - .call_tool(CallToolRequestParam { - name: "stream_processor".into(), - arguments: None, - }) - .await?; - - if let Some(content) = tool_result.content.first() { - if let Some(text) = content.as_text() { - tracing::info!("Processing completed: {}", text.text); - } - } - - client.cancel().await?; - client_handler_clone.stop_tracking(); - tracing::info!("SSE transport test completed successfully!"); - Ok(()) -} // Test HTTP transport, must run the server with `cargo run --example servers_progress_demo -- http` in the servers directory async fn test_http_transport(http_url: &str, records: u32) -> Result<()> { tracing::info!("Testing HTTP Streaming Transport"); @@ -316,13 +259,6 @@ async fn run_single_test(transport_type: &TransportType, args: &Args) -> Result< test_stdio_transport(args.records).await?; Ok(true) } - TransportType::Sse => match test_sse_transport(&args.sse_url, args.records).await { - Ok(_) => Ok(true), - Err(e) => { - tracing::error!("SSE test failed: {}", e); - Ok(false) - } - }, TransportType::Http => match test_http_transport(&args.http_url, args.records).await { Ok(_) => Ok(true), Err(e) => { @@ -357,11 +293,7 @@ async fn main() -> Result<()> { // Test all transport types let mut results = std::collections::HashMap::new(); - for transport_type in [ - TransportType::Stdio, - TransportType::Sse, - TransportType::Http, - ] { + for transport_type in [TransportType::Stdio, TransportType::Http] { let transport_name = format!("{:?}", transport_type).to_uppercase(); tracing::info!("\n"); diff --git a/examples/clients/src/sse.rs b/examples/clients/src/sse.rs deleted file mode 100644 index 11e5f3c5..00000000 --- a/examples/clients/src/sse.rs +++ /dev/null @@ -1,52 +0,0 @@ -use anyhow::Result; -use rmcp::{ - ServiceExt, - model::{CallToolRequestParam, ClientCapabilities, ClientInfo, Implementation}, - transport::SseClientTransport, -}; -use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; - -#[tokio::main] -async fn main() -> Result<()> { - // Initialize logging - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| format!("info,{}=debug", env!("CARGO_CRATE_NAME")).into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); - let transport = SseClientTransport::start("http://localhost:8000/sse").await?; - let client_info = ClientInfo { - protocol_version: Default::default(), - capabilities: ClientCapabilities::default(), - client_info: Implementation { - name: "test sse client".to_string(), - title: None, - version: "0.0.1".to_string(), - website_url: None, - icons: None, - }, - }; - let client = client_info.serve(transport).await.inspect_err(|e| { - tracing::error!("client error: {:?}", e); - })?; - - // Initialize - let server_info = client.peer_info(); - tracing::info!("Connected to server: {server_info:#?}"); - - // List tools - let tools = client.list_tools(Default::default()).await?; - tracing::info!("Available tools: {tools:#?}"); - - let tool_result = client - .call_tool(CallToolRequestParam { - name: "increment".into(), - arguments: serde_json::json!({}).as_object().cloned(), - }) - .await?; - tracing::info!("Tool result: {tool_result:#?}"); - client.cancel().await?; - Ok(()) -} diff --git a/examples/rig-integration/Cargo.toml b/examples/rig-integration/Cargo.toml index a472086d..079ef0dc 100644 --- a/examples/rig-integration/Cargo.toml +++ b/examples/rig-integration/Cargo.toml @@ -18,7 +18,6 @@ tokio = { version = "1", features = ["full"] } rmcp = { workspace = true, features = [ "client", "transport-child-process", - "transport-sse-client-reqwest", "transport-streamable-http-client-reqwest" ] } anyhow = "1.0" diff --git a/examples/rig-integration/src/config/mcp.rs b/examples/rig-integration/src/config/mcp.rs index d6bd31b6..45e4c23c 100644 --- a/examples/rig-integration/src/config/mcp.rs +++ b/examples/rig-integration/src/config/mcp.rs @@ -17,9 +17,6 @@ pub enum McpServerTransportConfig { Streamable { url: String, }, - Sse { - url: String, - }, Stdio { command: String, #[serde(default)] @@ -68,10 +65,6 @@ impl McpServerTransportConfig { rmcp::transport::StreamableHttpClientTransport::from_uri(url.to_string()); ().serve(transport).await? } - McpServerTransportConfig::Sse { url } => { - let transport = rmcp::transport::SseClientTransport::start(url.to_string()).await?; - ().serve(transport).await? - } McpServerTransportConfig::Stdio { command, args, diff --git a/examples/servers/Cargo.toml b/examples/servers/Cargo.toml index 8bf97f2a..8979a934 100644 --- a/examples/servers/Cargo.toml +++ b/examples/servers/Cargo.toml @@ -9,7 +9,6 @@ rmcp = { workspace = true, features = [ "server", "macros", "client", - "transport-sse-server", "transport-io", "transport-streamable-http-server", "auth", @@ -54,14 +53,6 @@ tokio-util = { version = "0.7", features = ["codec"] } name = "servers_counter_stdio" path = "src/counter_stdio.rs" -[[example]] -name = "servers_counter_sse" -path = "src/counter_sse.rs" - -[[example]] -name = "servers_counter_sse_directly" -path = "src/counter_sse_directly.rs" - [[example]] name = "servers_memory_stdio" path = "src/memory_stdio.rs" @@ -70,14 +61,6 @@ path = "src/memory_stdio.rs" name = "servers_counter_streamhttp" path = "src/counter_streamhttp.rs" -[[example]] -name = "servers_complex_auth_sse" -path = "src/complex_auth_sse.rs" - -[[example]] -name = "servers_simple_auth_sse" -path = "src/simple_auth_sse.rs" - [[example]] name = "servers_prompt_stdio" path = "src/prompt_stdio.rs" @@ -105,3 +88,11 @@ path = "src/completion_stdio.rs" [[example]] name = "servers_progress_demo" path = "src/progress_demo.rs" + +[[example]] +name = "servers_simple_auth_streamhttp" +path = "src/simple_auth_streamhttp.rs" + +[[example]] +name = "servers_complex_auth_streamhttp" +path = "src/complex_auth_streamhttp.rs" diff --git a/examples/servers/README.md b/examples/servers/README.md index 089ff4c6..f2e877ba 100644 --- a/examples/servers/README.md +++ b/examples/servers/README.md @@ -61,52 +61,26 @@ A server that demonstrates progress notifications during long-running operations - Provides a stream_processor tool that generates progress notifications - Demonstrates progress notifications during long-running operations -- Can be run with `cargo run --example servers_progress_demo -- {stdio|sse|http|all}` +- Can be run with `cargo run --example servers_progress_demo -- {stdio|http|all}` -
-Deprecated: SSE Transport Examples +### Simple Auth Streamable HTTP Server (`simple_auth_streamhttp.rs`) -> **Note:** SSE (Server-Sent Events) transport has been removed from newer versions of the MCP specification. Streamable HTTP is the preferred transport for HTTP-based MCP servers as it is more reliable. These examples are kept for reference but should not be used for new implementations. +A server demonstrating simple token-based authentication with streamable HTTP transport. -### Counter SSE Server (`counter_sse.rs`) +- Uses bearer token authentication via Authorization header +- Provides `/api/token/{id}` endpoint to get demo tokens +- Protected MCP endpoint at `/mcp` +- Shows how to add auth middleware to streamable HTTP services -A server that provides counter functionality using Server-Sent Events (SSE) transport. +### Complex Auth Streamable HTTP Server (`complex_auth_streamhttp.rs`) -- Runs on `http://127.0.0.1:8000/sse` by default -- Provides the same counter tools as the stdio version -- Demonstrates SSE transport setup with graceful shutdown -- Can be accessed via web browsers or SSE-compatible clients +A full OAuth 2.0 authorization server implementation with streamable HTTP MCP transport. -### Counter SSE Direct Server (`counter_sse_directly.rs`) - -A minimal SSE server implementation showing direct SSE server usage. - -- Simplified version of the SSE server -- Demonstrates basic SSE server configuration -- Provides counter functionality with minimal setup - -### Complex OAuth SSE Server (`complex_auth_sse.rs`) - -A comprehensive example demonstrating OAuth 2.0 integration with MCP servers. - -- Full OAuth 2.0 authorization server implementation -- Client registration and token management -- User authorization flow with web interface -- Token validation middleware -- Integrated with MCP SSE transport -- Demonstrates enterprise-grade authentication patterns - -### Simple OAuth SSE Server (`simple_auth_sse.rs`) - -A simplified OAuth example showing basic token-based authentication. - -- Basic token store and validation -- Authorization middleware for SSE endpoints -- Token generation API -- Simplified authentication flow -- Good starting point for adding authentication to MCP servers - -
+- Complete OAuth 2.0 authorization code flow +- Client registration endpoint +- Authorization server metadata discovery +- Protected MCP endpoint with token validation +- Demonstrates building a production-like auth server ## How to Run @@ -127,6 +101,12 @@ cargo run --example servers_elicitation_stdio # Run the prompt standard I/O server cargo run --example servers_prompt_stdio + +# Run the simple auth streamable HTTP server +cargo run --example servers_simple_auth_streamhttp + +# Run the complex auth streamable HTTP server +cargo run --example servers_complex_auth_streamhttp ``` ## Testing with MCP Inspector @@ -145,11 +125,6 @@ These examples use the following main dependencies: - `anyhow`: Error handling - `axum`: Web framework for HTTP-based transports - `tokio-util`: Utilities for async programming -- `askama`: Template engine (used in OAuth examples) -- `tower-http`: HTTP middleware (used for CORS in OAuth examples) -- `uuid`: UUID generation (used in OAuth examples) -- `chrono`: Date and time handling (used in OAuth examples) -- `rand`: Random number generation (used in OAuth examples) - `schemars`: JSON Schema generation (used in elicitation examples) ## Common Module diff --git a/examples/servers/src/complex_auth_sse.rs b/examples/servers/src/complex_auth_streamhttp.rs similarity index 91% rename from examples/servers/src/complex_auth_sse.rs rename to examples/servers/src/complex_auth_streamhttp.rs index c3f9fc36..cd5ee04e 100644 --- a/examples/servers/src/complex_auth_sse.rs +++ b/examples/servers/src/complex_auth_streamhttp.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, net::SocketAddr, sync::Arc, time::Duration}; +use std::{collections::HashMap, net::SocketAddr, sync::Arc}; use anyhow::Result; use askama::Template; @@ -13,17 +13,13 @@ use axum::{ }; use rand::{Rng, distr::Alphanumeric}; use rmcp::transport::{ - SseServer, - auth::{ - AuthorizationMetadata, ClientRegistrationRequest, ClientRegistrationResponse, - OAuthClientConfig, - }, - sse_server::SseServerConfig, + StreamableHttpServerConfig, + auth::{AuthorizationMetadata, ClientRegistrationResponse, OAuthClientConfig}, + streamable_http_server::{session::local::LocalSessionManager, tower::StreamableHttpService}, }; use serde::{Deserialize, Serialize}; use serde_json::Value; use tokio::sync::RwLock; -use tokio_util::sync::CancellationToken; use tower_http::cors::{Any, CorsLayer}; use tracing::{debug, error, info, warn}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; @@ -35,6 +31,13 @@ use common::counter::Counter; const BIND_ADDRESS: &str = "127.0.0.1:3000"; const INDEX_HTML: &str = include_str!("html/mcp_oauth_index.html"); +// Local registration request - only uses fields needed for this demo +#[derive(Debug, Deserialize)] +struct LocalClientRegistrationRequest { + client_name: String, + redirect_uris: Vec, +} + // A easy way to manage MCP OAuth Store for managing tokens and sessions #[derive(Clone, Debug)] struct McpOAuthStore { @@ -480,7 +483,7 @@ async fn oauth_token( } } -// Auth middleware for SSE connections +// Auth middleware for MCP connections async fn validate_token_middleware( State(token_store): State>, request: Request, @@ -537,7 +540,7 @@ async fn oauth_authorization_server() -> impl IntoResponse { // handle client registration request async fn oauth_register( State(state): State>, - Json(req): Json, + Json(req): Json, ) -> impl IntoResponse { debug!("register request: {:?}", req); if req.redirect_uris.is_empty() { @@ -640,23 +643,22 @@ async fn main() -> Result<()> { // Set up port let addr = BIND_ADDRESS.parse::()?; - // Create SSE server configuration for MCP - let sse_config = SseServerConfig { - bind: addr, - sse_path: "/mcp/sse".to_string(), - post_path: "/mcp/message".to_string(), - ct: CancellationToken::new(), - sse_keep_alive: Some(Duration::from_secs(15)), - }; - - // Create SSE server - let (sse_server, sse_router) = SseServer::new(sse_config); + // Create streamable HTTP service for MCP + let mcp_service: StreamableHttpService = + StreamableHttpService::new( + || Ok(Counter::new()), + LocalSessionManager::default().into(), + StreamableHttpServerConfig::default(), + ); - // Create protected SSE routes (require authorization) - let protected_sse_router = sse_router.layer(middleware::from_fn_with_state( - oauth_store.clone(), - validate_token_middleware, - )); + // Create protected MCP routes (require authorization) + let protected_mcp_router = + Router::new() + .nest_service("/mcp", mcp_service) + .layer(middleware::from_fn_with_state( + oauth_store.clone(), + validate_token_middleware, + )); // Create CORS layer for the oauth authorization server endpoint let cors_layer = CorsLayer::new() @@ -681,43 +683,23 @@ async fn main() -> Result<()> { // Create HTTP router with request logging middleware let app = Router::new() .route("/", get(index)) - .route("/mcp", get(index)) .route("/oauth/authorize", get(oauth_authorize)) .route("/oauth/approve", post(oauth_approve)) .merge(oauth_server_router) // Merge the CORS-enabled oauth server router - // .merge(protected_sse_router) + .merge(protected_mcp_router) .with_state(oauth_store.clone()) .layer(middleware::from_fn(log_request)); - let app = app.merge(protected_sse_router); - // Register token validation middleware for SSE - let cancel_token = sse_server.config.ct.clone(); - // Handle Ctrl+C - let cancel_token2 = sse_server.config.ct.clone(); - // Start SSE server with Counter service - sse_server.with_service(Counter::new); - // Start HTTP server info!("MCP OAuth Server started on {}", addr); let listener = tokio::net::TcpListener::bind(addr).await?; - let server = axum::serve(listener, app).with_graceful_shutdown(async move { - cancel_token.cancelled().await; - info!("Server is shutting down"); - }); - - tokio::spawn(async move { - match tokio::signal::ctrl_c().await { - Ok(()) => { - info!("Received Ctrl+C, shutting down"); - cancel_token2.cancel(); - } - Err(e) => error!("Failed to listen for Ctrl+C: {}", e), - } - }); - if let Err(e) = server.await { - error!("Server error: {}", e); - } + axum::serve(listener, app) + .with_graceful_shutdown(async { + tokio::signal::ctrl_c().await.ok(); + info!("Shutting down..."); + }) + .await?; Ok(()) } diff --git a/examples/servers/src/counter_sse.rs b/examples/servers/src/counter_sse.rs deleted file mode 100644 index 373a8a6d..00000000 --- a/examples/servers/src/counter_sse.rs +++ /dev/null @@ -1,54 +0,0 @@ -use rmcp::transport::sse_server::{SseServer, SseServerConfig}; -use tracing_subscriber::{ - layer::SubscriberExt, - util::SubscriberInitExt, - {self}, -}; -mod common; -use common::counter::Counter; - -const BIND_ADDRESS: &str = "127.0.0.1:8000"; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "debug".to_string().into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); - - let config = SseServerConfig { - bind: BIND_ADDRESS.parse()?, - sse_path: "/sse".to_string(), - post_path: "/message".to_string(), - ct: tokio_util::sync::CancellationToken::new(), - sse_keep_alive: None, - }; - - let (sse_server, router) = SseServer::new(config); - - // Do something with the router, e.g., add routes or middleware - - let listener = tokio::net::TcpListener::bind(sse_server.config.bind).await?; - - let ct = sse_server.config.ct.child_token(); - - let server = axum::serve(listener, router).with_graceful_shutdown(async move { - ct.cancelled().await; - tracing::info!("sse server cancelled"); - }); - - tokio::spawn(async move { - if let Err(e) = server.await { - tracing::error!(error = %e, "sse server shutdown with error"); - } - }); - - let ct = sse_server.with_service(Counter::new); - - tokio::signal::ctrl_c().await?; - ct.cancel(); - Ok(()) -} diff --git a/examples/servers/src/counter_sse_directly.rs b/examples/servers/src/counter_sse_directly.rs deleted file mode 100644 index f70320e2..00000000 --- a/examples/servers/src/counter_sse_directly.rs +++ /dev/null @@ -1,29 +0,0 @@ -use rmcp::transport::sse_server::SseServer; -use tracing_subscriber::{ - layer::SubscriberExt, - util::SubscriberInitExt, - {self}, -}; -mod common; -use common::counter::Counter; - -const BIND_ADDRESS: &str = "127.0.0.1:8000"; - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - tracing_subscriber::registry() - .with( - tracing_subscriber::EnvFilter::try_from_default_env() - .unwrap_or_else(|_| "debug".to_string().into()), - ) - .with(tracing_subscriber::fmt::layer()) - .init(); - - let ct = SseServer::serve(BIND_ADDRESS.parse()?) - .await? - .with_service_directly(Counter::new); - - tokio::signal::ctrl_c().await?; - ct.cancel(); - Ok(()) -} diff --git a/examples/servers/src/html/sse_auth_index.html b/examples/servers/src/html/sse_auth_index.html deleted file mode 100644 index 7acfa3b1..00000000 --- a/examples/servers/src/html/sse_auth_index.html +++ /dev/null @@ -1,33 +0,0 @@ - - - - RMCP Authorized SSE Server - - - -

RMCP Authorized SSE Server

-

This is a Server-Sent Events server example that requires OAuth authorization.

- -

Available Endpoints:

-
    -
  • /api/health - Health check
  • -
  • /api/token/{token_id} - Get test token (available: demo, test)
  • -
  • /sse - SSE connection endpoint (requires authorization)
  • -
  • /message - Message sending endpoint (requires authorization)
  • -
- -

Usage:

-
-        # Get a token
-        curl http://127.0.0.1:8000/api/token/demo
-
-        # Connect to SSE using the token
-        curl -H "Authorization: Bearer demo-token" http://127.0.0.1:8000/sse
-    
- - \ No newline at end of file diff --git a/examples/servers/src/progress_demo.rs b/examples/servers/src/progress_demo.rs index dd27eec7..e9e147ea 100644 --- a/examples/servers/src/progress_demo.rs +++ b/examples/servers/src/progress_demo.rs @@ -3,7 +3,6 @@ use std::env; use rmcp::{ ServiceExt, transport::{ - sse_server::{SseServer, SseServerConfig}, stdio, streamable_http_server::{StreamableHttpService, session::local::LocalSessionManager}, }, @@ -12,7 +11,6 @@ use rmcp::{ mod common; use common::progress_demo::ProgressDemo; -const SSE_BIND_ADDRESS: &str = "127.0.0.1:8000"; const HTTP_BIND_ADDRESS: &str = "127.0.0.1:8001"; #[tokio::main] @@ -24,14 +22,10 @@ async fn main() -> anyhow::Result<()> { match transport_mode.as_str() { "stdio" => run_stdio().await, - "sse" => run_sse().await, "http" | "streamhttp" => run_streamable_http().await, "all" => run_all_transports().await, _ => { - eprintln!( - "Usage: {} [stdio|sse|http|all]", - env::args().next().unwrap() - ); + eprintln!("Usage: {} [stdio|http|all]", env::args().next().unwrap()); std::process::exit(1); } } @@ -47,47 +41,6 @@ async fn run_stdio() -> anyhow::Result<()> { Ok(()) } -async fn run_sse() -> anyhow::Result<()> { - println!("Running SSE server"); - let config = SseServerConfig { - bind: SSE_BIND_ADDRESS.parse()?, - sse_path: "/sse".to_string(), - post_path: "/message".to_string(), - ct: tokio_util::sync::CancellationToken::new(), - sse_keep_alive: None, - }; - - let (sse_server, router) = SseServer::new(config); - - // Start the HTTP server for SSE - let listener = tokio::net::TcpListener::bind(sse_server.config.bind).await?; - let ct = sse_server.config.ct.child_token(); - - let server = axum::serve(listener, router).with_graceful_shutdown(async move { - ct.cancelled().await; - tracing::info!("SSE server cancelled"); - }); - - tokio::spawn(async move { - if let Err(e) = server.await { - tracing::error!(error = %e, "SSE server shutdown with error"); - } - }); - - // Start the MCP service with SSE transport - let ct = sse_server.with_service(ProgressDemo::new); - - tracing::info!( - "Progress Demo SSE server started at http://{}/sse", - SSE_BIND_ADDRESS - ); - tracing::info!("Press Ctrl+C to shutdown"); - - tokio::signal::ctrl_c().await?; - ct.cancel(); - Ok(()) -} - async fn run_streamable_http() -> anyhow::Result<()> { println!("Running Streamable HTTP server"); let service = StreamableHttpService::new( @@ -114,18 +67,6 @@ async fn run_streamable_http() -> anyhow::Result<()> { async fn run_all_transports() -> anyhow::Result<()> { println!("Running all transports"); - // Start SSE server - let sse_config = SseServerConfig { - bind: SSE_BIND_ADDRESS.parse()?, - sse_path: "/sse".to_string(), - post_path: "/message".to_string(), - ct: tokio_util::sync::CancellationToken::new(), - sse_keep_alive: None, - }; - - let (sse_server, sse_router) = SseServer::new(sse_config); - let sse_listener = tokio::net::TcpListener::bind(sse_server.config.bind).await?; - let sse_ct = sse_server.config.ct.child_token(); // Start Streamable HTTP server let http_service = StreamableHttpService::new( @@ -136,19 +77,6 @@ async fn run_all_transports() -> anyhow::Result<()> { let http_router = axum::Router::new().nest_service("/mcp", http_service); let http_listener = tokio::net::TcpListener::bind(HTTP_BIND_ADDRESS).await?; - // Start SSE HTTP server - let sse_http_server = - axum::serve(sse_listener, sse_router).with_graceful_shutdown(async move { - sse_ct.cancelled().await; - tracing::info!("SSE server cancelled"); - }); - - tokio::spawn(async move { - if let Err(e) = sse_http_server.await { - tracing::error!(error = %e, "SSE server shutdown with error"); - } - }); - // Start Streamable HTTP server tokio::spawn(async move { let _ = axum::serve(http_listener, http_router) @@ -156,11 +84,13 @@ async fn run_all_transports() -> anyhow::Result<()> { .await; }); - // Start MCP service with SSE - let mcp_sse_ct = sse_server.with_service(ProgressDemo::new); + tracing::info!( + "Progress Demo HTTP server started at http://{}/mcp", + HTTP_BIND_ADDRESS + ); + tracing::info!("Press Ctrl+C to shutdown"); tokio::signal::ctrl_c().await?; - mcp_sse_ct.cancel(); Ok(()) } diff --git a/examples/servers/src/simple_auth_sse.rs b/examples/servers/src/simple_auth_streamhttp.rs similarity index 64% rename from examples/servers/src/simple_auth_sse.rs rename to examples/servers/src/simple_auth_streamhttp.rs index 6514c161..f68ed894 100644 --- a/examples/servers/src/simple_auth_sse.rs +++ b/examples/servers/src/simple_auth_streamhttp.rs @@ -1,11 +1,13 @@ -/// This example shows how to use the RMCP SSE server with OAuth authorization. +/// This example shows how to use the RMCP streamable HTTP server with simple token authorization. /// Use the inspector to view this server https://github.com/modelcontextprotocol/inspector /// The default index page is available at http://127.0.0.1:8000/ /// # Get a token /// curl http://127.0.0.1:8000/api/token/demo -/// # Connect to SSE using the token -/// curl -H "Authorization: Bearer demo-token" http://127.0.0.1:8000/sse -use std::{net::SocketAddr, sync::Arc, time::Duration}; +/// # Connect using the token +/// curl -X POST -H "Authorization: Bearer demo-token" -H "Content-Type: application/json" \ +/// -d '{"jsonrpc":"2.0","method":"initialize","params":{},"id":1}' \ +/// http://127.0.0.1:8000/mcp +use std::{net::SocketAddr, sync::Arc}; use anyhow::Result; use axum::{ @@ -16,14 +18,27 @@ use axum::{ response::{Html, Response}, routing::get, }; -use rmcp::transport::{SseServer, sse_server::SseServerConfig}; -use tokio_util::sync::CancellationToken; +use rmcp::transport::{ + StreamableHttpServerConfig, + streamable_http_server::{session::local::LocalSessionManager, tower::StreamableHttpService}, +}; mod common; use common::counter::Counter; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; const BIND_ADDRESS: &str = "127.0.0.1:8000"; -const INDEX_HTML: &str = include_str!("html/sse_auth_index.html"); +const INDEX_HTML: &str = r#" + + + MCP Streamable HTTP Auth Server + + +

MCP Streamable HTTP Server with Auth

+

Get a token: curl http://127.0.0.1:8000/api/token/demo

+

Connect: curl -X POST -H "Authorization: Bearer demo-token" -H "Content-Type: application/json" -d '{"jsonrpc":"2.0","method":"initialize","params":{},"id":1}' http://127.0.0.1:8000/mcp

+ +"#; + // A simple token store struct TokenStore { valid_tokens: Vec, @@ -115,69 +130,44 @@ async fn main() -> Result<()> { // Set up port let addr = BIND_ADDRESS.parse::()?; - // Create SSE server configuration - let sse_config = SseServerConfig { - bind: addr, - sse_path: "/sse".to_string(), - post_path: "/message".to_string(), - ct: CancellationToken::new(), - sse_keep_alive: Some(Duration::from_secs(15)), - }; - - // Create SSE server - let (sse_server, sse_router) = SseServer::new(sse_config); + // Create streamable HTTP service + let mcp_service: StreamableHttpService = + StreamableHttpService::new( + || Ok(Counter::new()), + LocalSessionManager::default().into(), + StreamableHttpServerConfig::default(), + ); // Create API routes let api_routes = Router::new() .route("/health", get(health_check)) .route("/token/{token_id}", get(get_token)); - // Create protected SSE routes (require authorization) - let protected_sse_router = sse_router.layer(middleware::from_fn_with_state( - token_store.clone(), - auth_middleware, - )); + // Create protected MCP routes (require authorization) + let protected_mcp_router = + Router::new() + .nest_service("/mcp", mcp_service) + .layer(middleware::from_fn_with_state( + token_store.clone(), + auth_middleware, + )); // Create main router, public endpoints don't require authorization let app = Router::new() .route("/", get(index)) .nest("/api", api_routes) - .merge(protected_sse_router) - .with_state(()); - - // Start server and register service - let listener = tokio::net::TcpListener::bind(addr).await?; - let ct = sse_server.config.ct.clone(); - - // Start SSE server with Counter service - sse_server.with_service(Counter::new); - - // Handle signals for graceful shutdown - let cancel_token = ct.clone(); - tokio::spawn(async move { - match tokio::signal::ctrl_c().await { - Ok(()) => { - println!("Received Ctrl+C, shutting down server..."); - cancel_token.cancel(); - } - Err(err) => { - eprintln!("Unable to listen for Ctrl+C signal: {}", err); - } - } - }); + .merge(protected_mcp_router); // Start HTTP server + let listener = tokio::net::TcpListener::bind(addr).await?; tracing::info!("Server started on {}", addr); - let server = axum::serve(listener, app).with_graceful_shutdown(async move { - // Wait for cancellation signal - ct.cancelled().await; - println!("Server is shutting down..."); - }); - - if let Err(e) = server.await { - eprintln!("Server error: {}", e); - } - println!("Server has been shut down"); + axum::serve(listener, app) + .with_graceful_shutdown(async { + tokio::signal::ctrl_c().await.ok(); + println!("Shutting down..."); + }) + .await?; + Ok(()) } diff --git a/examples/simple-chat-client/Cargo.toml b/examples/simple-chat-client/Cargo.toml index cdebbfa8..db8cdda4 100644 --- a/examples/simple-chat-client/Cargo.toml +++ b/examples/simple-chat-client/Cargo.toml @@ -17,7 +17,6 @@ toml = "0.9" rmcp = { workspace = true, features = [ "client", "transport-child-process", - "transport-sse-client-reqwest", "transport-streamable-http-client-reqwest" ] } clap = { version = "4.0", features = ["derive"] } diff --git a/examples/simple-chat-client/src/config.rs b/examples/simple-chat-client/src/config.rs index 6f0350b4..e469c280 100644 --- a/examples/simple-chat-client/src/config.rs +++ b/examples/simple-chat-client/src/config.rs @@ -32,9 +32,6 @@ pub enum McpServerTransportConfig { Streamable { url: String, }, - Sse { - url: String, - }, Stdio { command: String, #[serde(default)] @@ -52,11 +49,6 @@ impl McpServerTransportConfig { rmcp::transport::StreamableHttpClientTransport::from_uri(url.to_string()); ().serve(transport).await? } - McpServerTransportConfig::Sse { url } => { - let transport = - rmcp::transport::sse_client::SseClientTransport::start(url.to_owned()).await?; - ().serve(transport).await? - } McpServerTransportConfig::Stdio { command, args,