Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add WebSocketClient::new_with_config to specify the WebSocket connection settings #975

Merged
merged 4 commits into from
Sep 16, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .changelog/unreleased/improvements/ws-config.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- `[tendermint-light-client]` Add `WebSocketClient::new_with_config` to specify
the WebSocket connection settings ([#974](https://github.com/informalsystems/tendermint-rs/issues/974))
4 changes: 3 additions & 1 deletion rpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatch
#[cfg(feature = "http-client")]
pub use transport::http::{HttpClient, HttpClientUrl};
#[cfg(feature = "websocket-client")]
pub use transport::websocket::{WebSocketClient, WebSocketClientDriver, WebSocketClientUrl};
pub use transport::websocket::{
WebSocketClient, WebSocketClientDriver, WebSocketClientUrl, WebSocketConfig,
};

use crate::endpoint::validators::DEFAULT_VALIDATORS_PER_PAGE;
use crate::endpoint::*;
Expand Down
96 changes: 69 additions & 27 deletions rpc/src/client/transport/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ const RECV_TIMEOUT: Duration = Duration::from_secs(RECV_TIMEOUT_SECONDS);
// Taken from https://github.com/tendermint/tendermint/blob/309e29c245a01825fc9630103311fd04de99fa5e/rpc/jsonrpc/server/ws_handler.go#L28
const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / 10);

/// Low-level WebSocket configuration
pub use async_tungstenite::tungstenite::protocol::WebSocketConfig;

/// Tendermint RPC client that provides access to all RPC functionality
/// (including [`Event`] subscription) over a WebSocket connection.
///
Expand Down Expand Up @@ -133,15 +136,31 @@ impl WebSocketClient {
///
/// Supports both `ws://` and `wss://` protocols.
pub async fn new<U>(url: U) -> Result<(Self, WebSocketClientDriver), Error>
where
U: TryInto<WebSocketClientUrl, Error = Error>,
{
Self::new_with_config(url, None).await
}

/// Construct a new WebSocket-based client connecting to the given
/// Tendermint node's RPC endpoint.
///
/// Supports both `ws://` and `wss://` protocols.
pub async fn new_with_config<U>(
url: U,
config: Option<WebSocketConfig>,
) -> Result<(Self, WebSocketClientDriver), Error>
where
U: TryInto<WebSocketClientUrl, Error = Error>,
{
let url = url.try_into()?;

let (inner, driver) = if url.0.is_secure() {
sealed::WebSocketClient::new_secure(url.0).await?
sealed::WebSocketClient::new_secure(url.0, config).await?
} else {
sealed::WebSocketClient::new_unsecure(url.0).await?
sealed::WebSocketClient::new_unsecure(url.0, config).await?
};

Ok((Self { inner }, driver))
}
}
Expand Down Expand Up @@ -236,12 +255,18 @@ mod sealed {
DriverCommand, SimpleRequestCommand, SubscribeCommand, UnsubscribeCommand,
WebSocketClientDriver,
};

use crate::client::sync::{unbounded, ChannelTx};
use crate::query::Query;
use crate::request::Wrapper;
use crate::utils::uuid_str;
use crate::{Error, Response, SimpleRequest, Subscription, Url};
use async_tungstenite::tokio::{connect_async, connect_async_with_tls_connector};

use async_tungstenite::{
tokio::{connect_async_with_config, connect_async_with_tls_connector_and_config},
tungstenite::protocol::WebSocketConfig,
};

use tracing::debug;

/// Marker for the [`AsyncTungsteniteClient`] for clients operating over
Expand Down Expand Up @@ -275,19 +300,25 @@ mod sealed {
/// this driver becomes the responsibility of the client owner, and must be
/// executed in a separate asynchronous context to the client to ensure it
/// doesn't block the client.
pub async fn new(url: Url) -> Result<(Self, WebSocketClientDriver), Error> {
pub async fn new(
url: Url,
config: Option<WebSocketConfig>,
) -> Result<(Self, WebSocketClientDriver), Error> {
let url = url.to_string();
debug!("Connecting to unsecure WebSocket endpoint: {}", url);
let (stream, _response) = connect_async(url).await.map_err(Error::tungstenite)?;

let (stream, _response) = connect_async_with_config(url, config)
.await
.map_err(Error::tungstenite)?;

let (cmd_tx, cmd_rx) = unbounded();
let driver = WebSocketClientDriver::new(stream, cmd_rx);
Ok((
Self {
cmd_tx,
_client_type: Default::default(),
},
driver,
))
let client = Self {
cmd_tx,
_client_type: Default::default(),
};

Ok((client, driver))
}
}

Expand All @@ -301,23 +332,28 @@ mod sealed {
/// this driver becomes the responsibility of the client owner, and must be
/// executed in a separate asynchronous context to the client to ensure it
/// doesn't block the client.
pub async fn new(url: Url) -> Result<(Self, WebSocketClientDriver), Error> {
pub async fn new(
url: Url,
config: Option<WebSocketConfig>,
) -> Result<(Self, WebSocketClientDriver), Error> {
let url = url.to_string();
debug!("Connecting to secure WebSocket endpoint: {}", url);

// Not supplying a connector means async_tungstenite will create the
// connector for us.
let (stream, _response) = connect_async_with_tls_connector(url, None)
.await
.map_err(Error::tungstenite)?;
let (stream, _response) =
connect_async_with_tls_connector_and_config(url, None, config)
.await
.map_err(Error::tungstenite)?;

let (cmd_tx, cmd_rx) = unbounded();
let driver = WebSocketClientDriver::new(stream, cmd_rx);
Ok((
Self {
cmd_tx,
_client_type: Default::default(),
},
driver,
))
let client = Self {
cmd_tx,
_client_type: Default::default(),
};

Ok((client, driver))
}
}

Expand Down Expand Up @@ -391,13 +427,19 @@ mod sealed {
}

impl WebSocketClient {
pub async fn new_unsecure(url: Url) -> Result<(Self, WebSocketClientDriver), Error> {
let (client, driver) = AsyncTungsteniteClient::<Unsecure>::new(url).await?;
pub async fn new_unsecure(
url: Url,
config: Option<WebSocketConfig>,
) -> Result<(Self, WebSocketClientDriver), Error> {
let (client, driver) = AsyncTungsteniteClient::<Unsecure>::new(url, config).await?;
Ok((Self::Unsecure(client), driver))
}

pub async fn new_secure(url: Url) -> Result<(Self, WebSocketClientDriver), Error> {
let (client, driver) = AsyncTungsteniteClient::<Secure>::new(url).await?;
pub async fn new_secure(
url: Url,
config: Option<WebSocketConfig>,
) -> Result<(Self, WebSocketClientDriver), Error> {
let (client, driver) = AsyncTungsteniteClient::<Secure>::new(url, config).await?;
Ok((Self::Secure(client), driver))
}

Expand Down
2 changes: 1 addition & 1 deletion rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ pub use client::{
#[cfg(feature = "http-client")]
pub use client::{HttpClient, HttpClientUrl};
#[cfg(feature = "websocket-client")]
pub use client::{WebSocketClient, WebSocketClientDriver, WebSocketClientUrl};
pub use client::{WebSocketClient, WebSocketClientDriver, WebSocketClientUrl, WebSocketConfig};

pub mod endpoint;
pub mod error;
Expand Down