Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add reconnecting-rpc-client #1396

Merged
merged 14 commits into from
Feb 8, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

11 changes: 11 additions & 0 deletions subxt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ web = [
"instant/wasm-bindgen"
]

# Enable this to use the reconnecting rpc client
unstable-reconnecting-rpc-client = ["dep:reconnecting-jsonrpsee-ws-client"]

# Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`).
jsonrpsee = ["dep:jsonrpsee"]

Expand Down Expand Up @@ -103,6 +106,9 @@ subxt-lightclient = { workspace = true, optional = true, default-features = fals
# Light client support:
tokio-stream = { workspace = true, optional = true }

# Reconnecting jsonrpc ws client
reconnecting-jsonrpsee-ws-client = { version = "0.2", optional = true }

# For parsing urls to disallow insecure schemes
url = { workspace = true }

Expand Down Expand Up @@ -138,6 +144,11 @@ name = "light_client_parachains"
path = "examples/light_client_parachains.rs"
required-features = ["unstable-light-client", "jsonrpsee", "native"]

[[example]]
name = "reconnecting_rpc_client"
path = "examples/reconnecting_rpc_client.rs"
required-features = ["unstable-reconnecting-rpc-client"]

[package.metadata.docs.rs]
features = ["default", "substrate-compat", "unstable-light-client"]
rustdoc-args = ["--cfg", "docsrs"]
Expand Down
73 changes: 73 additions & 0 deletions subxt/examples/reconnecting_rpc_client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
//! Example to utilize the `reconnecting rpc client` in subxt
//! which hidden behind behind `--feature reconnecting-rpc-client`
//!
//! To utilize full logs from the RPC client use:
//! `RUST_LOG="jsonrpsee=trace,reconnecting_jsonrpsee_ws_client=trace"`

#![allow(missing_docs)]

use std::time::Duration;

use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff, PingConfig};
use subxt::backend::rpc::RpcClient;
use subxt::error::{Error, RpcError};
use subxt::{OnlineClient, PolkadotConfig};

// Generate an interface that we can use from the node's metadata.
#[subxt::subxt(runtime_metadata_path = "../artifacts/polkadot_metadata_small.scale")]
pub mod polkadot {}

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();

// Create a new client with with a reconnecting RPC client.
let rpc = Client::builder()
// Reconnect with exponential backoff
//
// This API is "iterator-like" so one could limit it to only
// reconnect x times and then quit.
.retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)))
// Send period WebSocket pings/pongs every 6th second and if it's not ACK:ed in 30 seconds
// then disconnect.
//
// This is just a way to ensure that the connection isn't idle if no message is sent that often
.enable_ws_ping(
PingConfig::new()
.ping_interval(Duration::from_secs(6))
.inactive_limit(Duration::from_secs(30)),
)
// There are other configurations as well that can be found here:
// <https://docs.rs/reconnecting-jsonrpsee-ws-client/latest/reconnecting_jsonrpsee_ws_client/struct.ClientBuilder.html>
.build("ws://localhost:9944".to_string())
.await?;

let api: OnlineClient<PolkadotConfig> =
OnlineClient::from_rpc_client(RpcClient::new(rpc.clone())).await?;

// Subscribe to all finalized blocks:
let mut blocks_sub = api.blocks().subscribe_finalized().await?;

// For each block, print a bunch of information about it:
while let Some(block) = blocks_sub.next().await {
let block = match block {
Ok(b) => b,
Err(Error::Rpc(RpcError::DisconnectedWillReconnect(e))) => {
jsdw marked this conversation as resolved.
Show resolved Hide resolved
println!("The connection was lost: `{}`; reconnecting", e);
continue;
}
Err(e) => {
return Err(e.into());
}
};

let block_number = block.header().number;
let block_hash = block.hash();

println!("Block #{block_number} ({block_hash})");
}

println!("RPC client reconnected `{}` times", rpc.reconnect_count());

Ok(())
}
8 changes: 6 additions & 2 deletions subxt/src/backend/rpc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,13 @@ crate::macros::cfg_jsonrpsee! {
mod jsonrpsee_impl;
}

