diff --git a/CHANGELOG.md b/CHANGELOG.md index b75074b29..d44e89309 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,18 @@ ## Unreleased +### BREAKING CHANGES + +- `[tendermint-rpc, tendermint-light-client]` Upgrade Tokio to version 0.3.0 ([#683]) + - Upgrade `hyper` to `v0.14-dev` + - Upgrade `async-tungstenite` to `v0.10` + ### IMPROVEMENTS: - `[light-client]` Only require Tokio when `rpc-client` feature is enabled ([#425]) [#425]: https://github.com/informalsystems/tendermint-rs/issues/425 +[#683]: https://github.com/informalsystems/tendermint-rs/issues/683 + ## v0.17.0-rc3 diff --git a/light-client/Cargo.toml b/light-client/Cargo.toml index 0484ba28a..2e643befb 100644 --- a/light-client/Cargo.toml +++ b/light-client/Cargo.toml @@ -41,7 +41,7 @@ serde_derive = "1.0.106" sled = "0.34.3" static_assertions = "1.1.0" thiserror = "1.0.15" -tokio = { version = "0.2.20", optional = true } +tokio = { version = "0.3", features = ["rt"], optional = true } [dev-dependencies] tendermint-testgen = { path = "../testgen"} diff --git a/light-client/src/components/io.rs b/light-client/src/components/io.rs index 18b3ac961..fd869ee17 100644 --- a/light-client/src/components/io.rs +++ b/light-client/src/components/io.rs @@ -43,6 +43,10 @@ pub enum IoError { /// Task timed out. #[error("task timed out after {} ms", .0.as_millis())] Timeout(Duration), + + /// Failed to initialize runtime + #[error("failed to initialize runtime")] + Runtime, } impl IoError { diff --git a/light-client/src/utils/block_on.rs b/light-client/src/utils/block_on.rs index 164dc5bec..77a7f7fc9 100644 --- a/light-client/src/utils/block_on.rs +++ b/light-client/src/utils/block_on.rs @@ -11,11 +11,10 @@ where F::Output: Send, { std::thread::spawn(move || { - let mut rt = tokio::runtime::Builder::new() - .basic_scheduler() + let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() - .unwrap(); + .map_err(|_| IoError::Runtime)?; if let Some(timeout) = timeout { let task = async { tokio::time::timeout(timeout, f).await }; diff --git a/light-node/Cargo.toml b/light-node/Cargo.toml index 46d230f2e..439b7929e 100644 --- a/light-node/Cargo.toml +++ b/light-node/Cargo.toml @@ -27,7 +27,6 @@ name = "tendermint-light-node" path = "src/bin/tendermint-light-node/main.rs" [dependencies] -abscissa_tokio = "0.5" anomaly = { version = "0.2", features = [ "serializer" ] } async-trait = "0.1" gumdrop = "0.7" @@ -42,7 +41,7 @@ tendermint = { version = "0.17.0-rc3", path = "../tendermint" } tendermint-light-client = { version = "0.17.0-rc3", path = "../light-client" } tendermint-rpc = { version = "0.17.0-rc3", path = "../rpc", features = [ "http-client" ] } thiserror = "1.0" -tokio = { version = "0.2", features = ["full"] } +tokio = { version = "0.3", features = ["full"] } [dependencies.abscissa_core] version = "0.5.0" diff --git a/light-node/src/application.rs b/light-node/src/application.rs index 1ab05ac9e..36b32fc6a 100644 --- a/light-node/src/application.rs +++ b/light-node/src/application.rs @@ -5,7 +5,6 @@ use abscissa_core::{ application::{self, AppCell}, config, trace, Application, EntryPoint, FrameworkError, StandardPaths, }; -use abscissa_tokio::TokioComponent; /// Application state pub static APPLICATION: AppCell = AppCell::new(); @@ -83,8 +82,7 @@ impl Application for LightNodeApp { /// beyond the default ones provided by the framework, this is the place /// to do so. fn register_components(&mut self, command: &Self::Cmd) -> Result<(), FrameworkError> { - let mut components = self.framework_components(command)?; - components.push(Box::new(TokioComponent::new()?)); + let components = self.framework_components(command)?; self.state.components.register(components) } diff --git a/light-node/src/commands/start.rs b/light-node/src/commands/start.rs index 5e7da06a4..2beedf709 100644 --- a/light-node/src/commands/start.rs +++ b/light-node/src/commands/start.rs @@ -1,8 +1,6 @@ //! `start` subcommand - start the light node. -use std::process; - -use crate::application::{app_config, APPLICATION}; +use crate::application::app_config; use crate::config::{LightClientConfig, LightNodeConfig}; use crate::rpc; use crate::rpc::Server; @@ -44,42 +42,37 @@ pub struct StartCmd { impl Runnable for StartCmd { /// Start the application. fn run(&self) { - if let Err(err) = abscissa_tokio::run(&APPLICATION, async { - if let Err(e) = StartCmd::assert_init_was_run() { + if let Err(e) = StartCmd::assert_init_was_run() { + status_err!(&e); + panic!(e); + } + + let supervisor = match self.construct_supervisor() { + Ok(supervisor) => supervisor, + Err(e) => { status_err!(&e); panic!(e); } + }; + + let rpc_handler = supervisor.handle(); + StartCmd::start_rpc_server(rpc_handler); - let supervisor = match self.construct_supervisor() { - Ok(supervisor) => supervisor, - Err(e) => { - status_err!(&e); - panic!(e); + let handle = supervisor.handle(); + std::thread::spawn(|| supervisor.run()); + + loop { + match handle.verify_to_highest() { + Ok(light_block) => { + status_info!("synced to block:", light_block.height().to_string()); } - }; - - let rpc_handler = supervisor.handle(); - StartCmd::start_rpc_server(rpc_handler); - - let handle = supervisor.handle(); - std::thread::spawn(|| supervisor.run()); - - loop { - match handle.verify_to_highest() { - Ok(light_block) => { - status_info!("synced to block:", light_block.height().to_string()); - } - Err(err) => { - status_err!("sync failed: {}", err); - } + Err(err) => { + status_err!("sync failed: {}", err); } - - // TODO(liamsi): use ticks and make this configurable: - std::thread::sleep(Duration::from_millis(800)); } - }) { - status_err!("Unexpected error while running application: {}", err); - process::exit(1); + + // TODO(liamsi): use ticks and make this configurable: + std::thread::sleep(Duration::from_millis(800)); } } } diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index de946d971..94b197ef2 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -31,6 +31,8 @@ websocket-client = [ "async-trait", "async-tungstenite", "futures", + "tokio/rt", + "tokio/rt-multi-thread", "tokio/fs", "tokio/macros", "tokio/stream", @@ -53,9 +55,15 @@ uuid = { version = "0.8", default-features = false } subtle-encoding = { version = "0.5", features = ["bech32-preview"] } async-trait = { version = "0.1", optional = true } -async-tungstenite = { version="0.8", features = ["tokio-runtime"], optional = true } +async-tungstenite = { version = "0.10", features = ["tokio-runtime"], optional = true } futures = { version = "0.3", optional = true } http = { version = "0.2", optional = true } -hyper = { version = "0.13", optional = true } -tokio = { version = "0.2", optional = true } +tokio = { version = "0.3", optional = true } tracing = { version = "0.1", optional = true } +pin-project = "1.0.1" + +[dependencies.hyper] +version = "0.14.0-dev" +git = "https://github.com/hyperium/hyper/" +rev = "2a19ab74ed69bc776da25544e98979c9fb6e1834" +optional = true diff --git a/rpc/src/client/subscription.rs b/rpc/src/client/subscription.rs index a7b7adf99..926963704 100644 --- a/rpc/src/client/subscription.rs +++ b/rpc/src/client/subscription.rs @@ -11,6 +11,7 @@ use async_trait::async_trait; use futures::task::{Context, Poll}; use futures::Stream; use getrandom::getrandom; +use pin_project::pin_project; use std::collections::HashMap; use std::convert::TryInto; use std::pin::Pin; @@ -53,14 +54,19 @@ pub trait SubscriptionClient { /// ``` /// /// [`Event`]: ./event/struct.Event.html +#[pin_project] #[derive(Debug)] pub struct Subscription { /// The query for which events will be produced. pub query: Query, + /// The ID of this subscription (automatically assigned). pub id: SubscriptionId, + // Our internal result event receiver for this subscription. + #[pin] event_rx: ChannelRx>, + // Allows us to interact with the subscription driver (exclusively to // terminate this subscription). cmd_tx: ChannelTx, @@ -69,8 +75,8 @@ pub struct Subscription { impl Stream for Subscription { type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.event_rx.poll_recv(cx) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().event_rx.poll_next(cx) } } @@ -509,7 +515,7 @@ mod test { } async fn must_recv(ch: &mut ChannelRx, timeout_ms: u64) -> T { - let mut delay = time::delay_for(Duration::from_millis(timeout_ms)); + let mut delay = time::sleep(Duration::from_millis(timeout_ms)); tokio::select! { _ = &mut delay, if !delay.is_elapsed() => panic!("timed out waiting for recv"), Some(v) = ch.recv() => v, @@ -520,7 +526,7 @@ mod test { where T: std::fmt::Debug, { - let mut delay = time::delay_for(Duration::from_millis(timeout_ms)); + let mut delay = time::sleep(Duration::from_millis(timeout_ms)); tokio::select! { _ = &mut delay, if !delay.is_elapsed() => (), Some(v) = ch.recv() => panic!("got unexpected result from channel: {:?}", v), diff --git a/rpc/src/client/sync.rs b/rpc/src/client/sync.rs index fa0d7a6a1..e100425f7 100644 --- a/rpc/src/client/sync.rs +++ b/rpc/src/client/sync.rs @@ -4,10 +4,15 @@ //! convenience methods. We also only implement unbounded channels at present. //! In future, if RPC consumers need it, we will implement bounded channels. -use crate::{Error, Result}; +use std::pin::Pin; + use futures::task::{Context, Poll}; +use futures::Stream; +use pin_project::pin_project; use tokio::sync::mpsc; +use crate::{Error, Result}; + /// Constructor for an unbounded channel. pub fn unbounded() -> (ChannelTx, ChannelRx) { let (tx, rx) = mpsc::unbounded_channel(); @@ -33,8 +38,9 @@ impl ChannelTx { } /// Receiver interface for a channel. +#[pin_project] #[derive(Debug)] -pub struct ChannelRx(mpsc::UnboundedReceiver); +pub struct ChannelRx(#[pin] mpsc::UnboundedReceiver); impl ChannelRx { /// Wait indefinitely until we receive a value from the channel (or the @@ -42,8 +48,12 @@ impl ChannelRx { pub async fn recv(&mut self) -> Option { self.0.recv().await } +} + +impl Stream for ChannelRx { + type Item = T; - pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll> { - self.0.poll_recv(cx) + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + self.project().0.poll_next(cx) } } diff --git a/rpc/src/client/transport/websocket.rs b/rpc/src/client/transport/websocket.rs index a485beca2..e8d412c63 100644 --- a/rpc/src/client/transport/websocket.rs +++ b/rpc/src/client/transport/websocket.rs @@ -201,7 +201,7 @@ impl WebSocketClientDriver { pub async fn run(mut self) -> Result<()> { let mut ping_interval = tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL); - let mut recv_timeout = tokio::time::delay_for(PING_INTERVAL); + let mut recv_timeout = tokio::time::sleep(PING_INTERVAL); loop { tokio::select! { Some(res) = self.stream.next() => match res { diff --git a/tendermint/Cargo.toml b/tendermint/Cargo.toml index 9923c5c87..9bee5b3b9 100644 --- a/tendermint/Cargo.toml +++ b/tendermint/Cargo.toml @@ -61,8 +61,8 @@ k256 = { version = "0.5", optional = true, features = ["ecdsa"] } ripemd160 = { version = "0.9", optional = true } [dev-dependencies] -tendermint-rpc = { path = "../rpc", features = [ "http-client", "websocket-client" ] } -tokio = { version = "0.2", features = [ "macros" ] } +tendermint-rpc = { path = "../rpc", features = ["http-client", "websocket-client"] } +tokio = { version = "0.3", features = ["macros"] } [features] secp256k1 = ["k256", "ripemd160"] diff --git a/tendermint/tests/integration.rs b/tendermint/tests/integration.rs index c428aa2dc..2ea3b399a 100644 --- a/tendermint/tests/integration.rs +++ b/tendermint/tests/integration.rs @@ -249,7 +249,7 @@ mod rpc { let mut cur_tx_id = 0_u32; while !expected_tx_values.is_empty() { - let mut delay = tokio::time::delay_for(Duration::from_secs(3)); + let mut delay = tokio::time::sleep(Duration::from_secs(3)); tokio::select! { Some(res) = subs.next() => { let ev = res.unwrap(); @@ -314,7 +314,7 @@ mod rpc { .broadcast_tx_async(Transaction::from(tx.into_bytes())) .await .unwrap(); - tokio::time::delay_for(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(100)).await; } }); @@ -327,7 +327,7 @@ mod rpc { ); while expected_new_blocks > 0 && !expected_tx_values.is_empty() { - let mut timeout = tokio::time::delay_for(Duration::from_secs(3)); + let mut timeout = tokio::time::sleep(Duration::from_secs(3)); tokio::select! { Some(res) = combined_subs.next() => { let ev: Event = res.unwrap();