Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ws/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ mod session;
mod tests;

pub use self::metadata::{RequestContext, MetaExtractor, NoopExtractor};
pub use self::server::Server;
pub use self::session::{RequestMiddleware, MiddlewareAction};
pub use self::server::{CloseHandle, Server};
pub use self::server_builder::{ServerBuilder, Error};
pub use self::server_utils::cors::Origin;
pub use self::server_utils::hosts::{Host, DomainsValidation};
Expand Down
39 changes: 31 additions & 8 deletions ws/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::net::SocketAddr;
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use std::thread;

use core;
Expand All @@ -17,7 +17,7 @@ use {Error};
pub struct Server {
addr: SocketAddr,
handle: Option<thread::JoinHandle<Result<(), Error>>>,
remote: Option<Remote>,
remote: Arc<Mutex<Option<Remote>>>,
broadcaster: ws::Sender,
}

Expand Down Expand Up @@ -81,7 +81,7 @@ impl Server {
Ok(Server {
addr: local_addr,
handle: Some(handle),
remote: Some(eloop),
remote: Arc::new(Mutex::new(Some(eloop))),
broadcaster: broadcaster,
})
}
Expand All @@ -94,16 +94,39 @@ impl Server {
}

/// Closes the server and waits for it to finish
pub fn close(mut self) {
let _ = self.broadcaster.shutdown();
self.remote.take().expect("Remote is always Some at start.").close();
pub fn close(self) {
self.close_handle().close();
}

/// Returns a handle to the server that can be used to close it while another thread is
/// blocking in `wait`.
pub fn close_handle(&self) -> CloseHandle {
CloseHandle {
remote: self.remote.clone(),
broadcaster: self.broadcaster.clone(),
}
}
}

impl Drop for Server {
fn drop(&mut self) {
let _ = self.broadcaster.shutdown();
self.remote.take().map(|remote| remote.close());
self.close_handle().close();
self.handle.take().map(|handle| handle.join());
}
}


/// A handle that allows closing of a server even if it owned by a thread blocked in `wait`.
#[derive(Clone)]
pub struct CloseHandle {
remote: Arc<Mutex<Option<Remote>>>,
broadcaster: ws::Sender,
}

impl CloseHandle {
/// Closes the `Server`.
pub fn close(self) {
let _ = self.broadcaster.shutdown();
self.remote.lock().unwrap().take().map(|remote| remote.close());
}
}
23 changes: 21 additions & 2 deletions ws/src/tests.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use std::str::Lines;
use std::net::{TcpStream, Ipv4Addr};
use std::io::{Read, Write};
use std::net::{TcpStream, Ipv4Addr};
use std::str::Lines;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

use core;
use core::futures::Future;
Expand Down Expand Up @@ -170,3 +173,19 @@ fn bind_port_zero_should_give_random_port() {
assert_eq!(Ipv4Addr::new(127, 0, 0, 1), server.addr().ip());
assert_ne!(0, server.addr().port());
}

#[test]
fn close_handle_makes_wait_return() {
let server = serve(0);
let close_handle = server.close_handle();

let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(server.wait()).unwrap();
});
thread::sleep(Duration::from_secs(1));
close_handle.close();

let result = rx.recv_timeout(Duration::from_secs(10)).expect("Expected server to close");
assert!(result.is_ok());
}