Skip to content

Commit

Permalink
[ws client]: use max_payload_size in transport (#198)
Browse files Browse the repository at this point in the history
* fix examples

* [ws client]: transport use `max_payload_size`

* fix nits

* fix more nits

* Update ws-client/src/client.rs

* fix doc tests

* [ws client]: all transport config configurable

* Update ws-client/src/client.rs

Co-authored-by: David <dvdplm@gmail.com>

* Update ws-client/src/manager.rs

Co-authored-by: David <dvdplm@gmail.com>

* address grumbles

* grumbles: WsDnsError -> WsHandshakeError

* grumbles: remove needless clone

* fix lint: remove explicit lifetime

* fix nits: channel capacity + docs

* clippy nits

* clippy nits

* fix grumbles: channel capacity 256

As Maciej pointed out the capacity is the number of messages (not bytes) and each message is 96 bytes.
Thus, 256 * 96 = ~24kB which is reasonable default value.

* Update ws-client/src/client.rs

Co-authored-by: David <dvdplm@gmail.com>

* grumbles: docs

Co-authored-by: David <dvdplm@gmail.com>
  • Loading branch information
niklasad1 and dvdplm committed Feb 2, 2021
1 parent 1507126 commit 7dc9435
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 106 deletions.
4 changes: 3 additions & 1 deletion benches/benches/bench.rs
Expand Up @@ -78,7 +78,9 @@ pub fn websocket_requests(c: &mut criterion::Criterion) {
let (tx_addr, rx_addr) = oneshot::channel::<SocketAddr>();
async_std::task::spawn(ws_server(tx_addr));
let server_addr = block_on(rx_addr).unwrap();
let client = Arc::new(block_on(WsClient::new(&format!("ws://{}", server_addr), WsConfig::default())).unwrap());
let url = format!("ws://{}", server_addr);
let config = WsConfig::with_url(&url);
let client = Arc::new(block_on(WsClient::new(config)).unwrap());

c.bench_function("synchronous_websocket_round_trip", |b| {
b.iter(|| {
Expand Down
3 changes: 2 additions & 1 deletion examples/examples/ws.rs
Expand Up @@ -43,7 +43,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

server_started_rx.await?;
let client = WsClient::new(SERVER_URI, WsConfig::default()).await?;
let config = WsConfig::with_url(SERVER_URI);
let client = WsClient::new(config).await?;
let response: JsonValue = client.request("say_hello", Params::None).await?;
println!("r: {:?}", response);

Expand Down
3 changes: 2 additions & 1 deletion examples/examples/ws_subscription.rs
Expand Up @@ -44,7 +44,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
});

server_started_rx.await?;
let client = WsClient::new(SERVER_URI, WsConfig::default()).await?;
let config = WsConfig::with_url(SERVER_URI);
let client = WsClient::new(config).await?;
let mut subscribe_hello: WsSubscription<JsonValue> =
client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await?;

Expand Down
38 changes: 22 additions & 16 deletions tests/src/lib.rs
Expand Up @@ -42,8 +42,9 @@ async fn ws_subscription_works() {
let (server_started_tx, server_started_rx) = oneshot::channel::<SocketAddr>();
websocket_server(server_started_tx);
let server_addr = server_started_rx.await.unwrap();
let uri = format!("ws://{}", server_addr);
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let server_url = format!("ws://{}", server_addr);
let config = WsConfig::with_url(&server_url);
let client = WsClient::new(config).await.unwrap();
let mut hello_sub: WsSubscription<JsonValue> =
client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap();
let mut foo_sub: WsSubscription<JsonValue> =
Expand All @@ -62,8 +63,9 @@ async fn ws_method_call_works() {
let (server_started_tx, server_started_rx) = oneshot::channel::<SocketAddr>();
websocket_server(server_started_tx);
let server_addr = server_started_rx.await.unwrap();
let uri = format!("ws://{}", server_addr);
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let server_url = format!("ws://{}", server_addr);
let config = WsConfig::with_url(&server_url);
let client = WsClient::new(config).await.unwrap();
let response: JsonValue = client.request("say_hello", Params::None).await.unwrap();
assert_eq!(response, JsonValue::String("hello".into()));
}
Expand All @@ -84,11 +86,12 @@ async fn ws_subscription_several_clients() {
let (server_started_tx, server_started_rx) = oneshot::channel::<SocketAddr>();
websocket_server(server_started_tx);
let server_addr = server_started_rx.await.unwrap();
let server_url = format!("ws://{}", server_addr);

let mut clients = Vec::with_capacity(10);
for _ in 0..10 {
let uri = format!("ws://{}", server_addr);
let client = WsClient::new(&uri, WsConfig::default()).await.unwrap();
let config = WsConfig::with_url(&server_url);
let client = WsClient::new(config).await.unwrap();
let hello_sub: WsSubscription<JsonValue> =
client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap();
let foo_sub: WsSubscription<JsonValue> =
Expand All @@ -102,14 +105,14 @@ async fn ws_subscription_several_clients_with_drop() {
let (server_started_tx, server_started_rx) = oneshot::channel::<SocketAddr>();
websocket_server(server_started_tx);
let server_addr = server_started_rx.await.unwrap();
let server_url = format!("ws://{}", server_addr);

let mut clients = Vec::with_capacity(10);
for _ in 0..10 {
let uri = format!("ws://{}", server_addr);
let client =
WsClient::new(&uri, WsConfig { subscription_channel_capacity: u32::MAX as usize, ..Default::default() })
.await
.unwrap();
let mut config = WsConfig::with_url(&server_url);
config.max_subscription_capacity = u32::MAX as usize;

let client = WsClient::new(config).await.unwrap();
let hello_sub: WsSubscription<JsonValue> =
client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap();
let foo_sub: WsSubscription<JsonValue> =
Expand Down Expand Up @@ -149,10 +152,11 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() {
let (server_started_tx, server_started_rx) = oneshot::channel::<SocketAddr>();
websocket_server(server_started_tx);
let server_addr = server_started_rx.await.unwrap();
let server_url = format!("ws://{}", server_addr);

let uri = format!("ws://{}", server_addr);
let client =
WsClient::new(&uri, WsConfig { subscription_channel_capacity: 4, ..Default::default() }).await.unwrap();
let mut config = WsConfig::with_url(&server_url);
config.max_subscription_capacity = 4;
let client = WsClient::new(config).await.unwrap();
let mut hello_sub: WsSubscription<JsonValue> =
client.subscribe("subscribe_hello", Params::None, "unsubscribe_hello").await.unwrap();

Expand Down Expand Up @@ -183,9 +187,11 @@ async fn ws_more_request_than_buffer_should_not_deadlock() {
let (concurrent_tx, concurrent_rx) = oneshot::channel::<()>();
websocket_server_with_wait_period(server_started_tx, concurrent_rx);
let server_addr = server_started_rx.await.unwrap();
let server_url = format!("ws://{}", server_addr);

let uri = format!("ws://{}", server_addr);
let client = WsClient::new(&uri, WsConfig { request_channel_capacity: 2, ..Default::default() }).await.unwrap();
let mut config = WsConfig::with_url(&server_url);
config.max_subscription_capacity = 2;
let client = WsClient::new(config).await.unwrap();

let mut requests = Vec::new();
//NOTE: we use less than 8 because of https://github.com/paritytech/jsonrpsee/issues/168.
Expand Down
84 changes: 54 additions & 30 deletions ws-client/src/client.rs
Expand Up @@ -35,48 +35,71 @@ use jsonrpsee_types::{
error::Error,
jsonrpc::{self, JsonValue, SubscriptionId},
};
use std::convert::TryInto;
use std::time::Duration;
use std::{borrow::Cow, convert::TryInto};
use std::{io, marker::PhantomData};

/// Client that can be cloned.
///
/// > **Note**: This struct is designed to be easy to use, but it works by maintaining a background
/// > task running in parallel. If this is not desirable, you are encouraged to use the
/// > [`RawClient`] struct instead.
/// > **Note**: This struct is designed to be easy to use, but it works by maintaining a background task running in parallel.
#[derive(Clone)]
pub struct WsClient {
/// Channel to send requests to the background task.
to_back: mpsc::Sender<FrontToBack>,
/// Config.
config: WsConfig,
/// Request timeout
request_timeout: Option<Duration>,
}

#[derive(Copy, Clone, Debug)]
/// Configuration.
pub struct WsConfig {
/// Backend channel for serving requests and notifications.
pub request_channel_capacity: usize,
/// Backend channel for each unique subscription.
pub subscription_channel_capacity: usize,
#[derive(Clone, Debug)]
pub struct WsConfig<'a> {
/// URL to connect to.
pub url: &'a str,
/// Max request body size
pub max_request_body_size: usize,
/// Request timeout
pub request_timeout: Option<Duration>,
/// Connection timeout
pub connection_timeout: Duration,
/// `Origin` header to pass during the HTTP handshake. If `None`, no
/// `Origin` header was passed.
pub origin: Option<Cow<'a, str>>,
/// Url to send during the HTTP handshake.
pub handshake_url: Cow<'a, str>,
/// Max concurrent request capacity.
///
/// **Note**: The actual capacity is `num_senders + max_concurrent_requests_capacity`
/// because it is passed to [`futures::channel::mpsc::channel`]
/// and the capacity may increase because the sender is cloned when new
/// requests, notifications and subscriptions are created.
pub max_concurrent_requests_capacity: usize,
/// Max concurrent capacity for each subscription; when the capacity is exceeded the subscription will be dropped.
///
/// You can also prevent the subscription being dropped by calling [`WsSubscription::next()`] frequently enough
/// such that the buffer capacity doesn't exceeds.
///
/// **Note**: The actual capacity is `num_senders + max_subscription_capacity`
/// because it is passed to [`futures::channel::mpsc::channel`].
pub max_subscription_capacity: usize,
}

impl Default for WsConfig {
fn default() -> Self {
impl<'a> WsConfig<'a> {
/// Default WebSocket configuration with a specified URL to connect to.
pub fn with_url(url: &'a str) -> Self {
Self {
request_channel_capacity: 100,
subscription_channel_capacity: 4,
url,
max_request_body_size: 10 * 1024 * 1024,
request_timeout: None,
connection_timeout: Duration::from_secs(10),
origin: None,
handshake_url: From::from("/"),
max_concurrent_requests_capacity: 256,
max_subscription_capacity: 4,
}
}
}

/// Active subscription on a [`Client`].
/// Active subscription on a [`WsClient`].
pub struct WsSubscription<Notif> {
/// Channel to send requests to the background task.
to_back: mpsc::Sender<FrontToBack>,
Expand Down Expand Up @@ -134,17 +157,18 @@ impl WsClient {
/// Initializes a new WebSocket client
///
/// Fails when the URL is invalid.
pub async fn new(remote_addr: impl AsRef<str>, config: WsConfig) -> Result<Self, Error> {
let (sender, receiver) = jsonrpc_transport::websocket_connection(remote_addr.as_ref())
.await
.map_err(|e| Error::TransportError(Box::new(e)))?;
pub async fn new(config: WsConfig<'_>) -> Result<WsClient, Error> {
let max_capacity_per_subscription = config.max_subscription_capacity;
let request_timeout = config.request_timeout;
let (to_back, from_front) = mpsc::channel(config.max_concurrent_requests_capacity);

let (to_back, from_front) = mpsc::channel(config.request_channel_capacity);
let (sender, receiver) =
jsonrpc_transport::websocket_connection(config).await.map_err(|e| Error::TransportError(Box::new(e)))?;

async_std::task::spawn(async move {
background_task(sender, receiver, from_front, config).await;
background_task(sender, receiver, from_front, max_capacity_per_subscription).await;
});
Ok(Self { to_back, config })
Ok(Self { to_back, request_timeout })
}

/// Send a notification to the server.
Expand Down Expand Up @@ -178,7 +202,7 @@ impl WsClient {
.await
.map_err(Error::Internal)?;

let send_back_rx_out = if let Some(duration) = self.config.request_timeout {
let send_back_rx_out = if let Some(duration) = self.request_timeout {
let timeout = async_std::task::sleep(duration);
futures::pin_mut!(send_back_rx, timeout);
match future::select(send_back_rx, timeout).await {
Expand Down Expand Up @@ -280,7 +304,7 @@ async fn background_task(
mut sender: jsonrpc_transport::Sender,
receiver: jsonrpc_transport::Receiver,
mut frontend: mpsc::Receiver<FrontToBack>,
config: WsConfig,
max_capacity_per_subscription: usize,
) {
let mut manager = RequestManager::new();

Expand Down Expand Up @@ -363,7 +387,7 @@ async fn background_task(
break;
}
Some(Ok(jsonrpc::Response::Single(response))) => {
match process_response(&mut manager, response, config.subscription_channel_capacity) {
match process_response(&mut manager, response, max_capacity_per_subscription) {
Ok(Some((unsubscribe, params))) => {
if let Err(e) = sender.start_request(unsubscribe, params).await {
log::error!("Failed to send unsubscription response: {:?}", e);
Expand All @@ -379,7 +403,7 @@ async fn background_task(
Some(Ok(jsonrpc::Response::Batch(responses))) => {
// if any request fails, throw away entire batch.
for response in responses {
match process_response(&mut manager, response, config.subscription_channel_capacity) {
match process_response(&mut manager, response, max_capacity_per_subscription) {
Ok(Some((unsubscribe, params))) => {
if let Err(e) = sender.start_request(unsubscribe, params).await {
log::error!("Failed to send unsubscription response: {:?}", e);
Expand Down Expand Up @@ -431,7 +455,7 @@ async fn background_task(
fn process_response(
manager: &mut RequestManager,
response: jsonrpc::Output,
subscription_capacity: usize,
max_capacity_per_subscription: usize,
) -> Result<Option<(String, jsonrpc::Params)>, Error> {
let response_id = *response.id().as_number().ok_or(Error::InvalidRequestId)?;

Expand Down Expand Up @@ -470,7 +494,7 @@ fn process_response(
}
};

let (subscribe_tx, subscribe_rx) = mpsc::channel(subscription_capacity);
let (subscribe_tx, subscribe_rx) = mpsc::channel(max_capacity_per_subscription);
if manager.insert_subscription(response_id, sub_id.clone(), subscribe_tx, unsubscribe_method).is_ok() {
match send_back_oneshot.send(Ok((subscribe_rx, sub_id.clone()))) {
Ok(_) => Ok(None),
Expand Down
7 changes: 4 additions & 3 deletions ws-client/src/jsonrpc_transport.rs
Expand Up @@ -2,12 +2,13 @@
//!
//! Wraps the underlying WebSocket transport with specific JSONRPC details.

use crate::transport::{self, WsConnectError, WsNewDnsError};
use crate::transport::{self, WsConnectError, WsHandshakeError};
use crate::WsConfig;
use jsonrpsee_types::jsonrpc;

/// Creates a new JSONRPC WebSocket connection, represented as a Sender and Receiver pair.
pub async fn websocket_connection(target: impl AsRef<str>) -> Result<(Sender, Receiver), WsNewDnsError> {
let (sender, receiver) = transport::websocket_connection(target.as_ref()).await?;
pub async fn websocket_connection(config: WsConfig<'_>) -> Result<(Sender, Receiver), WsHandshakeError> {
let (sender, receiver) = transport::websocket_connection(config).await?;
Ok((Sender::new(sender), Receiver::new(receiver)))
}

Expand Down
2 changes: 1 addition & 1 deletion ws-client/src/lib.rs
@@ -1,4 +1,4 @@
/// Client.
/// WebSocket Client.
pub mod client;
/// JSONRPC WebSocket transport.
pub mod jsonrpc_transport;
Expand Down
7 changes: 4 additions & 3 deletions ws-client/src/manager.rs
@@ -1,9 +1,10 @@
//! Handles and monitors JSONRPC v2 method calls and subscriptions
//!
//! Definitions:
//! * Request ID - request ID in JSONRPC-v2 specification
//! (the specs allow number, string or null but this crate only supports numbers)
//! * Subscription ID - unique ID generated by server
//!
//! - RequestId: request ID in the JSONRPC-v2 specification
//! > **Note**: The spec allow number, string or null but this crate only supports numbers.
//! - SubscriptionId: unique ID generated by server

use fnv::FnvHashMap;
use futures::channel::{mpsc, oneshot};
Expand Down

0 comments on commit 7dc9435

Please sign in to comment.