From aaaed575df6c7d8856ce27657690e051ffcc5a21 Mon Sep 17 00:00:00 2001 From: David Palm Date: Sat, 25 May 2019 10:50:13 +0200 Subject: [PATCH 1/7] Fix a few compiler warnings --- http/src/tests.rs | 5 ++--- ipc/src/logger.rs | 2 +- tcp/src/logger.rs | 2 +- ws/src/tests.rs | 1 - 4 files changed, 4 insertions(+), 6 deletions(-) diff --git a/http/src/tests.rs b/http/src/tests.rs index 28cd772ee..94a97e021 100644 --- a/http/src/tests.rs +++ b/http/src/tests.rs @@ -4,6 +4,7 @@ use self::jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Value}; use std::io::{Read, Write}; use std::net::TcpStream; use std::str::Lines; +use std::time::Duration; use self::jsonrpc_core::futures::{self, Future}; use super::*; @@ -52,8 +53,6 @@ fn serve_allow_headers(cors_allow_headers: cors::AccessControlAllowHeaders) -> S } fn io() -> IoHandler { - use std::{thread, time}; - let mut io = IoHandler::default(); io.add_method("hello", |params: Params| match params.parse::<(u64,)>() { Ok((num,)) => Ok(Value::String(format!("world: {}", num))), @@ -66,7 +65,7 @@ fn io() -> IoHandler { io.add_method("hello_async2", |_params: Params| { let (c, p) = futures::oneshot(); thread::spawn(move || { - thread::sleep(time::Duration::from_millis(10)); + thread::sleep(Duration::from_millis(10)); c.send(Value::String("world".into())).unwrap(); }); p.map_err(|_| Error::invalid_request()) diff --git a/ipc/src/logger.rs b/ipc/src/logger.rs index c599dc34b..9b885a72d 100644 --- a/ipc/src/logger.rs +++ b/ipc/src/logger.rs @@ -10,7 +10,7 @@ lazy_static! { builder.filter(None, LevelFilter::Info); if let Ok(log) = env::var("RUST_LOG") { - builder.parse(&log); + builder.parse_filters(&log); } if let Ok(_) = builder.try_init() { diff --git a/tcp/src/logger.rs b/tcp/src/logger.rs index 8502fe870..6edd87759 100644 --- a/tcp/src/logger.rs +++ b/tcp/src/logger.rs @@ -8,7 +8,7 @@ lazy_static! { builder.filter(None, LevelFilter::Info); if let Ok(log) = env::var("RUST_LOG") { - builder.parse(&log); + builder.parse_filters(&log); } if let Ok(_) = builder.try_init() { diff --git a/ws/src/tests.rs b/ws/src/tests.rs index 1f4014637..d46e978f7 100644 --- a/ws/src/tests.rs +++ b/ws/src/tests.rs @@ -62,7 +62,6 @@ fn request(server: Server, request: &str) -> Response { fn serve(port: u16) -> (Server, Arc) { use crate::core::futures::sync::oneshot; - use std::time::Duration; let pending = Arc::new(AtomicUsize::new(0)); From 91f72a8907d98c1ce432f27df6592e50884a67d1 Mon Sep 17 00:00:00 2001 From: David Palm Date: Sat, 25 May 2019 22:36:55 +0200 Subject: [PATCH 2/7] WIP --- http/Cargo.toml | 2 +- http/src/lib.rs | 153 +++++++++++++++++++++++++++++++++++----------- http/src/tests.rs | 25 ++++++++ 3 files changed, 144 insertions(+), 36 deletions(-) diff --git a/http/Cargo.toml b/http/Cargo.toml index 6b18ddaa2..1cefc7b31 100644 --- a/http/Cargo.toml +++ b/http/Cargo.toml @@ -17,6 +17,6 @@ jsonrpc-server-utils = { version = "11.0", path = "../server-utils" } log = "0.4" net2 = "0.2" unicase = "2.0" - +parking_lot = "0.8.0" [badges] travis-ci = { repository = "paritytech/jsonrpc", branch = "master"} diff --git a/http/src/lib.rs b/http/src/lib.rs index 0848b6f77..dc8e45984 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -40,6 +40,8 @@ use std::net::SocketAddr; use std::sync::{mpsc, Arc}; use std::thread; +use parking_lot::Mutex; + use crate::jsonrpc::futures::sync::oneshot; use crate::jsonrpc::futures::{self, future, Future, Stream}; use crate::jsonrpc::MetaIoHandler; @@ -377,10 +379,11 @@ impl> ServerBuilder { let (local_addr_tx, local_addr_rx) = mpsc::channel(); let (close, shutdown_signal) = oneshot::channel(); + let (is_done_tx, is_done_rx) = oneshot::channel(); let eloop = self.executor.init_with_name("http.worker0")?; let req_max_size = self.max_request_body_size; serve( - (shutdown_signal, local_addr_tx), + (shutdown_signal, local_addr_tx, is_done_tx), eloop.executor(), addr.to_owned(), cors_domains.clone(), @@ -399,9 +402,10 @@ impl> ServerBuilder { .map(|i| { let (local_addr_tx, local_addr_rx) = mpsc::channel(); let (close, shutdown_signal) = oneshot::channel(); + let (is_done_tx, is_done_rx) = oneshot::channel(); let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?; serve( - (shutdown_signal, local_addr_tx), + (shutdown_signal, local_addr_tx, is_done_tx), eloop.executor(), addr.to_owned(), cors_domains.clone(), @@ -416,27 +420,36 @@ impl> ServerBuilder { reuse_port, req_max_size, ); - Ok((eloop, close, local_addr_rx)) + Ok((eloop, close, local_addr_rx, is_done_rx)) }) .collect::>>()?; // Wait for server initialization let local_addr = recv_address(local_addr_rx); // Wait for other threads as well. - let mut handles = handles + let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles .into_iter() - .map(|(eloop, close, local_addr_rx)| { + .map(|(eloop, close, local_addr_rx, is_done_rx)| { let _ = recv_address(local_addr_rx)?; - Ok((eloop, close)) + Ok((eloop, close, is_done_rx)) }) .collect::)>>()?; - handles.push((eloop, close)); - let (executors, close) = handles.into_iter().unzip(); + handles.push((eloop, close, is_done_rx)); + + let mut executors = vec![]; + let mut closers = vec![]; + let mut is_done_rxs = vec![]; + for handle in handles { + executors.push(handle.0); + closers.push(handle.1); + is_done_rxs.push(handle.2); + } Ok(Server { address: local_addr?, - executor: Some(executors), - close: Some(close), + executors: Arc::new( Mutex::new(Some(executors))), + close: Arc::new(Mutex::new(Some(closers))), + done_sig: Some(is_done_rxs), }) } } @@ -448,7 +461,7 @@ fn recv_address(local_addr_rx: mpsc::Receiver>) -> io::Re } fn serve>( - signals: (oneshot::Receiver<()>, mpsc::Sender>), + signals: (oneshot::Receiver<()>, mpsc::Sender>, oneshot::Sender<()>), executor: tokio::runtime::TaskExecutor, addr: SocketAddr, cors_domains: CorsDomains, @@ -463,7 +476,7 @@ fn serve>( reuse_port: bool, max_request_body_size: usize, ) { - let (shutdown_signal, local_addr_tx) = signals; + let (shutdown_signal, local_addr_tx, is_done_tx) = signals; executor.spawn(future::lazy(move || { let handle = tokio::reactor::Handle::default(); @@ -535,14 +548,23 @@ fn serve>( Ok(()) }) .map_err(|e| { + println!("Incoming streams error, closing server"); warn!("Incoming streams error, closing sever: {:?}", e); }) - .select(shutdown_signal.map_err(|e| { - debug!("Shutdown signaller dropped, closing server: {:?}", e); - })) + .select(shutdown_signal + .inspect(|x| { + println!("Shutdown signaller got traffic {:?}", x); + }) + .map_err(|e| { + println!("Shutdown signaller dropped, closing server"); + debug!("Shutdown signaller dropped, closing server: {:?}", e); + })) .map(|_| ()) .map_err(|_| ()) }) + }).and_then(|_| { + println!("[serve] DONE!"); + is_done_tx.send(()) })); } @@ -562,11 +584,47 @@ fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> { Ok(()) } +#[derive(Clone)] +pub struct CloseHandle { + executors: Arc>>>, + closers: Arc>>>>, +} + +impl CloseHandle { + pub fn close(mut self) { + println!("[CLoseHandle, close] closing executors"); + if let Some(executors) = self.executors.lock().take() { + for executor in executors { executor.close() } + } + println!("[CLoseHandle, close] sending () to closers"); + if let Some(closers) = self.closers.lock().take() { + for closer in closers { let _ = closer.send(()); } + } + +// let execs = Arc::get_mut(&mut self.executors).expect("CloseHandle.close").take(); +// if let Some(executors) = execs { +// for executor in executors { +// executor.close() +// } +// } +// for executor in self.executors.take() { +// executor.close() +// } + } + +// pub fn wait(mut self) { +// if let Some(executors) = self.executors.lock().take() { +// for executor in executors { executor.wait() } +// } +// } +} + /// jsonrpc http server instance pub struct Server { address: SocketAddr, - executor: Option>, - close: Option>>, + executors: Arc>>>, + close: Arc>>>>, + done_sig: Option>>, } const PROOF: &str = "Server is always Some until self is consumed."; @@ -577,30 +635,55 @@ impl Server { } /// Closes the server. - pub fn close(mut self) { - for close in self.close.take().expect(PROOF) { - let _ = close.send(()); - } - - for executor in self.executor.take().expect(PROOF) { - executor.close(); - } + pub fn close(self) { + self.close_handle().close() +// for close in self.close.take().expect(PROOF) { +// let _ = close.send(()); +// } +// +// for executor in self.executors.take().expect(PROOF) { +// executor.close(); +// } } /// Will block, waiting for the server to finish. pub fn wait(mut self) { - for executor in self.executor.take().expect(PROOF) { - executor.wait(); + if let Some(ch) = self.done_sig.take() { + for c in ch { + c.map(|_|{println!("[Server.wait] Got signal")}).wait(); + } +// ch[0].map(|_| { +// println!("[Server.wait] got msg"); +// }).wait(); } +// self.done_sig.unwrap()[0].map(move |_| { +// println!("[Server.wait] got msg"); +// }).wait(); +// self.done_signal.recv() + +// self.close_handle().wait() +// let execs = Arc::get_mut(&mut self.executors).expect("Server.wait"); +// +//// for executor in self.executors.take().expect(PROOF) { +// for executor in execs.take().expect(PROOF) { +// executor.wait(); +// } } -} -impl Drop for Server { - fn drop(&mut self) { - if let Some(executors) = self.executor.take() { - for executor in executors { - executor.close(); - } - }; + pub fn close_handle(&self) -> CloseHandle { + CloseHandle { + executors: self.executors.clone(), + closers: self.close.clone(), + } } } + +//impl Drop for Server { +// fn drop(&mut self) { +// if let Some(executors) = self.executors.take() { +// for executor in executors { +// executor.close(); +// } +// }; +// } +//} diff --git a/http/src/tests.rs b/http/src/tests.rs index 94a97e021..84c8059d7 100644 --- a/http/src/tests.rs +++ b/http/src/tests.rs @@ -1405,6 +1405,31 @@ fn should_return_connection_header() { assert_eq!(response.body, world_batch()); } +#[test] +fn close_handle_makes_wait_return() { + let server = serve(id); + let close_handle = server.close_handle(); + let close_handle2 = close_handle.clone(); + + let (tx, rx) = mpsc::channel(); + + thread::spawn(move || { + println!("[tst thr] SENDING"); + tx.send(server.wait()).unwrap(); + println!("[tst thr] SENT"); + }); + + thread::sleep(Duration::from_secs(3)); + + println!("[tst] WAKING UP"); + + thread::spawn(move || {close_handle.close() }); + thread::spawn(move || {close_handle2.close() }); +// close_handle.close(); + + rx.recv_timeout(Duration::from_secs(10)).expect("Expected server to close"); +} + #[test] fn should_close_connection_without_keep_alive() { // given From ea8250bd78b066fb41c5c485a88ed922c40a7a56 Mon Sep 17 00:00:00 2001 From: David Palm Date: Sun, 26 May 2019 00:22:22 +0200 Subject: [PATCH 3/7] cleanup --- http/src/lib.rs | 112 ++++++++++++++++-------------------------------- 1 file changed, 37 insertions(+), 75 deletions(-) diff --git a/http/src/lib.rs b/http/src/lib.rs index dc8e45984..968a1faf0 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -244,7 +244,8 @@ impl> ServerBuilder { rest_api: RestApi::Disabled, health_api: None, keep_alive: true, - threads: 1, + threads: 4, +// threads: 1, max_request_body_size: 5 * 1024 * 1024, } } @@ -379,11 +380,12 @@ impl> ServerBuilder { let (local_addr_tx, local_addr_rx) = mpsc::channel(); let (close, shutdown_signal) = oneshot::channel(); - let (is_done_tx, is_done_rx) = oneshot::channel(); + let (done_tx, done_rx) = oneshot::channel(); let eloop = self.executor.init_with_name("http.worker0")?; let req_max_size = self.max_request_body_size; + // First thread `Executor` is initialised differently from the others serve( - (shutdown_signal, local_addr_tx, is_done_tx), + (shutdown_signal, local_addr_tx, done_tx), eloop.executor(), addr.to_owned(), cors_domains.clone(), @@ -402,10 +404,10 @@ impl> ServerBuilder { .map(|i| { let (local_addr_tx, local_addr_rx) = mpsc::channel(); let (close, shutdown_signal) = oneshot::channel(); - let (is_done_tx, is_done_rx) = oneshot::channel(); + let (done_tx, done_rx) = oneshot::channel(); let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?; serve( - (shutdown_signal, local_addr_tx, is_done_tx), + (shutdown_signal, local_addr_tx, done_tx), eloop.executor(), addr.to_owned(), cors_domains.clone(), @@ -420,7 +422,7 @@ impl> ServerBuilder { reuse_port, req_max_size, ); - Ok((eloop, close, local_addr_rx, is_done_rx)) + Ok((eloop, close, local_addr_rx, done_rx)) }) .collect::>>()?; @@ -429,27 +431,27 @@ impl> ServerBuilder { // Wait for other threads as well. let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles .into_iter() - .map(|(eloop, close, local_addr_rx, is_done_rx)| { + .map(|(eloop, close, local_addr_rx, done_rx)| { let _ = recv_address(local_addr_rx)?; - Ok((eloop, close, is_done_rx)) + Ok((eloop, close, done_rx)) }) .collect::)>>()?; - handles.push((eloop, close, is_done_rx)); - - let mut executors = vec![]; - let mut closers = vec![]; - let mut is_done_rxs = vec![]; - for handle in handles { - executors.push(handle.0); - closers.push(handle.1); - is_done_rxs.push(handle.2); - } + handles.push((eloop, close, done_rx)); + + let (executors, closers, done_rxs) = handles + .into_iter() + .fold((vec![], vec![], vec![]), |mut acc, (eloop, closer, done_rx)| { + acc.0.push(eloop); + acc.1.push(closer); + acc.2.push(done_rx); + acc + }); Ok(Server { address: local_addr?, executors: Arc::new( Mutex::new(Some(executors))), close: Arc::new(Mutex::new(Some(closers))), - done_sig: Some(is_done_rxs), + done: Some(done_rxs), }) } } @@ -476,7 +478,7 @@ fn serve>( reuse_port: bool, max_request_body_size: usize, ) { - let (shutdown_signal, local_addr_tx, is_done_tx) = signals; + let (shutdown_signal, local_addr_tx, done_tx) = signals; executor.spawn(future::lazy(move || { let handle = tokio::reactor::Handle::default(); @@ -564,7 +566,7 @@ fn serve>( }) }).and_then(|_| { println!("[serve] DONE!"); - is_done_tx.send(()) + done_tx.send(()) })); } @@ -591,7 +593,7 @@ pub struct CloseHandle { } impl CloseHandle { - pub fn close(mut self) { + pub fn close(self) { println!("[CLoseHandle, close] closing executors"); if let Some(executors) = self.executors.lock().take() { for executor in executors { executor.close() } @@ -600,23 +602,7 @@ impl CloseHandle { if let Some(closers) = self.closers.lock().take() { for closer in closers { let _ = closer.send(()); } } - -// let execs = Arc::get_mut(&mut self.executors).expect("CloseHandle.close").take(); -// if let Some(executors) = execs { -// for executor in executors { -// executor.close() -// } -// } -// for executor in self.executors.take() { -// executor.close() -// } - } - -// pub fn wait(mut self) { -// if let Some(executors) = self.executors.lock().take() { -// for executor in executors { executor.wait() } -// } -// } + } } /// jsonrpc http server instance @@ -624,10 +610,9 @@ pub struct Server { address: SocketAddr, executors: Arc>>>, close: Arc>>>>, - done_sig: Option>>, + done: Option>>, } -const PROOF: &str = "Server is always Some until self is consumed."; impl Server { /// Returns address of this server pub fn address(&self) -> &SocketAddr { @@ -637,39 +622,20 @@ impl Server { /// Closes the server. pub fn close(self) { self.close_handle().close() -// for close in self.close.take().expect(PROOF) { -// let _ = close.send(()); -// } -// -// for executor in self.executors.take().expect(PROOF) { -// executor.close(); -// } } /// Will block, waiting for the server to finish. pub fn wait(mut self) { - if let Some(ch) = self.done_sig.take() { - for c in ch { - c.map(|_|{println!("[Server.wait] Got signal")}).wait(); + if let Some(receivers) = self.done.take() { + for receiver in receivers { + receiver.map(|_| { println!("[Server.wait] Got signal") }).wait(); +// let _ = receiver.wait(); } -// ch[0].map(|_| { -// println!("[Server.wait] got msg"); -// }).wait(); } -// self.done_sig.unwrap()[0].map(move |_| { -// println!("[Server.wait] got msg"); -// }).wait(); -// self.done_signal.recv() - -// self.close_handle().wait() -// let execs = Arc::get_mut(&mut self.executors).expect("Server.wait"); -// -//// for executor in self.executors.take().expect(PROOF) { -// for executor in execs.take().expect(PROOF) { -// executor.wait(); -// } } + /// Get a handle that allows us to close the server from a different thread and/or while the + /// server is `wait()`ing. pub fn close_handle(&self) -> CloseHandle { CloseHandle { executors: self.executors.clone(), @@ -678,12 +644,8 @@ impl Server { } } -//impl Drop for Server { -// fn drop(&mut self) { -// if let Some(executors) = self.executors.take() { -// for executor in executors { -// executor.close(); -// } -// }; -// } -//} +impl Drop for Server { + fn drop(&mut self) { + self.close_handle().close(); + } +} From 1d155a911a4a91f33dede3912328e58457361dfd Mon Sep 17 00:00:00 2001 From: David Palm Date: Sun, 26 May 2019 00:29:23 +0200 Subject: [PATCH 4/7] Cleanup and docs --- http/src/lib.rs | 18 ++++++------------ http/src/tests.rs | 9 +-------- 2 files changed, 7 insertions(+), 20 deletions(-) diff --git a/http/src/lib.rs b/http/src/lib.rs index 968a1faf0..51a6b638a 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -244,8 +244,7 @@ impl> ServerBuilder { rest_api: RestApi::Disabled, health_api: None, keep_alive: true, - threads: 4, -// threads: 1, + threads: 1, max_request_body_size: 5 * 1024 * 1024, } } @@ -550,22 +549,16 @@ fn serve>( Ok(()) }) .map_err(|e| { - println!("Incoming streams error, closing server"); warn!("Incoming streams error, closing sever: {:?}", e); }) .select(shutdown_signal - .inspect(|x| { - println!("Shutdown signaller got traffic {:?}", x); - }) .map_err(|e| { - println!("Shutdown signaller dropped, closing server"); debug!("Shutdown signaller dropped, closing server: {:?}", e); })) .map(|_| ()) .map_err(|_| ()) }) }).and_then(|_| { - println!("[serve] DONE!"); done_tx.send(()) })); } @@ -586,6 +579,8 @@ fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> { Ok(()) } +/// Handle used to close the server. Can be cloned and passed around to different threads and be used +/// to close a server that is `wait()`ing. #[derive(Clone)] pub struct CloseHandle { executors: Arc>>>, @@ -593,12 +588,12 @@ pub struct CloseHandle { } impl CloseHandle { + /// Shutdown a running server pub fn close(self) { - println!("[CLoseHandle, close] closing executors"); if let Some(executors) = self.executors.lock().take() { for executor in executors { executor.close() } } - println!("[CLoseHandle, close] sending () to closers"); + if let Some(closers) = self.closers.lock().take() { for closer in closers { let _ = closer.send(()); } } @@ -628,8 +623,7 @@ impl Server { pub fn wait(mut self) { if let Some(receivers) = self.done.take() { for receiver in receivers { - receiver.map(|_| { println!("[Server.wait] Got signal") }).wait(); -// let _ = receiver.wait(); + let _ = receiver.wait(); } } } diff --git a/http/src/tests.rs b/http/src/tests.rs index 84c8059d7..1730be94d 100644 --- a/http/src/tests.rs +++ b/http/src/tests.rs @@ -1409,23 +1409,16 @@ fn should_return_connection_header() { fn close_handle_makes_wait_return() { let server = serve(id); let close_handle = server.close_handle(); - let close_handle2 = close_handle.clone(); let (tx, rx) = mpsc::channel(); thread::spawn(move || { - println!("[tst thr] SENDING"); tx.send(server.wait()).unwrap(); - println!("[tst thr] SENT"); }); thread::sleep(Duration::from_secs(3)); - println!("[tst] WAKING UP"); - - thread::spawn(move || {close_handle.close() }); - thread::spawn(move || {close_handle2.close() }); -// close_handle.close(); + close_handle.close(); rx.recv_timeout(Duration::from_secs(10)).expect("Expected server to close"); } From 375380673801278a3fad449bba75e762a74222d9 Mon Sep 17 00:00:00 2001 From: David Palm Date: Sun, 26 May 2019 00:40:16 +0200 Subject: [PATCH 5/7] Fix whitespace and grammar --- http/src/lib.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/http/src/lib.rs b/http/src/lib.rs index 51a6b638a..48fd001e8 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -382,7 +382,7 @@ impl> ServerBuilder { let (done_tx, done_rx) = oneshot::channel(); let eloop = self.executor.init_with_name("http.worker0")?; let req_max_size = self.max_request_body_size; - // First thread `Executor` is initialised differently from the others + // The first threads `Executor` is initialised differently from the others serve( (shutdown_signal, local_addr_tx, done_tx), eloop.executor(), @@ -552,9 +552,9 @@ fn serve>( warn!("Incoming streams error, closing sever: {:?}", e); }) .select(shutdown_signal - .map_err(|e| { - debug!("Shutdown signaller dropped, closing server: {:?}", e); - })) + .map_err(|e| { + debug!("Shutdown signaller dropped, closing server: {:?}", e); + })) .map(|_| ()) .map_err(|_| ()) }) From 3d42fd2f690a8257b571e804fee1793d42af4bbd Mon Sep 17 00:00:00 2001 From: David Date: Mon, 27 May 2019 14:32:19 +0200 Subject: [PATCH 6/7] Update http/src/lib.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-Authored-By: Tomasz Drwięga --- http/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http/src/lib.rs b/http/src/lib.rs index 48fd001e8..51b2a296e 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -448,7 +448,7 @@ impl> ServerBuilder { Ok(Server { address: local_addr?, - executors: Arc::new( Mutex::new(Some(executors))), + executors: Arc::new(Mutex::new(Some(executors))), close: Arc::new(Mutex::new(Some(closers))), done: Some(done_rxs), }) From b271b119bedf3bf215d381657404d0d9c55ab9dd Mon Sep 17 00:00:00 2001 From: David Palm Date: Mon, 27 May 2019 14:56:46 +0200 Subject: [PATCH 7/7] Now that wait() does not use the eventloop/signalling chans, just bundle them up in a tuple --- http/src/lib.rs | 35 +++++++++++++---------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/http/src/lib.rs b/http/src/lib.rs index 51b2a296e..00545d755 100644 --- a/http/src/lib.rs +++ b/http/src/lib.rs @@ -437,19 +437,17 @@ impl> ServerBuilder { .collect::)>>()?; handles.push((eloop, close, done_rx)); - let (executors, closers, done_rxs) = handles + let (executors, done_rxs) = handles .into_iter() - .fold((vec![], vec![], vec![]), |mut acc, (eloop, closer, done_rx)| { - acc.0.push(eloop); - acc.1.push(closer); - acc.2.push(done_rx); + .fold((vec![], vec![]), |mut acc, (eloop, closer, done_rx)| { + acc.0.push((eloop, closer)); + acc.1.push(done_rx); acc }); Ok(Server { address: local_addr?, executors: Arc::new(Mutex::new(Some(executors))), - close: Arc::new(Mutex::new(Some(closers))), done: Some(done_rxs), }) } @@ -581,21 +579,18 @@ fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> { /// Handle used to close the server. Can be cloned and passed around to different threads and be used /// to close a server that is `wait()`ing. + #[derive(Clone)] -pub struct CloseHandle { - executors: Arc>>>, - closers: Arc>>>>, -} +pub struct CloseHandle(Arc)>>>>); impl CloseHandle { /// Shutdown a running server pub fn close(self) { - if let Some(executors) = self.executors.lock().take() { - for executor in executors { executor.close() } - } - - if let Some(closers) = self.closers.lock().take() { - for closer in closers { let _ = closer.send(()); } + if let Some(executors) = self.0.lock().take() { + for (executor, closer) in executors { + executor.close(); + let _ = closer.send(()); + } } } } @@ -603,8 +598,7 @@ impl CloseHandle { /// jsonrpc http server instance pub struct Server { address: SocketAddr, - executors: Arc>>>, - close: Arc>>>>, + executors: Arc)>>>>, done: Option>>, } @@ -631,10 +625,7 @@ impl Server { /// Get a handle that allows us to close the server from a different thread and/or while the /// server is `wait()`ing. pub fn close_handle(&self) -> CloseHandle { - CloseHandle { - executors: self.executors.clone(), - closers: self.close.clone(), - } + CloseHandle(self.executors.clone()) } }