Skip to content

Commit

Permalink
Update to Tokio 1.0 (#628)
Browse files Browse the repository at this point in the history
* WIP: Update to Tokio 1.0

* ipc: Migrate remaining Unix test code to Tokio 1.0

* core-client: Don't depend on unused hyper/server feature

* http: Fix used feature set by hyper

* WIP: Bump to the transferred parity-tokio-ipc repo

* Use newly released version of parity-tokio-ipc

* Remove extra newline in Cargo.toml

* Refactor suspension slightly in SuspendableStream
  • Loading branch information
Xanewok committed Jul 7, 2021
1 parent 609d7a6 commit ac72c85
Show file tree
Hide file tree
Showing 11 changed files with 44 additions and 35 deletions.
8 changes: 4 additions & 4 deletions core-client/transports/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
url = "1.7"

hyper = { version = "0.13", optional = true }
hyper-tls = { version = "0.4", optional = true }
hyper = { version = "0.14", features = ["client", "http1"], optional = true }
hyper-tls = { version = "0.5", optional = true }
jsonrpc-server-utils = { version = "17.1", path = "../../server-utils", optional = true }
parity-tokio-ipc = { version = "0.8", optional = true }
tokio = { version = "0.2", optional = true }
parity-tokio-ipc = { version = "0.9", optional = true }
tokio = { version = "1", optional = true }
websocket = { version = "0.24", optional = true }

[dev-dependencies]
Expand Down
2 changes: 1 addition & 1 deletion http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ version = "17.1.0"

[dependencies]
futures = "0.3"
hyper = "0.13"
hyper = { version = "0.14", features = ["http1", "tcp", "server", "stream"] }
jsonrpc-core = { version = "17.1", path = "../core" }
jsonrpc-server-utils = { version = "17.1", path = "../server-utils" }
log = "0.4"
Expand Down
4 changes: 2 additions & 2 deletions ipc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ log = "0.4"
tower-service = "0.3"
jsonrpc-core = { version = "17.1", path = "../core" }
jsonrpc-server-utils = { version = "17.1", path = "../server-utils", default-features = false }
parity-tokio-ipc = "0.8"
parity-tokio-ipc = "0.9"
parking_lot = "0.11.0"

[dev-dependencies]
env_logger = "0.7"
lazy_static = "1.0"

[target.'cfg(not(windows))'.dev-dependencies]
tokio = { version = "0.2", default-features = false, features = ["uds", "time", "rt-threaded", "io-driver"] }
tokio = { version = "1", default-features = false, features = ["net", "time", "rt-multi-thread"] }

[badges]
travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}
7 changes: 4 additions & 3 deletions ipc/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,7 +360,7 @@ mod tests {
reply.expect("there should be one reply")
};

let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(reply).expect("wait for reply")
}

Expand Down Expand Up @@ -609,9 +609,10 @@ mod tests {
tx.send(true).expect("failed to report that the server has stopped");
});

