From 9b285b8a59b7a967def285b5e2b298f1a675cfc8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20F=C3=A4rnstrand?= Date: Tue, 30 May 2017 13:01:28 +0200 Subject: [PATCH 1/2] Add CloseHandle feature to ws server --- ws/src/lib.rs | 2 +- ws/src/server.rs | 39 +++++++++++++++++++++++++++++++-------- 2 files changed, 32 insertions(+), 9 deletions(-) diff --git a/ws/src/lib.rs b/ws/src/lib.rs index 4ca051deb..a3b569e32 100644 --- a/ws/src/lib.rs +++ b/ws/src/lib.rs @@ -16,8 +16,8 @@ mod session; mod tests; pub use self::metadata::{RequestContext, MetaExtractor, NoopExtractor}; -pub use self::server::Server; pub use self::session::{RequestMiddleware, MiddlewareAction}; +pub use self::server::{CloseHandle, Server}; pub use self::server_builder::{ServerBuilder, Error}; pub use self::server_utils::cors::Origin; pub use self::server_utils::hosts::{Host, DomainsValidation}; diff --git a/ws/src/server.rs b/ws/src/server.rs index 544faac0f..2e780684d 100644 --- a/ws/src/server.rs +++ b/ws/src/server.rs @@ -1,5 +1,5 @@ use std::net::SocketAddr; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::thread; use core; @@ -17,7 +17,7 @@ use {Error}; pub struct Server { addr: SocketAddr, handle: Option>>, - remote: Option, + remote: Arc>>, broadcaster: ws::Sender, } @@ -81,7 +81,7 @@ impl Server { Ok(Server { addr: local_addr, handle: Some(handle), - remote: Some(eloop), + remote: Arc::new(Mutex::new(Some(eloop))), broadcaster: broadcaster, }) } @@ -94,16 +94,39 @@ impl Server { } /// Closes the server and waits for it to finish - pub fn close(mut self) { - let _ = self.broadcaster.shutdown(); - self.remote.take().expect("Remote is always Some at start.").close(); + pub fn close(self) { + self.close_handle().close(); + } + + /// Returns a handle to the server that can be used to close it while another thread is + /// blocking in `wait`. + pub fn close_handle(&self) -> CloseHandle { + CloseHandle { + remote: self.remote.clone(), + broadcaster: self.broadcaster.clone(), + } } } impl Drop for Server { fn drop(&mut self) { - let _ = self.broadcaster.shutdown(); - self.remote.take().map(|remote| remote.close()); + self.close_handle().close(); self.handle.take().map(|handle| handle.join()); } } + + +/// A handle that allows closing of a server even if it owned by a thread blocked in `wait`. +#[derive(Clone)] +pub struct CloseHandle { + remote: Arc>>, + broadcaster: ws::Sender, +} + +impl CloseHandle { + /// Closes the `Server`. + pub fn close(self) { + let _ = self.broadcaster.shutdown(); + self.remote.lock().unwrap().take().map(|remote| remote.close()); + } +} From e5da3f069fb14f9a90278f09a1841f9149af541b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Linus=20F=C3=A4rnstrand?= Date: Thu, 8 Jun 2017 09:37:01 +0200 Subject: [PATCH 2/2] Add CloseHandle test --- ws/src/tests.rs | 23 +++++++++++++++++++++-- 1 file changed, 21 insertions(+), 2 deletions(-) diff --git a/ws/src/tests.rs b/ws/src/tests.rs index 7e8e7f5e2..ea9f05dad 100644 --- a/ws/src/tests.rs +++ b/ws/src/tests.rs @@ -1,6 +1,9 @@ -use std::str::Lines; -use std::net::{TcpStream, Ipv4Addr}; use std::io::{Read, Write}; +use std::net::{TcpStream, Ipv4Addr}; +use std::str::Lines; +use std::sync::mpsc; +use std::thread; +use std::time::Duration; use core; use core::futures::Future; @@ -170,3 +173,19 @@ fn bind_port_zero_should_give_random_port() { assert_eq!(Ipv4Addr::new(127, 0, 0, 1), server.addr().ip()); assert_ne!(0, server.addr().port()); } + +#[test] +fn close_handle_makes_wait_return() { + let server = serve(0); + let close_handle = server.close_handle(); + + let (tx, rx) = mpsc::channel(); + thread::spawn(move || { + tx.send(server.wait()).unwrap(); + }); + thread::sleep(Duration::from_secs(1)); + close_handle.close(); + + let result = rx.recv_timeout(Duration::from_secs(10)).expect("Expected server to close"); + assert!(result.is_ok()); +}