Skip to content

Commit

Permalink
Upgrade Tokio to version 0.3.0 (informalsystems#681)
Browse files Browse the repository at this point in the history
* Upgrade Tokio to version 0.3.0

* Use development version of `hyper` for Tokio 0.3 support

* Enable runtime features in light client

* Use single threaded Tokio runtime in block_on

* Update changelog

* Remove unsafe pin projection code and replace it with safe pin_project crate

* Propagate error when we fail to initialize Tokio runtime in block_on

Co-authored-by: Thane Thomson <thane@informal.systems>
  • Loading branch information
romac and thanethomson committed Nov 19, 2020
1 parent 62406f6 commit 0fc7f40
Show file tree
Hide file tree
Showing 13 changed files with 83 additions and 58 deletions.
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
## Unreleased

### BREAKING CHANGES

- `[tendermint-rpc, tendermint-light-client]` Upgrade Tokio to version 0.3.0 ([#683])
- Upgrade `hyper` to `v0.14-dev`
- Upgrade `async-tungstenite` to `v0.10`

### IMPROVEMENTS:

- `[light-client]` Only require Tokio when `rpc-client` feature is enabled ([#425])

[#425]: https://github.com/informalsystems/tendermint-rs/issues/425
[#683]: https://github.com/informalsystems/tendermint-rs/issues/683


## v0.17.0-rc3

Expand Down
2 changes: 1 addition & 1 deletion light-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ serde_derive = "1.0.106"
sled = "0.34.3"
static_assertions = "1.1.0"
thiserror = "1.0.15"
tokio = { version = "0.2.20", optional = true }
tokio = { version = "0.3", features = ["rt"], optional = true }

[dev-dependencies]
tendermint-testgen = { path = "../testgen"}
Expand Down
4 changes: 4 additions & 0 deletions light-client/src/components/io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ pub enum IoError {
/// Task timed out.
#[error("task timed out after {} ms", .0.as_millis())]
Timeout(Duration),

/// Failed to initialize runtime
#[error("failed to initialize runtime")]
Runtime,
}

impl IoError {
Expand Down
5 changes: 2 additions & 3 deletions light-client/src/utils/block_on.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,10 @@ where
F::Output: Send,
{
std::thread::spawn(move || {
let mut rt = tokio::runtime::Builder::new()
.basic_scheduler()
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
.map_err(|_| IoError::Runtime)?;

if let Some(timeout) = timeout {
let task = async { tokio::time::timeout(timeout, f).await };
Expand Down
3 changes: 1 addition & 2 deletions light-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ name = "tendermint-light-node"
path = "src/bin/tendermint-light-node/main.rs"

[dependencies]
abscissa_tokio = "0.5"
anomaly = { version = "0.2", features = [ "serializer" ] }
async-trait = "0.1"
gumdrop = "0.7"
Expand All @@ -42,7 +41,7 @@ tendermint = { version = "0.17.0-rc3", path = "../tendermint" }
tendermint-light-client = { version = "0.17.0-rc3", path = "../light-client" }
tendermint-rpc = { version = "0.17.0-rc3", path = "../rpc", features = [ "http-client" ] }
thiserror = "1.0"
tokio = { version = "0.2", features = ["full"] }
tokio = { version = "0.3", features = ["full"] }

[dependencies.abscissa_core]
version = "0.5.0"
Expand Down
4 changes: 1 addition & 3 deletions light-node/src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use abscissa_core::{
application::{self, AppCell},
config, trace, Application, EntryPoint, FrameworkError, StandardPaths,
};
use abscissa_tokio::TokioComponent;

/// Application state
pub static APPLICATION: AppCell<LightNodeApp> = AppCell::new();
Expand Down Expand Up @@ -83,8 +82,7 @@ impl Application for LightNodeApp {
/// beyond the default ones provided by the framework, this is the place
/// to do so.
fn register_components(&mut self, command: &Self::Cmd) -> Result<(), FrameworkError> {
let mut components = self.framework_components(command)?;
components.push(Box::new(TokioComponent::new()?));
let components = self.framework_components(command)?;
self.state.components.register(components)
}

Expand Down
57 changes: 25 additions & 32 deletions light-node/src/commands/start.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
//! `start` subcommand - start the light node.

use std::process;

use crate::application::{app_config, APPLICATION};
use crate::application::app_config;
use crate::config::{LightClientConfig, LightNodeConfig};
use crate::rpc;
use crate::rpc::Server;
Expand Down Expand Up @@ -44,42 +42,37 @@ pub struct StartCmd {
impl Runnable for StartCmd {
/// Start the application.
fn run(&self) {
if let Err(err) = abscissa_tokio::run(&APPLICATION, async {
if let Err(e) = StartCmd::assert_init_was_run() {
if let Err(e) = StartCmd::assert_init_was_run() {
status_err!(&e);
panic!(e);
}

let supervisor = match self.construct_supervisor() {
Ok(supervisor) => supervisor,
Err(e) => {
status_err!(&e);
panic!(e);
}
};

let rpc_handler = supervisor.handle();
StartCmd::start_rpc_server(rpc_handler);

let supervisor = match self.construct_supervisor() {
Ok(supervisor) => supervisor,
Err(e) => {
status_err!(&e);
panic!(e);
let handle = supervisor.handle();
std::thread::spawn(|| supervisor.run());

loop {
match handle.verify_to_highest() {
Ok(light_block) => {
status_info!("synced to block:", light_block.height().to_string());
}
};

let rpc_handler = supervisor.handle();
StartCmd::start_rpc_server(rpc_handler);

let handle = supervisor.handle();
std::thread::spawn(|| supervisor.run());

loop {
match handle.verify_to_highest() {
Ok(light_block) => {
status_info!("synced to block:", light_block.height().to_string());
}
Err(err) => {
status_err!("sync failed: {}", err);
}
Err(err) => {
status_err!("sync failed: {}", err);
}

// TODO(liamsi): use ticks and make this configurable:
std::thread::sleep(Duration::from_millis(800));
}
}) {
status_err!("Unexpected error while running application: {}", err);
process::exit(1);

// TODO(liamsi): use ticks and make this configurable:
std::thread::sleep(Duration::from_millis(800));
}
}
}
Expand Down
14 changes: 11 additions & 3 deletions rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ websocket-client = [
"async-trait",
"async-tungstenite",
"futures",
"tokio/rt",
"tokio/rt-multi-thread",
"tokio/fs",
"tokio/macros",
"tokio/stream",
Expand All @@ -53,9 +55,15 @@ uuid = { version = "0.8", default-features = false }
subtle-encoding = { version = "0.5", features = ["bech32-preview"] }

async-trait = { version = "0.1", optional = true }
async-tungstenite = { version="0.8", features = ["tokio-runtime"], optional = true }
async-tungstenite = { version = "0.10", features = ["tokio-runtime"], optional = true }
futures = { version = "0.3", optional = true }
http = { version = "0.2", optional = true }
hyper = { version = "0.13", optional = true }
tokio = { version = "0.2", optional = true }
tokio = { version = "0.3", optional = true }
tracing = { version = "0.1", optional = true }
pin-project = "1.0.1"

[dependencies.hyper]
version = "0.14.0-dev"
git = "https://github.com/hyperium/hyper/"
rev = "2a19ab74ed69bc776da25544e98979c9fb6e1834"
optional = true
14 changes: 10 additions & 4 deletions rpc/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use async_trait::async_trait;
use futures::task::{Context, Poll};
use futures::Stream;
use getrandom::getrandom;
use pin_project::pin_project;
use std::collections::HashMap;
use std::convert::TryInto;
use std::pin::Pin;
Expand Down Expand Up @@ -53,14 +54,19 @@ pub trait SubscriptionClient {
/// ```
///
/// [`Event`]: ./event/struct.Event.html
#[pin_project]
#[derive(Debug)]
pub struct Subscription {
/// The query for which events will be produced.
pub query: Query,

/// The ID of this subscription (automatically assigned).
pub id: SubscriptionId,

// Our internal result event receiver for this subscription.
#[pin]
event_rx: ChannelRx<Result<Event>>,

// Allows us to interact with the subscription driver (exclusively to
// terminate this subscription).
cmd_tx: ChannelTx<SubscriptionDriverCmd>,
Expand All @@ -69,8 +75,8 @@ pub struct Subscription {
impl Stream for Subscription {
type Item = Result<Event>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.event_rx.poll_recv(cx)
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().event_rx.poll_next(cx)
}
}

Expand Down Expand Up @@ -509,7 +515,7 @@ mod test {
}

async fn must_recv<T>(ch: &mut ChannelRx<T>, timeout_ms: u64) -> T {
let mut delay = time::delay_for(Duration::from_millis(timeout_ms));
let mut delay = time::sleep(Duration::from_millis(timeout_ms));
tokio::select! {
_ = &mut delay, if !delay.is_elapsed() => panic!("timed out waiting for recv"),
Some(v) = ch.recv() => v,
Expand All @@ -520,7 +526,7 @@ mod test {
where
T: std::fmt::Debug,
{
let mut delay = time::delay_for(Duration::from_millis(timeout_ms));
let mut delay = time::sleep(Duration::from_millis(timeout_ms));
tokio::select! {
_ = &mut delay, if !delay.is_elapsed() => (),
Some(v) = ch.recv() => panic!("got unexpected result from channel: {:?}", v),
Expand Down
18 changes: 14 additions & 4 deletions rpc/src/client/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,15 @@
//! convenience methods. We also only implement unbounded channels at present.
//! In future, if RPC consumers need it, we will implement bounded channels.

use crate::{Error, Result};
use std::pin::Pin;

use futures::task::{Context, Poll};
use futures::Stream;
use pin_project::pin_project;
use tokio::sync::mpsc;

use crate::{Error, Result};

/// Constructor for an unbounded channel.
pub fn unbounded<T>() -> (ChannelTx<T>, ChannelRx<T>) {
let (tx, rx) = mpsc::unbounded_channel();
Expand All @@ -33,17 +38,22 @@ impl<T> ChannelTx<T> {
}

/// Receiver interface for a channel.
#[pin_project]
#[derive(Debug)]
pub struct ChannelRx<T>(mpsc::UnboundedReceiver<T>);
pub struct ChannelRx<T>(#[pin] mpsc::UnboundedReceiver<T>);

impl<T> ChannelRx<T> {
/// Wait indefinitely until we receive a value from the channel (or the
/// channel is closed).
pub async fn recv(&mut self) -> Option<T> {
self.0.recv().await
}
}

impl<T> Stream for ChannelRx<T> {
type Item = T;

pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
self.0.poll_recv(cx)
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().0.poll_next(cx)
}
}
2 changes: 1 addition & 1 deletion rpc/src/client/transport/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ impl WebSocketClientDriver {
pub async fn run(mut self) -> Result<()> {
let mut ping_interval =
tokio::time::interval_at(Instant::now().add(PING_INTERVAL), PING_INTERVAL);
let mut recv_timeout = tokio::time::delay_for(PING_INTERVAL);
let mut recv_timeout = tokio::time::sleep(PING_INTERVAL);
loop {
tokio::select! {
Some(res) = self.stream.next() => match res {
Expand Down
4 changes: 2 additions & 2 deletions tendermint/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ k256 = { version = "0.5", optional = true, features = ["ecdsa"] }
ripemd160 = { version = "0.9", optional = true }

[dev-dependencies]
tendermint-rpc = { path = "../rpc", features = [ "http-client", "websocket-client" ] }
tokio = { version = "0.2", features = [ "macros" ] }
tendermint-rpc = { path = "../rpc", features = ["http-client", "websocket-client"] }
tokio = { version = "0.3", features = ["macros"] }

[features]
secp256k1 = ["k256", "ripemd160"]
6 changes: 3 additions & 3 deletions tendermint/tests/integration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ mod rpc {
let mut cur_tx_id = 0_u32;

while !expected_tx_values.is_empty() {
let mut delay = tokio::time::delay_for(Duration::from_secs(3));
let mut delay = tokio::time::sleep(Duration::from_secs(3));
tokio::select! {
Some(res) = subs.next() => {
let ev = res.unwrap();
Expand Down Expand Up @@ -314,7 +314,7 @@ mod rpc {
.broadcast_tx_async(Transaction::from(tx.into_bytes()))
.await
.unwrap();
tokio::time::delay_for(Duration::from_millis(100)).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}
});

Expand All @@ -327,7 +327,7 @@ mod rpc {
);

while expected_new_blocks > 0 && !expected_tx_values.is_empty() {
let mut timeout = tokio::time::delay_for(Duration::from_secs(3));
let mut timeout = tokio::time::sleep(Duration::from_secs(3));
tokio::select! {
Some(res) = combined_subs.next() => {
let ev: Event = res.unwrap();
Expand Down

0 comments on commit 0fc7f40

Please sign in to comment.