let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async move {
let timeout = tokio::time::delay_for(Duration::from_millis(500));
let timeout = tokio::time::sleep(Duration::from_millis(500));
futures::pin_mut!(timeout);

match futures::future::select(rx, timeout).await {
futures::future::Either::Left((result, _)) => {
Expand Down
7 changes: 4 additions & 3 deletions server-utils/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,15 @@ repository = "https://github.com/paritytech/jsonrpc"
version = "17.1.0"

[dependencies]
bytes = "0.5"
bytes = "1.0"
futures = "0.3"
globset = "0.4"
jsonrpc-core = { version = "17.1", path = "../core" }
lazy_static = "1.1.0"
log = "0.4"
tokio = { version = "0.2", features = ["rt-threaded", "io-driver", "io-util", "time", "tcp"] }
tokio-util = { version = "0.3", features = ["codec"] }
tokio = { version = "1", features = ["rt-multi-thread", "io-util", "time", "net"] }
tokio-util = { version = "0.6", features = ["codec"] }
tokio-stream = { version = "0.1", features = ["net"] }

unicase = "2.0"

Expand Down
1 change: 1 addition & 0 deletions server-utils/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ extern crate log;
extern crate lazy_static;

pub use tokio;
pub use tokio_stream;
pub use tokio_util;

pub mod cors;
Expand Down
5 changes: 2 additions & 3 deletions server-utils/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,8 @@ impl RpcEventLoop {
pub fn with_name(name: Option<String>) -> io::Result<Self> {
let (stop, stopped) = futures::channel::oneshot::channel();

let mut tb = runtime::Builder::new();
tb.core_threads(1);
tb.threaded_scheduler();
let mut tb = runtime::Builder::new_multi_thread();
tb.worker_threads(1);
tb.enable_all();

if let Some(name) = name {
Expand Down
23 changes: 14 additions & 9 deletions server-utils/src/suspendable_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ use std::future::Future;
use std::io;
use std::pin::Pin;
use std::task::Poll;
use std::time::Duration;

use tokio::time::Delay;
use std::time::{Duration, Instant};

/// `Incoming` is a stream of incoming sockets
/// Polling the stream may return a temporary io::Error (for instance if we can't open the connection because of "too many open files" limit)
Expand All @@ -19,7 +17,7 @@ pub struct SuspendableStream<S> {
next_delay: Duration,
initial_delay: Duration,
max_delay: Duration,
timeout: Option<Delay>,
suspended_until: Option<Instant>,
}

impl<S> SuspendableStream<S> {
Expand All @@ -31,7 +29,7 @@ impl<S> SuspendableStream<S> {
next_delay: Duration::from_millis(20),
initial_delay: Duration::from_millis(10),
max_delay: Duration::from_secs(5),
timeout: None,
suspended_until: None,
}
}
}
Expand All @@ -44,10 +42,17 @@ where

fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll<Option<Self::Item>> {
loop {
if let Some(timeout) = self.timeout.as_mut() {
match Pin::new(timeout).poll(cx) {
// If we encountered a connection error before then we suspend
// polling from the underlying stream for a bit
if let Some(deadline) = &mut self.suspended_until {
let deadline = tokio::time::Instant::from_std(*deadline);
let sleep = tokio::time::sleep_until(deadline);
futures::pin_mut!(sleep);
match sleep.poll(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(()) => {}
Poll::Ready(()) => {
self.suspended_until = None;
}
}
}

Expand Down Expand Up @@ -78,7 +83,7 @@ where
};
debug!("Error accepting connection: {}", err);
debug!("The server will stop accepting connections for {:?}", self.next_delay);
self.timeout = Some(tokio::time::delay_for(self.next_delay));
self.suspended_until = Some(Instant::now() + self.next_delay);
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions stdio/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ version = "17.1.0"
futures = "0.3"
jsonrpc-core = { version = "17.1", path = "../core" }
log = "0.4"
tokio = { version = "0.2", features = ["io-std", "io-driver", "io-util"] }
tokio-util = { version = "0.3", features = ["codec"] }
tokio = { version = "1", features = ["io-std", "io-util"] }
tokio-util = { version = "0.6", features = ["codec"] }

[dev-dependencies]
tokio = { version = "0.2", features = ["rt-core", "macros"] }
tokio = { version = "1", features = ["rt", "macros"] }
lazy_static = "1.0"
env_logger = "0.7"

Expand Down
2 changes: 2 additions & 0 deletions tcp/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use tower_service::Service as _;

use crate::futures::{self, future};
use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware};
use crate::server_utils::tokio_stream::wrappers::TcpListenerStream;
use crate::server_utils::{codecs, reactor, tokio, tokio_util::codec::Framed, SuspendableStream};

use crate::dispatch::{Dispatcher, PeerMessageQueue, SenderChannels};
Expand Down Expand Up @@ -94,6 +95,7 @@ where
executor.executor().spawn(async move {
let start = async {
let listener = tokio::net::TcpListener::bind(&address).await?;
let listener = TcpListenerStream::new(listener);
let connections = SuspendableStream::new(listener);

let server = connections.map(|socket| {
Expand Down
14 changes: 7 additions & 7 deletions tcp/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::net::{Shutdown, SocketAddr};
use std::net::SocketAddr;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -23,7 +23,7 @@ fn casual_server() -> ServerBuilder {
}

fn run_future<O>(fut: impl std::future::Future<Output = O> + Send) -> O {
let mut rt = tokio::runtime::Runtime::new().unwrap();
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(fut)
}

Expand Down Expand Up @@ -60,9 +60,9 @@ fn disconnect() {
let _server = server.start(&addr).expect("Server must run with no issues");

run_future(async move {
let stream = TcpStream::connect(&addr).await.unwrap();
let mut stream = TcpStream::connect(&addr).await.unwrap();
assert_eq!(stream.peer_addr().unwrap(), addr);
stream.shutdown(::std::net::Shutdown::Both).unwrap();
stream.shutdown().await.unwrap();
});

::std::thread::sleep(::std::time::Duration::from_millis(50));
Expand All @@ -76,7 +76,7 @@ fn dummy_request(addr: &SocketAddr, data: Vec<u8>) -> Vec<u8> {
let stream = async move {
let mut stream = TcpStream::connect(addr).await?;
stream.write_all(&data).await?;
stream.shutdown(Shutdown::Write)?;
stream.shutdown().await?;
let mut read_buf = vec![];
let _ = stream.read_to_end(&mut read_buf).await;

Expand Down Expand Up @@ -243,7 +243,7 @@ fn message() {

let client = async move {
let stream = TcpStream::connect(&addr);
let delay = tokio::time::delay_for(Duration::from_millis(500));
let delay = tokio::time::sleep(Duration::from_millis(500));
let (stream, _) = futures::join!(stream, delay);
let mut stream = stream?;

Expand Down Expand Up @@ -272,7 +272,7 @@ fn message() {
let data = b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n";
stream.write_all(&data[..]).await?;

stream.shutdown(Shutdown::Write).unwrap();
stream.shutdown().await.unwrap();
let mut read_buf = vec![];
let _ = stream.read_to_end(&mut read_buf).await?;

Expand Down

0 comments on commit ac72c85

Please sign in to comment.