Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,6 @@ jsonrpc-server-utils = { version = "11.0", path = "../server-utils" }
log = "0.4"
net2 = "0.2"
unicase = "2.0"

parking_lot = "0.8.0"
[badges]
travis-ci = { repository = "paritytech/jsonrpc", branch = "master"}
92 changes: 61 additions & 31 deletions http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ use std::net::SocketAddr;
use std::sync::{mpsc, Arc};
use std::thread;

use parking_lot::Mutex;

use crate::jsonrpc::futures::sync::oneshot;
use crate::jsonrpc::futures::{self, future, Future, Stream};
use crate::jsonrpc::MetaIoHandler;
Expand Down Expand Up @@ -377,10 +379,12 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {

let (local_addr_tx, local_addr_rx) = mpsc::channel();
let (close, shutdown_signal) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
let eloop = self.executor.init_with_name("http.worker0")?;
let req_max_size = self.max_request_body_size;
// The first threads `Executor` is initialised differently from the others
serve(
(shutdown_signal, local_addr_tx),
(shutdown_signal, local_addr_tx, done_tx),
eloop.executor(),
addr.to_owned(),
cors_domains.clone(),
Expand All @@ -399,9 +403,10 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
.map(|i| {
let (local_addr_tx, local_addr_rx) = mpsc::channel();
let (close, shutdown_signal) = oneshot::channel();
let (done_tx, done_rx) = oneshot::channel();
let eloop = UninitializedExecutor::Unspawned.init_with_name(format!("http.worker{}", i + 1))?;
serve(
(shutdown_signal, local_addr_tx),
(shutdown_signal, local_addr_tx, done_tx),
eloop.executor(),
addr.to_owned(),
cors_domains.clone(),
Expand All @@ -416,27 +421,34 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
reuse_port,
req_max_size,
);
Ok((eloop, close, local_addr_rx))
Ok((eloop, close, local_addr_rx, done_rx))
})
.collect::<io::Result<Vec<_>>>()?;

// Wait for server initialization
let local_addr = recv_address(local_addr_rx);
// Wait for other threads as well.
let mut handles = handles
let mut handles: Vec<(Executor, oneshot::Sender<()>, oneshot::Receiver<()>)> = handles
.into_iter()
.map(|(eloop, close, local_addr_rx)| {
.map(|(eloop, close, local_addr_rx, done_rx)| {
let _ = recv_address(local_addr_rx)?;
Ok((eloop, close))
Ok((eloop, close, done_rx))
})
.collect::<io::Result<(Vec<_>)>>()?;
handles.push((eloop, close));
let (executors, close) = handles.into_iter().unzip();
handles.push((eloop, close, done_rx));

let (executors, done_rxs) = handles
.into_iter()
.fold((vec![], vec![]), |mut acc, (eloop, closer, done_rx)| {
acc.0.push((eloop, closer));
acc.1.push(done_rx);
acc
});

Ok(Server {
address: local_addr?,
executor: Some(executors),
close: Some(close),
executors: Arc::new(Mutex::new(Some(executors))),
done: Some(done_rxs),
})
}
}
Expand All @@ -448,7 +460,7 @@ fn recv_address(local_addr_rx: mpsc::Receiver<io::Result<SocketAddr>>) -> io::Re
}

fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>),
signals: (oneshot::Receiver<()>, mpsc::Sender<io::Result<SocketAddr>>, oneshot::Sender<()>),
executor: tokio::runtime::TaskExecutor,
addr: SocketAddr,
cors_domains: CorsDomains,
Expand All @@ -463,7 +475,7 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
reuse_port: bool,
max_request_body_size: usize,
) {
let (shutdown_signal, local_addr_tx) = signals;
let (shutdown_signal, local_addr_tx, done_tx) = signals;
executor.spawn(future::lazy(move || {
let handle = tokio::reactor::Handle::default();

Expand Down Expand Up @@ -537,12 +549,15 @@ fn serve<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>>(
.map_err(|e| {
warn!("Incoming streams error, closing sever: {:?}", e);
})
.select(shutdown_signal.map_err(|e| {
.select(shutdown_signal
.map_err(|e| {
debug!("Shutdown signaller dropped, closing server: {:?}", e);
}))
.map(|_| ())
.map_err(|_| ())
})
}).and_then(|_| {
done_tx.send(())
}));
}

Expand All @@ -562,45 +577,60 @@ fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
Ok(())
}

/// Handle used to close the server. Can be cloned and passed around to different threads and be used
/// to close a server that is `wait()`ing.

#[derive(Clone)]
pub struct CloseHandle(Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>);

impl CloseHandle {
/// Shutdown a running server
pub fn close(self) {
if let Some(executors) = self.0.lock().take() {
for (executor, closer) in executors {
executor.close();
let _ = closer.send(());
}
}
}
}

/// jsonrpc http server instance
pub struct Server {
address: SocketAddr,
executor: Option<Vec<Executor>>,
close: Option<Vec<oneshot::Sender<()>>>,
executors: Arc<Mutex<Option<Vec<(Executor, oneshot::Sender<()>)>>>>,
done: Option<Vec<oneshot::Receiver<()>>>,
}

const PROOF: &str = "Server is always Some until self is consumed.";
impl Server {
/// Returns address of this server
pub fn address(&self) -> &SocketAddr {
&self.address
}

/// Closes the server.
pub fn close(mut self) {
for close in self.close.take().expect(PROOF) {
let _ = close.send(());
}

for executor in self.executor.take().expect(PROOF) {
executor.close();
}
pub fn close(self) {
self.close_handle().close()
}

/// Will block, waiting for the server to finish.
pub fn wait(mut self) {
for executor in self.executor.take().expect(PROOF) {
executor.wait();
if let Some(receivers) = self.done.take() {
for receiver in receivers {
let _ = receiver.wait();
Copy link

@carllin carllin Aug 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dvdplm Is this wait sufficient to guarantee that all the threads owned by the server have cleaned up by the time wait returns? From what I understand it guarantees that the serving threads have been cleaned up here: https://github.com/paritytech/jsonrpc/blob/master/http/src/lib.rs#L565, but the tokio threads owned by the RpcEventLoop aren't guaranteed to have exited until this other wait returns from within the Executor->RpcEventLoop here: https://github.com/paritytech/jsonrpc/blob/master/server-utils/src/reactor.rs#L75

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we can wait for the Executor to finish - it could have been spawned externally or shared for some other tasks. I guess the issue might be that wait() only actually waits for the moment where we stop accepting new requests, not when we stop processing them.
Do you have a specific issue that this behaviour causes for you?

Copy link

@carllin carllin Aug 21, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomusdrw, @dvdplm thanks for the reply! I am running into issues writing test code for testing start/ restart behavior on the server. I have specific cleanup behavior that runs on things like filesystem objects in the Drop implementation of certain structs that are owned by Executor tasks. B/c shutting down the server doesn't guarantee the spawned Executor tasks have finished, then on restart of the server, there's potential for data races based on when these Executor tasks finish and these objects get dropped.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomusdrw, @dvdplm should I file an issue for this to ensure wait() waits for when all requests are finished processing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please do, but I think it's a tricky one. We need to distinguish between externally provided executor (where we don't really want to wait until it ends) and a one that we've spawned ourselves.

}
}
}

/// Get a handle that allows us to close the server from a different thread and/or while the
/// server is `wait()`ing.
pub fn close_handle(&self) -> CloseHandle {
CloseHandle(self.executors.clone())
}
}

impl Drop for Server {
fn drop(&mut self) {
if let Some(executors) = self.executor.take() {
for executor in executors {
executor.close();
}
};
self.close_handle().close();
}
}
23 changes: 20 additions & 3 deletions http/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use self::jsonrpc_core::{Error, ErrorCode, IoHandler, Params, Value};
use std::io::{Read, Write};
use std::net::TcpStream;
use std::str::Lines;
use std::time::Duration;

use self::jsonrpc_core::futures::{self, Future};
use super::*;
Expand Down Expand Up @@ -52,8 +53,6 @@ fn serve_allow_headers(cors_allow_headers: cors::AccessControlAllowHeaders) -> S
}

fn io() -> IoHandler {
use std::{thread, time};

let mut io = IoHandler::default();
io.add_method("hello", |params: Params| match params.parse::<(u64,)>() {
Ok((num,)) => Ok(Value::String(format!("world: {}", num))),
Expand All @@ -66,7 +65,7 @@ fn io() -> IoHandler {
io.add_method("hello_async2", |_params: Params| {
let (c, p) = futures::oneshot();
thread::spawn(move || {
thread::sleep(time::Duration::from_millis(10));
thread::sleep(Duration::from_millis(10));
c.send(Value::String("world".into())).unwrap();
});
p.map_err(|_| Error::invalid_request())
Expand Down Expand Up @@ -1406,6 +1405,24 @@ fn should_return_connection_header() {
assert_eq!(response.body, world_batch());
}

#[test]
fn close_handle_makes_wait_return() {
let server = serve(id);
let close_handle = server.close_handle();

let (tx, rx) = mpsc::channel();

thread::spawn(move || {
tx.send(server.wait()).unwrap();
});

thread::sleep(Duration::from_secs(3));

close_handle.close();

rx.recv_timeout(Duration::from_secs(10)).expect("Expected server to close");
}

#[test]
fn should_close_connection_without_keep_alive() {
// given
Expand Down
2 changes: 1 addition & 1 deletion ipc/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ lazy_static! {
builder.filter(None, LevelFilter::Info);

if let Ok(log) = env::var("RUST_LOG") {
builder.parse(&log);
builder.parse_filters(&log);
}

if let Ok(_) = builder.try_init() {
Expand Down
2 changes: 1 addition & 1 deletion tcp/src/logger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ lazy_static! {
builder.filter(None, LevelFilter::Info);

if let Ok(log) = env::var("RUST_LOG") {
builder.parse(&log);
builder.parse_filters(&log);
}

if let Ok(_) = builder.try_init() {
Expand Down
1 change: 0 additions & 1 deletion ws/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ fn request(server: Server, request: &str) -> Response {

fn serve(port: u16) -> (Server, Arc<AtomicUsize>) {
use crate::core::futures::sync::oneshot;
use std::time::Duration;

let pending = Arc::new(AtomicUsize::new(0));

Expand Down