diff --git a/http/Cargo.toml b/http/Cargo.toml index 6b18ddaa2..1cefc7b31 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -17,6 +17,6 @@ jsonrpc-server-utils = { version = "11.0", path = "../server-utils" } log = "0.4" net2 = "0.2" unicase = "2.0" - +parking_lot = "0.8.0" [badges] travis-ci = { repository = "paritytech/jsonrpc", branch = "master"} diff --git a/http/src/lib.rs b/http/src/lib.rs index 0848b6f77..00545d755 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -40,6 +40,8 @@ use std::net::SocketAddr; use std::sync::{mpsc, Arc}; use std::thread; +use parking_lot::Mutex; + use crate::jsonrpc::futures::sync::oneshot; use crate::jsonrpc::futures::{self, future, Future, Stream}; use crate::jsonrpc::MetaIoHandler; @@ -377,10 +379,12 @@ impl> ServerBuilder { let (local_addr_tx, local_addr_rx) = mpsc::channel(); let (close, shutdown_signal) = oneshot::channel(); + let (done_tx, done_rx) = oneshot::channel(); let eloop = self.executor.init_with_name("http.worker0")?; let req_max_size = self.max_request_body_size; + // The first threads `Executor` is initialised differently from the others serve( - (shutdown_signal, local_addr_tx), + (shutdown_signal, local_addr_tx, done_tx), eloop.executor(), addr.to_owned(), cors_domains.clone(), @@ -399,9 +403,10 @@ impl> ServerBuilder { .map(|i| { let (local_addr_tx, local_addr_rx) = mpsc::channel(); let (close, shutdown_signal) = oneshot::channel(); + let (done_tx, done_rx) = oneshot::channel(); let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?; serve( - (shutdown_signal, local_addr_tx), + (shutdown_signal, local_addr_tx, done_tx), eloop.executor(), addr.to_owned(), cors_domains.clone(), @@ -416,27 +421,34 @@ impl> ServerBuilder { reuse_port, req_max_size, ); - Ok((eloop, close, local_addr_rx)) + Ok((eloop, close, local_addr_rx, done_rx)) }) .collect::>>()?; // Wait for server initialization let local_addr = recv_address(local_addr_rx); // Wait for other threads as well. - let mut handles = handles + let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles .into_iter() - .map(|(eloop, close, local_addr_rx)| { + .map(|(eloop, close, local_addr_rx, done_rx)| { let _ = recv_address(local_addr_rx)?; - Ok((eloop, close)) + Ok((eloop, close, done_rx)) }) .collect::)>>()?; - handles.push((eloop, close)); - let (executors, close) = handles.into_iter().unzip(); + handles.push((eloop, close, done_rx)); + + let (executors, done_rxs) = handles + .into_iter() + .fold((vec![], vec![]), |mut acc, (eloop, closer, done_rx)| { + acc.0.push((eloop, closer)); + acc.1.push(done_rx); + acc + }); Ok(Server { address: local_addr?, - executor: Some(executors), - close: Some(close), + executors: Arc::new(Mutex::new(Some(executors))), + done: Some(done_rxs), }) } } @@ -448,7 +460,7 @@ fn recv_address(local_addr_rx: mpsc::Receiver>) -> io::Re } fn serve>( - signals: (oneshot::Receiver<()>, mpsc::Sender>), + signals: (oneshot::Receiver<()>, mpsc::Sender>, oneshot::Sender<()>), executor: tokio::runtime::TaskExecutor, addr: SocketAddr, cors_domains: CorsDomains, @@ -463,7 +475,7 @@ fn serve>( reuse_port: bool, max_request_body_size: usize, ) { - let (shutdown_signal, local_addr_tx) = signals; + let (shutdown_signal, local_addr_tx, done_tx) = signals; executor.spawn(future::lazy(move || { let handle = tokio::reactor::Handle::default(); @@ -537,12 +549,15 @@ fn serve>( .map_err(|e| { warn!("Incoming streams error, closing sever: {:?}", e); }) - .select(shutdown_signal.map_err(|e| { + .select(shutdown_signal + .map_err(|e| { debug!("Shutdown signaller dropped, closing server: {:?}", e); })) .map(|_| ()) .map_err(|_| ()) }) + }).and_then(|_| { + done_tx.send(()) })); } @@ -562,14 +577,31 @@ fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> { Ok(()) } +/// Handle used to close the server. Can be cloned and passed around to different threads and be used +/// to close a server that is `wait()`ing. + +#[derive(Clone)] +pub struct CloseHandle(Arc)>>>>); + +impl CloseHandle { + /// Shutdown a running server + pub fn close(self) { + if let Some(executors) = self.0.lock().take() { + for (executor, closer) in executors { + executor.close(); + let _ = closer.send(()); + } + } + } +} + /// jsonrpc http server instance pub struct Server { address: SocketAddr, - executor: Option>, - close: Option>>, + executors: Arc)>>>>, + done: Option>>, } -const PROOF: &str = "Server is always Some until self is consumed."; impl Server { /// Returns address of this server pub fn address(&self) -> &SocketAddr { @@ -577,30 +609,28 @@ impl Server { } /// Closes the server. - pub fn close(mut self) { - for close in self.close.take().expect(PROOF) { - let _ = close.send(()); - } - - for executor in self.executor.take().expect(PROOF) { - executor.close(); - } + pub fn close(self) { + self.close_handle().close() } /// Will block, waiting for the server to finish. pub fn wait(mut self) { - for executor in self.executor.take().expect(PROOF) { - executor.wait(); + if let Some(receivers) = self.done.take() { + for receiver in receivers { + let _ = receiver.wait(); + } } } + + /// Get a handle that allows us to close the server from a different thread and/or while the + /// server is `wait()`ing. + pub fn close_handle(&self) -> CloseHandle { + CloseHandle(self.executors.clone()) + } } impl Drop for Server { fn drop(&mut self) { - if let Some(executors) = self.executor.take() { - for executor in executors { - executor.close(); - } - }; + self.close_handle().close(); } } diff --git a/http/src/tests.rs b/http/src/tests.rs index 28cd772ee..1730be94d 100644 --- a/http/src/tests.rs +++ b/http/src/tests.rs @@ -4,6 +4,7 @@ use self::jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Value}; use std::io::{Read, Write}; use std::net::TcpStream; use std::str::Lines; +use std::time::Duration; use self::jsonrpc_core::futures::{self, Future}; use super::*; @@ -52,8 +53,6 @@ fn serve_allow_headers(cors_allow_headers: cors::AccessControlAllowHeaders) -> S } fn io() -> IoHandler { - use std::{thread, time}; - let mut io = IoHandler::default(); io.add_method("hello", |params: Params| match params.parse::<(u64,)>() { Ok((num,)) => Ok(Value::String(format!("world: {}", num))), @@ -66,7 +65,7 @@ fn io() -> IoHandler { io.add_method("hello_async2", |_params: Params| { let (c, p) = futures::oneshot(); thread::spawn(move || { - thread::sleep(time::Duration::from_millis(10)); + thread::sleep(Duration::from_millis(10)); c.send(Value::String("world".into())).unwrap(); }); p.map_err(|_| Error::invalid_request()) @@ -1406,6 +1405,24 @@ fn should_return_connection_header() { assert_eq!(response.body, world_batch()); } +#[test] +fn close_handle_makes_wait_return() { + let server = serve(id); + let close_handle = server.close_handle(); + + let (tx, rx) = mpsc::channel(); + + thread::spawn(move || { + tx.send(server.wait()).unwrap(); + }); + + thread::sleep(Duration::from_secs(3)); + + close_handle.close(); + + rx.recv_timeout(Duration::from_secs(10)).expect("Expected server to close"); +} + #[test] fn should_close_connection_without_keep_alive() { // given diff --git a/ipc/src/logger.rs b/ipc/src/logger.rs index c599dc34b..9b885a72d 100644 --- a/ipc/src/logger.rs +++ b/ipc/src/logger.rs @@ -10,7 +10,7 @@ lazy_static! { builder.filter(None, LevelFilter::Info); if let Ok(log) = env::var("RUST_LOG") { - builder.parse(&log); + builder.parse_filters(&log); } if let Ok(_) = builder.try_init() { diff --git a/tcp/src/logger.rs b/tcp/src/logger.rs index 8502fe870..6edd87759 100644 --- a/tcp/src/logger.rs +++ b/tcp/src/logger.rs @@ -8,7 +8,7 @@ lazy_static! { builder.filter(None, LevelFilter::Info); if let Ok(log) = env::var("RUST_LOG") { - builder.parse(&log); + builder.parse_filters(&log); } if let Ok(_) = builder.try_init() { diff --git a/ws/src/tests.rs b/ws/src/tests.rs index 1f4014637..d46e978f7 100644 --- a/ws/src/tests.rs +++ b/ws/src/tests.rs @@ -62,7 +62,6 @@ fn request(server: Server, request: &str) -> Response { fn serve(port: u16) -> (Server, Arc) { use crate::core::futures::sync::oneshot; - use std::time::Duration; let pending = Arc::new(AtomicUsize::new(0));