crate::macros::cfg_reconnecting_rpc_client! {
mod reconnecting_jsonrpsee_impl;
pub use reconnecting_jsonrpsee_ws_client as reconnecting_rpc_client;
}

mod rpc_client;
mod rpc_client_t;

pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT};

pub use rpc_client::{rpc_params, RpcClient, RpcParams, RpcSubscription};
pub use rpc_client_t::{RawRpcFuture, RawRpcSubscription, RawValue, RpcClientT};
52 changes: 52 additions & 0 deletions subxt/src/backend/rpc/reconnecting_jsonrpsee_impl.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2019-2023 Parity Technologies (UK) Ltd.
// This file is dual-licensed as Apache-2.0 or GPL-3.0.
// see LICENSE for license details.

use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
use crate::error::RpcError;
use futures::{FutureExt, StreamExt, TryStreamExt};
use reconnecting_jsonrpsee_ws_client::SubscriptionId;
use serde_json::value::RawValue;

impl RpcClientT for reconnecting_jsonrpsee_ws_client::Client {
fn request_raw<'a>(
&'a self,
method: &'a str,
params: Option<Box<RawValue>>,
) -> RawRpcFuture<'a, Box<RawValue>> {
async {
self.request_raw(method.to_string(), params)
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))
}
.boxed()
}

fn subscribe_raw<'a>(
&'a self,
sub: &'a str,
params: Option<Box<RawValue>>,
unsub: &'a str,
) -> RawRpcFuture<'a, RawRpcSubscription> {
async {
let sub = self
.subscribe_raw(sub.to_string(), params, unsub.to_string())
.await
.map_err(|e| RpcError::ClientError(Box::new(e)))?;

let id = match sub.id() {
SubscriptionId::Num(n) => n.to_string(),
SubscriptionId::Str(s) => s.to_string(),
};
let stream = sub
.map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string()))
.boxed();

Ok(RawRpcSubscription {
stream,
id: Some(id),
})
}
.boxed()
}
}
12 changes: 10 additions & 2 deletions subxt/src/error/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@

mod dispatch_error;

use core::fmt::Debug;

crate::macros::cfg_unstable_light_client! {
pub use crate::client::LightClientError;
}
Expand Down Expand Up @@ -100,6 +98,13 @@ impl From<std::convert::Infallible> for Error {
}
}

impl Error {
/// Checks whether the error was caused by a RPC re-connection.
pub fn is_disconnected_will_reconnect(&self) -> bool {
matches!(self, Error::Rpc(RpcError::DisconnectedWillReconnect(_)))
}
}

/// An RPC error. Since we are generic over the RPC client that is used,
/// the error is boxed and could be casted.
#[derive(Debug, thiserror::Error)]
Expand All @@ -120,6 +125,9 @@ pub enum RpcError {
/// The requested URL is insecure.
#[error("RPC error: insecure URL: {0}")]
InsecureUrl(String),
/// The connection was lost and automatically reconnected.
#[error("RPC error: the connection was lost `{0}`; reconnect automatically initiated")]
DisconnectedWillReconnect(String),
}

impl RpcError {
Expand Down
16 changes: 15 additions & 1 deletion subxt/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,21 @@ macro_rules! cfg_jsonrpsee_web {
}
}

pub(crate) use {cfg_feature, cfg_jsonrpsee, cfg_substrate_compat, cfg_unstable_light_client};
#[allow(unused)]
macro_rules! cfg_reconnecting_rpc_client {
($($item:item)*) => {
$(
#[cfg(all(feature = "unstable-reconnecting-rpc-client"))]
#[cfg_attr(docsrs, doc(cfg(feature = "unstable-reconnecting-rpc-client")))]
$item
)*
}
}

pub(crate) use {
cfg_feature, cfg_jsonrpsee, cfg_reconnecting_rpc_client, cfg_substrate_compat,
cfg_unstable_light_client,
};

// Only used by light-client.
#[allow(unused)]
Expand Down
Loading