diff --git a/CHANGELOG.md b/CHANGELOG.md index b75074b29..d59df838f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,8 +3,12 @@ ### IMPROVEMENTS: - `[light-client]` Only require Tokio when `rpc-client` feature is enabled ([#425]) +- `[rpc]` The `WebSocketClient` now adds support for all remaining RPC requests + by way of implementing the `Client` trait ([#646]) [#425]: https://github.com/informalsystems/tendermint-rs/issues/425 +[#646]: https://github.com/informalsystems/tendermint-rs/pull/646 + ## v0.17.0-rc3 diff --git a/docs/architecture/adr-008-event-subscription.md b/docs/architecture/adr-008-event-subscription.md index a390c1143..14ad2b927 100644 --- a/docs/architecture/adr-008-event-subscription.md +++ b/docs/architecture/adr-008-event-subscription.md @@ -218,7 +218,7 @@ WebSocket connection to provide subscription functionality (the #[async_trait] pub trait SubscriptionClient { /// `/subscribe`: subscribe to receive events produced by the given query. - async fn subscribe(&mut self, query: String) -> Result; + async fn subscribe(&mut self, query: Query) -> Result; } ``` @@ -323,19 +323,23 @@ pub enum EventType { ValidatorSetUpdates, } -pub struct Condition { - key: String, - op: Operation, -} - -pub enum Operation { - Eq(Operand), - Lt(Operand), - Lte(Operand), - Gt(Operand), - Gte(Operand), - Contains(Operand), - Exists, +// A condition specifies a key (first parameter) and, depending on the +// operation, an value which is an operand of some kind. +pub enum Condition { + // Equals + Eq(String, Operand), + // Less than + Lt(String, Operand), + // Less than or equal to + Lte(String, Operand), + // Greater than + Gt(String, Operand), + // Greater than or equal to + Gte(String, Operand), + // Contains (to check if a key contains a certain sub-string) + Contains(String, String), + // Exists (to check if a key exists) + Exists(String), } // According to https://docs.tendermint.com/master/rpc/#/Websocket/subscribe, @@ -346,7 +350,8 @@ pub enum Operation { // operand types to the `Operand` enum, as this would improve ergonomics. pub enum Operand { String(String), - Integer(i64), + Signed(i64), + Unsigned(u64), Float(f64), Date(chrono::Date), DateTime(chrono::DateTime), @@ -361,7 +366,7 @@ track of all of the queries relating to a particular client. ```rust pub struct SubscriptionRouter { // A map of queries -> (map of subscription IDs -> result event tx channels) - subscriptions: HashMap>>>, + subscriptions: HashMap>, } ``` @@ -372,21 +377,61 @@ server [drops subscription IDs from events][tendermint-2949], which is likely if we want to conform more strictly to the [JSON-RPC standard for notifications][jsonrpc-notifications]. -#### Two-Phase Subscribe/Unsubscribe +### Handling Mixed Events and Responses + +Since a full client needs to implement both the `Client` and +`SubscriptionClient` traits, for certain transports (like a WebSocket +connection) we could end up receiving a mixture of events from subscriptions +and responses to RPC requests. To disambiguate these different types of +incoming messages, a simple mechanism is proposed for the +`WebSocketClientDriver` that keeps track of pending requests and only matures +them once it receives its corresponding response. -Due to the fact that a WebSocket connection lacks request/response semantics, -when managing multiple subscriptions from a single client we need to implement a -**two-phase subscription creation/removal process**: +```rust +pub struct WebSocketClientDriver { + // ... -1. An outgoing, but unconfirmed, subscribe/unsubscribe request is tracked. -2. The subscribe/unsubscribe request is confirmed or cancelled by a response - from the remote WebSocket server. + // Commands we've received but have not yet completed, indexed by their ID. + // A Terminate command is executed immediately. + pending_commands: HashMap, +} -The need for this two-phase subscribe/unsubscribe process is more clearly -illustrated in the following sequence diagram: +// The different types of requests that the WebSocketClient can send to its +// driver. +// +// Each of SubscribeCommand, UnsubscribeCommand and SimpleRequestCommand keep +// a response channel that allows for the driver to send a response later on +// when it receives a relevant one. +enum DriverCommand { + // Initiate a subscription request. + Subscribe(SubscribeCommand), + // Initiate an unsubscribe request. + Unsubscribe(UnsubscribeCommand), + // For non-subscription-related requests. + SimpleRequest(SimpleRequestCommand), + Terminate, +} +``` -![RPC client two-phase -subscribe/unsubscribe](./assets/rpc-client-two-phase-subscribe.png) +IDs of outgoing requests are randomly generated [UUIDv4] strings. + +The logic here is as follows: + +1. A call is made to `WebSocketClient::subscribe` or + `WebSocketClient::perform`. +2. The client sends the relevant `DriverCommand` to its driver via its internal + communication channel. +3. The driver receives the command, sends the relevant simple or subscription + request, and keeps track of the command in its `pending_commands` member + along with its ID. This allows the driver to continue handling outgoing + requests and incoming responses in the meantime. +4. If the driver receives a JSON-RPC message whose ID corresponds to an ID in + its `pending_commands` member, it assumes that response is relevant to that + command and sends back to the original caller by way of a channel stored in + one of the `SubscribeCommand`, `UnsubscribeCommand` or + `SimpleRequestCommand` structs. Failures are also communicated through this + same mechanism. +5. The pending command is evicted from the `pending_commands` member. ## Status @@ -433,4 +478,4 @@ None [futures-stream-mod]: https://docs.rs/futures/*/futures/stream/index.html [tendermint-2949]: https://github.com/tendermint/tendermint/issues/2949 [jsonrpc-notifications]: https://www.jsonrpc.org/specification#notification - +[UUIDv4]: https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_(random) diff --git a/docs/architecture/assets/rpc-client-erd.graphml b/docs/architecture/assets/rpc-client-erd.graphml index 3e4e86dd8..2e3bdd713 100644 --- a/docs/architecture/assets/rpc-client-erd.graphml +++ b/docs/architecture/assets/rpc-client-erd.graphml @@ -94,10 +94,10 @@ - + - TwoPhaseSubscriptionRouter + SubscriptionRouter @@ -216,18 +216,7 @@ Detail) - - - - Operation - - - - - - - - + Operand @@ -235,7 +224,7 @@ Detail) - + @@ -305,16 +294,14 @@ Detail) - + - - - + - Has/ -Uses + Has/ +Uses @@ -385,26 +372,15 @@ Uses - - - - Has - - - - - - - - + - Has + Has - + @@ -417,8 +393,7 @@ Uses - - + diff --git a/docs/architecture/assets/rpc-client-erd.png b/docs/architecture/assets/rpc-client-erd.png index f5f7678d7..11aa05910 100644 Binary files a/docs/architecture/assets/rpc-client-erd.png and b/docs/architecture/assets/rpc-client-erd.png differ diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index c3fae0b8f..624ec18c1 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -25,7 +25,15 @@ all-features = true [features] default = [] -http-client = [ "async-trait", "futures", "http", "hyper", "tokio/fs", "tokio/macros" ] +http-client = [ + "async-trait", + "futures", + "http", + "hyper", + "tokio/fs", + "tokio/macros", + "tracing" +] secp256k1 = [ "tendermint/secp256k1" ] websocket-client = [ "async-trait", diff --git a/rpc/README.md b/rpc/README.md index 240be746d..f35f68f77 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -21,25 +21,23 @@ This crate optionally provides access to different types of RPC client functionality and different client transports based on which features you select when using it. -Two features are provided at present: +Two features are provided at present. * `http-client` - Provides `HttpClient`, which is a basic RPC client that interacts with remote Tendermint nodes via **JSON-RPC over HTTP**. This - client does not provide `Event` subscription functionality. See the + client does not provide `Event` subscription functionality. See the [Tendermint RPC] for more details. -* `websocket-client` - Provides `WebSocketClient`, which currently only - provides `Event` subscription functionality over a WebSocket connection. See - the [`/subscribe` endpoint] in the Tendermint RPC for more details. This - client does not yet provide access to the RPC methods provided by the - `Client` trait (this is planned for a future release). +* `websocket-client` - Provides `WebSocketClient`, which provides full + client functionality, including general RPC functionality (such as that + provided by `HttpClient`) as well as `Event` subscription + functionality. ### Mock Clients Mock clients are included when either of the `http-client` or `websocket-client` features are enabled to aid in testing. This includes -`MockClient`, which only implements `Client` (no subscription -functionality), and `MockSubscriptionClient`, which helps you simulate -subscriptions to events being generated by a Tendermint node. +`MockClient`, which implements both `Client` and `SubscriptionClient` +traits. ### Related diff --git a/rpc/src/client.rs b/rpc/src/client.rs index 0a574be1d..d49828851 100644 --- a/rpc/src/client.rs +++ b/rpc/src/client.rs @@ -1,13 +1,11 @@ //! Tendermint RPC client. mod subscription; -pub use subscription::{Subscription, SubscriptionClient, SubscriptionId}; +pub use subscription::{Subscription, SubscriptionClient}; pub mod sync; mod transport; -pub use transport::mock::{ - MockClient, MockRequestMatcher, MockRequestMethodMatcher, MockSubscriptionClient, -}; +pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher}; #[cfg(feature = "http-client")] pub use transport::http::HttpClient; @@ -15,7 +13,7 @@ pub use transport::http::HttpClient; pub use transport::websocket::{WebSocketClient, WebSocketClientDriver}; use crate::endpoint::*; -use crate::{Request, Result}; +use crate::{Result, SimpleRequest}; use async_trait::async_trait; use tendermint::abci::{self, Transaction}; use tendermint::block::Height; @@ -164,5 +162,5 @@ pub trait Client { /// Perform a request against the RPC endpoint async fn perform(&self, request: R) -> Result where - R: Request; + R: SimpleRequest; } diff --git a/rpc/src/client/subscription.rs b/rpc/src/client/subscription.rs index 6d76c334c..07fb2805c 100644 --- a/rpc/src/client/subscription.rs +++ b/rpc/src/client/subscription.rs @@ -1,21 +1,14 @@ //! Subscription- and subscription management-related functionality. -#[cfg(feature = "websocket-client")] -pub use two_phase_router::{SubscriptionState, TwoPhaseSubscriptionRouter}; - -use crate::client::sync::{unbounded, ChannelRx, ChannelTx}; +use crate::client::sync::{ChannelRx, ChannelTx}; use crate::event::Event; use crate::query::Query; -use crate::{Error, Id, Result}; +use crate::Result; use async_trait::async_trait; use futures::task::{Context, Poll}; use futures::Stream; -use getrandom::getrandom; use pin_project::pin_project; -use std::collections::HashMap; -use std::convert::TryInto; use std::pin::Pin; -use std::str::FromStr; /// A client that exclusively provides [`Event`] subscription capabilities, /// without any other RPC method support. @@ -24,16 +17,32 @@ use std::str::FromStr; #[async_trait] pub trait SubscriptionClient { /// `/subscribe`: subscribe to receive events produced by the given query. - async fn subscribe(&mut self, query: Query) -> Result; + async fn subscribe(&self, query: Query) -> Result; + + /// `/unsubscribe`: unsubscribe from events relating to the given query. + /// + /// This method is particularly useful when you want to terminate multiple + /// [`Subscription`]s to the same [`Query`] simultaneously, or if you've + /// joined multiple `Subscription`s together using [`select_all`] and you + /// no longer have access to the individual `Subscription` instances to + /// terminate them separately. + /// + /// [`Subscription`]: struct.Subscription.html + /// [`Query`]: struct.Query.html + /// [`select_all`]: https://docs.rs/futures/*/futures/stream/fn.select_all.html + async fn unsubscribe(&self, query: Query) -> Result<()>; } +pub(crate) type SubscriptionTx = ChannelTx>; +pub(crate) type SubscriptionRx = ChannelRx>; + /// An interface that can be used to asynchronously receive [`Event`]s for a /// particular subscription. /// /// ## Examples /// /// ``` -/// use tendermint_rpc::{SubscriptionId, Subscription}; +/// use tendermint_rpc::Subscription; /// use futures::StreamExt; /// /// /// Prints `count` events from the given subscription. @@ -57,602 +66,34 @@ pub trait SubscriptionClient { #[pin_project] #[derive(Debug)] pub struct Subscription { - /// The query for which events will be produced. - pub query: Query, - - /// The ID of this subscription (automatically assigned). - pub id: SubscriptionId, - + // A unique identifier for this subscription. + id: String, + // The query for which events will be produced. + query: Query, // Our internal result event receiver for this subscription. #[pin] - event_rx: ChannelRx>, - - // Allows us to interact with the subscription driver (exclusively to - // terminate this subscription). - cmd_tx: ChannelTx, + rx: SubscriptionRx, } impl Stream for Subscription { type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().event_rx.poll_next(cx) + self.project().rx.poll_next(cx) } } impl Subscription { - pub(crate) fn new( - id: SubscriptionId, - query: Query, - event_rx: ChannelRx>, - cmd_tx: ChannelTx, - ) -> Self { - Self { - id, - query, - event_rx, - cmd_tx, - } - } - - /// Gracefully terminate this subscription and consume it. - /// - /// The `Subscription` can be moved to any asynchronous context, and this - /// method provides a way to terminate it from that same context. - pub async fn terminate(mut self) -> Result<()> { - let (result_tx, mut result_rx) = unbounded(); - self.cmd_tx - .send(SubscriptionDriverCmd::Unsubscribe { - id: self.id.clone(), - query: self.query.clone(), - result_tx, - }) - .await?; - result_rx.recv().await.ok_or_else(|| { - Error::client_internal_error( - "failed to hear back from subscription termination request".to_string(), - ) - })? + pub(crate) fn new(id: String, query: Query, rx: SubscriptionRx) -> Self { + Self { id, query, rx } } -} - -/// A command that can be sent to the subscription driver. -/// -/// It is assumed that all [`SubscriptionClient`] implementations will follow a -/// handle/driver concurrency model, where the client itself will just be a -/// handle to a driver that runs in a separate coroutine. -/// -/// [`SubscriptionClient`]: trait.SubscriptionClient.html -#[derive(Debug, Clone)] -pub enum SubscriptionDriverCmd { - /// Initiate a new subscription. - Subscribe { - /// The desired ID for the new subscription. - id: SubscriptionId, - /// The query for which to initiate the subscription. - query: Query, - /// Where to send events received for this subscription. - event_tx: ChannelTx>, - /// Where to send the result of this subscription command. - result_tx: ChannelTx>, - }, - /// Terminate an existing subscription. - Unsubscribe { - /// The ID of the subscription to terminate. - id: SubscriptionId, - /// The query associated with the subscription we want to terminate. - query: Query, - /// Where to send the result of this unsubscribe command. - result_tx: ChannelTx>, - }, - /// Terminate the subscription driver entirely. - Terminate, -} - -/// Each new subscription is automatically assigned an ID. -/// -/// By default, we generate random [UUIDv4] IDs for each subscription to -/// minimize chances of collision. -/// -/// [UUIDv4]: https://en.wikipedia.org/wiki/Universally_unique_identifier#Version_4_(random) -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub struct SubscriptionId(String); - -impl Default for SubscriptionId { - fn default() -> Self { - let mut bytes = [0; 16]; - getrandom(&mut bytes).expect("RNG failure!"); - - let uuid = uuid::Builder::from_bytes(bytes) - .set_variant(uuid::Variant::RFC4122) - .set_version(uuid::Version::Random) - .build(); - - Self(uuid.to_string()) - } -} - -impl std::fmt::Display for SubscriptionId { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{}", self.0) - } -} - -impl Into for SubscriptionId { - fn into(self) -> Id { - Id::Str(self.0) - } -} - -impl TryInto for Id { - type Error = Error; - - fn try_into(self) -> std::result::Result { - match self { - Id::Str(s) => Ok(SubscriptionId(s)), - Id::Num(i) => Ok(SubscriptionId(format!("{}", i))), - Id::None => Err(Error::client_internal_error( - "cannot convert an empty JSON-RPC ID into a subscription ID", - )), - } - } -} - -impl FromStr for SubscriptionId { - type Err = (); - - fn from_str(s: &str) -> std::result::Result { - Ok(Self(s.to_string())) - } -} - -impl SubscriptionId { - pub fn as_str(&self) -> &str { - self.0.as_str() - } -} - -/// Provides a mechanism for tracking [`Subscription`]s and routing [`Event`]s -/// to those subscriptions. -/// -/// [`Subscription`]: struct.Subscription.html -/// [`Event`]: ./event/struct.Event.html -#[derive(Debug)] -pub struct SubscriptionRouter { - // A map of subscription queries to collections of subscription IDs and - // their result channels. Used for publishing events relating to a specific - // query. - subscriptions: HashMap>>>, -} - -impl SubscriptionRouter { - /// Publishes the given event to all of the subscriptions to which the - /// event is relevant. At present, it matches purely based on the query - /// associated with the event, and only queries that exactly match that of - /// the event's. - pub async fn publish(&mut self, ev: Event) { - let subs_for_query = match self.subscriptions.get_mut(&ev.query) { - Some(s) => s, - None => return, - }; - // We assume here that any failure to publish an event is an indication - // that the receiver end of the channel has been dropped, which allows - // us to safely stop tracking the subscription. - let mut disconnected = Vec::::new(); - for (id, event_tx) in subs_for_query { - if event_tx.send(Ok(ev.clone())).await.is_err() { - disconnected.push(id.clone()); - } - } - // Obtain a mutable reference because the previous reference was - // consumed in the above for loop. We should panic if there are no - // longer any subscriptions for this query. - let subs_for_query = self.subscriptions.get_mut(&ev.query).unwrap(); - for id in disconnected { - subs_for_query.remove(&id); - } - } - - /// Immediately add a new subscription to the router without waiting for - /// confirmation. - pub fn add(&mut self, id: &SubscriptionId, query: String, event_tx: ChannelTx>) { - let subs_for_query = match self.subscriptions.get_mut(&query) { - Some(s) => s, - None => { - self.subscriptions.insert(query.clone(), HashMap::new()); - self.subscriptions.get_mut(&query).unwrap() - } - }; - subs_for_query.insert(id.clone(), event_tx); - } - - /// Immediately remove the subscription with the given query and ID. - pub fn remove(&mut self, id: &SubscriptionId, query: String) { - let subs_for_query = match self.subscriptions.get_mut(&query) { - Some(s) => s, - None => return, - }; - subs_for_query.remove(id); - } -} -impl Default for SubscriptionRouter { - fn default() -> Self { - Self { - subscriptions: HashMap::new(), - } + /// Return this subscription's ID for informational purposes. + pub fn id(&self) -> &str { + &self.id } -} - -#[cfg(feature = "websocket-client")] -mod two_phase_router { - use super::*; - - /// A subscription router that can manage pending subscribe and unsubscribe - /// requests, as well as their confirmation/cancellation. - /// - /// This is useful in instances where the underlying transport is complex, - /// e.g. WebSocket connections, where many messages are multiplexed on the - /// same communication line. In such cases, a response from the remote - /// endpoint immediately after a subscribe/unsubscribe request may not be - /// relevant to that request. - #[derive(Debug)] - pub struct TwoPhaseSubscriptionRouter { - // The underlying router that exclusively keeps track of confirmed and - // active subscriptions. - router: SubscriptionRouter, - // A map of JSON-RPC request IDs (for `/subscribe` requests) to pending - // subscription requests. - pending_subscribe: HashMap, - // A map of JSON-RPC request IDs (for the `/unsubscribe` requests) to pending - // unsubscribe requests. - pending_unsubscribe: HashMap, - } - - impl Default for TwoPhaseSubscriptionRouter { - fn default() -> Self { - Self { - router: SubscriptionRouter::default(), - pending_subscribe: HashMap::new(), - pending_unsubscribe: HashMap::new(), - } - } - } - - impl TwoPhaseSubscriptionRouter { - /// Publishes the given event to all of the subscriptions to which the - /// event is relevant. - pub async fn publish(&mut self, ev: Event) { - self.router.publish(ev).await - } - - /// Keep track of a pending subscription, which can either be confirmed or - /// cancelled. - /// - /// `req_id` must be a unique identifier for this particular pending - /// subscription request operation, where `subs_id` must be the unique ID - /// of the subscription we eventually want added. - pub fn pending_add( - &mut self, - req_id: &str, - subs_id: &SubscriptionId, - query: String, - event_tx: ChannelTx>, - result_tx: ChannelTx>, - ) { - self.pending_subscribe.insert( - req_id.to_string(), - PendingSubscribe { - id: subs_id.clone(), - query, - event_tx, - result_tx, - }, - ); - } - - /// Attempts to confirm the pending subscription request with the given ID. - /// - /// Returns an error if it fails to respond to the original caller to - /// indicate success. - pub async fn confirm_add(&mut self, req_id: &str) -> Result<()> { - match self.pending_subscribe.remove(req_id) { - Some(mut pending_subscribe) => { - self.router.add( - &pending_subscribe.id, - pending_subscribe.query.clone(), - pending_subscribe.event_tx, - ); - Ok(pending_subscribe.result_tx.send(Ok(())).await?) - } - None => Ok(()), - } - } - - /// Attempts to cancel the pending subscription with the given ID, sending - /// the specified error to the original creator of the attempted - /// subscription. - pub async fn cancel_add(&mut self, req_id: &str, err: impl Into) -> Result<()> { - match self.pending_subscribe.remove(req_id) { - Some(mut pending_subscribe) => Ok(pending_subscribe - .result_tx - .send(Err(err.into())) - .await - .map_err(|_| { - Error::client_internal_error(format!( - "failed to communicate result of pending subscription with ID: {}", - pending_subscribe.id, - )) - })?), - None => Ok(()), - } - } - - /// Keeps track of a pending unsubscribe request, which can either be - /// confirmed or cancelled. - pub fn pending_remove( - &mut self, - req_id: &str, - subs_id: &SubscriptionId, - query: String, - result_tx: ChannelTx>, - ) { - self.pending_unsubscribe.insert( - req_id.to_string(), - PendingUnsubscribe { - id: subs_id.clone(), - query, - result_tx, - }, - ); - } - - /// Confirm the pending unsubscribe request for the subscription with the - /// given ID. - pub async fn confirm_remove(&mut self, req_id: &str) -> Result<()> { - match self.pending_unsubscribe.remove(req_id) { - Some(mut pending_unsubscribe) => { - self.router - .remove(&pending_unsubscribe.id, pending_unsubscribe.query.clone()); - Ok(pending_unsubscribe.result_tx.send(Ok(())).await?) - } - None => Ok(()), - } - } - - /// Cancel the pending unsubscribe request for the subscription with the - /// given ID, responding with the given error. - pub async fn cancel_remove(&mut self, req_id: &str, err: impl Into) -> Result<()> { - match self.pending_unsubscribe.remove(req_id) { - Some(mut pending_unsubscribe) => { - Ok(pending_unsubscribe.result_tx.send(Err(err.into())).await?) - } - None => Ok(()), - } - } - - /// Helper to check whether the subscription with the given ID is - /// currently active. - pub fn is_active(&self, id: &SubscriptionId) -> bool { - self.router - .subscriptions - .iter() - .any(|(_query, subs_for_query)| subs_for_query.contains_key(id)) - } - - /// Obtain a mutable reference to the subscription with the given ID (if it - /// exists). - pub fn get_active_subscription_mut( - &mut self, - id: &SubscriptionId, - ) -> Option<&mut ChannelTx>> { - self.router - .subscriptions - .iter_mut() - .find(|(_query, subs_for_query)| subs_for_query.contains_key(id)) - .and_then(|(_query, subs_for_query)| subs_for_query.get_mut(id)) - } - - /// Utility method to determine the current state of the subscription with - /// the given ID. - pub fn subscription_state(&self, req_id: &str) -> Option { - if self.pending_subscribe.contains_key(req_id) { - Some(SubscriptionState::Pending) - } else if self.pending_unsubscribe.contains_key(req_id) { - Some(SubscriptionState::Cancelling) - } else if self.is_active(&SubscriptionId::from_str(req_id).unwrap()) { - Some(SubscriptionState::Active) - } else { - None - } - } - } - - #[derive(Debug)] - struct PendingSubscribe { - id: SubscriptionId, - query: String, - event_tx: ChannelTx>, - result_tx: ChannelTx>, - } - - #[derive(Debug)] - struct PendingUnsubscribe { - id: SubscriptionId, - query: String, - result_tx: ChannelTx>, - } - - /// The current state of a subscription. - #[derive(Debug, Clone, PartialEq)] - pub enum SubscriptionState { - Pending, - Active, - Cancelling, - } -} - -#[cfg(test)] -mod test { - use super::*; - use crate::client::sync::unbounded; - use crate::event::{Event, WrappedEvent}; - use std::path::PathBuf; - use tokio::fs; - use tokio::time::{self, Duration}; - - async fn read_json_fixture(name: &str) -> String { - fs::read_to_string(PathBuf::from("./tests/support/").join(name.to_owned() + ".json")) - .await - .unwrap() - } - - async fn read_event(name: &str) -> Event { - serde_json::from_str::(read_json_fixture(name).await.as_str()) - .unwrap() - .into_result() - .unwrap() - } - - async fn must_recv(ch: &mut ChannelRx, timeout_ms: u64) -> T { - let mut delay = time::delay_for(Duration::from_millis(timeout_ms)); - tokio::select! { - _ = &mut delay, if !delay.is_elapsed() => panic!("timed out waiting for recv"), - Some(v) = ch.recv() => v, - } - } - - async fn must_not_recv(ch: &mut ChannelRx, timeout_ms: u64) - where - T: std::fmt::Debug, - { - let mut delay = time::delay_for(Duration::from_millis(timeout_ms)); - tokio::select! { - _ = &mut delay, if !delay.is_elapsed() => (), - Some(v) = ch.recv() => panic!("got unexpected result from channel: {:?}", v), - } - } - - #[tokio::test] - async fn router_basic_pub_sub() { - let mut router = SubscriptionRouter::default(); - - let (subs1_id, subs2_id, subs3_id) = ( - SubscriptionId::default(), - SubscriptionId::default(), - SubscriptionId::default(), - ); - let (subs1_event_tx, mut subs1_event_rx) = unbounded(); - let (subs2_event_tx, mut subs2_event_rx) = unbounded(); - let (subs3_event_tx, mut subs3_event_rx) = unbounded(); - - // Two subscriptions with the same query - router.add(&subs1_id, "query1".into(), subs1_event_tx); - router.add(&subs2_id, "query1".into(), subs2_event_tx); - // Another subscription with a different query - router.add(&subs3_id, "query2".into(), subs3_event_tx); - - let mut ev = read_event("event_new_block_1").await; - ev.query = "query1".into(); - router.publish(ev.clone()).await; - - let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); - let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap(); - must_not_recv(&mut subs3_event_rx, 50).await; - assert_eq!(ev, subs1_ev); - assert_eq!(ev, subs2_ev); - - ev.query = "query2".into(); - router.publish(ev.clone()).await; - - must_not_recv(&mut subs1_event_rx, 50).await; - must_not_recv(&mut subs2_event_rx, 50).await; - let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap(); - assert_eq!(ev, subs3_ev); - } - - #[cfg(feature = "websocket-client")] - #[tokio::test] - async fn router_pending_subscription() { - let mut router = TwoPhaseSubscriptionRouter::default(); - let subs_id = SubscriptionId::default(); - let (event_tx, mut event_rx) = unbounded(); - let (result_tx, mut result_rx) = unbounded(); - let query = "query".to_string(); - let mut ev = read_event("event_new_block_1").await; - ev.query = query.clone(); - - assert!(router.subscription_state(&subs_id.to_string()).is_none()); - router.pending_add( - subs_id.as_str(), - &subs_id, - query.clone(), - event_tx, - result_tx, - ); - assert_eq!( - SubscriptionState::Pending, - router.subscription_state(subs_id.as_str()).unwrap() - ); - router.publish(ev.clone()).await; - must_not_recv(&mut event_rx, 50).await; - - router.confirm_add(subs_id.as_str()).await.unwrap(); - assert_eq!( - SubscriptionState::Active, - router.subscription_state(subs_id.as_str()).unwrap() - ); - must_not_recv(&mut event_rx, 50).await; - let _ = must_recv(&mut result_rx, 500).await; - - router.publish(ev.clone()).await; - let received_ev = must_recv(&mut event_rx, 500).await.unwrap(); - assert_eq!(ev, received_ev); - - let (result_tx, mut result_rx) = unbounded(); - router.pending_remove(subs_id.as_str(), &subs_id, query.clone(), result_tx); - assert_eq!( - SubscriptionState::Cancelling, - router.subscription_state(subs_id.as_str()).unwrap(), - ); - - router.confirm_remove(subs_id.as_str()).await.unwrap(); - assert!(router.subscription_state(subs_id.as_str()).is_none()); - router.publish(ev.clone()).await; - if must_recv(&mut result_rx, 500).await.is_err() { - panic!("we should have received successful confirmation of the unsubscribe request") - } - } - - #[cfg(feature = "websocket-client")] - #[tokio::test] - async fn router_cancel_pending_subscription() { - let mut router = TwoPhaseSubscriptionRouter::default(); - let subs_id = SubscriptionId::default(); - let (event_tx, mut event_rx) = unbounded::>(); - let (result_tx, mut result_rx) = unbounded::>(); - let query = "query".to_string(); - let mut ev = read_event("event_new_block_1").await; - ev.query = query.clone(); - - assert!(router.subscription_state(subs_id.as_str()).is_none()); - router.pending_add(subs_id.as_str(), &subs_id, query, event_tx, result_tx); - assert_eq!( - SubscriptionState::Pending, - router.subscription_state(subs_id.as_str()).unwrap() - ); - router.publish(ev.clone()).await; - must_not_recv(&mut event_rx, 50).await; - - let cancel_error = Error::client_internal_error("cancelled"); - router - .cancel_add(subs_id.as_str(), cancel_error.clone()) - .await - .unwrap(); - assert!(router.subscription_state(subs_id.as_str()).is_none()); - assert_eq!(Err(cancel_error), must_recv(&mut result_rx, 500).await); - router.publish(ev.clone()).await; - must_not_recv(&mut event_rx, 50).await; + pub fn query(&self) -> &Query { + &self.query } } diff --git a/rpc/src/client/sync.rs b/rpc/src/client/sync.rs index e100425f7..275a9b8b8 100644 --- a/rpc/src/client/sync.rs +++ b/rpc/src/client/sync.rs @@ -27,7 +27,7 @@ pub fn unbounded() -> (ChannelTx, ChannelRx) { pub struct ChannelTx(mpsc::UnboundedSender); impl ChannelTx { - pub async fn send(&mut self, value: T) -> Result<()> { + pub fn send(&self, value: T) -> Result<()> { self.0.send(value).map_err(|e| { Error::client_internal_error(format!( "failed to send message to internal channel: {}", @@ -45,6 +45,7 @@ pub struct ChannelRx(#[pin] mpsc::UnboundedReceiver); impl ChannelRx { /// Wait indefinitely until we receive a value from the channel (or the /// channel is closed). + #[allow(dead_code)] pub async fn recv(&mut self) -> Option { self.0.recv().await } diff --git a/rpc/src/client/transport.rs b/rpc/src/client/transport.rs index edd65bc2b..3aa8c649b 100644 --- a/rpc/src/client/transport.rs +++ b/rpc/src/client/transport.rs @@ -1,24 +1,10 @@ //! Tendermint RPC client implementations for different transports. pub mod mock; +mod router; +mod utils; #[cfg(feature = "http-client")] pub mod http; #[cfg(feature = "websocket-client")] pub mod websocket; - -use crate::{Error, Result}; -use tendermint::net; - -// TODO(thane): Should we move this into a separate module? -/// Convenience method to extract the host and port associated with the given -/// address, but only if it's a TCP address (it fails otherwise). -pub fn get_tcp_host_port(address: net::Address) -> Result<(String, u16)> { - match address { - net::Address::Tcp { host, port, .. } => Ok((host, port)), - other => Err(Error::invalid_params(&format!( - "invalid RPC address: {:?}", - other - ))), - } -} diff --git a/rpc/src/client/transport/http.rs b/rpc/src/client/transport/http.rs index 98846f1e7..cfb52ae71 100644 --- a/rpc/src/client/transport/http.rs +++ b/rpc/src/client/transport/http.rs @@ -1,7 +1,7 @@ //! HTTP-based transport for Tendermint RPC Client. -use crate::client::transport::get_tcp_host_port; -use crate::{Client, Request, Response, Result}; +use crate::client::transport::utils::get_tcp_host_port; +use crate::{Client, Response, Result, SimpleRequest}; use async_trait::async_trait; use bytes::buf::ext::BufExt; use hyper::header; @@ -43,9 +43,29 @@ pub struct HttpClient { impl Client for HttpClient { async fn perform(&self, request: R) -> Result where - R: Request, + R: SimpleRequest, { - http_request(&self.host, self.port, request).await + let request_body = request.into_json(); + + let mut request = hyper::Request::builder() + .method("POST") + .uri(&format!("http://{}:{}/", self.host, self.port)) + .body(hyper::Body::from(request_body.into_bytes()))?; + + { + let headers = request.headers_mut(); + headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); + headers.insert( + header::USER_AGENT, + format!("tendermint.rs/{}", env!("CARGO_PKG_VERSION")) + .parse() + .unwrap(), + ); + } + let http_client = hyper::Client::builder().build_http(); + let response = http_client.request(request).await?; + let response_body = hyper::body::aggregate(response.into_body()).await?; + R::Response::from_reader(response_body.reader()) } } @@ -56,30 +76,3 @@ impl HttpClient { Ok(HttpClient { host, port }) } } - -pub async fn http_request(host: &str, port: u16, request: R) -> Result -where - R: Request, -{ - let request_body = request.into_json(); - - let mut request = hyper::Request::builder() - .method("POST") - .uri(&format!("http://{}:{}/", host, port)) - .body(hyper::Body::from(request_body.into_bytes()))?; - - { - let headers = request.headers_mut(); - headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); - headers.insert( - header::USER_AGENT, - format!("tendermint.rs/{}", env!("CARGO_PKG_VERSION")) - .parse() - .unwrap(), - ); - } - let http_client = hyper::Client::builder().build_http(); - let response = http_client.request(request).await?; - let response_body = hyper::body::aggregate(response.into_body()).await?; - R::Response::from_reader(response_body.reader()) -} diff --git a/rpc/src/client/transport/mock.rs b/rpc/src/client/transport/mock.rs index 775f7d2f4..04bad190e 100644 --- a/rpc/src/client/transport/mock.rs +++ b/rpc/src/client/transport/mock.rs @@ -1,9 +1,12 @@ //! Mock client implementation for use in testing. -mod subscription; -pub use subscription::MockSubscriptionClient; - -use crate::{Client, Error, Method, Request, Response, Result}; +use crate::client::subscription::SubscriptionTx; +use crate::client::sync::{unbounded, ChannelRx, ChannelTx}; +use crate::client::transport::router::SubscriptionRouter; +use crate::event::Event; +use crate::query::Query; +use crate::utils::uuid_str; +use crate::{Client, Error, Method, Request, Response, Result, Subscription, SubscriptionClient}; use async_trait::async_trait; use std::collections::HashMap; @@ -32,16 +35,21 @@ use std::collections::HashMap; /// async fn main() { /// let matcher = MockRequestMethodMatcher::default() /// .map(Method::AbciInfo, Ok(ABCI_INFO_RESPONSE.to_string())); -/// let client = MockClient::new(matcher); +/// let (client, driver) = MockClient::new(matcher); +/// let driver_hdl = tokio::spawn(async move { driver.run().await }); /// /// let abci_info = client.abci_info().await.unwrap(); /// println!("Got mock ABCI info: {:?}", abci_info); /// assert_eq!("GaiaApp".to_string(), abci_info.data); +/// +/// client.close(); +/// driver_hdl.await.unwrap(); /// } /// ``` #[derive(Debug)] pub struct MockClient { matcher: M, + driver_tx: ChannelTx, } #[async_trait] @@ -58,8 +66,117 @@ impl Client for MockClient { impl MockClient { /// Create a new mock RPC client using the given request matcher. - pub fn new(matcher: M) -> Self { - Self { matcher } + pub fn new(matcher: M) -> (Self, MockClientDriver) { + let (driver_tx, driver_rx) = unbounded(); + ( + Self { matcher, driver_tx }, + MockClientDriver::new(driver_rx), + ) + } + + /// Publishes the given event to all subscribers whose query exactly + /// matches that of the event. + pub fn publish(&self, ev: &Event) { + self.driver_tx + .send(DriverCommand::Publish(Box::new(ev.clone()))) + .unwrap(); + } + + /// Signal to the mock client's driver to terminate. + pub fn close(self) { + self.driver_tx.send(DriverCommand::Terminate).unwrap(); + } +} + +#[async_trait] +impl SubscriptionClient for MockClient { + async fn subscribe(&self, query: Query) -> Result { + let id = uuid_str(); + let (subs_tx, subs_rx) = unbounded(); + let (result_tx, mut result_rx) = unbounded(); + self.driver_tx.send(DriverCommand::Subscribe { + id: id.clone(), + query: query.clone(), + subscription_tx: subs_tx, + result_tx, + })?; + result_rx.recv().await.unwrap()?; + Ok(Subscription::new(id, query, subs_rx)) + } + + async fn unsubscribe(&self, query: Query) -> Result<()> { + let (result_tx, mut result_rx) = unbounded(); + self.driver_tx + .send(DriverCommand::Unsubscribe { query, result_tx })?; + result_rx.recv().await.unwrap() + } +} + +#[derive(Debug)] +pub enum DriverCommand { + Subscribe { + id: String, + query: Query, + subscription_tx: SubscriptionTx, + result_tx: ChannelTx>, + }, + Unsubscribe { + query: Query, + result_tx: ChannelTx>, + }, + Publish(Box), + Terminate, +} + +#[derive(Debug)] +pub struct MockClientDriver { + router: SubscriptionRouter, + rx: ChannelRx, +} + +impl MockClientDriver { + pub fn new(rx: ChannelRx) -> Self { + Self { + router: SubscriptionRouter::default(), + rx, + } + } + + pub async fn run(mut self) -> Result<()> { + loop { + tokio::select! { + Some(cmd) = self.rx.recv() => match cmd { + DriverCommand::Subscribe { id, query, subscription_tx, result_tx } => { + self.subscribe(id, query, subscription_tx, result_tx); + } + DriverCommand::Unsubscribe { query, result_tx } => { + self.unsubscribe(query, result_tx); + } + DriverCommand::Publish(event) => self.publish(event.as_ref()), + DriverCommand::Terminate => return Ok(()), + } + } + } + } + + fn subscribe( + &mut self, + id: String, + query: Query, + subscription_tx: SubscriptionTx, + result_tx: ChannelTx>, + ) { + self.router.add(id, query, subscription_tx); + result_tx.send(Ok(())).unwrap(); + } + + fn unsubscribe(&mut self, query: Query, result_tx: ChannelTx>) { + self.router.remove_by_query(query); + result_tx.send(Ok(())).unwrap(); + } + + fn publish(&mut self, event: &Event) { + self.router.publish(event); } } @@ -118,6 +235,8 @@ impl MockRequestMethodMatcher { #[cfg(test)] mod test { use super::*; + use crate::query::EventType; + use futures::StreamExt; use std::path::PathBuf; use tendermint::block::Height; use tendermint::chain::Id; @@ -129,6 +248,10 @@ mod test { .unwrap() } + async fn read_event(name: &str) -> Event { + Event::from_string(&read_json_fixture(name).await).unwrap() + } + #[tokio::test] async fn mock_client() { let abci_info_fixture = read_json_fixture("abci_info").await; @@ -136,7 +259,8 @@ mod test { let matcher = MockRequestMethodMatcher::default() .map(Method::AbciInfo, Ok(abci_info_fixture)) .map(Method::Block, Ok(block_fixture)); - let client = MockClient::new(matcher); + let (client, driver) = MockClient::new(matcher); + let driver_hdl = tokio::spawn(async move { driver.run().await }); let abci_info = client.abci_info().await.unwrap(); assert_eq!("GaiaApp".to_string(), abci_info.data); @@ -145,5 +269,45 @@ mod test { let block = client.block(Height::from(10_u32)).await.unwrap().block; assert_eq!(Height::from(10_u32), block.header.height); assert_eq!("cosmoshub-2".parse::().unwrap(), block.header.chain_id); + + client.close(); + driver_hdl.await.unwrap().unwrap(); + } + + #[tokio::test] + async fn mock_subscription_client() { + let (client, driver) = MockClient::new(MockRequestMethodMatcher::default()); + let driver_hdl = tokio::spawn(async move { driver.run().await }); + + let event1 = read_event("event_new_block_1").await; + let event2 = read_event("event_new_block_2").await; + let event3 = read_event("event_new_block_3").await; + let events = vec![event1, event2, event3]; + + let subs1 = client.subscribe(EventType::NewBlock.into()).await.unwrap(); + let subs2 = client.subscribe(EventType::NewBlock.into()).await.unwrap(); + assert_ne!(subs1.id().to_string(), subs2.id().to_string()); + + // We can do this because the underlying channels can buffer the + // messages as we publish them. + let subs1_events = subs1.take(3); + let subs2_events = subs2.take(3); + for ev in &events { + client.publish(ev); + } + + // Here each subscription's channel is drained. + let subs1_events = subs1_events.collect::>>().await; + let subs2_events = subs2_events.collect::>>().await; + + assert_eq!(3, subs1_events.len()); + assert_eq!(3, subs2_events.len()); + + for i in 0..3 { + assert!(events[i].eq(subs1_events[i].as_ref().unwrap())); + } + + client.close(); + driver_hdl.await.unwrap().unwrap(); } } diff --git a/rpc/src/client/transport/mock/subscription.rs b/rpc/src/client/transport/mock/subscription.rs deleted file mode 100644 index 8f18386b1..000000000 --- a/rpc/src/client/transport/mock/subscription.rs +++ /dev/null @@ -1,212 +0,0 @@ -//! Subscription functionality for the Tendermint RPC mock client. - -use crate::client::subscription::{SubscriptionDriverCmd, SubscriptionRouter}; -use crate::client::sync::{unbounded, ChannelRx, ChannelTx}; -use crate::event::Event; -use crate::query::Query; -use crate::{Error, Result, Subscription, SubscriptionClient, SubscriptionId}; -use async_trait::async_trait; -use tokio::task::JoinHandle; - -/// A mock client that facilitates [`Event`] subscription. -/// -/// Creating a `MockSubscriptionClient` will immediately spawn an asynchronous -/// driver task that handles routing of incoming [`Event`]s. The -/// `MockSubscriptionClient` then effectively becomes a handle to the -/// asynchronous driver. -/// -/// [`Event`]: event/struct.Event.html -#[derive(Debug)] -pub struct MockSubscriptionClient { - driver_hdl: JoinHandle>, - event_tx: ChannelTx, - cmd_tx: ChannelTx, -} - -#[async_trait] -impl SubscriptionClient for MockSubscriptionClient { - async fn subscribe(&mut self, query: Query) -> Result { - let (event_tx, event_rx) = unbounded(); - let (result_tx, mut result_rx) = unbounded(); - let id = SubscriptionId::default(); - self.send_cmd(SubscriptionDriverCmd::Subscribe { - id: id.clone(), - query: query.clone(), - event_tx, - result_tx, - }) - .await?; - result_rx.recv().await.ok_or_else(|| { - Error::client_internal_error( - "failed to receive subscription confirmation from mock client driver", - ) - })??; - - Ok(Subscription::new(id, query, event_rx, self.cmd_tx.clone())) - } -} - -impl MockSubscriptionClient { - /// Publish the given event to all subscribers whose queries match that of - /// the event. - pub async fn publish(&mut self, ev: Event) -> Result<()> { - self.event_tx.send(ev).await - } - - async fn send_cmd(&mut self, cmd: SubscriptionDriverCmd) -> Result<()> { - self.cmd_tx.send(cmd).await - } - - /// Attempt to gracefully close this client. - pub async fn close(mut self) -> Result<()> { - self.send_cmd(SubscriptionDriverCmd::Terminate).await?; - self.driver_hdl.await.map_err(|e| { - Error::client_internal_error(format!( - "failed to terminate mock client driver task: {}", - e - )) - })? - } -} - -impl Default for MockSubscriptionClient { - fn default() -> Self { - let (event_tx, event_rx) = unbounded(); - let (cmd_tx, cmd_rx) = unbounded(); - let driver = MockSubscriptionClientDriver::new(event_rx, cmd_rx); - let driver_hdl = tokio::spawn(async move { driver.run().await }); - Self { - driver_hdl, - event_tx, - cmd_tx, - } - } -} - -#[derive(Debug)] -struct MockSubscriptionClientDriver { - event_rx: ChannelRx, - cmd_rx: ChannelRx, - router: SubscriptionRouter, -} - -impl MockSubscriptionClientDriver { - fn new(event_rx: ChannelRx, cmd_rx: ChannelRx) -> Self { - Self { - event_rx, - cmd_rx, - router: SubscriptionRouter::default(), - } - } - - async fn run(mut self) -> Result<()> { - loop { - tokio::select! { - Some(ev) = self.event_rx.recv() => self.router.publish(ev).await, - Some(cmd) = self.cmd_rx.recv() => match cmd { - SubscriptionDriverCmd::Subscribe { - id, - query, - event_tx, - result_tx, - } => self.subscribe(id, query, event_tx, result_tx).await?, - SubscriptionDriverCmd::Unsubscribe { - id, - query, - result_tx, - } => self.unsubscribe(id, query, result_tx).await?, - SubscriptionDriverCmd::Terminate => return Ok(()), - }, - } - } - } - - async fn subscribe( - &mut self, - id: SubscriptionId, - query: impl ToString, - event_tx: ChannelTx>, - mut result_tx: ChannelTx>, - ) -> Result<()> { - self.router.add(&id, query.to_string(), event_tx); - result_tx.send(Ok(())).await - } - - async fn unsubscribe( - &mut self, - id: SubscriptionId, - query: impl ToString, - mut result_tx: ChannelTx>, - ) -> Result<()> { - self.router.remove(&id, query.to_string()); - result_tx.send(Ok(())).await - } -} - -#[cfg(test)] -mod test { - use super::*; - use crate::query::EventType; - use crate::Response; - use futures::StreamExt; - use std::path::PathBuf; - use tokio::fs; - - async fn read_json_fixture(name: &str) -> String { - fs::read_to_string(PathBuf::from("./tests/support/").join(name.to_owned() + ".json")) - .await - .unwrap() - } - - async fn read_event(name: &str) -> Event { - Event::from_string(&read_json_fixture(name).await).unwrap() - } - - fn take_from_subs_and_terminate( - mut subs: Subscription, - count: usize, - ) -> JoinHandle>> { - tokio::spawn(async move { - let mut res = Vec::new(); - while let Some(res_ev) = subs.next().await { - res.push(res_ev); - if res.len() >= count { - break; - } - } - subs.terminate().await.unwrap(); - res - }) - } - - #[tokio::test] - async fn mock_subscription_client() { - let mut client = MockSubscriptionClient::default(); - let event1 = read_event("event_new_block_1").await; - let event2 = read_event("event_new_block_2").await; - let event3 = read_event("event_new_block_3").await; - let events = vec![event1, event2, event3]; - - let subs1 = client.subscribe(EventType::NewBlock.into()).await.unwrap(); - let subs2 = client.subscribe(EventType::NewBlock.into()).await.unwrap(); - assert_ne!(subs1.id, subs2.id); - - let subs1_events = take_from_subs_and_terminate(subs1, 3); - let subs2_events = take_from_subs_and_terminate(subs2, 3); - for ev in &events { - client.publish(ev.clone()).await.unwrap(); - } - - let subs1_events = subs1_events.await.unwrap(); - let subs2_events = subs2_events.await.unwrap(); - - assert_eq!(3, subs1_events.len()); - assert_eq!(3, subs2_events.len()); - - for i in 0..3 { - assert!(events[i].eq(subs1_events[i].as_ref().unwrap())); - } - - client.close().await.unwrap(); - } -} diff --git a/rpc/src/client/transport/router.rs b/rpc/src/client/transport/router.rs new file mode 100644 index 000000000..f13a029cb --- /dev/null +++ b/rpc/src/client/transport/router.rs @@ -0,0 +1,179 @@ +//! Event routing for subscriptions. + +use crate::client::subscription::SubscriptionTx; +use crate::event::Event; +use std::borrow::BorrowMut; +use std::collections::{HashMap, HashSet}; +use tracing::debug; + +/// Provides a mechanism for tracking [`Subscription`]s and routing [`Event`]s +/// to those subscriptions. +/// +/// [`Subscription`]: struct.Subscription.html +/// [`Event`]: ./event/struct.Event.html +#[derive(Debug)] +pub struct SubscriptionRouter { + // A map of subscription queries to collections of subscription IDs and + // their result channels. Used for publishing events relating to a specific + // query. + subscriptions: HashMap>, +} + +impl SubscriptionRouter { + /// Publishes the given event to all of the subscriptions to which the + /// event is relevant. At present, it matches purely based on the query + /// associated with the event, and only queries that exactly match that of + /// the event's. + pub fn publish(&mut self, ev: &Event) -> PublishResult { + let subs_for_query = match self.subscriptions.get_mut(&ev.query) { + Some(s) => s, + None => return PublishResult::NoSubscribers, + }; + // We assume here that any failure to publish an event is an indication + // that the receiver end of the channel has been dropped, which allows + // us to safely stop tracking the subscription. + let mut disconnected = HashSet::new(); + for (id, event_tx) in subs_for_query.borrow_mut() { + if let Err(e) = event_tx.send(Ok(ev.clone())) { + disconnected.insert(id.clone()); + debug!( + "Automatically disconnecting subscription with ID {} for query \"{}\" due to failure to publish to it: {}", + id, ev.query, e + ); + } + } + for id in disconnected { + subs_for_query.remove(&id); + } + if subs_for_query.is_empty() { + PublishResult::AllDisconnected + } else { + PublishResult::Success + } + } + + /// Immediately add a new subscription to the router without waiting for + /// confirmation. + pub fn add(&mut self, id: impl ToString, query: impl ToString, tx: SubscriptionTx) { + let query = query.to_string(); + let subs_for_query = match self.subscriptions.get_mut(&query) { + Some(s) => s, + None => { + self.subscriptions.insert(query.clone(), HashMap::new()); + self.subscriptions.get_mut(&query).unwrap() + } + }; + subs_for_query.insert(id.to_string(), tx); + } + + /// Removes all the subscriptions relating to the given query. + pub fn remove_by_query(&mut self, query: impl ToString) -> usize { + self.subscriptions + .remove(&query.to_string()) + .map(|subs_for_query| subs_for_query.len()) + .unwrap_or(0) + } +} + +#[cfg(feature = "websocket-client")] +impl SubscriptionRouter { + /// Returns the number of active subscriptions for the given query. + pub fn num_subscriptions_for_query(&self, query: impl ToString) -> usize { + self.subscriptions + .get(&query.to_string()) + .map(|subs_for_query| subs_for_query.len()) + .unwrap_or(0) + } +} + +impl Default for SubscriptionRouter { + fn default() -> Self { + Self { + subscriptions: HashMap::new(), + } + } +} + +#[derive(Debug, Clone)] +pub enum PublishResult { + Success, + NoSubscribers, + AllDisconnected, +} + +#[cfg(test)] +mod test { + use super::*; + use crate::client::sync::{unbounded, ChannelRx}; + use crate::event::{Event, WrappedEvent}; + use crate::utils::uuid_str; + use std::path::PathBuf; + use tokio::fs; + use tokio::time::{self, Duration}; + + async fn read_json_fixture(name: &str) -> String { + fs::read_to_string(PathBuf::from("./tests/support/").join(name.to_owned() + ".json")) + .await + .unwrap() + } + + async fn read_event(name: &str) -> Event { + serde_json::from_str::(read_json_fixture(name).await.as_str()) + .unwrap() + .into_result() + .unwrap() + } + + async fn must_recv(ch: &mut ChannelRx, timeout_ms: u64) -> T { + let mut delay = time::delay_for(Duration::from_millis(timeout_ms)); + tokio::select! { + _ = &mut delay, if !delay.is_elapsed() => panic!("timed out waiting for recv"), + Some(v) = ch.recv() => v, + } + } + + async fn must_not_recv(ch: &mut ChannelRx, timeout_ms: u64) + where + T: std::fmt::Debug, + { + let mut delay = time::delay_for(Duration::from_millis(timeout_ms)); + tokio::select! { + _ = &mut delay, if !delay.is_elapsed() => (), + Some(v) = ch.recv() => panic!("got unexpected result from channel: {:?}", v), + } + } + + #[tokio::test] + async fn router_basic_pub_sub() { + let mut router = SubscriptionRouter::default(); + + let (subs1_id, subs2_id, subs3_id) = (uuid_str(), uuid_str(), uuid_str()); + let (subs1_event_tx, mut subs1_event_rx) = unbounded(); + let (subs2_event_tx, mut subs2_event_rx) = unbounded(); + let (subs3_event_tx, mut subs3_event_rx) = unbounded(); + + // Two subscriptions with the same query + router.add(subs1_id, "query1", subs1_event_tx); + router.add(subs2_id, "query1", subs2_event_tx); + // Another subscription with a different query + router.add(subs3_id, "query2", subs3_event_tx); + + let mut ev = read_event("event_new_block_1").await; + ev.query = "query1".into(); + router.publish(&ev); + + let subs1_ev = must_recv(&mut subs1_event_rx, 500).await.unwrap(); + let subs2_ev = must_recv(&mut subs2_event_rx, 500).await.unwrap(); + must_not_recv(&mut subs3_event_rx, 50).await; + assert_eq!(ev, subs1_ev); + assert_eq!(ev, subs2_ev); + + ev.query = "query2".into(); + router.publish(&ev); + + must_not_recv(&mut subs1_event_rx, 50).await; + must_not_recv(&mut subs2_event_rx, 50).await; + let subs3_ev = must_recv(&mut subs3_event_rx, 500).await.unwrap(); + assert_eq!(ev, subs3_ev); + } +} diff --git a/rpc/src/client/transport/utils.rs b/rpc/src/client/transport/utils.rs new file mode 100644 index 000000000..16419b1e0 --- /dev/null +++ b/rpc/src/client/transport/utils.rs @@ -0,0 +1,16 @@ +//! Client transport-related utilities. + +use crate::{Error, Result}; +use tendermint::net; + +/// Convenience method to extract the host and port associated with the given +/// address, but only if it's a TCP address (it fails otherwise). +pub(crate) fn get_tcp_host_port(address: net::Address) -> Result<(String, u16)> { + match address { + net::Address::Tcp { host, port, .. } => Ok((host, port)), + other => Err(Error::invalid_params(&format!( + "invalid RPC address: {:?}", + other + ))), + } +} diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index a485beca2..e8e97e469 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -1,15 +1,17 @@ //! WebSocket-based clients for accessing Tendermint RPC functionality. -use crate::client::subscription::{ - SubscriptionDriverCmd, SubscriptionState, TwoPhaseSubscriptionRouter, -}; +use crate::client::subscription::SubscriptionTx; use crate::client::sync::{unbounded, ChannelRx, ChannelTx}; -use crate::client::transport::get_tcp_host_port; +use crate::client::transport::router::{PublishResult, SubscriptionRouter}; +use crate::client::transport::utils::get_tcp_host_port; use crate::endpoint::{subscribe, unsubscribe}; use crate::event::Event; use crate::query::Query; +use crate::request::Wrapper; +use crate::utils::uuid_str; use crate::{ - request, response, Error, Response, Result, Subscription, SubscriptionClient, SubscriptionId, + response, Client, Error, Id, Request, Response, Result, SimpleRequest, Subscription, + SubscriptionClient, }; use async_trait::async_trait; use async_tungstenite::tokio::{connect_async, TokioAdapter}; @@ -20,12 +22,12 @@ use async_tungstenite::WebSocketStream; use futures::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; +use std::collections::HashMap; use std::ops::Add; -use std::str::FromStr; use tendermint::net; use tokio::net::TcpStream; use tokio::time::{Duration, Instant}; -use tracing::debug; +use tracing::{debug, error}; // WebSocket connection times out if we haven't heard anything at all from the // server in this long. @@ -40,15 +42,23 @@ const RECV_TIMEOUT: Duration = Duration::from_secs(RECV_TIMEOUT_SECONDS); // Taken from https://github.com/tendermint/tendermint/blob/309e29c245a01825fc9630103311fd04de99fa5e/rpc/jsonrpc/server/ws_handler.go#L28 const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / 10); -/// Tendermint RPC client that provides [`Event`] subscription capabilities -/// over JSON-RPC over a WebSocket connection. +/// Tendermint RPC client that provides access to all RPC functionality +/// (including [`Event`] subscription) over a WebSocket connection. +/// +/// The `WebSocketClient` itself is effectively just a handle to its driver +/// (see the [`new`] method). The driver is the component of the client that +/// actually interacts with the remote RPC over the WebSocket connection. +/// The `WebSocketClient` can therefore be cloned into different asynchronous +/// contexts, effectively allowing for asynchronous access to the driver. /// -/// In order to not block the calling task, this client spawns an asynchronous -/// driver that continuously interacts with the actual WebSocket connection. -/// The `WebSocketClient` itself is effectively just a handle to this driver. -/// This driver is spawned as the client is created. +/// It is the caller's responsibility to spawn an asynchronous task in which to +/// execute the driver's [`run`] method. See the example below. /// -/// To terminate the client and the driver, simply use its [`close`] method. +/// Dropping [`Subscription`]s will automatically terminate them (the +/// `WebSocketClientDriver` detects a disconnected channel and removes the +/// subscription from its internal routing table). When all subscriptions to a +/// particular query have disconnected, the driver will automatically issue an +/// unsubscribe request to the remote RPC endpoint. /// /// ### Timeouts /// @@ -70,17 +80,23 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// ## Examples /// /// ```rust,ignore -/// use tendermint_rpc::{WebSocketClient, SubscriptionClient}; +/// use tendermint::abci::Transaction; +/// use tendermint_rpc::{WebSocketClient, SubscriptionClient, Client}; /// use tendermint_rpc::query::EventType; /// use futures::StreamExt; /// /// #[tokio::main] /// async fn main() { -/// let (mut client, driver) = WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) +/// let (client, driver) = WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) /// .await /// .unwrap(); /// let driver_handle = tokio::spawn(async move { driver.run().await }); /// +/// // Standard client functionality +/// let tx = format!("some-key=some-value"); +/// client.broadcast_tx_async(Transaction::from(tx.into_bytes())).await.unwrap(); +/// +/// // Subscription functionality /// let mut subs = client.subscribe(EventType::NewBlock.into()) /// .await /// .unwrap(); @@ -93,16 +109,12 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// println!("Got event: {:?}", ev); /// ev_count -= 1; /// if ev_count < 0 { -/// break +/// break; /// } /// } /// -/// // Sends an unsubscribe request via the WebSocket connection, but keeps -/// // the connection open. -/// subs.terminate().await.unwrap(); -/// /// // Signal to the driver to terminate. -/// client.close().await.unwrap(); +/// client.close().unwrap(); /// // Await the driver's termination to ensure proper connection closure. /// let _ = driver_handle.await.unwrap(); /// } @@ -110,10 +122,13 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// /// [`Event`]: ./event/struct.Event.html /// [`close`]: struct.WebSocketClient.html#method.close +/// [`new`]: struct.WebSocketClient.html#method.new +/// [`run`]: struct.WebSocketClientDriver.html#method.run +/// [`Subscription`]: struct.Subscription.html /// [tendermint-websocket-ping]: https://github.com/tendermint/tendermint/blob/309e29c245a01825fc9630103311fd04de99fa5e/rpc/jsonrpc/server/ws_handler.go#L28 #[derive(Debug, Clone)] pub struct WebSocketClient { - cmd_tx: ChannelTx, + cmd_tx: ChannelTx, } impl WebSocketClient { @@ -134,44 +149,122 @@ impl WebSocketClient { Ok((Self { cmd_tx }, driver)) } - async fn send_cmd(&mut self, cmd: SubscriptionDriverCmd) -> Result<()> { - self.cmd_tx.send(cmd).await.map_err(|e| { + fn send_cmd(&self, cmd: DriverCommand) -> Result<()> { + self.cmd_tx.send(cmd).map_err(|e| { Error::client_internal_error(format!("failed to send command to client driver: {}", e)) }) } /// Signals to the driver that it must terminate. - pub async fn close(mut self) -> Result<()> { - self.send_cmd(SubscriptionDriverCmd::Terminate).await + pub fn close(self) -> Result<()> { + self.send_cmd(DriverCommand::Terminate) + } +} + +#[async_trait] +impl Client for WebSocketClient { + async fn perform(&self, request: R) -> Result + where + R: SimpleRequest, + { + let wrapper = Wrapper::new(request); + let id = wrapper.id().clone().to_string(); + let wrapped_request = wrapper.into_json(); + let (response_tx, mut response_rx) = unbounded(); + self.send_cmd(DriverCommand::SimpleRequest(SimpleRequestCommand { + id, + wrapped_request, + response_tx, + }))?; + let response = response_rx.recv().await.ok_or_else(|| { + Error::client_internal_error("failed to hear back from WebSocket driver".to_string()) + })??; + R::Response::from_string(response) } } #[async_trait] impl SubscriptionClient for WebSocketClient { - async fn subscribe(&mut self, query: Query) -> Result { - let (event_tx, event_rx) = unbounded(); - let (result_tx, mut result_rx) = unbounded::>(); - let id = SubscriptionId::default(); - self.send_cmd(SubscriptionDriverCmd::Subscribe { - id: id.clone(), - query: query.clone(), - event_tx, - result_tx, - }) - .await?; - // Wait to make sure our subscription request went through - // successfully. - result_rx.recv().await.ok_or_else(|| { + async fn subscribe(&self, query: Query) -> Result { + let (subscription_tx, subscription_rx) = unbounded(); + let (response_tx, mut response_rx) = unbounded(); + // By default we use UUIDs to differentiate subscriptions + let id = uuid_str(); + self.send_cmd(DriverCommand::Subscribe(SubscribeCommand { + id: id.to_string(), + query: query.to_string(), + subscription_tx, + response_tx, + }))?; + // Make sure our subscription request went through successfully. + let _ = response_rx.recv().await.ok_or_else(|| { + Error::client_internal_error("failed to hear back from WebSocket driver".to_string()) + })??; + Ok(Subscription::new(id, query, subscription_rx)) + } + + async fn unsubscribe(&self, query: Query) -> Result<()> { + let (response_tx, mut response_rx) = unbounded(); + self.send_cmd(DriverCommand::Unsubscribe(UnsubscribeCommand { + query: query.to_string(), + response_tx, + }))?; + let _ = response_rx.recv().await.ok_or_else(|| { Error::client_internal_error("failed to hear back from WebSocket driver".to_string()) })??; - Ok(Subscription::new(id, query, event_rx, self.cmd_tx.clone())) + Ok(()) } } +// The different types of commands that can be sent from the WebSocketClient to +// the driver. +#[derive(Debug, Clone)] +enum DriverCommand { + // Initiate a subscription request. + Subscribe(SubscribeCommand), + // Initiate an unsubscribe request. + Unsubscribe(UnsubscribeCommand), + // For non-subscription-related requests. + SimpleRequest(SimpleRequestCommand), + Terminate, +} + +#[derive(Debug, Clone)] +struct SubscribeCommand { + // The desired ID for the outgoing JSON-RPC request. + id: String, + // The query for which we want to receive events. + query: String, + // Where to send subscription events. + subscription_tx: SubscriptionTx, + // Where to send the result of the subscription request. + response_tx: ChannelTx>, +} + +#[derive(Debug, Clone)] +struct UnsubscribeCommand { + // The query from which to unsubscribe. + query: String, + // Where to send the result of the unsubscribe request. + response_tx: ChannelTx>, +} + +#[derive(Debug, Clone)] +struct SimpleRequestCommand { + // The desired ID for the outgoing JSON-RPC request. Technically we + // could extract this from the wrapped request, but that would mean + // additional unnecessary computational resources for deserialization. + id: String, + // The wrapped and serialized JSON-RPC request. + wrapped_request: String, + // Where to send the result of the simple request. + response_tx: ChannelTx>, +} + #[derive(Serialize, Deserialize, Debug, Clone)] -struct GenericJSONResponse(serde_json::Value); +struct GenericJsonResponse(serde_json::Value); -impl Response for GenericJSONResponse {} +impl Response for GenericJsonResponse {} /// Drives the WebSocket connection for a `WebSocketClient` instance. /// @@ -179,20 +272,27 @@ impl Response for GenericJSONResponse {} /// with the remote WebSocket endpoint. #[derive(Debug)] pub struct WebSocketClientDriver { + // The underlying WebSocket network connection. stream: WebSocketStream>, - router: TwoPhaseSubscriptionRouter, - cmd_rx: ChannelRx, + // Facilitates routing of events to their respective subscriptions. + router: SubscriptionRouter, + // How we receive incoming commands from the WebSocketClient. + cmd_rx: ChannelRx, + // Commands we've received but have not yet completed, indexed by their ID. + // A Terminate command is executed immediately. + pending_commands: HashMap, } impl WebSocketClientDriver { fn new( stream: WebSocketStream>, - cmd_rx: ChannelRx, + cmd_rx: ChannelRx, ) -> Self { Self { stream, - router: TwoPhaseSubscriptionRouter::default(), + router: SubscriptionRouter::default(), cmd_rx, + pending_commands: HashMap::new(), } } @@ -206,6 +306,8 @@ impl WebSocketClientDriver { tokio::select! { Some(res) = self.stream.next() => match res { Ok(msg) => { + // Reset the receive timeout every time we successfully + // receive a message from the remote endpoint. recv_timeout.reset(Instant::now().add(PING_INTERVAL)); self.handle_incoming_msg(msg).await? }, @@ -216,18 +318,10 @@ impl WebSocketClientDriver { ), }, Some(cmd) = self.cmd_rx.recv() => match cmd { - SubscriptionDriverCmd::Subscribe { - id, - query, - event_tx, - result_tx, - } => self.subscribe(id, query, event_tx, result_tx).await?, - SubscriptionDriverCmd::Unsubscribe { - id, - query, - result_tx, - } => self.unsubscribe(id, query, result_tx).await?, - SubscriptionDriverCmd::Terminate => return self.close().await, + DriverCommand::Subscribe(subs_cmd) => self.subscribe(subs_cmd).await?, + DriverCommand::Unsubscribe(unsubs_cmd) => self.unsubscribe(unsubs_cmd).await?, + DriverCommand::SimpleRequest(req_cmd) => self.simple_request(req_cmd).await?, + DriverCommand::Terminate => return self.close().await, }, _ = ping_interval.next() => self.ping().await?, _ = &mut recv_timeout => { @@ -246,124 +340,146 @@ impl WebSocketClientDriver { }) } - async fn send_request( - &mut self, - req: request::Wrapper, - result_tx: &mut ChannelTx>, - ) -> Result<()> + async fn send_request(&mut self, wrapper: Wrapper) -> Result<()> where - R: request::Request, + R: Request, { - if let Err(e) = self - .send_msg(Message::Text(serde_json::to_string_pretty(&req).unwrap())) - .await - { - let _ = result_tx.send(Err(e.clone())).await; + self.send_msg(Message::Text( + serde_json::to_string_pretty(&wrapper).unwrap(), + )) + .await + } + + async fn subscribe(&mut self, cmd: SubscribeCommand) -> Result<()> { + // If we already have an active subscription for the given query, + // there's no need to initiate another one. Just add this subscription + // to the router. + if self.router.num_subscriptions_for_query(cmd.query.clone()) > 0 { + let (id, query, subscription_tx, response_tx) = + (cmd.id, cmd.query, cmd.subscription_tx, cmd.response_tx); + self.router.add(id, query, subscription_tx); + return response_tx.send(Ok(())); + } + + // Otherwise, we need to initiate a subscription request. + let wrapper = Wrapper::new_with_id( + Id::Str(cmd.id.clone()), + subscribe::Request::new(cmd.query.clone()), + ); + if let Err(e) = self.send_request(wrapper).await { + cmd.response_tx.send(Err(e.clone()))?; return Err(e); } + self.pending_commands + .insert(cmd.id.clone(), DriverCommand::Subscribe(cmd)); Ok(()) } - async fn subscribe( - &mut self, - id: SubscriptionId, - query: impl ToString, - event_tx: ChannelTx>, - mut result_tx: ChannelTx>, - ) -> Result<()> { - let query = query.to_string(); - let req = request::Wrapper::new_with_id( - id.clone().into(), - subscribe::Request::new(query.clone()), - ); - let _ = self.send_request(req, &mut result_tx).await; - self.router - .pending_add(id.as_str(), &id, query, event_tx, result_tx); + async fn unsubscribe(&mut self, cmd: UnsubscribeCommand) -> Result<()> { + // Terminate all subscriptions for this query immediately. This + // prioritizes acknowledgement of the caller's wishes over networking + // problems. + if self.router.remove_by_query(cmd.query.clone()) == 0 { + // If there were no subscriptions for this query, respond + // immediately. + cmd.response_tx.send(Ok(()))?; + return Ok(()); + } + + // Unsubscribe requests can (and probably should) have distinct + // JSON-RPC IDs as compared to their subscription IDs. + let wrapper = Wrapper::new(unsubscribe::Request::new(cmd.query.clone())); + let req_id = wrapper.id().clone(); + if let Err(e) = self.send_request(wrapper).await { + cmd.response_tx.send(Err(e.clone()))?; + return Err(e); + } + self.pending_commands + .insert(req_id.to_string(), DriverCommand::Unsubscribe(cmd)); Ok(()) } - async fn unsubscribe( - &mut self, - id: SubscriptionId, - query: impl ToString, - mut result_tx: ChannelTx>, - ) -> Result<()> { - let query = query.to_string(); - let req = request::Wrapper::new(unsubscribe::Request::new(query.clone())); - let req_id = req.id().to_string(); - let _ = self.send_request(req, &mut result_tx).await; - self.router.pending_remove(&req_id, &id, query, result_tx); + async fn simple_request(&mut self, cmd: SimpleRequestCommand) -> Result<()> { + if let Err(e) = self + .send_msg(Message::Text(cmd.wrapped_request.clone())) + .await + { + cmd.response_tx.send(Err(e.clone()))?; + return Err(e); + } + self.pending_commands + .insert(cmd.id.clone(), DriverCommand::SimpleRequest(cmd)); Ok(()) } async fn handle_incoming_msg(&mut self, msg: Message) -> Result<()> { match msg { Message::Text(s) => self.handle_text_msg(s).await, - Message::Ping(v) => self.pong(v).await?, - _ => (), + Message::Ping(v) => self.pong(v).await, + _ => Ok(()), } - Ok(()) } - async fn handle_text_msg(&mut self, msg: String) { - match Event::from_string(&msg) { - Ok(ev) => { - self.router.publish(ev).await; - } + async fn handle_text_msg(&mut self, msg: String) -> Result<()> { + if let Ok(ev) = Event::from_string(&msg) { + self.publish_event(ev).await; + return Ok(()); + } + + let wrapper = match serde_json::from_str::>(&msg) { + Ok(w) => w, Err(e) => { - debug!("Received incoming JSON:\n{}", msg); - debug!("error {:?}", e); - if let Ok(wrapper) = - serde_json::from_str::>(&msg) - { - self.handle_generic_response(wrapper).await; - } + error!( + "Failed to deserialize incoming message as a JSON-RPC message: {}", + e + ); + debug!("JSON-RPC message: {}", msg); + return Ok(()); + } + }; + let id = wrapper.id().to_string(); + if let Some(pending_cmd) = self.pending_commands.remove(&id) { + return self.respond_to_pending_command(pending_cmd, msg).await; + }; + // We ignore incoming messages whose ID we don't recognize (could be + // relating to a fire-and-forget unsubscribe request - see the + // publish_event() method below). + Ok(()) + } + + async fn publish_event(&mut self, ev: Event) { + if let PublishResult::AllDisconnected = self.router.publish(&ev) { + debug!( + "All subscribers for query \"{}\" have disconnected. Unsubscribing from query...", + ev.query + ); + // If all subscribers have disconnected for this query, we need to + // unsubscribe from it. We issue a fire-and-forget unsubscribe + // message. + if let Err(e) = self + .send_request(Wrapper::new(unsubscribe::Request::new(ev.query.clone()))) + .await + { + error!("Failed to send unsubscribe request: {}", e); } } } - async fn handle_generic_response(&mut self, wrapper: response::Wrapper) { - let req_id = wrapper.id().to_string(); - if let Some(state) = self.router.subscription_state(&req_id) { - match wrapper.into_result() { - Ok(_) => match state { - SubscriptionState::Pending => { - let _ = self.router.confirm_add(&req_id).await; - } - SubscriptionState::Cancelling => { - let _ = self.router.confirm_remove(&req_id).await; - } - SubscriptionState::Active => { - if let Some(event_tx) = self.router.get_active_subscription_mut( - &SubscriptionId::from_str(&req_id).unwrap(), - ) { - let _ = event_tx.send( - Err(Error::websocket_error( - "failed to parse incoming response from remote WebSocket endpoint - does this client support the remote's RPC version?", - )), - ).await; - } - } - }, - Err(e) => match state { - SubscriptionState::Pending => { - let _ = self.router.cancel_add(&req_id, e).await; - } - SubscriptionState::Cancelling => { - let _ = self.router.cancel_remove(&req_id, e).await; - } - // This is important to allow the remote endpoint to - // arbitrarily send error responses back to specific - // subscriptions. - SubscriptionState::Active => { - if let Some(event_tx) = self.router.get_active_subscription_mut( - &SubscriptionId::from_str(&req_id).unwrap(), - ) { - let _ = event_tx.send(Err(e)).await; - } - } - }, + async fn respond_to_pending_command( + &mut self, + pending_cmd: DriverCommand, + response: String, + ) -> Result<()> { + match pending_cmd { + DriverCommand::Subscribe(cmd) => { + let (id, query, subscription_tx, response_tx) = + (cmd.id, cmd.query, cmd.subscription_tx, cmd.response_tx); + self.router.add(id, query, subscription_tx); + response_tx.send(Ok(())) } + DriverCommand::Unsubscribe(cmd) => cmd.response_tx.send(Ok(())), + DriverCommand::SimpleRequest(cmd) => cmd.response_tx.send(Ok(response)), + _ => Ok(()), } } @@ -395,11 +511,10 @@ impl WebSocketClientDriver { mod test { use super::*; use crate::query::EventType; - use crate::{Id, Method}; + use crate::{request, Id, Method}; use async_tungstenite::tokio::accept_async; use futures::StreamExt; use std::collections::HashMap; - use std::convert::TryInto; use std::path::PathBuf; use std::str::FromStr; use tokio::fs; @@ -435,12 +550,12 @@ mod test { } } - async fn publish_event(&mut self, ev: Event) -> Result<()> { - self.event_tx.send(ev).await + fn publish_event(&mut self, ev: Event) -> Result<()> { + self.event_tx.send(ev) } - async fn terminate(mut self) -> Result<()> { - self.terminate_tx.send(Ok(())).await.unwrap(); + async fn terminate(self) -> Result<()> { + self.terminate_tx.send(Ok(())).unwrap(); self.driver_hdl.await.unwrap() } } @@ -470,7 +585,7 @@ mod test { async fn run(mut self) -> Result<()> { loop { tokio::select! { - Some(ev) = self.event_rx.recv() => self.publish_event(ev).await, + Some(ev) = self.event_rx.recv() => self.publish_event(ev), Some(res) = self.listener.next() => self.handle_incoming(res.unwrap()).await, Some(res) = self.terminate_rx.recv() => { self.terminate().await; @@ -482,9 +597,9 @@ mod test { // Publishes the given event to all subscribers for the query relating // to the event. - async fn publish_event(&mut self, ev: Event) { + fn publish_event(&mut self, ev: Event) { for handler in &mut self.handlers { - handler.publish_event(ev.clone()).await; + handler.publish_event(ev.clone()); } } @@ -526,12 +641,12 @@ mod test { } } - async fn publish_event(&mut self, ev: Event) { - let _ = self.event_tx.send(ev).await; + fn publish_event(&mut self, ev: Event) { + let _ = self.event_tx.send(ev); } - async fn terminate(mut self) -> Result<()> { - self.terminate_tx.send(Ok(())).await?; + async fn terminate(self) -> Result<()> { + self.terminate_tx.send(Ok(()))?; self.driver_hdl.await.unwrap() } } @@ -543,7 +658,7 @@ mod test { terminate_rx: ChannelRx>, // A mapping of subscription queries to subscription IDs for this // connection. - subscriptions: HashMap, + subscriptions: HashMap, } impl TestServerHandlerDriver { @@ -582,7 +697,7 @@ mod test { Some(id) => id.clone(), None => return, }; - let _ = self.send(subs_id.into(), ev).await; + let _ = self.send(Id::Str(subs_id), ev).await; } async fn handle_incoming_msg(&mut self, msg: Message) -> Option> { @@ -614,7 +729,7 @@ mod test { self.add_subscription( req.params().query.clone(), - req.id().clone().try_into().unwrap(), + req.id().to_string(), ); self.send(req.id().clone(), subscribe::Response {}).await; } @@ -647,7 +762,7 @@ mod test { None } - fn add_subscription(&mut self, query: String, id: SubscriptionId) { + fn add_subscription(&mut self, query: String, id: String) { println!("Adding subscription with ID {} for query: {}", &id, &query); self.subscriptions.insert(query, id); } @@ -702,7 +817,7 @@ mod test { println!("Starting WebSocket server..."); let mut server = TestServer::new("127.0.0.1:0").await; println!("Creating client RPC WebSocket connection..."); - let (mut client, driver) = WebSocketClient::new(server.node_addr.clone()) + let (client, driver) = WebSocketClient::new(server.node_addr.clone()) .await .unwrap(); let driver_handle = tokio::spawn(async move { driver.run().await }); @@ -719,21 +834,19 @@ mod test { break; } } - println!("Terminating subscription..."); - subs.terminate().await.unwrap(); results }); println!("Publishing events"); // Publish the events from this context for ev in &test_events { - server.publish_event(ev.clone()).await.unwrap(); + server.publish_event(ev.clone()).unwrap(); } println!("Collecting results from subscription..."); let collected_results = subs_collector_hdl.await.unwrap(); - client.close().await.unwrap(); + client.close().unwrap(); server.terminate().await.unwrap(); let _ = driver_handle.await.unwrap(); println!("Closed client and terminated server"); diff --git a/rpc/src/endpoint/abci_info.rs b/rpc/src/endpoint/abci_info.rs index 248398d36..f28a2f98c 100644 --- a/rpc/src/endpoint/abci_info.rs +++ b/rpc/src/endpoint/abci_info.rs @@ -19,6 +19,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// ABCI information response #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/abci_query.rs b/rpc/src/endpoint/abci_query.rs index 5712de146..f97eb8392 100644 --- a/rpc/src/endpoint/abci_query.rs +++ b/rpc/src/endpoint/abci_query.rs @@ -49,6 +49,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// ABCI query response wrapper #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/block.rs b/rpc/src/endpoint/block.rs index 2442b0b08..9933f8ad2 100644 --- a/rpc/src/endpoint/block.rs +++ b/rpc/src/endpoint/block.rs @@ -30,6 +30,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// Block responses #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/block_results.rs b/rpc/src/endpoint/block_results.rs index 7b8d066a6..5e3b70eed 100644 --- a/rpc/src/endpoint/block_results.rs +++ b/rpc/src/endpoint/block_results.rs @@ -30,6 +30,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// ABCI result response. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/blockchain.rs b/rpc/src/endpoint/blockchain.rs index a150aadb6..a334ae1f6 100644 --- a/rpc/src/endpoint/blockchain.rs +++ b/rpc/src/endpoint/blockchain.rs @@ -41,6 +41,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// Block responses #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/broadcast/tx_async.rs b/rpc/src/endpoint/broadcast/tx_async.rs index b2e294a21..2cd5bd8ae 100644 --- a/rpc/src/endpoint/broadcast/tx_async.rs +++ b/rpc/src/endpoint/broadcast/tx_async.rs @@ -26,6 +26,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// Response from either an async or sync transaction broadcast request. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/broadcast/tx_commit.rs b/rpc/src/endpoint/broadcast/tx_commit.rs index f3e201c5a..3a8fe1c1c 100644 --- a/rpc/src/endpoint/broadcast/tx_commit.rs +++ b/rpc/src/endpoint/broadcast/tx_commit.rs @@ -34,6 +34,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// Response from `/broadcast_tx_commit`. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/broadcast/tx_sync.rs b/rpc/src/endpoint/broadcast/tx_sync.rs index 57120b8e9..706d9d125 100644 --- a/rpc/src/endpoint/broadcast/tx_sync.rs +++ b/rpc/src/endpoint/broadcast/tx_sync.rs @@ -26,6 +26,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// Response from either an async or sync transaction broadcast request. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/commit.rs b/rpc/src/endpoint/commit.rs index 1b8a222ee..d08fc06b9 100644 --- a/rpc/src/endpoint/commit.rs +++ b/rpc/src/endpoint/commit.rs @@ -27,6 +27,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// Commit responses #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/evidence.rs b/rpc/src/endpoint/evidence.rs index 2a0390103..98bcb3158 100644 --- a/rpc/src/endpoint/evidence.rs +++ b/rpc/src/endpoint/evidence.rs @@ -1,8 +1,6 @@ //! `/broadcast_evidence`: broadcast an evidence. use crate::Method; -use crate::Request as RpcRequest; -use crate::Response as RpcResponse; use serde::{Deserialize, Serialize}; use tendermint::{abci::transaction, evidence::Evidence}; @@ -21,7 +19,7 @@ impl Request { } } -impl RpcRequest for Request { +impl crate::Request for Request { type Response = Response; fn method(&self) -> Method { @@ -29,6 +27,8 @@ impl RpcRequest for Request { } } +impl crate::SimpleRequest for Request {} + /// Response from either an evidence broadcast request. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { @@ -37,4 +37,4 @@ pub struct Response { pub hash: transaction::Hash, } -impl RpcResponse for Response {} +impl crate::Response for Response {} diff --git a/rpc/src/endpoint/genesis.rs b/rpc/src/endpoint/genesis.rs index a09f28f16..bbc75ab7e 100644 --- a/rpc/src/endpoint/genesis.rs +++ b/rpc/src/endpoint/genesis.rs @@ -16,6 +16,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// Block responses #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/health.rs b/rpc/src/endpoint/health.rs index a4f24dc94..677dee9e1 100644 --- a/rpc/src/endpoint/health.rs +++ b/rpc/src/endpoint/health.rs @@ -14,6 +14,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// Healthcheck responses #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response {} diff --git a/rpc/src/endpoint/net_info.rs b/rpc/src/endpoint/net_info.rs index 5be6b08cd..7efd58739 100644 --- a/rpc/src/endpoint/net_info.rs +++ b/rpc/src/endpoint/net_info.rs @@ -19,6 +19,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// Net info responses #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/status.rs b/rpc/src/endpoint/status.rs index 59e4b93bd..ae0e807f8 100644 --- a/rpc/src/endpoint/status.rs +++ b/rpc/src/endpoint/status.rs @@ -16,6 +16,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// Status responses #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/endpoint/subscribe.rs b/rpc/src/endpoint/subscribe.rs index b2040972e..1bd84d10b 100644 --- a/rpc/src/endpoint/subscribe.rs +++ b/rpc/src/endpoint/subscribe.rs @@ -1,9 +1,13 @@ //! `/subscribe` endpoint JSON-RPC wrapper use serde::{Deserialize, Serialize}; -use std::io::Read; /// Subscription request for events. +/// +/// A subscription request is not a [`SimpleRequest`], because it does not +/// return a simple, singular response. +/// +/// [`SimpleRequest`]: ../trait.SimpleRequest.html #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct Request { pub query: String, @@ -29,16 +33,4 @@ impl crate::Request for Request { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response {} -/// Subscribe does not have a meaningful response at the moment. -impl crate::Response for Response { - /// We throw away response data JSON string so swallow errors and return the empty Response - fn from_string(_response: impl AsRef<[u8]>) -> Result { - Ok(Response {}) - } - - /// We throw away responses in `subscribe` to swallow errors from the `io::Reader` and provide - /// the Response - fn from_reader(_reader: impl Read) -> Result { - Ok(Response {}) - } -} +impl crate::Response for Response {} diff --git a/rpc/src/endpoint/unsubscribe.rs b/rpc/src/endpoint/unsubscribe.rs index c496201f3..0bae0dc66 100644 --- a/rpc/src/endpoint/unsubscribe.rs +++ b/rpc/src/endpoint/unsubscribe.rs @@ -1,9 +1,8 @@ //! `/unsubscribe` endpoint JSON-RPC wrapper use serde::{Deserialize, Serialize}; -use std::io::Read; -/// Subscription request for events. +/// Request to unsubscribe from events relating to a given query. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] pub struct Request { pub query: String, @@ -28,17 +27,4 @@ impl crate::Request for Request { #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response {} -/// Unsubscribe does not have a meaningful response at the moment. -impl crate::Response for Response { - /// We throw away response data JSON string so swallow errors and return - /// the empty Response - fn from_string(_response: impl AsRef<[u8]>) -> Result { - Ok(Response {}) - } - - /// We throw away responses in `unsubscribe` to swallow errors from the - /// `io::Reader` and provide the Response - fn from_reader(_reader: impl Read) -> Result { - Ok(Response {}) - } -} +impl crate::Response for Response {} diff --git a/rpc/src/endpoint/validators.rs b/rpc/src/endpoint/validators.rs index 3decf346c..f0fa70865 100644 --- a/rpc/src/endpoint/validators.rs +++ b/rpc/src/endpoint/validators.rs @@ -25,6 +25,8 @@ impl crate::Request for Request { } } +impl crate::SimpleRequest for Request {} + /// Validator responses #[derive(Clone, Debug, Deserialize, Serialize)] pub struct Response { diff --git a/rpc/src/id.rs b/rpc/src/id.rs index 8519aba96..1a21acb89 100644 --- a/rpc/src/id.rs +++ b/rpc/src/id.rs @@ -1,7 +1,8 @@ //! JSON-RPC IDs -use getrandom::getrandom; +use crate::utils::uuid_str; use serde::{Deserialize, Serialize}; +use std::fmt; /// JSON-RPC ID: request-specific identifier #[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq, Ord, PartialOrd)] @@ -18,24 +19,16 @@ pub enum Id { impl Id { /// Create a JSON-RPC ID containing a UUID v4 (i.e. random) pub fn uuid_v4() -> Self { - let mut bytes = [0; 16]; - getrandom(&mut bytes).expect("RNG failure!"); - - let uuid = uuid::Builder::from_bytes(bytes) - .set_variant(uuid::Variant::RFC4122) - .set_version(uuid::Version::Random) - .build(); - - Id::Str(uuid.to_string()) + Self::Str(uuid_str()) } } -impl ToString for Id { - fn to_string(&self) -> String { +impl fmt::Display for Id { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { - Id::Num(i) => format!("{}", i), - Id::Str(s) => s.clone(), - Id::None => "none".to_string(), + Id::Num(i) => write!(f, "{}", i), + Id::Str(s) => write!(f, "{}", s), + Id::None => write!(f, ""), } } } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index c709416f6..2b692992f 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -6,41 +6,39 @@ //! functionality and different client transports based on which features you //! select when using it. //! -//! Two features are provided at present: +//! Two features are provided at present. //! //! * `http-client` - Provides [`HttpClient`], which is a basic RPC client that //! interacts with remote Tendermint nodes via **JSON-RPC over HTTP**. This //! client does not provide [`Event`] subscription functionality. See the //! [Tendermint RPC] for more details. -//! * `websocket-client` - Provides [`WebSocketClient`], which currently only -//! provides `Event` subscription functionality over a WebSocket connection. -//! See the [`/subscribe` endpoint] in the Tendermint RPC for more details. -//! This client does not yet provide access to the RPC methods provided by -//! the [`Client`] trait (this is planned for a future release). +//! * `websocket-client` - Provides [`WebSocketClient`], which provides full +//! client functionality, including general RPC functionality (such as that +//! provided by `HttpClient`) as well as [`Event`] subscription +//! functionality. //! //! ### Mock Clients //! //! Mock clients are included when either of the `http-client` or //! `websocket-client` features are enabled to aid in testing. This includes -//! [`MockClient`], which only implements [`Client`] (no subscription -//! functionality), and [`MockSubscriptionClient`], which helps you simulate -//! subscriptions to events being generated by a Tendermint node. +//! [`MockClient`], which implements both [`Client`] and [`SubscriptionClient`] +//! traits. //! //! [`Client`]: trait.Client.html +//! [`SubscriptionClient`]: trait.SubscriptionClient.html //! [`HttpClient`]: struct.HttpClient.html //! [`Event`]: event/struct.Event.html //! [`WebSocketClient`]: struct.WebSocketClient.html //! [Tendermint RPC]: https://docs.tendermint.com/master/rpc/ //! [`/subscribe` endpoint]: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe //! [`MockClient`]: struct.MockClient.html -//! [`MockSubscriptionClient`]: struct.MockSubscriptionClient.html #[cfg(any(feature = "http-client", feature = "websocket-client"))] mod client; #[cfg(any(feature = "http-client", feature = "websocket-client"))] pub use client::{ - Client, MockClient, MockRequestMatcher, MockRequestMethodMatcher, MockSubscriptionClient, - Subscription, SubscriptionClient, SubscriptionId, + Client, MockClient, MockRequestMatcher, MockRequestMethodMatcher, Subscription, + SubscriptionClient, }; #[cfg(feature = "http-client")] @@ -57,9 +55,10 @@ pub mod query; pub mod request; pub mod response; mod result; +mod utils; mod version; pub use self::{ - error::Error, id::Id, method::Method, request::Request, response::Response, result::Result, - version::Version, + error::Error, id::Id, method::Method, request::Request, request::SimpleRequest, + response::Response, result::Result, version::Version, }; diff --git a/rpc/src/request.rs b/rpc/src/request.rs index 17784e462..9aebd50c9 100644 --- a/rpc/src/request.rs +++ b/rpc/src/request.rs @@ -14,10 +14,20 @@ pub trait Request: Debug + DeserializeOwned + Serialize + Sized + Send { /// Serialize this request as JSON fn into_json(self) -> String { - serde_json::to_string_pretty(&Wrapper::new(self)).unwrap() + Wrapper::new(self).into_json() } } +/// Simple JSON-RPC requests which correlate with a single response from the +/// remote endpoint. +/// +/// An example of a request which is not simple would be the event subscription +/// request, which, on success, returns a [`Subscription`] and not just a +/// simple, singular response. +/// +/// [`Subscription`]: struct.Subscription.html +pub trait SimpleRequest: Request {} + /// JSON-RPC request wrapper (i.e. message envelope) #[derive(Debug, Deserialize, Serialize)] pub struct Wrapper { @@ -63,4 +73,8 @@ where pub fn params(&self) -> &R { &self.params } + + pub fn into_json(self) -> String { + serde_json::to_string_pretty(&self).unwrap() + } } diff --git a/rpc/src/utils.rs b/rpc/src/utils.rs new file mode 100644 index 000000000..76934a967 --- /dev/null +++ b/rpc/src/utils.rs @@ -0,0 +1,18 @@ +//! Utility methods for the Tendermint RPC crate. + +use getrandom::getrandom; + +/// Produce a string containing a UUID. +/// +/// Panics if random number generation fails. +pub fn uuid_str() -> String { + let mut bytes = [0; 16]; + getrandom(&mut bytes).expect("RNG failure!"); + + let uuid = uuid::Builder::from_bytes(bytes) + .set_variant(uuid::Variant::RFC4122) + .set_version(uuid::Version::Random) + .build(); + + uuid.to_string() +} diff --git a/tendermint/tests/integration.rs b/tendermint/tests/integration.rs index c428aa2dc..3c12585c9 100644 --- a/tendermint/tests/integration.rs +++ b/tendermint/tests/integration.rs @@ -186,7 +186,7 @@ mod rpc { #[tokio::test] #[ignore] async fn subscription_interface() { - let (mut client, driver) = WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) + let (client, driver) = WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) .await .unwrap(); let driver_handle = tokio::spawn(async move { driver.run().await }); @@ -203,7 +203,7 @@ mod rpc { } } - client.close().await.unwrap(); + client.close().unwrap(); let _ = driver_handle.await.unwrap(); } @@ -219,23 +219,24 @@ mod rpc { } async fn simple_transaction_subscription() { - let rpc_client = HttpClient::new("tcp://127.0.0.1:26657".parse().unwrap()).unwrap(); - let (mut subs_client, driver) = - WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) - .await - .unwrap(); + let (client, driver) = WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) + .await + .unwrap(); let driver_handle = tokio::spawn(async move { driver.run().await }); - let mut subs = subs_client.subscribe(EventType::Tx.into()).await.unwrap(); + let mut subs = client.subscribe(EventType::Tx.into()).await.unwrap(); // We use Id::uuid_v4() here as a quick hack to generate a random value. let mut expected_tx_values = (0..10_u32) .map(|_| Id::uuid_v4().to_string()) .collect::>(); let broadcast_tx_values = expected_tx_values.clone(); + // We can clone the WebSocket client, because it's just a handle to the + // driver. + let inner_client = client.clone(); tokio::spawn(async move { for (tx_count, val) in broadcast_tx_values.into_iter().enumerate() { let tx = format!("tx{}={}", tx_count, val); - rpc_client + inner_client .broadcast_tx_async(Transaction::from(tx.into_bytes())) .await .unwrap(); @@ -283,22 +284,17 @@ mod rpc { } } - subs_client.close().await.unwrap(); + client.close().unwrap(); let _ = driver_handle.await.unwrap(); } async fn concurrent_subscriptions() { - let rpc_client = HttpClient::new("tcp://127.0.0.1:26657".parse().unwrap()).unwrap(); - let (mut subs_client, driver) = - WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) - .await - .unwrap(); - let driver_handle = tokio::spawn(async move { driver.run().await }); - let new_block_subs = subs_client - .subscribe(EventType::NewBlock.into()) + let (client, driver) = WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) .await .unwrap(); - let tx_subs = subs_client.subscribe(EventType::Tx.into()).await.unwrap(); + let driver_handle = tokio::spawn(async move { driver.run().await }); + let new_block_subs = client.subscribe(EventType::NewBlock.into()).await.unwrap(); + let tx_subs = client.subscribe(EventType::Tx.into()).await.unwrap(); // We use Id::uuid_v4() here as a quick hack to generate a random value. let mut expected_tx_values = (0..10_u32) @@ -307,10 +303,11 @@ mod rpc { let broadcast_tx_values = expected_tx_values.clone(); let mut expected_new_blocks = 5_i32; + let inner_client = client.clone(); tokio::spawn(async move { for (tx_count, val) in broadcast_tx_values.into_iter().enumerate() { let tx = format!("tx{}={}", tx_count, val); - rpc_client + inner_client .broadcast_tx_async(Transaction::from(tx.into_bytes())) .await .unwrap(); @@ -348,7 +345,7 @@ mod rpc { } } - subs_client.close().await.unwrap(); + client.close().unwrap(); let _ = driver_handle.await.unwrap(); } }