diff --git a/CHANGELOG.md b/CHANGELOG.md index 9916945b2..2e3673954 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,17 @@ ## Unreleased +### BREAKING CHANGES + +* `[tendermint-rpc]` The `SubscriptionClient` trait now requires a `close` + method, since it assumes that subscription clients will, in general, use + long-running connections. This should not, however, break any downstream + usage of the clients ([#820]) +* `[tendermint-rpc]` The `HttpClient` and `WebSocketClient` constructors now + take any input that can be converted to a `tendermint_rpc::Url`. This should + hopefully have minimal impact on projects using the code, but it might + require some minor code changes in some cases - see the crate docs for more + details ([#820]) + ### FEATURES * `[tendermint-abci]` Release minimal framework for building ABCI applications @@ -10,9 +22,15 @@ method at present, exclusively provides access to block verification. This does not include network access or the Light Client's bisection algorithm ([#812]) +* `[tendermint-rpc]` Support for secure connections (`https://` and `wss://`) + has been added to the Tendermint RPC clients, as well as support for HTTP + proxies for HTTP clients ([#820]) +* `[tendermint-rpc]` A `tendermint-rpc` CLI has been added to simplify + interaction with RPC endpoints from the command line ([#820]) [#794]: https://github.com/informalsystems/tendermint-rs/pull/794 [#812]: https://github.com/informalsystems/tendermint-rs/pull/812 +[#820]: https://github.com/informalsystems/tendermint-rs/pull/820 ## v0.18.1 diff --git a/abci/Cargo.toml b/abci/Cargo.toml index 4318ad2c4..f930db86a 100644 --- a/abci/Cargo.toml +++ b/abci/Cargo.toml @@ -25,7 +25,7 @@ binary = [ "structopt", "tracing-subscriber" ] bytes = "1.0" eyre = "0.6" prost = "0.7" -tendermint-proto = { version = "0.18.0", path = "../proto" } +tendermint-proto = { version = "0.18.1", path = "../proto" } thiserror = "1.0" tracing = "0.1" diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 4cfa8d4d5..2320291e4 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -23,16 +23,30 @@ description = """ [package.metadata.docs.rs] all-features = true +[[bin]] +name = "tendermint-rpc" +path = "src/client/bin/main.rs" +required-features = [ "cli" ] + [features] default = [] +cli = [ + "http-client", + "structopt", + "tracing-subscriber", + "websocket-client" +] http-client = [ "async-trait", "futures", "http", "hyper", + "hyper-proxy", + "hyper-rustls", "tokio/fs", "tokio/macros", - "tracing" + "tracing", + "url" ] secp256k1 = [ "tendermint/secp256k1" ] websocket-client = [ @@ -44,13 +58,18 @@ websocket-client = [ "tokio/macros", "tokio/sync", "tokio/time", - "tracing" + "tracing", + "url" ] [dependencies] bytes = "1.0" chrono = "0.4" getrandom = "0.1" +# TODO(thane): Use a released version once support for inverted patterns is released. +# See https://github.com/kevinmehall/rust-peg/pull/245 +peg = { git = "https://github.com/kevinmehall/rust-peg.git", rev = "ba6019539b2cf80289190cbb9537c94113b6b7d1" } +pin-project = "1.0.1" serde = { version = "1", features = [ "derive" ] } serde_bytes = "0.11" serde_json = "1" @@ -62,10 +81,14 @@ subtle-encoding = { version = "0.5", features = ["bech32-preview"] } walkdir = "2.3" async-trait = { version = "0.1", optional = true } -async-tungstenite = { version = "0.12", features = ["tokio-runtime"], optional = true } +async-tungstenite = { version = "0.12", features = ["tokio-runtime", "tokio-rustls"], optional = true } futures = { version = "0.3", optional = true } http = { version = "0.2", optional = true } -hyper = { version = "0.14", optional = true, features = ["client", "http1", "tcp"] } +hyper = { version = "0.14", optional = true, features = ["client", "http1", "http2", "tcp"] } +hyper-proxy = { version = "0.9", optional = true } +hyper-rustls = { version = "0.22.1", optional = true } +structopt = { version = "0.3", optional = true } tokio = { version = "1.0", optional = true } tracing = { version = "0.1", optional = true } -pin-project = "1.0.1" +tracing-subscriber = { version = "0.2", optional = true } +url = { version = "2.2", optional = true } diff --git a/rpc/README.md b/rpc/README.md index f35f68f77..50ba0584b 100644 --- a/rpc/README.md +++ b/rpc/README.md @@ -21,16 +21,72 @@ This crate optionally provides access to different types of RPC client functionality and different client transports based on which features you select when using it. -Two features are provided at present. +Several client-related features are provided at present: * `http-client` - Provides `HttpClient`, which is a basic RPC client that - interacts with remote Tendermint nodes via **JSON-RPC over HTTP**. This - client does not provide `Event` subscription functionality. See the - [Tendermint RPC] for more details. + interacts with remote Tendermint nodes via **JSON-RPC over HTTP or + HTTPS**. This client does not provide `Event` subscription + functionality. See the [Tendermint RPC] for more details. * `websocket-client` - Provides `WebSocketClient`, which provides full - client functionality, including general RPC functionality (such as that - provided by `HttpClient`) as well as `Event` subscription - functionality. + client functionality, including general RPC functionality as well as + `Event`] subscription functionality. Can be used over secure + (`wss://`) and unsecure (`ws://`) connections. + +### CLI + +A `tendermint-rpc` console application is provided for testing/experimentation +purposes. To build this application: + +```bash +# From the tendermint-rpc crate's directory +cd rpc +cargo build --bin tendermint-rpc --features cli + +# To run directly and show usage information +cargo run --bin tendermint-rpc --features cli -- --help + +# To install the binary to your Cargo binaries path +# (should be globally accessible) +cargo install --bin tendermint-rpc --features cli --path . +``` + +The application sends its logs to **stderr** and its output to **stdout**, so +it's relatively easy to capture RPC output. + +**Usage examples:** (assuming you've installed the binary) + +```bash +# Check which RPC commands/endpoints are supported. +tendermint-rpc --help + +# Query the status of the Tendermint node bound to tcp://127.0.0.1:26657 +tendermint-rpc status + +# Submit a transaction to the key/value store ABCI app via a Tendermint node +# bound to tcp://127.0.0.1:26657 +tendermint-rpc broadcast-tx-async somekey=somevalue + +# Query the value associated with key "somekey" (still assuming a key/value +# store ABCI app) +tendermint-rpc abci-query somekey + +# To use an HTTP/S proxy to access your RPC endpoint +tendermint-rpc --proxy-url http://yourproxy:8080 abci-query somekey + +# To set your HTTP/S proxy for multiple subsequent queries +export HTTP_PROXY=http://yourproxy:8080 +tendermint-rpc abci-query somekey + +# Subscribe to receive new blocks (must use the WebSocket endpoint) +# Prints out all incoming events +tendermint-rpc -u ws://127.0.0.1:26657/websocket subscribe "tm.event='NewBlock'" + +# If you want to execute a number of queries against a specific endpoint and +# don't feel like re-typing the URL over and over again, just set the +# TENDERMINT_RPC_URL environment variable +export TENDERMINT_RPC_URL=ws://127.0.0.1:26657/websocket +tendermint-rpc subscribe "tm.event='Tx'" +``` ### Mock Clients diff --git a/rpc/src/client.rs b/rpc/src/client.rs index 5ad9a1cf4..39dab71b5 100644 --- a/rpc/src/client.rs +++ b/rpc/src/client.rs @@ -8,9 +8,9 @@ mod transport; pub use transport::mock::{MockClient, MockRequestMatcher, MockRequestMethodMatcher}; #[cfg(feature = "http-client")] -pub use transport::http::HttpClient; +pub use transport::http::{HttpClient, HttpClientUrl}; #[cfg(feature = "websocket-client")] -pub use transport::websocket::{WebSocketClient, WebSocketClientDriver}; +pub use transport::websocket::{WebSocketClient, WebSocketClientDriver, WebSocketClientUrl}; use crate::endpoint::*; use crate::query::Query; @@ -104,8 +104,8 @@ pub trait Client { self.perform(broadcast::tx_sync::Request::new(tx)).await } - /// `/broadcast_tx_sync`: broadcast a transaction, returning the response - /// from `CheckTx`. + /// `/broadcast_tx_commit`: broadcast a transaction, returning the response + /// from `DeliverTx`. async fn broadcast_tx_commit(&self, tx: Transaction) -> Result { self.perform(broadcast::tx_commit::Request::new(tx)).await } diff --git a/rpc/src/client/bin/main.rs b/rpc/src/client/bin/main.rs new file mode 100644 index 000000000..ed90c5d00 --- /dev/null +++ b/rpc/src/client/bin/main.rs @@ -0,0 +1,400 @@ +//! CLI for performing simple interactions against a Tendermint node's RPC. + +use futures::StreamExt; +use std::str::FromStr; +use std::time::Duration; +use structopt::StructOpt; +use tendermint::abci::{Path, Transaction}; +use tendermint_rpc::query::Query; +use tendermint_rpc::{ + Client, Error, HttpClient, Order, Result, Scheme, Subscription, SubscriptionClient, Url, + WebSocketClient, +}; +use tracing::level_filters::LevelFilter; +use tracing::{error, info, warn}; + +/// CLI for performing simple interactions against a Tendermint node's RPC. +/// +/// Supports HTTP, HTTPS, WebSocket and secure WebSocket (wss://) URLs. +#[derive(Debug, StructOpt)] +struct Opt { + /// The URL of the Tendermint node's RPC endpoint. + #[structopt( + short, + long, + default_value = "http://127.0.0.1:26657", + env = "TENDERMINT_RPC_URL" + )] + url: Url, + + /// An optional HTTP/S proxy through which to submit requests to the + /// Tendermint node's RPC endpoint. Only available for HTTP/HTTPS endpoints + /// (i.e. WebSocket proxies are not supported). + #[structopt(long)] + proxy_url: Option, + + /// Increase output logging verbosity to DEBUG level. + #[structopt(short, long)] + verbose: bool, + + #[structopt(subcommand)] + req: Request, +} + +#[derive(Debug, StructOpt)] +enum Request { + #[structopt(flatten)] + ClientRequest(ClientRequest), + /// Subscribe to receive events produced by a specific query. + Subscribe { + /// The query against which events will be matched. + query: Query, + /// The maximum number of events to receive before terminating. + #[structopt(long)] + max_events: Option, + /// The maximum amount of time (in seconds) to listen for events before + /// terminating. + #[structopt(long)] + max_time: Option, + }, +} + +#[derive(Debug, StructOpt)] +enum ClientRequest { + /// Request information about the ABCI application. + AbciInfo, + /// Query the ABCI application. + AbciQuery { + /// The path for which you want to query, if any. + #[structopt(long)] + path: Option, + /// The data for which you want to query. + data: String, + /// The block height at which to query. + #[structopt(long)] + height: Option, + #[structopt(long)] + prove: bool, + }, + /// Get a block at a given height. + Block { height: u32 }, + /// Get block headers between two heights (min <= height <= max). + Blockchain { + /// The minimum height + min: u32, + /// The maximum height. + max: u32, + }, + /// Request the block results at a given height. + BlockResults { + /// The height of the block you want. + height: u32, + }, + // TODO(thane): Implement evidence broadcast + /// Broadcast a transaction asynchronously (without waiting for the ABCI + /// app to check it or for it to be committed). + BroadcastTxAsync { + /// The transaction to broadcast. + tx: String, + }, + /// Broadcast a transaction, waiting for it to be fully committed before + /// returning. + BroadcastTxCommit { + /// The transaction to broadcast. + tx: String, + }, + /// Broadcast a transaction synchronously (waiting for the ABCI app to + /// check it, but not for it to be committed). + BroadcastTxSync { + /// The transaction to broadcast. + tx: String, + }, + /// Get the commit for the given height. + Commit { height: u32 }, + /// Get the current consensus state. + ConsensusState, + /// Get the node's genesis data. + Genesis, + /// Get the node's health. + Health, + /// Request the latest block. + LatestBlock, + /// Request the results for the latest block. + LatestBlockResults, + /// Request the latest commit. + LatestCommit, + /// Obtain information about the P2P stack and other network connections. + NetInfo, + /// Get Tendermint status (node info, public key, latest block hash, etc.). + Status, + /// Search for transactions with their results. + TxSearch { + /// The query against which transactions should be matched. + query: Query, + #[structopt(long, default_value = "1")] + page: u32, + #[structopt(long, default_value = "10")] + per_page: u8, + #[structopt(long, default_value = "asc")] + order: Order, + #[structopt(long)] + prove: bool, + }, + /// Get the validators at the given height. + Validators { height: u32 }, +} + +#[tokio::main] +async fn main() { + let opt: Opt = Opt::from_args(); + let log_level = if opt.verbose { + LevelFilter::DEBUG + } else { + LevelFilter::INFO + }; + // All our logging goes to stderr, so our output can go to stdout + tracing_subscriber::fmt() + .with_max_level(log_level) + .with_writer(std::io::stderr) + .init(); + + let proxy_url = match get_http_proxy_url(opt.url.scheme(), opt.proxy_url.clone()) { + Ok(u) => u, + Err(e) => { + error!("Failed to obtain proxy URL: {}", e); + std::process::exit(-1); + } + }; + let result = match opt.url.scheme() { + Scheme::Http | Scheme::Https => http_request(opt.url, proxy_url, opt.req).await, + Scheme::WebSocket | Scheme::SecureWebSocket => match opt.proxy_url { + Some(_) => Err(Error::invalid_params( + "proxies are only supported for use with HTTP clients at present", + )), + None => websocket_request(opt.url, opt.req).await, + }, + }; + if let Err(e) = result { + error!("Failed: {}", e); + std::process::exit(-1); + } +} + +// Retrieve the proxy URL with precedence: +// 1. If supplied, that's the proxy URL used. +// 2. If not supplied, but environment variable HTTP_PROXY or HTTPS_PROXY are +// supplied, then use the appropriate variable for the URL in question. +fn get_http_proxy_url(url_scheme: Scheme, proxy_url: Option) -> Result> { + match proxy_url { + Some(u) => Ok(Some(u)), + None => match url_scheme { + Scheme::Http => std::env::var("HTTP_PROXY").ok(), + Scheme::Https => std::env::var("HTTPS_PROXY") + .ok() + .or_else(|| std::env::var("HTTP_PROXY").ok()), + _ => { + if std::env::var("HTTP_PROXY").is_ok() || std::env::var("HTTPS_PROXY").is_ok() { + warn!( + "Ignoring HTTP proxy environment variables for non-HTTP client connection" + ); + } + None + } + } + .map(|u| u.parse()) + .transpose(), + } +} + +async fn http_request(url: Url, proxy_url: Option, req: Request) -> Result<()> { + let client = match proxy_url { + Some(proxy_url) => { + info!( + "Using HTTP client with proxy {} to submit request to {}", + proxy_url, url + ); + HttpClient::new_with_proxy(url, proxy_url) + } + None => { + info!("Using HTTP client to submit request to: {}", url); + HttpClient::new(url) + } + }?; + + match req { + Request::ClientRequest(r) => client_request(&client, r).await, + _ => Err(Error::invalid_params("HTTP/S clients do not support subscription capabilities (please use the WebSocket client instead)")) + } +} + +async fn websocket_request(url: Url, req: Request) -> Result<()> { + info!("Using WebSocket client to submit request to: {}", url); + let (client, driver) = WebSocketClient::new(url).await?; + let driver_hdl = tokio::spawn(async move { driver.run().await }); + + let result = match req { + Request::ClientRequest(r) => client_request(&client, r).await, + Request::Subscribe { + query, + max_events, + max_time, + } => subscription_client_request(&client, query, max_events, max_time).await, + }; + + client.close()?; + driver_hdl + .await + .map_err(|e| Error::client_internal_error(e.to_string()))??; + result +} + +async fn client_request(client: &C, req: ClientRequest) -> Result<()> +where + C: Client + Sync, +{ + let result = match req { + ClientRequest::AbciInfo => serde_json::to_string_pretty(&client.abci_info().await?)?, + ClientRequest::AbciQuery { + path, + data, + height, + prove, + } => serde_json::to_string_pretty( + &client + .abci_query( + path.map(|s| Path::from_str(&s)).transpose()?, + data, + height.map(Into::into), + prove, + ) + .await?, + )?, + ClientRequest::Block { height } => { + serde_json::to_string_pretty(&client.block(height).await?)? + } + ClientRequest::Blockchain { min, max } => { + serde_json::to_string_pretty(&client.blockchain(min, max).await?)? + } + ClientRequest::BlockResults { height } => { + serde_json::to_string_pretty(&client.block_results(height).await?)? + } + ClientRequest::BroadcastTxAsync { tx } => serde_json::to_string_pretty( + &client + .broadcast_tx_async(Transaction::from(tx.into_bytes())) + .await?, + )?, + ClientRequest::BroadcastTxCommit { tx } => serde_json::to_string_pretty( + &client + .broadcast_tx_commit(Transaction::from(tx.into_bytes())) + .await?, + )?, + ClientRequest::BroadcastTxSync { tx } => serde_json::to_string_pretty( + &client + .broadcast_tx_sync(Transaction::from(tx.into_bytes())) + .await?, + )?, + ClientRequest::Commit { height } => { + serde_json::to_string_pretty(&client.commit(height).await?)? + } + ClientRequest::LatestBlock => serde_json::to_string_pretty(&client.latest_block().await?)?, + ClientRequest::LatestBlockResults => { + serde_json::to_string_pretty(&client.latest_block_results().await?)? + } + ClientRequest::LatestCommit => { + serde_json::to_string_pretty(&client.latest_commit().await?)? + } + ClientRequest::ConsensusState => { + serde_json::to_string_pretty(&client.consensus_state().await?)? + } + ClientRequest::Genesis => serde_json::to_string_pretty(&client.genesis().await?)?, + ClientRequest::Health => serde_json::to_string_pretty(&client.health().await?)?, + ClientRequest::NetInfo => serde_json::to_string_pretty(&client.net_info().await?)?, + ClientRequest::Status => serde_json::to_string_pretty(&client.status().await?)?, + ClientRequest::TxSearch { + query, + page, + per_page, + order, + prove, + } => serde_json::to_string_pretty( + &client + .tx_search(query, prove, page, per_page, order) + .await?, + )?, + ClientRequest::Validators { height } => { + serde_json::to_string_pretty(&client.validators(height).await?)? + } + }; + println!("{}", result); + Ok(()) +} + +async fn subscription_client_request( + client: &C, + query: Query, + max_events: Option, + max_time: Option, +) -> Result<()> +where + C: SubscriptionClient, +{ + info!("Creating subcription for query: {}", query); + let subs = client.subscribe(query).await?; + match max_time { + Some(secs) => recv_events_with_timeout(subs, max_events, secs).await, + None => recv_events(subs, max_events).await, + } +} + +async fn recv_events_with_timeout( + mut subs: Subscription, + max_events: Option, + timeout_secs: u32, +) -> Result<()> { + let timeout = tokio::time::sleep(Duration::from_secs(timeout_secs as u64)); + let mut event_count = 0u64; + tokio::pin!(timeout); + loop { + tokio::select! { + result_opt = subs.next() => { + let result = match result_opt { + Some(r) => r, + None => { + info!("The server terminated the subscription"); + return Ok(()); + } + }; + let event = result?; + println!("{}", serde_json::to_string_pretty(&event)?); + event_count += 1; + if let Some(me) = max_events { + if event_count >= (me as u64) { + info!("Reached maximum number of events: {}", me); + return Ok(()); + } + } + } + _ = &mut timeout => { + info!("Reached event receive timeout of {} seconds", timeout_secs); + return Ok(()) + } + } + } +} + +async fn recv_events(mut subs: Subscription, max_events: Option) -> Result<()> { + let mut event_count = 0u64; + while let Some(result) = subs.next().await { + let event = result?; + println!("{}", serde_json::to_string_pretty(&event)?); + event_count += 1; + if let Some(me) = max_events { + if event_count >= (me as u64) { + info!("Reached maximum number of events: {}", me); + return Ok(()); + } + } + } + info!("The server terminated the subscription"); + Ok(()) +} diff --git a/rpc/src/client/subscription.rs b/rpc/src/client/subscription.rs index 07fb2805c..4d69d601d 100644 --- a/rpc/src/client/subscription.rs +++ b/rpc/src/client/subscription.rs @@ -12,8 +12,6 @@ use std::pin::Pin; /// A client that exclusively provides [`Event`] subscription capabilities, /// without any other RPC method support. -/// -/// [`Event`]: event/struct.Event.html #[async_trait] pub trait SubscriptionClient { /// `/subscribe`: subscribe to receive events produced by the given query. @@ -27,10 +25,12 @@ pub trait SubscriptionClient { /// no longer have access to the individual `Subscription` instances to /// terminate them separately. /// - /// [`Subscription`]: struct.Subscription.html - /// [`Query`]: struct.Query.html /// [`select_all`]: https://docs.rs/futures/*/futures/stream/fn.select_all.html async fn unsubscribe(&self, query: Query) -> Result<()>; + + /// Subscription clients will usually have long-running underlying + /// transports that will need to be closed at some point. + fn close(self) -> Result<()>; } pub(crate) type SubscriptionTx = ChannelTx>; @@ -61,8 +61,6 @@ pub(crate) type SubscriptionRx = ChannelRx>; /// } /// } /// ``` -/// -/// [`Event`]: ./event/struct.Event.html #[pin_project] #[derive(Debug)] pub struct Subscription { diff --git a/rpc/src/client/transport.rs b/rpc/src/client/transport.rs index 3aa8c649b..aa00e6241 100644 --- a/rpc/src/client/transport.rs +++ b/rpc/src/client/transport.rs @@ -2,7 +2,6 @@ pub mod mock; mod router; -mod utils; #[cfg(feature = "http-client")] pub mod http; diff --git a/rpc/src/client/transport/http.rs b/rpc/src/client/transport/http.rs index d773dff5c..29e4b9604 100644 --- a/rpc/src/client/transport/http.rs +++ b/rpc/src/client/transport/http.rs @@ -1,19 +1,20 @@ //! HTTP-based transport for Tendermint RPC Client. +use crate::client::Client; +use crate::{Error, Result, Scheme, SimpleRequest, Url}; use async_trait::async_trait; -use hyper::body::Buf; -use hyper::header; - +use std::convert::{TryFrom, TryInto}; +use std::str::FromStr; use tendermint::net; -use crate::client::transport::utils::get_tcp_host_port; -use crate::{Client, Error, Response, Result, SimpleRequest}; -use std::io::Read; - -/// A JSON-RPC/HTTP Tendermint RPC client (implements [`Client`]). +/// A JSON-RPC/HTTP Tendermint RPC client (implements [`crate::Client`]). +/// +/// Supports both HTTP and HTTPS connections to Tendermint RPC endpoints, and +/// allows for the use of HTTP proxies (see [`HttpClient::new_with_proxy`] for +/// details). /// -/// Does not provide [`Event`] subscription facilities (see [`WebSocketClient`] -/// for a client that does provide [`Event`] subscription facilities). +/// Does not provide [`crate::event::Event`] subscription facilities (see +/// [`crate::WebSocketClient`] for a client that does). /// /// ## Examples /// @@ -22,7 +23,7 @@ use std::io::Read; /// /// #[tokio::main] /// async fn main() { -/// let client = HttpClient::new("tcp://127.0.0.1:26657".parse().unwrap()) +/// let client = HttpClient::new("http://127.0.0.1:26657") /// .unwrap(); /// /// let abci_info = client.abci_info() @@ -32,14 +33,50 @@ use std::io::Read; /// println!("Got ABCI info: {:?}", abci_info); /// } /// ``` -/// -/// [`Client`]: trait.Client.html -/// [`Event`]: ./event/struct.Event.html -/// [`WebSocketClient`]: struct.WebSocketClient.html #[derive(Debug, Clone)] pub struct HttpClient { - host: String, - port: u16, + inner: sealed::HttpClient, +} + +impl HttpClient { + /// Construct a new Tendermint RPC HTTP/S client connecting to the given + /// URL. + pub fn new(url: U) -> Result + where + U: TryInto, + { + let url = url.try_into()?; + Ok(Self { + inner: if url.0.is_secure() { + sealed::HttpClient::new_https(url.try_into()?) + } else { + sealed::HttpClient::new_http(url.try_into()?) + }, + }) + } + + /// Construct a new Tendermint RPC HTTP/S client connecting to the given + /// URL, but via the specified proxy's URL. + /// + /// If the RPC endpoint is secured (HTTPS), the proxy will automatically + /// attempt to connect using the [HTTP CONNECT] method. + /// + /// [HTTP CONNECT]: https://en.wikipedia.org/wiki/HTTP_tunnel + pub fn new_with_proxy(url: U, proxy_url: P) -> Result + where + U: TryInto, + P: TryInto, + { + let url = url.try_into()?; + let proxy_url = proxy_url.try_into()?; + Ok(Self { + inner: if proxy_url.0.is_secure() { + sealed::HttpClient::new_https_proxy(url.try_into()?, proxy_url.try_into()?)? + } else { + sealed::HttpClient::new_http_proxy(url.try_into()?, proxy_url.try_into()?)? + }, + }) + } } #[async_trait] @@ -48,41 +85,203 @@ impl Client for HttpClient { where R: SimpleRequest, { - let request_body = request.into_json(); + self.inner.perform(request).await + } +} + +/// A URL limited to use with HTTP clients. +/// +/// Facilitates useful type conversions and inferences. +#[derive(Debug, Clone)] +pub struct HttpClientUrl(Url); + +impl TryFrom for HttpClientUrl { + type Error = Error; + + fn try_from(value: Url) -> Result { + match value.scheme() { + Scheme::Http | Scheme::Https => Ok(Self(value)), + _ => Err(Error::invalid_params(&format!( + "cannot use URL {} with HTTP clients", + value + ))), + } + } +} + +impl FromStr for HttpClientUrl { + type Err = Error; + + fn from_str(s: &str) -> Result { + let url: Url = s.parse()?; + url.try_into() + } +} + +impl TryFrom<&str> for HttpClientUrl { + type Error = Error; + + fn try_from(value: &str) -> Result { + value.parse() + } +} + +impl TryFrom for HttpClientUrl { + type Error = Error; + + fn try_from(value: net::Address) -> Result { + match value { + net::Address::Tcp { + peer_id: _, + host, + port, + } => format!("http://{}:{}", host, port).parse(), + net::Address::Unix { .. } => Err(Error::invalid_params( + "only TCP-based node addresses are supported", + )), + } + } +} - let mut request = hyper::Request::builder() - .method("POST") - .uri(&format!("http://{}:{}/", self.host, self.port)) - .body(hyper::Body::from(request_body.into_bytes()))?; +impl TryFrom for hyper::Uri { + type Error = Error; + fn try_from(value: HttpClientUrl) -> Result { + Ok(value.0.to_string().parse()?) + } +} + +mod sealed { + use crate::{Error, Response, Result, SimpleRequest}; + use hyper::body::Buf; + use hyper::client::connect::Connect; + use hyper::client::HttpConnector; + use hyper::{header, Uri}; + use hyper_proxy::{Intercept, Proxy, ProxyConnector}; + use hyper_rustls::HttpsConnector; + use std::io::Read; + + /// A wrapper for a `hyper`-based client, generic over the connector type. + #[derive(Debug, Clone)] + pub struct HyperClient { + uri: Uri, + inner: hyper::Client, + } + + impl HyperClient { + pub fn new(uri: Uri, inner: hyper::Client) -> Self { + Self { uri, inner } + } + } + + impl HyperClient + where + C: Connect + Clone + Send + Sync + 'static, + { + pub async fn perform(&self, request: R) -> Result + where + R: SimpleRequest, + { + let request = self.build_request(request)?; + let response = self.inner.request(request).await?; + let response_body = response_to_string(response).await?; + tracing::debug!("Incoming response: {}", response_body); + R::Response::from_string(&response_body) + } + } + + impl HyperClient { + /// Build a request using the given Tendermint RPC request. + pub fn build_request( + &self, + request: R, + ) -> Result> { + let request_body = request.into_json(); + + let mut request = hyper::Request::builder() + .method("POST") + .uri(&self.uri) + .body(hyper::Body::from(request_body.into_bytes()))?; + + { + let headers = request.headers_mut(); + headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); + headers.insert( + header::USER_AGENT, + format!("tendermint.rs/{}", env!("CARGO_PKG_VERSION")) + .parse() + .unwrap(), + ); + } + + Ok(request) + } + } + + /// We offer several variations of `hyper`-based client. + /// + /// Here we erase the type signature of the underlying `hyper`-based + /// client, allowing the higher-level HTTP client to operate via HTTP or + /// HTTPS, and with or without a proxy. + #[derive(Debug, Clone)] + pub enum HttpClient { + Http(HyperClient), + Https(HyperClient>), + HttpProxy(HyperClient>), + HttpsProxy(HyperClient>>), + } + + impl HttpClient { + pub fn new_http(uri: Uri) -> Self { + Self::Http(HyperClient::new(uri, hyper::Client::new())) + } + + pub fn new_https(uri: Uri) -> Self { + Self::Https(HyperClient::new( + uri, + hyper::Client::builder().build(HttpsConnector::with_native_roots()), + )) + } + + pub fn new_http_proxy(uri: Uri, proxy_uri: Uri) -> Result { + let proxy = Proxy::new(Intercept::All, proxy_uri); + let proxy_connector = ProxyConnector::from_proxy(HttpConnector::new(), proxy)?; + Ok(Self::HttpProxy(HyperClient::new( + uri, + hyper::Client::builder().build(proxy_connector), + ))) + } + + pub fn new_https_proxy(uri: Uri, proxy_uri: Uri) -> Result { + let proxy = Proxy::new(Intercept::All, proxy_uri); + let proxy_connector = + ProxyConnector::from_proxy(HttpsConnector::with_native_roots(), proxy)?; + Ok(Self::HttpsProxy(HyperClient::new( + uri, + hyper::Client::builder().build(proxy_connector), + ))) + } + + pub async fn perform(&self, request: R) -> Result + where + R: SimpleRequest, { - let headers = request.headers_mut(); - headers.insert(header::CONTENT_TYPE, "application/json".parse().unwrap()); - headers.insert( - header::USER_AGENT, - format!("tendermint.rs/{}", env!("CARGO_PKG_VERSION")) - .parse() - .unwrap(), - ); + match self { + HttpClient::Http(c) => c.perform(request).await, + HttpClient::Https(c) => c.perform(request).await, + HttpClient::HttpProxy(c) => c.perform(request).await, + HttpClient::HttpsProxy(c) => c.perform(request).await, + } } + } - let http_client = hyper::Client::new(); - let response = http_client.request(request).await?; + async fn response_to_string(response: hyper::Response) -> Result { let mut response_body = String::new(); hyper::body::aggregate(response.into_body()) .await? .reader() .read_to_string(&mut response_body) .map_err(|_| Error::client_internal_error("failed to read response body to string"))?; - tracing::debug!("Incoming response: {}", response_body); - R::Response::from_string(&response_body) - } -} - -impl HttpClient { - /// Create a new JSON-RPC/HTTP Tendermint RPC client. - pub fn new(address: net::Address) -> Result { - let (host, port) = get_tcp_host_port(address)?; - Ok(HttpClient { host, port }) + Ok(response_body) } } diff --git a/rpc/src/client/transport/mock.rs b/rpc/src/client/transport/mock.rs index 04bad190e..9bd46fc6c 100644 --- a/rpc/src/client/transport/mock.rs +++ b/rpc/src/client/transport/mock.rs @@ -110,6 +110,10 @@ impl SubscriptionClient for MockClient { .send(DriverCommand::Unsubscribe { query, result_tx })?; result_rx.recv().await.unwrap() } + + fn close(self) -> Result<()> { + Ok(()) + } } #[derive(Debug)] diff --git a/rpc/src/client/transport/utils.rs b/rpc/src/client/transport/utils.rs deleted file mode 100644 index 16419b1e0..000000000 --- a/rpc/src/client/transport/utils.rs +++ /dev/null @@ -1,16 +0,0 @@ -//! Client transport-related utilities. - -use crate::{Error, Result}; -use tendermint::net; - -/// Convenience method to extract the host and port associated with the given -/// address, but only if it's a TCP address (it fails otherwise). -pub(crate) fn get_tcp_host_port(address: net::Address) -> Result<(String, u16)> { - match address { - net::Address::Tcp { host, port, .. } => Ok((host, port)), - other => Err(Error::invalid_params(&format!( - "invalid RPC address: {:?}", - other - ))), - } -} diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index 750aba6e5..7e728ef35 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -1,20 +1,18 @@ //! WebSocket-based clients for accessing Tendermint RPC functionality. use crate::client::subscription::SubscriptionTx; -use crate::client::sync::{unbounded, ChannelRx, ChannelTx}; +use crate::client::sync::{ChannelRx, ChannelTx}; use crate::client::transport::router::{PublishResult, SubscriptionRouter}; -use crate::client::transport::utils::get_tcp_host_port; use crate::endpoint::{subscribe, unsubscribe}; use crate::event::Event; use crate::query::Query; use crate::request::Wrapper; -use crate::utils::uuid_str; use crate::{ - response, Client, Error, Id, Request, Response, Result, SimpleRequest, Subscription, - SubscriptionClient, + response, Client, Error, Id, Request, Response, Result, Scheme, SimpleRequest, Subscription, + SubscriptionClient, Url, }; use async_trait::async_trait; -use async_tungstenite::tokio::{connect_async, TokioAdapter}; +use async_tungstenite::tokio::ConnectStream; use async_tungstenite::tungstenite::protocol::frame::coding::CloseCode; use async_tungstenite::tungstenite::protocol::CloseFrame; use async_tungstenite::tungstenite::Message; @@ -23,9 +21,10 @@ use futures::{SinkExt, StreamExt}; use serde::{Deserialize, Serialize}; use std::borrow::Cow; use std::collections::HashMap; +use std::convert::{TryFrom, TryInto}; use std::ops::Add; +use std::str::FromStr; use tendermint::net; -use tokio::net::TcpStream; use tokio::time::{Duration, Instant}; use tracing::{debug, error}; @@ -46,13 +45,13 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// (including [`Event`] subscription) over a WebSocket connection. /// /// The `WebSocketClient` itself is effectively just a handle to its driver -/// (see the [`new`] method). The driver is the component of the client that -/// actually interacts with the remote RPC over the WebSocket connection. -/// The `WebSocketClient` can therefore be cloned into different asynchronous -/// contexts, effectively allowing for asynchronous access to the driver. +/// The driver is the component of the client that actually interacts with the +/// remote RPC over the WebSocket connection. The `WebSocketClient` can +/// therefore be cloned into different asynchronous contexts, effectively +/// allowing for asynchronous access to the driver. /// /// It is the caller's responsibility to spawn an asynchronous task in which to -/// execute the driver's [`run`] method. See the example below. +/// execute the [`WebSocketClientDriver::run`] method. See the example below. /// /// Dropping [`Subscription`]s will automatically terminate them (the /// `WebSocketClientDriver` detects a disconnected channel and removes the @@ -72,7 +71,7 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// /// The WebSocket client implements a keep-alive mechanism whereby it sends a /// PING message to the server every 27 seconds, matching the PING cadence of -/// the Tendermint server (see [this code](tendermint-websocket-ping) for +/// the Tendermint server (see [this code][tendermint-websocket-ping] for /// details). /// /// This is not configurable at present. @@ -87,7 +86,7 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// /// #[tokio::main] /// async fn main() { -/// let (client, driver) = WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) +/// let (client, driver) = WebSocketClient::new("ws://127.0.0.1:26657/websocket") /// .await /// .unwrap(); /// let driver_handle = tokio::spawn(async move { driver.run().await }); @@ -120,100 +119,319 @@ const PING_INTERVAL: Duration = Duration::from_secs((RECV_TIMEOUT_SECONDS * 9) / /// } /// ``` /// -/// [`Event`]: ./event/struct.Event.html -/// [`close`]: struct.WebSocketClient.html#method.close -/// [`new`]: struct.WebSocketClient.html#method.new -/// [`run`]: struct.WebSocketClientDriver.html#method.run -/// [`Subscription`]: struct.Subscription.html /// [tendermint-websocket-ping]: https://github.com/tendermint/tendermint/blob/309e29c245a01825fc9630103311fd04de99fa5e/rpc/jsonrpc/server/ws_handler.go#L28 #[derive(Debug, Clone)] pub struct WebSocketClient { - cmd_tx: ChannelTx, + inner: sealed::WebSocketClient, } impl WebSocketClient { - /// Construct a WebSocket client. Immediately attempts to open a WebSocket - /// connection to the node with the given address. + /// Construct a new WebSocket-based client connecting to the given + /// Tendermint node's RPC endpoint. /// - /// On success, this returns both a client handle (a `WebSocketClient` - /// instance) as well as the WebSocket connection driver. The execution of - /// this driver becomes the responsibility of the client owner, and must be - /// executed in a separate asynchronous context to the client to ensure it - /// doesn't block the client. - pub async fn new(address: net::Address) -> Result<(Self, WebSocketClientDriver)> { - let (host, port) = get_tcp_host_port(address)?; - let (stream, _response) = - connect_async(&format!("ws://{}:{}/websocket", &host, port)).await?; - let (cmd_tx, cmd_rx) = unbounded(); - let driver = WebSocketClientDriver::new(stream, cmd_rx); - Ok((Self { cmd_tx }, driver)) - } - - fn send_cmd(&self, cmd: DriverCommand) -> Result<()> { - self.cmd_tx.send(cmd).map_err(|e| { - Error::client_internal_error(format!("failed to send command to client driver: {}", e)) - }) - } - - /// Signals to the driver that it must terminate. - pub fn close(self) -> Result<()> { - self.send_cmd(DriverCommand::Terminate) + /// Supports both `ws://` and `wss://` protocols. + pub async fn new(url: U) -> Result<(Self, WebSocketClientDriver)> + where + U: TryInto, + { + let url = url.try_into()?; + let (inner, driver) = if url.0.is_secure() { + sealed::WebSocketClient::new_secure(url.0).await? + } else { + sealed::WebSocketClient::new_unsecure(url.0).await? + }; + Ok((Self { inner }, driver)) } } #[async_trait] impl Client for WebSocketClient { - async fn perform(&self, request: R) -> Result + async fn perform(&self, request: R) -> Result<::Response> where R: SimpleRequest, { - let wrapper = Wrapper::new(request); - let id = wrapper.id().clone().to_string(); - let wrapped_request = wrapper.into_json(); - let (response_tx, mut response_rx) = unbounded(); - self.send_cmd(DriverCommand::SimpleRequest(SimpleRequestCommand { - id, - wrapped_request, - response_tx, - }))?; - let response = response_rx.recv().await.ok_or_else(|| { - Error::client_internal_error("failed to hear back from WebSocket driver".to_string()) - })??; - tracing::debug!("Incoming response: {}", response); - R::Response::from_string(response) + self.inner.perform(request).await } } #[async_trait] impl SubscriptionClient for WebSocketClient { async fn subscribe(&self, query: Query) -> Result { - let (subscription_tx, subscription_rx) = unbounded(); - let (response_tx, mut response_rx) = unbounded(); - // By default we use UUIDs to differentiate subscriptions - let id = uuid_str(); - self.send_cmd(DriverCommand::Subscribe(SubscribeCommand { - id: id.to_string(), - query: query.to_string(), - subscription_tx, - response_tx, - }))?; - // Make sure our subscription request went through successfully. - let _ = response_rx.recv().await.ok_or_else(|| { - Error::client_internal_error("failed to hear back from WebSocket driver".to_string()) - })??; - Ok(Subscription::new(id, query, subscription_rx)) + self.inner.subscribe(query).await } async fn unsubscribe(&self, query: Query) -> Result<()> { - let (response_tx, mut response_rx) = unbounded(); - self.send_cmd(DriverCommand::Unsubscribe(UnsubscribeCommand { - query: query.to_string(), - response_tx, - }))?; - let _ = response_rx.recv().await.ok_or_else(|| { - Error::client_internal_error("failed to hear back from WebSocket driver".to_string()) - })??; - Ok(()) + self.inner.unsubscribe(query).await + } + + fn close(self) -> Result<()> { + self.inner.close() + } +} + +/// A URL limited to use with WebSocket clients. +/// +/// Facilitates useful type conversions and inferences. +#[derive(Debug, Clone)] +pub struct WebSocketClientUrl(Url); + +impl TryFrom for WebSocketClientUrl { + type Error = Error; + + fn try_from(value: Url) -> Result { + match value.scheme() { + Scheme::WebSocket | Scheme::SecureWebSocket => Ok(Self(value)), + _ => Err(Error::invalid_params(&format!( + "cannot use URL {} with WebSocket clients", + value + ))), + } + } +} + +impl FromStr for WebSocketClientUrl { + type Err = Error; + + fn from_str(s: &str) -> Result { + let url: Url = s.parse()?; + url.try_into() + } +} + +impl TryFrom<&str> for WebSocketClientUrl { + type Error = Error; + + fn try_from(value: &str) -> Result { + value.parse() + } +} + +impl TryFrom for WebSocketClientUrl { + type Error = Error; + + fn try_from(value: net::Address) -> Result { + match value { + net::Address::Tcp { + peer_id: _, + host, + port, + } => format!("ws://{}:{}/websocket", host, port).parse(), + net::Address::Unix { .. } => Err(Error::invalid_params( + "only TCP-based node addresses are supported", + )), + } + } +} + +mod sealed { + use super::{ + DriverCommand, SimpleRequestCommand, SubscribeCommand, UnsubscribeCommand, + WebSocketClientDriver, + }; + use crate::client::sync::{unbounded, ChannelTx}; + use crate::query::Query; + use crate::request::Wrapper; + use crate::utils::uuid_str; + use crate::{Error, Response, Result, SimpleRequest, Subscription, Url}; + use async_tungstenite::tokio::{connect_async, connect_async_with_tls_connector}; + use tracing::debug; + + /// Marker for the [`AsyncTungsteniteClient`] for clients operating over + /// unsecure connections. + #[derive(Debug, Clone)] + pub struct Unsecure; + + /// Marker for the [`AsyncTungsteniteClient`] for clients operating over + /// secure connections. + #[derive(Debug, Clone)] + pub struct Secure; + + /// An [`async-tungstenite`]-based WebSocket client. + /// + /// Different modes of operation (secure and unsecure) are facilitated by + /// different variants of this type. + /// + /// [`async-tungstenite`]: https://crates.io/crates/async-tungstenite + #[derive(Debug, Clone)] + pub struct AsyncTungsteniteClient { + cmd_tx: ChannelTx, + _client_type: std::marker::PhantomData, + } + + impl AsyncTungsteniteClient { + /// Construct a WebSocket client. Immediately attempts to open a WebSocket + /// connection to the node with the given address. + /// + /// On success, this returns both a client handle (a `WebSocketClient` + /// instance) as well as the WebSocket connection driver. The execution of + /// this driver becomes the responsibility of the client owner, and must be + /// executed in a separate asynchronous context to the client to ensure it + /// doesn't block the client. + pub async fn new(url: Url) -> Result<(Self, WebSocketClientDriver)> { + let url = url.to_string(); + debug!("Connecting to unsecure WebSocket endpoint: {}", url); + let (stream, _response) = connect_async(url).await?; + let (cmd_tx, cmd_rx) = unbounded(); + let driver = WebSocketClientDriver::new(stream, cmd_rx); + Ok(( + Self { + cmd_tx, + _client_type: Default::default(), + }, + driver, + )) + } + } + + impl AsyncTungsteniteClient { + /// Construct a WebSocket client. Immediately attempts to open a WebSocket + /// connection to the node with the given address, but over a secure + /// connection. + /// + /// On success, this returns both a client handle (a `WebSocketClient` + /// instance) as well as the WebSocket connection driver. The execution of + /// this driver becomes the responsibility of the client owner, and must be + /// executed in a separate asynchronous context to the client to ensure it + /// doesn't block the client. + pub async fn new(url: Url) -> Result<(Self, WebSocketClientDriver)> { + let url = url.to_string(); + debug!("Connecting to secure WebSocket endpoint: {}", url); + // Not supplying a connector means async_tungstenite will create the + // connector for us. + let (stream, _response) = connect_async_with_tls_connector(url, None).await?; + let (cmd_tx, cmd_rx) = unbounded(); + let driver = WebSocketClientDriver::new(stream, cmd_rx); + Ok(( + Self { + cmd_tx, + _client_type: Default::default(), + }, + driver, + )) + } + } + + impl AsyncTungsteniteClient { + fn send_cmd(&self, cmd: DriverCommand) -> Result<()> { + self.cmd_tx.send(cmd).map_err(|e| { + Error::client_internal_error(format!( + "failed to send command to client driver: {}", + e + )) + }) + } + + pub async fn perform(&self, request: R) -> Result + where + R: SimpleRequest, + { + let wrapper = Wrapper::new(request); + let id = wrapper.id().clone().to_string(); + let wrapped_request = wrapper.into_json(); + let (response_tx, mut response_rx) = unbounded(); + self.send_cmd(DriverCommand::SimpleRequest(SimpleRequestCommand { + id, + wrapped_request, + response_tx, + }))?; + let response = response_rx.recv().await.ok_or_else(|| { + Error::client_internal_error( + "failed to hear back from WebSocket driver".to_string(), + ) + })??; + tracing::debug!("Incoming response: {}", response); + R::Response::from_string(response) + } + + pub async fn subscribe(&self, query: Query) -> Result { + let (subscription_tx, subscription_rx) = unbounded(); + let (response_tx, mut response_rx) = unbounded(); + // By default we use UUIDs to differentiate subscriptions + let id = uuid_str(); + self.send_cmd(DriverCommand::Subscribe(SubscribeCommand { + id: id.to_string(), + query: query.to_string(), + subscription_tx, + response_tx, + }))?; + // Make sure our subscription request went through successfully. + let _ = response_rx.recv().await.ok_or_else(|| { + Error::client_internal_error( + "failed to hear back from WebSocket driver".to_string(), + ) + })??; + Ok(Subscription::new(id, query, subscription_rx)) + } + + pub async fn unsubscribe(&self, query: Query) -> Result<()> { + let (response_tx, mut response_rx) = unbounded(); + self.send_cmd(DriverCommand::Unsubscribe(UnsubscribeCommand { + query: query.to_string(), + response_tx, + }))?; + let _ = response_rx.recv().await.ok_or_else(|| { + Error::client_internal_error( + "failed to hear back from WebSocket driver".to_string(), + ) + })??; + Ok(()) + } + + /// Signals to the driver that it must terminate. + pub fn close(self) -> Result<()> { + self.send_cmd(DriverCommand::Terminate) + } + } + + /// Allows us to erase the type signatures associated with the different + /// WebSocket client variants. + #[derive(Debug, Clone)] + pub enum WebSocketClient { + Unsecure(AsyncTungsteniteClient), + Secure(AsyncTungsteniteClient), + } + + impl WebSocketClient { + pub async fn new_unsecure(url: Url) -> Result<(Self, WebSocketClientDriver)> { + let (client, driver) = AsyncTungsteniteClient::::new(url).await?; + Ok((Self::Unsecure(client), driver)) + } + + pub async fn new_secure(url: Url) -> Result<(Self, WebSocketClientDriver)> { + let (client, driver) = AsyncTungsteniteClient::::new(url).await?; + Ok((Self::Secure(client), driver)) + } + + pub async fn perform(&self, request: R) -> Result + where + R: SimpleRequest, + { + match self { + WebSocketClient::Unsecure(c) => c.perform(request).await, + WebSocketClient::Secure(c) => c.perform(request).await, + } + } + + pub async fn subscribe(&self, query: Query) -> Result { + match self { + WebSocketClient::Unsecure(c) => c.subscribe(query).await, + WebSocketClient::Secure(c) => c.subscribe(query).await, + } + } + + pub async fn unsubscribe(&self, query: Query) -> Result<()> { + match self { + WebSocketClient::Unsecure(c) => c.unsubscribe(query).await, + WebSocketClient::Secure(c) => c.unsubscribe(query).await, + } + } + + pub fn close(self) -> Result<()> { + match self { + WebSocketClient::Unsecure(c) => c.close(), + WebSocketClient::Secure(c) => c.close(), + } + } } } @@ -271,10 +489,9 @@ impl Response for GenericJsonResponse {} /// /// This is the primary component responsible for transport-level interaction /// with the remote WebSocket endpoint. -#[derive(Debug)] pub struct WebSocketClientDriver { // The underlying WebSocket network connection. - stream: WebSocketStream>, + stream: WebSocketStream, // Facilitates routing of events to their respective subscriptions. router: SubscriptionRouter, // How we receive incoming commands from the WebSocketClient. @@ -285,10 +502,7 @@ pub struct WebSocketClientDriver { } impl WebSocketClientDriver { - fn new( - stream: WebSocketStream>, - cmd_rx: ChannelRx, - ) -> Self { + fn new(stream: WebSocketStream, cmd_rx: ChannelRx) -> Self { Self { stream, router: SubscriptionRouter::default(), @@ -514,15 +728,17 @@ impl WebSocketClientDriver { #[cfg(test)] mod test { use super::*; + use crate::client::sync::unbounded; use crate::query::EventType; use crate::{request, Id, Method}; - use async_tungstenite::tokio::accept_async; + use async_tungstenite::tokio::{accept_async, TokioAdapter}; use futures::StreamExt; use std::collections::HashMap; use std::path::PathBuf; use std::str::FromStr; + use tendermint::net; use tokio::fs; - use tokio::net::TcpListener; + use tokio::net::{TcpListener, TcpStream}; use tokio::task::JoinHandle; // Interface to a driver that manages all incoming WebSocket connections. diff --git a/rpc/src/error.rs b/rpc/src/error.rs index 18d8e8725..83d23a9db 100644 --- a/rpc/src/error.rs +++ b/rpc/src/error.rs @@ -109,6 +109,19 @@ impl Display for Error { } } +impl From for Error { + fn from(e: std::io::Error) -> Self { + Error::client_internal_error(e.to_string()) + } +} + +#[cfg(any(feature = "http-client", feature = "websocket-client"))] +impl From for Error { + fn from(e: url::ParseError) -> Self { + Error::invalid_params(&e.to_string()) + } +} + #[cfg(feature = "http-client")] impl From for Error { fn from(http_error: http::Error) -> Error { @@ -123,6 +136,13 @@ impl From for Error { } } +#[cfg(feature = "http-client")] +impl From for Error { + fn from(e: http::uri::InvalidUri) -> Self { + Error::http_error(e.to_string()) + } +} + #[cfg(feature = "websocket-client")] impl From for Error { fn from(websocket_error: WSError) -> Error { @@ -130,6 +150,20 @@ impl From for Error { } } +#[cfg(feature = "cli")] +impl From for Error { + fn from(e: serde_json::Error) -> Self { + Error::client_internal_error(e.to_string()) + } +} + +#[cfg(feature = "cli")] +impl From for Error { + fn from(e: tendermint::Error) -> Self { + Error::client_internal_error(e.to_string()) + } +} + /// Tendermint RPC error codes. /// /// See `func RPC*Error()` definitions in: diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index eff517e69..c53ae8df6 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -6,16 +6,16 @@ //! functionality and different client transports based on which features you //! select when using it. //! -//! Two features are provided at present: +//! Several client-related features are provided at present: //! //! * `http-client` - Provides [`HttpClient`], which is a basic RPC client that -//! interacts with remote Tendermint nodes via **JSON-RPC over HTTP**. This -//! client does not provide [`Event`] subscription functionality. See the -//! [Tendermint RPC] for more details. +//! interacts with remote Tendermint nodes via **JSON-RPC over HTTP or +//! HTTPS**. This client does not provide [`event::Event`] subscription +//! functionality. See the [Tendermint RPC] for more details. //! * `websocket-client` - Provides [`WebSocketClient`], which provides full -//! client functionality, including general RPC functionality (such as that -//! provided by `HttpClient`) as well as [`Event`] subscription -//! functionality. +//! client functionality, including general RPC functionality as well as +//! [`event::Event`] subscription functionality. Can be used over secure +//! (`wss://`) and unsecure (`ws://`) connections. //! //! ### Mock Clients //! @@ -24,27 +24,25 @@ //! [`MockClient`], which implements both [`Client`] and [`SubscriptionClient`] //! traits. //! -//! [`Client`]: trait.Client.html -//! [`SubscriptionClient`]: trait.SubscriptionClient.html -//! [`HttpClient`]: struct.HttpClient.html -//! [`Event`]: event/struct.Event.html -//! [`WebSocketClient`]: struct.WebSocketClient.html //! [Tendermint RPC]: https://docs.tendermint.com/master/rpc/ //! [`/subscribe` endpoint]: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe -//! [`MockClient`]: struct.MockClient.html #[cfg(any(feature = "http-client", feature = "websocket-client"))] mod client; #[cfg(any(feature = "http-client", feature = "websocket-client"))] +mod rpc_url; +#[cfg(any(feature = "http-client", feature = "websocket-client"))] pub use client::{ Client, MockClient, MockRequestMatcher, MockRequestMethodMatcher, Subscription, SubscriptionClient, }; +#[cfg(any(feature = "http-client", feature = "websocket-client"))] +pub use rpc_url::{Scheme, Url}; #[cfg(feature = "http-client")] -pub use client::HttpClient; +pub use client::{HttpClient, HttpClientUrl}; #[cfg(feature = "websocket-client")] -pub use client::{WebSocketClient, WebSocketClientDriver}; +pub use client::{WebSocketClient, WebSocketClientDriver, WebSocketClientUrl}; pub mod endpoint; pub mod error; diff --git a/rpc/src/order.rs b/rpc/src/order.rs index b9294464e..79ff77142 100644 --- a/rpc/src/order.rs +++ b/rpc/src/order.rs @@ -1,6 +1,8 @@ //! Ordering of paginated RPC responses. +use crate::Error; use serde::{Deserialize, Serialize}; +use std::str::FromStr; /// Ordering of paginated RPC responses. #[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)] @@ -13,3 +15,18 @@ pub enum Order { #[serde(rename = "desc")] Descending, } + +impl FromStr for Order { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "asc" => Ok(Self::Ascending), + "desc" => Ok(Self::Descending), + _ => Err(Error::invalid_params(&format!( + "invalid order type: {} (must be \"asc\" or \"desc\")", + s + ))), + } + } +} diff --git a/rpc/src/query.rs b/rpc/src/query.rs index 8afcd94ba..61d34b0bc 100644 --- a/rpc/src/query.rs +++ b/rpc/src/query.rs @@ -4,9 +4,13 @@ //! //! [`Query`]: struct.Query.html -use std::fmt; +// TODO(thane): These warnings are generated by the PEG for some reason. Try to fix and remove. +#![allow(clippy::redundant_closure_call, clippy::unit_arg)] -use chrono::{Date, DateTime, FixedOffset, Utc}; +use crate::{Error, Result}; +use chrono::{Date, DateTime, FixedOffset, NaiveDate, Utc}; +use std::fmt; +use std::str::FromStr; /// A structured query for use in interacting with the Tendermint RPC event /// subscription system. @@ -17,6 +21,8 @@ use chrono::{Date, DateTime, FixedOffset, Utc}; /// /// ## Examples /// +/// ### Direct construction of queries +/// /// ```rust /// use tendermint_rpc::query::{Query, EventType}; /// @@ -26,10 +32,25 @@ use chrono::{Date, DateTime, FixedOffset, Utc}; /// let query = Query::from(EventType::Tx).and_eq("tx.hash", "XYZ"); /// assert_eq!("tm.event = 'Tx' AND tx.hash = 'XYZ'", query.to_string()); /// -/// let query = Query::from(EventType::Tx).and_gte("tx.height", 100_i64); +/// let query = Query::from(EventType::Tx).and_gte("tx.height", 100_u64); /// assert_eq!("tm.event = 'Tx' AND tx.height >= 100", query.to_string()); /// ``` /// +/// ### Query parsing +/// +/// ```rust +/// use tendermint_rpc::query::{Query, EventType}; +/// +/// let query: Query = "tm.event = 'NewBlock'".parse().unwrap(); +/// assert_eq!(query, Query::from(EventType::NewBlock)); +/// +/// let query: Query = "tm.event = 'Tx' AND tx.hash = 'XYZ'".parse().unwrap(); +/// assert_eq!(query, Query::from(EventType::Tx).and_eq("tx.hash", "XYZ")); +/// +/// let query: Query = "tm.event = 'Tx' AND tx.height >= 100".parse().unwrap(); +/// assert_eq!(query, Query::from(EventType::Tx).and_gte("tx.height", 100_u64)); +/// ``` +/// /// [subscribe endpoint documentation]: https://docs.tendermint.com/master/rpc/#/Websocket/subscribe #[derive(Debug, Clone, PartialEq)] pub struct Query { @@ -185,6 +206,172 @@ impl fmt::Display for Query { } } +peg::parser! { + grammar query_parser() for str { + // Some or no whitespace. + rule _() = quiet!{[' ']*} + + // At least some whitespace. + rule __() = quiet!{[' ']+} + + rule string() -> &'input str + = "'" s:$([^'\'']*) "'" { s } + + rule unsigned() -> u64 + = s:$(['0'..='9']+) {? + u64::from_str(s) + .map_err(|_| "failed to parse as an unsigned integer") + } + + rule signed() -> i64 + = s:$("-" ['1'..='9'] ['0'..='9']*) {? + i64::from_str(s) + .map_err(|_| "failed to parse as a signed integer") + } + + rule year() -> &'input str + = $(['0'..='9']*<4>) + + rule month() -> &'input str + = $(['0' | '1'] ['0'..='9']) + + rule day() -> &'input str + = $(['0'..='3'] ['0'..='9']) + + rule date() -> &'input str + = $(year() "-" month() "-" day()) + + rule hour() -> &'input str + = $(['0'..='2'] ['0'..='9']) + + rule min_sec() -> &'input str + = $(['0'..='5'] ['0'..='9']) + + rule nanosec() -> &'input str + = $("." ['0'..='9']+) + + rule time() -> &'input str + = $(hour() ":" min_sec() ":" min_sec() nanosec()? "Z") + + rule datetime() -> &'input str + = dt:$(date() "T" time()) { dt } + + rule float() -> f64 + = s:$("-"? ['0'..='9']+ "." ['0'..='9']+) {? + f64::from_str(s) + .map_err(|_| "failed to parse as a 64-bit floating point number") + } + + rule string_op() -> Operand + = s:string() { Operand::String(s.to_owned()) } + + rule unsigned_op() -> Operand + = u:unsigned() { Operand::Unsigned(u) } + + rule signed_op() -> Operand + = s:signed() { Operand::Signed(s) } + + rule datetime_op() -> Operand + = "TIME" __ dt:datetime() {? + DateTime::parse_from_rfc3339(dt) + .map(|dt| Operand::DateTime(dt.with_timezone(&Utc))) + .map_err(|_| "failed to parse as RFC3339-compatible date/time") + } + + rule date_op() -> Operand + = "DATE" __ dt:date() {? + let naive_date = NaiveDate::parse_from_str(dt, "%Y-%m-%d") + .map_err(|_| "failed to parse as RFC3339-compatible date")?; + Ok(Operand::Date(Date::from_utc(naive_date, Utc))) + } + + rule float_op() -> Operand + = f:float() { Operand::Float(f) } + + rule tag() -> &'input str + = $(['a'..='z' | 'A'..='Z'] ['a'..='z' | 'A'..='Z' | '0'..='9' | '_' | '.']*) + + rule operand() -> Operand + = datetime_op() / date_op() / string_op() / float_op() / signed_op() / unsigned_op() + + rule eq() -> Condition + = t:tag() _ "=" _ op:operand() { Condition::Eq(t.to_owned(), op) } + + rule lte() -> Condition + = t:tag() _ "<=" _ op:operand() { Condition::Lte(t.to_owned(), op) } + + rule lt() -> Condition + = t:tag() _ "<" _ op:operand() { Condition::Lt(t.to_owned(), op) } + + rule gte() -> Condition + = t:tag() _ ">=" _ op:operand() { Condition::Gte(t.to_owned(), op) } + + rule gt() -> Condition + = t:tag() _ ">" _ op:operand() { Condition::Gt(t.to_owned(), op) } + + rule contains() -> Condition + = t:tag() __ "CONTAINS" __ op:string() { Condition::Contains(t.to_owned(), op.to_owned()) } + + rule exists() -> Condition + = t:tag() __ "EXISTS" { Condition::Exists(t.to_owned()) } + + rule event_type() -> Term + = "tm.event" _ "=" _ "'" et:$("NewBlock" / "Tx") "'" { + Term::EventType(EventType::from_str(et).unwrap()) + } + + rule condition() -> Term + = c:(eq() / lte() / lt() / gte() / gt() / contains() / exists()) { Term::Condition(c) } + + rule term() -> Term + = event_type() / condition() + + pub rule query() -> Vec + = t:term() ** ( __ "AND" __ ) { t } + } +} + +/// A term in a query is either an event type or a general condition. +/// Exclusively used for query parsing. +#[derive(Debug)] +pub enum Term { + EventType(EventType), + Condition(Condition), +} + +// Separate a list of terms into lists of each type of term. +fn separate_terms(terms: Vec) -> (Vec, Vec) { + terms + .into_iter() + .fold((Vec::new(), Vec::new()), |mut v, t| { + match t { + Term::EventType(et) => v.0.push(et), + Term::Condition(c) => v.1.push(c), + } + v + }) +} + +impl FromStr for Query { + type Err = Error; + + fn from_str(s: &str) -> Result { + let (event_types, conditions) = separate_terms( + query_parser::query(s) + .map_err(|e| Error::invalid_params(&format!("failed to parse query: {}", e)))?, + ); + if event_types.len() > 1 { + return Err(Error::invalid_params( + "tm.event can only be used once in a query", + )); + } + Ok(Query { + event_type: event_types.first().cloned(), + conditions, + }) + } +} + fn join(f: &mut fmt::Formatter<'_>, separator: S, iterable: I) -> fmt::Result where S: fmt::Display, @@ -219,6 +406,21 @@ impl fmt::Display for EventType { } } +impl FromStr for EventType { + type Err = Error; + + fn from_str(s: &str) -> Result { + match s { + "NewBlock" => Ok(Self::NewBlock), + "Tx" => Ok(Self::Tx), + invalid => Err(Error::invalid_params(&format!( + "unrecognized event type: {}", + invalid + ))), + } + } +} + /// The different types of conditions supported by a [`Query`]. /// /// [`Query`]: struct.Query.html @@ -279,8 +481,8 @@ impl fmt::Display for Operand { Operand::Signed(i) => write!(f, "{}", i), Operand::Unsigned(u) => write!(f, "{}", u), Operand::Float(h) => write!(f, "{}", h), - Operand::Date(d) => write!(f, "{}", escape(&d.format("%Y-%m-%d").to_string())), - Operand::DateTime(dt) => write!(f, "{}", escape(&dt.to_rfc3339())), + Operand::Date(d) => write!(f, "DATE {}", d.format("%Y-%m-%d").to_string()), + Operand::DateTime(dt) => write!(f, "TIME {}", dt.to_rfc3339()), } } } @@ -402,7 +604,7 @@ fn escape(s: &str) -> String { #[cfg(test)] mod test { use super::*; - use chrono::NaiveDate; + use chrono::{NaiveDate, NaiveDateTime, NaiveTime}; #[test] fn empty_query() { @@ -464,7 +666,7 @@ mod test { "some_date", Date::from_utc(NaiveDate::from_ymd(2020, 9, 24), Utc), ); - assert_eq!("some_date = '2020-09-24'", query.to_string()); + assert_eq!("some_date = DATE 2020-09-24", query.to_string()); } #[test] @@ -474,7 +676,7 @@ mod test { DateTime::parse_from_rfc3339("2020-09-24T10:17:23-04:00").unwrap(), ); assert_eq!( - "some_date_time = '2020-09-24T14:17:23+00:00'", + "some_date_time = TIME 2020-09-24T14:17:23+00:00", query.to_string() ); } @@ -500,4 +702,246 @@ mod test { query.to_string() ); } + + #[test] + fn query_event_type_parsing() { + // Test the empty query (that matches all possible events) + let query = Query::from_str("").unwrap(); + assert_eq!(query, Query::default()); + + // With just one event type + let query = Query::from_str("tm.event='Tx'").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert!(query.conditions.is_empty()); + let query = Query::from_str("tm.event='NewBlock'").unwrap(); + assert_eq!(query.event_type, Some(EventType::NewBlock)); + assert!(query.conditions.is_empty()); + + // One event type, with whitespace + let query = Query::from_str("tm.event = 'NewBlock'").unwrap(); + assert_eq!(query.event_type, Some(EventType::NewBlock)); + assert!(query.conditions.is_empty()); + + // Two event types are not allowed + assert!(Query::from_str("tm.event='Tx' AND tm.event='NewBlock'").is_err()); + } + + #[test] + fn query_string_term_parsing() { + // Query with string term + let query = Query::from_str("tm.event='Tx' AND transfer.sender='AddrA'").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq( + "transfer.sender".to_owned(), + Operand::String("AddrA".to_owned()), + )] + ); + // Query with string term, with extra whitespace + let query = Query::from_str("tm.event = 'Tx' AND transfer.sender = 'AddrA'").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq( + "transfer.sender".to_owned(), + Operand::String("AddrA".to_owned()), + )] + ); + } + + #[test] + fn query_unsigned_term_parsing() { + let query = Query::from_str("tm.event = 'Tx' AND tx.height = 10").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq("tx.height".to_owned(), Operand::Unsigned(10))] + ); + + let query = Query::from_str("tm.event = 'Tx' AND tx.height <= 100").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Lte( + "tx.height".to_owned(), + Operand::Unsigned(100) + )] + ); + } + + #[test] + fn query_signed_term_parsing() { + let query = Query::from_str("tm.event = 'Tx' AND some.value = -1").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq("some.value".to_owned(), Operand::Signed(-1))] + ); + + let query = Query::from_str("tm.event = 'Tx' AND some.value <= -100").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Lte( + "some.value".to_owned(), + Operand::Signed(-100) + )] + ); + } + + #[test] + fn query_date_parsing() { + let query = Query::from_str("tm.event = 'Tx' AND some.date <= DATE 2022-02-03").unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Lte( + "some.date".to_owned(), + Operand::Date(Date::from_utc(NaiveDate::from_ymd(2022, 2, 3), Utc)) + )] + ); + } + + #[test] + fn query_datetime_parsing() { + let query = + Query::from_str("tm.event = 'Tx' AND some.datetime = TIME 2021-02-26T17:05:02.1495Z") + .unwrap(); + assert_eq!(query.event_type, Some(EventType::Tx)); + assert_eq!( + query.conditions, + vec![Condition::Eq( + "some.datetime".to_owned(), + Operand::DateTime(DateTime::from_utc( + NaiveDateTime::new( + NaiveDate::from_ymd(2021, 2, 26), + NaiveTime::from_hms_nano(17, 5, 2, 149500000) + ), + Utc + )) + )] + ) + } + + #[test] + fn query_float_parsing() { + // Positive floating point number + let query = Query::from_str("short.pi = 3.14159").unwrap(); + assert_eq!(query.conditions.len(), 1); + match &query.conditions[0] { + Condition::Eq(tag, op) => { + assert_eq!(tag, "short.pi"); + match op { + Operand::Float(f) => { + assert!(floats_eq(*f, std::f64::consts::PI, 5)); + } + _ => panic!("unexpected operand: {:?}", op), + } + } + c => panic!("unexpected condition: {:?}", c), + } + + // Negative floating point number + let query = Query::from_str("short.pi = -3.14159").unwrap(); + assert_eq!(query.conditions.len(), 1); + match &query.conditions[0] { + Condition::Eq(tag, op) => { + assert_eq!(tag, "short.pi"); + match op { + Operand::Float(f) => { + assert!(floats_eq(*f, -std::f64::consts::PI, 5)); + } + _ => panic!("unexpected operand: {:?}", op), + } + } + c => panic!("unexpected condition: {:?}", c), + } + } + + // From https://stackoverflow.com/a/41447964/1156132 + fn floats_eq(a: f64, b: f64, precision: u8) -> bool { + let factor = 10.0f64.powi(precision as i32); + let a = (a * factor).trunc(); + let b = (b * factor).trunc(); + a == b + } + + #[test] + fn query_conditions() { + let query = Query::from_str("some.field = 'string'").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Eq( + "some.field".to_owned(), + Operand::String("string".to_owned()) + )] + } + ); + + let query = Query::from_str("some.field < 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Lt("some.field".to_owned(), Operand::Unsigned(5),)] + } + ); + + let query = Query::from_str("some.field <= 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Lte( + "some.field".to_owned(), + Operand::Unsigned(5), + )] + } + ); + + let query = Query::from_str("some.field > 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Gt("some.field".to_owned(), Operand::Unsigned(5),)] + } + ); + + let query = Query::from_str("some.field >= 5").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Gte( + "some.field".to_owned(), + Operand::Unsigned(5), + )] + } + ); + + let query = Query::from_str("some.field CONTAINS 'inner'").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Contains( + "some.field".to_owned(), + "inner".to_owned() + )] + } + ); + + let query = Query::from_str("some.field EXISTS").unwrap(); + assert_eq!( + query, + Query { + event_type: None, + conditions: vec![Condition::Exists("some.field".to_owned())] + } + ); + } } diff --git a/rpc/src/rpc_url.rs b/rpc/src/rpc_url.rs new file mode 100644 index 000000000..f727a5fb6 --- /dev/null +++ b/rpc/src/rpc_url.rs @@ -0,0 +1,132 @@ +//! URL representation for RPC clients. + +use crate::{Error, Result}; +use std::convert::TryFrom; +use std::fmt; +use std::str::FromStr; + +/// The various schemes supported by Tendermint RPC clients. +#[derive(Debug, Clone, Copy)] +pub enum Scheme { + Http, + Https, + WebSocket, + SecureWebSocket, +} + +impl fmt::Display for Scheme { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + Scheme::Http => write!(f, "http"), + Scheme::Https => write!(f, "https"), + Scheme::WebSocket => write!(f, "ws"), + Scheme::SecureWebSocket => write!(f, "wss"), + } + } +} + +impl FromStr for Scheme { + type Err = Error; + + fn from_str(s: &str) -> Result { + Ok(match s { + "http" => Scheme::Http, + "https" => Scheme::Https, + "ws" => Scheme::WebSocket, + "wss" => Scheme::SecureWebSocket, + _ => return Err(Error::invalid_params(&format!("unsupported scheme: {}", s))), + }) + } +} + +/// A uniform resource locator (URL), with support for only those +/// schemes/protocols supported by Tendermint RPC clients. +/// +/// Re-implements relevant parts of [`url::Url`]'s interface with convenience +/// mechanisms for transformation to/from other types. +#[derive(Debug, Clone)] +pub struct Url { + inner: url::Url, + scheme: Scheme, + host: String, + port: u16, +} + +impl FromStr for Url { + type Err = Error; + + fn from_str(s: &str) -> Result { + let inner: url::Url = s.parse()?; + let scheme: Scheme = inner.scheme().parse()?; + let host = inner + .host_str() + .ok_or_else(|| Error::invalid_params(&format!("URL is missing its host: {}", s)))? + .to_owned(); + let port = inner.port_or_known_default().ok_or_else(|| { + Error::invalid_params(&format!("cannot determine appropriate port for URL: {}", s)) + })?; + Ok(Self { + inner, + scheme, + host, + port, + }) + } +} + +impl Url { + /// Returns whether or not this URL represents a connection to a secure + /// endpoint. + pub fn is_secure(&self) -> bool { + match self.scheme { + Scheme::Http => false, + Scheme::Https => true, + Scheme::WebSocket => false, + Scheme::SecureWebSocket => true, + } + } + + /// Get the scheme associated with this URL. + pub fn scheme(&self) -> Scheme { + self.scheme + } + + /// Get the username associated with this URL, if any. + pub fn username(&self) -> &str { + self.inner.username() + } + + /// Get the password associated with this URL, if any. + pub fn password(&self) -> Option<&str> { + self.inner.password() + } + + /// Get the host associated with this URL. + pub fn host(&self) -> &str { + &self.host + } + + /// Get the port associated with this URL. + pub fn port(&self) -> u16 { + self.port + } + + /// Get this URL's path. + pub fn path(&self) -> &str { + self.inner.path() + } +} + +impl fmt::Display for Url { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.inner) + } +} + +impl TryFrom for Url { + type Error = Error; + + fn try_from(value: url::Url) -> Result { + value.to_string().parse() + } +} diff --git a/rpc/src/serialization.rs b/rpc/src/serialization.rs deleted file mode 100644 index 90d5e8b91..000000000 --- a/rpc/src/serialization.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! Custom serialization/deserialization functionality for the RPC. - -pub mod timestamp; diff --git a/tools/abci-test/Cargo.toml b/tools/abci-test/Cargo.toml index f9e5dca5c..1907ad648 100644 --- a/tools/abci-test/Cargo.toml +++ b/tools/abci-test/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "abci-test" -version = "0.18.0" +version = "0.18.1" authors = ["Thane Thomson "] edition = "2018" description = """ @@ -12,9 +12,9 @@ description = """ [dependencies] futures = "0.3" -log = "0.4" -simple_logger = "1.11" structopt = "0.3" -tendermint = { version = "0.18.0", path = "../../tendermint" } -tendermint-rpc = { version = "0.18.0", path = "../../rpc", features = [ "websocket-client" ] } +tendermint = { version = "0.18.1", path = "../../tendermint" } +tendermint-rpc = { version = "0.18.1", path = "../../rpc", features = [ "websocket-client" ] } +tracing = "0.1" +tracing-subscriber = "0.2" tokio = { version = "1", features = ["full"] } diff --git a/tools/abci-test/src/main.rs b/tools/abci-test/src/main.rs index 35f225583..6dfd072e1 100644 --- a/tools/abci-test/src/main.rs +++ b/tools/abci-test/src/main.rs @@ -1,8 +1,6 @@ //! ABCI key/value store integration test application. use futures::StreamExt; -use log::{debug, error, info, LevelFilter}; -use simple_logger::SimpleLogger; use structopt::StructOpt; use tendermint::abci::Transaction; use tendermint::net::Address; @@ -10,6 +8,8 @@ use tendermint_rpc::event::EventData; use tendermint_rpc::query::EventType; use tendermint_rpc::{Client, SubscriptionClient, WebSocketClient}; use tokio::time::Duration; +use tracing::level_filters::LevelFilter; +use tracing::{debug, error, info}; #[derive(Debug, StructOpt)] /// A harness for testing tendermint-abci through a full Tendermint node @@ -30,14 +30,13 @@ struct Opt { #[tokio::main] async fn main() -> Result<(), Box> { let opt: Opt = Opt::from_args(); - SimpleLogger::new() - .with_level(if opt.verbose { - LevelFilter::Debug + tracing_subscriber::fmt() + .with_max_level(if opt.verbose { + LevelFilter::DEBUG } else { - LevelFilter::Info + LevelFilter::INFO }) - .init() - .unwrap(); + .init(); info!("Connecting to Tendermint node at {}:{}", opt.host, opt.port); let (mut client, driver) = WebSocketClient::new(Address::Tcp { @@ -78,7 +77,7 @@ async fn run_tests(client: &mut WebSocketClient) -> Result<(), Box HttpClient { init_logging(); - HttpClient::new("tcp://127.0.0.1:26657".parse().unwrap()).unwrap() + HttpClient::new("http://127.0.0.1:26657").unwrap() } pub async fn localhost_websocket_client() -> (WebSocketClient, WebSocketClientDriver) { init_logging(); - WebSocketClient::new("tcp://127.0.0.1:26657".parse().unwrap()) + WebSocketClient::new("ws://127.0.0.1:26657/websocket") .await .unwrap() }