diff --git a/http/src/lib.rs b/http/src/lib.rs index 514b4e2c5..bdd03eaef 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -40,7 +40,7 @@ mod utils; mod tests; use std::io; -use std::sync::{mpsc, Arc}; +use std::sync::{mpsc, Arc, Mutex}; use std::net::SocketAddr; use std::thread; @@ -430,7 +430,7 @@ impl> ServerBuilder { Ok(Server { address: local_addr?, executor: Some(executors), - close: Some(close), + close: Arc::new(Mutex::new(Some(close))), }) } } @@ -554,7 +554,7 @@ fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> { pub struct Server { address: SocketAddr, executor: Option>, - close: Option>>, + close: Arc>>>>, } const PROOF: &'static str = "Server is always Some until self is consumed."; @@ -566,13 +566,7 @@ impl Server { /// Closes the server. pub fn close(mut self) { - for close in self.close.take().expect(PROOF) { - let _ = close.send(()); - } - - for executor in self.executor.take().expect(PROOF) { - executor.close(); - } + self.close_handle().close(); } /// Will block, waiting for the server to finish. @@ -581,12 +575,50 @@ impl Server { executor.wait(); } } + + /// Returns a handle to the server that can be used to close it while another thread is + /// blocking in `wait`. + pub fn close_handle(&mut self) -> CloseHandle { + let executor_close: Option> = self.executor.as_mut().map(|executors| { + executors + .iter_mut() + .map(|executor| executor.close_handle()) + .collect() + }); + + CloseHandle { + executor_close: Arc::new(Mutex::new(executor_close)), + close: self.close.clone(), + } + } } impl Drop for Server { fn drop(&mut self) { - self.executor.take().map(|executors| { - for executor in executors { executor.close(); } - }); + self.close_handle().close(); } } + +/// A handle that allows closing of a server even if it owned by a thread blocked in `wait`. +#[derive(Clone)] +pub struct CloseHandle { + close: Arc>>>>, + executor_close: Arc>>>>>, +} + +impl CloseHandle { + /// Closes the `Server`. + pub fn close(self) { + self.close.lock().unwrap().take().map(|close_vector| { + for close in close_vector { + let _ = close.send(()); + } + }); + + self.executor_close.lock().unwrap().take().map(|executor_close_vector| { + for e in executor_close_vector { + e.map(|v| v.send(())); + } + }); + } +} \ No newline at end of file diff --git a/http/src/tests.rs b/http/src/tests.rs index 3fea71680..a7d3dfb66 100644 --- a/http/src/tests.rs +++ b/http/src/tests.rs @@ -3,6 +3,7 @@ extern crate jsonrpc_core; use std::str::Lines; use std::net::TcpStream; use std::io::{Read, Write}; +use std::time::Duration; use self::jsonrpc_core::{IoHandler, Params, Value, Error, ErrorCode}; use self::jsonrpc_core::futures::{self, Future}; @@ -1166,3 +1167,18 @@ fn world_5() -> String { fn world_batch() -> String { "[{\"jsonrpc\":\"2.0\",\"result\":\"world\",\"id\":1}]\n".into() } + +#[test] +fn close_handle_makes_wait_return() { + let mut server = serve(id); + let close_handle = server.close_handle(); + + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + tx.send(server.wait()).unwrap(); + }); + thread::sleep(Duration::from_secs(1)); + close_handle.close(); + + rx.recv_timeout(Duration::from_secs(10)).expect("Expected server to close"); +} \ No newline at end of file diff --git a/server-utils/src/reactor.rs b/server-utils/src/reactor.rs index 63083d734..05a22f793 100644 --- a/server-utils/src/reactor.rs +++ b/server-utils/src/reactor.rs @@ -62,6 +62,15 @@ impl Executor { self.executor().spawn(future) } + /// Returns a handle to close the underlying event loop + pub fn close_handle(&mut self) -> Option> { + if let Executor::Spawned(ref mut eloop) = self { + eloop.close_handle() + } else { + None + } + } + /// Closes underlying event loop (if any!). pub fn close(self) { if let Executor::Spawned(eloop) = self { @@ -163,4 +172,10 @@ impl RpcEventLoop { warn!("Event Loop is already finished. {:?}", e); }); } + + /// Returns the close signal that can be used to close the event loop even while + /// another thread is blocked on the event loop in "wait" + pub fn close_handle(&mut self) -> Option>{ + self.close.take() + } }