From e61797a2286446c49b955648217538cdc615be23 Mon Sep 17 00:00:00 2001 From: Carl Date: Tue, 23 Oct 2018 18:40:15 -0700 Subject: [PATCH 1/3] Add ability to call close on another thread after calling wait() on main server thread --- http/src/lib.rs | 54 ++++++++++++++++++++++++++++--------- http/src/tests.rs | 16 +++++++++++ server-utils/src/reactor.rs | 12 +++++++++ 3 files changed, 69 insertions(+), 13 deletions(-) diff --git a/http/src/lib.rs b/http/src/lib.rs index 514b4e2c5..26c98e755 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -40,7 +40,7 @@ mod utils; mod tests; use std::io; -use std::sync::{mpsc, Arc}; +use std::sync::{mpsc, Arc, Mutex}; use std::net::SocketAddr; use std::thread; @@ -430,7 +430,7 @@ impl> ServerBuilder { Ok(Server { address: local_addr?, executor: Some(executors), - close: Some(close), + close: Arc::new(Mutex::new(Some(close))), }) } } @@ -554,7 +554,7 @@ fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> { pub struct Server { address: SocketAddr, executor: Option>, - close: Option>>, + close: Arc>>>>, } const PROOF: &'static str = "Server is always Some until self is consumed."; @@ -566,13 +566,7 @@ impl Server { /// Closes the server. pub fn close(mut self) { - for close in self.close.take().expect(PROOF) { - let _ = close.send(()); - } - - for executor in self.executor.take().expect(PROOF) { - executor.close(); - } + self.close_handle().close(); } /// Will block, waiting for the server to finish. @@ -581,12 +575,46 @@ impl Server { executor.wait(); } } + + /// Returns a handle to the server that can be used to close it while another thread is + /// blocking in `wait`. + pub fn close_handle(&mut self) -> CloseHandle { + let executor_close: Option> = self.executor.as_mut().map(|executors| { + executors.iter_mut().map(|executor| executor.close_handle()).collect() + }); + + CloseHandle { + executor_close, + close: self.close.clone(), + } + } } impl Drop for Server { fn drop(&mut self) { - self.executor.take().map(|executors| { - for executor in executors { executor.close(); } - }); + self.close_handle().close(); } } + +/// A handle that allows closing of a server even if it owned by a thread blocked in `wait`. +pub struct CloseHandle { + close: Arc>>>>, + executor_close: Option>>>, +} + +impl CloseHandle { + /// Closes the `Server`. + pub fn close(mut self) { + self.close.lock().unwrap().take().map(|close_vector| { + for close in close_vector { + let _ = close.send(()); + } + }); + + self.executor_close.take().map(|executor_close_vector| { + for e in executor_close_vector { + e.map(|v| v.send(())); + } + }); + } +} \ No newline at end of file diff --git a/http/src/tests.rs b/http/src/tests.rs index 3fea71680..a7d3dfb66 100644 --- a/http/src/tests.rs +++ b/http/src/tests.rs @@ -3,6 +3,7 @@ extern crate jsonrpc_core; use std::str::Lines; use std::net::TcpStream; use std::io::{Read, Write}; +use std::time::Duration; use self::jsonrpc_core::{IoHandler, Params, Value, Error, ErrorCode}; use self::jsonrpc_core::futures::{self, Future}; @@ -1166,3 +1167,18 @@ fn world_5() -> String { fn world_batch() -> String { "[{\"jsonrpc\":\"2.0\",\"result\":\"world\",\"id\":1}]\n".into() } + +#[test] +fn close_handle_makes_wait_return() { + let mut server = serve(id); + let close_handle = server.close_handle(); + + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + tx.send(server.wait()).unwrap(); + }); + thread::sleep(Duration::from_secs(1)); + close_handle.close(); + + rx.recv_timeout(Duration::from_secs(10)).expect("Expected server to close"); +} \ No newline at end of file diff --git a/server-utils/src/reactor.rs b/server-utils/src/reactor.rs index 63083d734..9439d9283 100644 --- a/server-utils/src/reactor.rs +++ b/server-utils/src/reactor.rs @@ -62,6 +62,14 @@ impl Executor { self.executor().spawn(future) } + pub fn close_handle(&mut self) -> Option> { + if let Executor::Spawned(ref mut eloop) = self { + eloop.close_handle() + } else { + None + } + } + /// Closes underlying event loop (if any!). pub fn close(self) { if let Executor::Spawned(eloop) = self { @@ -163,4 +171,8 @@ impl RpcEventLoop { warn!("Event Loop is already finished. {:?}", e); }); } + + pub fn close_handle(&mut self) -> Option>{ + self.close.take() + } } From c4bc260d66e5811d0f700f5ddeb3f2f7ff7fe920 Mon Sep 17 00:00:00 2001 From: Carl Date: Wed, 24 Oct 2018 13:05:40 -0700 Subject: [PATCH 2/3] Change spaces to tabs, add missing documentation --- http/src/lib.rs | 18 +++++++++--------- server-utils/src/reactor.rs | 3 +++ 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/http/src/lib.rs b/http/src/lib.rs index 26c98e755..f869146f3 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -577,17 +577,17 @@ impl Server { } /// Returns a handle to the server that can be used to close it while another thread is - /// blocking in `wait`. - pub fn close_handle(&mut self) -> CloseHandle { + /// blocking in `wait`. + pub fn close_handle(&mut self) -> CloseHandle { let executor_close: Option> = self.executor.as_mut().map(|executors| { executors.iter_mut().map(|executor| executor.close_handle()).collect() }); - CloseHandle { - executor_close, - close: self.close.clone(), - } - } + CloseHandle { + executor_close, + close: self.close.clone(), + } + } } impl Drop for Server { @@ -598,8 +598,8 @@ impl Drop for Server { /// A handle that allows closing of a server even if it owned by a thread blocked in `wait`. pub struct CloseHandle { - close: Arc>>>>, - executor_close: Option>>>, + close: Arc>>>>, + executor_close: Option>>>, } impl CloseHandle { diff --git a/server-utils/src/reactor.rs b/server-utils/src/reactor.rs index 9439d9283..05a22f793 100644 --- a/server-utils/src/reactor.rs +++ b/server-utils/src/reactor.rs @@ -62,6 +62,7 @@ impl Executor { self.executor().spawn(future) } + /// Returns a handle to close the underlying event loop pub fn close_handle(&mut self) -> Option> { if let Executor::Spawned(ref mut eloop) = self { eloop.close_handle() @@ -172,6 +173,8 @@ impl RpcEventLoop { }); } + /// Returns the close signal that can be used to close the event loop even while + /// another thread is blocked on the event loop in "wait" pub fn close_handle(&mut self) -> Option>{ self.close.take() } From a08a0c8f2eebbef80d4c786c66e03666420b2b62 Mon Sep 17 00:00:00 2001 From: Carl Date: Fri, 26 Oct 2018 12:11:13 -0700 Subject: [PATCH 3/3] Make CloseHandle cloneable --- http/src/lib.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/http/src/lib.rs b/http/src/lib.rs index f869146f3..bdd03eaef 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -580,11 +580,14 @@ impl Server { /// blocking in `wait`. pub fn close_handle(&mut self) -> CloseHandle { let executor_close: Option> = self.executor.as_mut().map(|executors| { - executors.iter_mut().map(|executor| executor.close_handle()).collect() + executors + .iter_mut() + .map(|executor| executor.close_handle()) + .collect() }); CloseHandle { - executor_close, + executor_close: Arc::new(Mutex::new(executor_close)), close: self.close.clone(), } } @@ -597,24 +600,25 @@ impl Drop for Server { } /// A handle that allows closing of a server even if it owned by a thread blocked in `wait`. +#[derive(Clone)] pub struct CloseHandle { close: Arc>>>>, - executor_close: Option>>>, + executor_close: Arc>>>>>, } impl CloseHandle { - /// Closes the `Server`. - pub fn close(mut self) { + /// Closes the `Server`. + pub fn close(self) { self.close.lock().unwrap().take().map(|close_vector| { for close in close_vector { let _ = close.send(()); } }); - self.executor_close.take().map(|executor_close_vector| { + self.executor_close.lock().unwrap().take().map(|executor_close_vector| { for e in executor_close_vector { - e.map(|v| v.send(())); - } + e.map(|v| v.send(())); + } }); - } + } } \ No newline at end of file