diff --git a/core-client/transports/Cargo.toml b/core-client/transports/Cargo.toml index af4b3a62a..f5617df2c 100644 --- a/core-client/transports/Cargo.toml +++ b/core-client/transports/Cargo.toml @@ -44,11 +44,11 @@ serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" url = "1.7" -hyper = { version = "0.13", optional = true } -hyper-tls = { version = "0.4", optional = true } +hyper = { version = "0.14", features = ["client", "http1"], optional = true } +hyper-tls = { version = "0.5", optional = true } jsonrpc-server-utils = { version = "17.1", path = "../../server-utils", optional = true } -parity-tokio-ipc = { version = "0.8", optional = true } -tokio = { version = "0.2", optional = true } +parity-tokio-ipc = { version = "0.9", optional = true } +tokio = { version = "1", optional = true } websocket = { version = "0.24", optional = true } [dev-dependencies] diff --git a/http/Cargo.toml b/http/Cargo.toml index 1e6172abc..8208268f2 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -12,7 +12,7 @@ version = "17.1.0" [dependencies] futures = "0.3" -hyper = "0.13" +hyper = { version = "0.14", features = ["http1", "tcp", "server", "stream"] } jsonrpc-core = { version = "17.1", path = "../core" } jsonrpc-server-utils = { version = "17.1", path = "../server-utils" } log = "0.4" diff --git a/ipc/Cargo.toml b/ipc/Cargo.toml index 0eef09da3..6f3d9adaf 100644 --- a/ipc/Cargo.toml +++ b/ipc/Cargo.toml @@ -15,7 +15,7 @@ log = "0.4" tower-service = "0.3" jsonrpc-core = { version = "17.1", path = "../core" } jsonrpc-server-utils = { version = "17.1", path = "../server-utils", default-features = false } -parity-tokio-ipc = "0.8" +parity-tokio-ipc = "0.9" parking_lot = "0.11.0" [dev-dependencies] @@ -23,7 +23,7 @@ env_logger = "0.7" lazy_static = "1.0" [target.'cfg(not(windows))'.dev-dependencies] -tokio = { version = "0.2", default-features = false, features = ["uds", "time", "rt-threaded", "io-driver"] } +tokio = { version = "1", default-features = false, features = ["net", "time", "rt-multi-thread"] } [badges] travis-ci = { repository = "paritytech/jsonrpc", branch = "master"} diff --git a/ipc/src/server.rs b/ipc/src/server.rs index 107150304..1b8fd0969 100644 --- a/ipc/src/server.rs +++ b/ipc/src/server.rs @@ -360,7 +360,7 @@ mod tests { reply.expect("there should be one reply") }; - let mut rt = tokio::runtime::Runtime::new().unwrap(); + let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(reply).expect("wait for reply") } @@ -609,9 +609,10 @@ mod tests { tx.send(true).expect("failed to report that the server has stopped"); }); - let mut rt = tokio::runtime::Runtime::new().unwrap(); + let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(async move { - let timeout = tokio::time::delay_for(Duration::from_millis(500)); + let timeout = tokio::time::sleep(Duration::from_millis(500)); + futures::pin_mut!(timeout); match futures::future::select(rx, timeout).await { futures::future::Either::Left((result, _)) => { diff --git a/server-utils/Cargo.toml b/server-utils/Cargo.toml index aacbce4f0..5a6e27f18 100644 --- a/server-utils/Cargo.toml +++ b/server-utils/Cargo.toml @@ -11,14 +11,15 @@ repository = "https://github.com/paritytech/jsonrpc" version = "17.1.0" [dependencies] -bytes = "0.5" +bytes = "1.0" futures = "0.3" globset = "0.4" jsonrpc-core = { version = "17.1", path = "../core" } lazy_static = "1.1.0" log = "0.4" -tokio = { version = "0.2", features = ["rt-threaded", "io-driver", "io-util", "time", "tcp"] } -tokio-util = { version = "0.3", features = ["codec"] } +tokio = { version = "1", features = ["rt-multi-thread", "io-util", "time", "net"] } +tokio-util = { version = "0.6", features = ["codec"] } +tokio-stream = { version = "0.1", features = ["net"] } unicase = "2.0" diff --git a/server-utils/src/lib.rs b/server-utils/src/lib.rs index 5c9f52d1a..3d6cd5cc3 100644 --- a/server-utils/src/lib.rs +++ b/server-utils/src/lib.rs @@ -9,6 +9,7 @@ extern crate log; extern crate lazy_static; pub use tokio; +pub use tokio_stream; pub use tokio_util; pub mod cors; diff --git a/server-utils/src/reactor.rs b/server-utils/src/reactor.rs index df8afd408..041328528 100644 --- a/server-utils/src/reactor.rs +++ b/server-utils/src/reactor.rs @@ -96,9 +96,8 @@ impl RpcEventLoop { pub fn with_name(name: Option) -> io::Result { let (stop, stopped) = futures::channel::oneshot::channel(); - let mut tb = runtime::Builder::new(); - tb.core_threads(1); - tb.threaded_scheduler(); + let mut tb = runtime::Builder::new_multi_thread(); + tb.worker_threads(1); tb.enable_all(); if let Some(name) = name { diff --git a/server-utils/src/suspendable_stream.rs b/server-utils/src/suspendable_stream.rs index 8d3179da9..96af9c91a 100644 --- a/server-utils/src/suspendable_stream.rs +++ b/server-utils/src/suspendable_stream.rs @@ -2,9 +2,7 @@ use std::future::Future; use std::io; use std::pin::Pin; use std::task::Poll; -use std::time::Duration; - -use tokio::time::Delay; +use std::time::{Duration, Instant}; /// `Incoming` is a stream of incoming sockets /// Polling the stream may return a temporary io::Error (for instance if we can't open the connection because of "too many open files" limit) @@ -19,7 +17,7 @@ pub struct SuspendableStream { next_delay: Duration, initial_delay: Duration, max_delay: Duration, - timeout: Option, + suspended_until: Option, } impl SuspendableStream { @@ -31,7 +29,7 @@ impl SuspendableStream { next_delay: Duration::from_millis(20), initial_delay: Duration::from_millis(10), max_delay: Duration::from_secs(5), - timeout: None, + suspended_until: None, } } } @@ -44,10 +42,17 @@ where fn poll_next(mut self: Pin<&mut Self>, cx: &mut std::task::Context<'_>) -> std::task::Poll> { loop { - if let Some(timeout) = self.timeout.as_mut() { - match Pin::new(timeout).poll(cx) { + // If we encountered a connection error before then we suspend + // polling from the underlying stream for a bit + if let Some(deadline) = &mut self.suspended_until { + let deadline = tokio::time::Instant::from_std(*deadline); + let sleep = tokio::time::sleep_until(deadline); + futures::pin_mut!(sleep); + match sleep.poll(cx) { Poll::Pending => return Poll::Pending, - Poll::Ready(()) => {} + Poll::Ready(()) => { + self.suspended_until = None; + } } } @@ -78,7 +83,7 @@ where }; debug!("Error accepting connection: {}", err); debug!("The server will stop accepting connections for {:?}", self.next_delay); - self.timeout = Some(tokio::time::delay_for(self.next_delay)); + self.suspended_until = Some(Instant::now() + self.next_delay); } } } diff --git a/stdio/Cargo.toml b/stdio/Cargo.toml index 8c907431f..2e5c91ba9 100644 --- a/stdio/Cargo.toml +++ b/stdio/Cargo.toml @@ -13,11 +13,11 @@ version = "17.1.0" futures = "0.3" jsonrpc-core = { version = "17.1", path = "../core" } log = "0.4" -tokio = { version = "0.2", features = ["io-std", "io-driver", "io-util"] } -tokio-util = { version = "0.3", features = ["codec"] } +tokio = { version = "1", features = ["io-std", "io-util"] } +tokio-util = { version = "0.6", features = ["codec"] } [dev-dependencies] -tokio = { version = "0.2", features = ["rt-core", "macros"] } +tokio = { version = "1", features = ["rt", "macros"] } lazy_static = "1.0" env_logger = "0.7" diff --git a/tcp/src/server.rs b/tcp/src/server.rs index 55a2b8d4a..9e6aad222 100644 --- a/tcp/src/server.rs +++ b/tcp/src/server.rs @@ -7,6 +7,7 @@ use tower_service::Service as _; use crate::futures::{self, future}; use crate::jsonrpc::{middleware, MetaIoHandler, Metadata, Middleware}; +use crate::server_utils::tokio_stream::wrappers::TcpListenerStream; use crate::server_utils::{codecs, reactor, tokio, tokio_util::codec::Framed, SuspendableStream}; use crate::dispatch::{Dispatcher, PeerMessageQueue, SenderChannels}; @@ -94,6 +95,7 @@ where executor.executor().spawn(async move { let start = async { let listener = tokio::net::TcpListener::bind(&address).await?; + let listener = TcpListenerStream::new(listener); let connections = SuspendableStream::new(listener); let server = connections.map(|socket| { diff --git a/tcp/src/tests.rs b/tcp/src/tests.rs index b95e1b288..36d7c45c8 100644 --- a/tcp/src/tests.rs +++ b/tcp/src/tests.rs @@ -1,4 +1,4 @@ -use std::net::{Shutdown, SocketAddr}; +use std::net::SocketAddr; use std::str::FromStr; use std::sync::Arc; use std::time::Duration; @@ -23,7 +23,7 @@ fn casual_server() -> ServerBuilder { } fn run_future(fut: impl std::future::Future + Send) -> O { - let mut rt = tokio::runtime::Runtime::new().unwrap(); + let rt = tokio::runtime::Runtime::new().unwrap(); rt.block_on(fut) } @@ -60,9 +60,9 @@ fn disconnect() { let _server = server.start(&addr).expect("Server must run with no issues"); run_future(async move { - let stream = TcpStream::connect(&addr).await.unwrap(); + let mut stream = TcpStream::connect(&addr).await.unwrap(); assert_eq!(stream.peer_addr().unwrap(), addr); - stream.shutdown(::std::net::Shutdown::Both).unwrap(); + stream.shutdown().await.unwrap(); }); ::std::thread::sleep(::std::time::Duration::from_millis(50)); @@ -76,7 +76,7 @@ fn dummy_request(addr: &SocketAddr, data: Vec) -> Vec { let stream = async move { let mut stream = TcpStream::connect(addr).await?; stream.write_all(&data).await?; - stream.shutdown(Shutdown::Write)?; + stream.shutdown().await?; let mut read_buf = vec![]; let _ = stream.read_to_end(&mut read_buf).await; @@ -243,7 +243,7 @@ fn message() { let client = async move { let stream = TcpStream::connect(&addr); - let delay = tokio::time::delay_for(Duration::from_millis(500)); + let delay = tokio::time::sleep(Duration::from_millis(500)); let (stream, _) = futures::join!(stream, delay); let mut stream = stream?; @@ -272,7 +272,7 @@ fn message() { let data = b"{\"jsonrpc\": \"2.0\", \"method\": \"say_hello\", \"params\": [42, 23], \"id\": 1}\n"; stream.write_all(&data[..]).await?; - stream.shutdown(Shutdown::Write).unwrap(); + stream.shutdown().await.unwrap(); let mut read_buf = vec![]; let _ = stream.read_to_end(&mut read_buf).await?;