Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 45 additions & 13 deletions http/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ mod utils;
mod tests;

use std::io;
use std::sync::{mpsc, Arc};
use std::sync::{mpsc, Arc, Mutex};
use std::net::SocketAddr;
use std::thread;

Expand Down Expand Up @@ -430,7 +430,7 @@ impl<M: jsonrpc::Metadata, S: jsonrpc::Middleware<M>> ServerBuilder<M, S> {
Ok(Server {
address: local_addr?,
executor: Some(executors),
close: Some(close),
close: Arc::new(Mutex::new(Some(close))),
})
}
}
Expand Down Expand Up @@ -554,7 +554,7 @@ fn configure_port(_reuse: bool, _tcp: &net2::TcpBuilder) -> io::Result<()> {
pub struct Server {
address: SocketAddr,
executor: Option<Vec<Executor>>,
Copy link
Contributor

@tomusdrw tomusdrw Oct 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executor should most likely be wrapped in Arc<Mutex< as wel, so that it can be passed to CloseHandle. Actually it would be simpler to store CloseHandle directly in Server and just clone it when requested. CloseHandle should handle the case if Options are None.
To simplify it further you can just have:

pub struct Server {
   address: SocketAddr,
  close_handle: CloseHandle,
}

impl Server {
  ...
  pub fn close_handle(&self) -> CloseHandle {
    self.close_handle.clone()
  }
}

#[derive(Clone)]
pub struct CloseHandle {
  handle: Arc<Mutex<Option<(Executor, Vec<oneshot::Sender<()>)>>>,
}

impl CloseHandle {
  ///  <docs ...>
  /// returns `false` if the server was closed earlier
  pub fn close(self) -> bool {
     if let Some((executor, senders)) = self.handle.lock().take() {
        executor.close();
        for s in senders { s.send(()).expect(...) } 
        true
     } else { false }
  }
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how this would work: once the server starts waiting it has to take the lock on the handle, so close() as written above will hang on the lock?

close: Option<Vec<oneshot::Sender<()>>>,
close: Arc<Mutex<Option<Vec<oneshot::Sender<()>>>>>,
}

const PROOF: &'static str = "Server is always Some until self is consumed.";
Expand All @@ -566,13 +566,7 @@ impl Server {

/// Closes the server.
pub fn close(mut self) {
for close in self.close.take().expect(PROOF) {
let _ = close.send(());
}

for executor in self.executor.take().expect(PROOF) {
executor.close();
}
self.close_handle().close();
}

/// Will block, waiting for the server to finish.
Expand All @@ -581,12 +575,50 @@ impl Server {
executor.wait();
}
}

/// Returns a handle to the server that can be used to close it while another thread is
/// blocking in `wait`.
pub fn close_handle(&mut self) -> CloseHandle {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function shouldn't mutate anything, so it should be &self.

Suggested change
pub fn close_handle(&mut self) -> CloseHandle {
pub fn close_handle(&self) -> CloseHandle {
let executor_close: Option<Vec<_>> = self.executor.as_ref().map(|executors| {
executors
.iter()
.map(|executor| executor.close_handle())
.collect()
});
...
}

let executor_close: Option<Vec<_>> = self.executor.as_mut().map(|executors| {
executors
.iter_mut()
.map(|executor| executor.close_handle())
.collect()
});

CloseHandle {
executor_close: Arc::new(Mutex::new(executor_close)),
close: self.close.clone(),
}
}
}

impl Drop for Server {
fn drop(&mut self) {
self.executor.take().map(|executors| {
for executor in executors { executor.close(); }
});
self.close_handle().close();
}
}

/// A handle that allows closing of a server even if it owned by a thread blocked in `wait`.
#[derive(Clone)]
pub struct CloseHandle {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should implement/derive Clone for the CloseHandle.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

futures::Sender, doesn't implement Clone(), so how should we approach this?

Copy link
Member

@bkchr bkchr Oct 25, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you mean the executor_close can not be cloned?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@carllin it's wrapped in Arc<Mutex anyway, so it doesn't really matter. If derive doesn't work then you need to implement Clone manually (and just clone the Arc)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bkchr, @tomusdrw, yes the executor_close cannot be cloned.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So let's wrap it into Arc as well. I also don't quite understand why do we need to close the executor when we close the server? It wasn't the case for WS or IPC server?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tomusdrw, in the current WS implementation, close() also closes the executor: https://github.com/paritytech/jsonrpc/blob/master/ws/src/server.rs#L149

close: Arc<Mutex<Option<Vec<oneshot::Sender<()>>>>>,
executor_close: Arc<Mutex<Option<Vec<Option<futures::Complete<()>>>>>>,
}

impl CloseHandle {
/// Closes the `Server`.
pub fn close(self) {
self.close.lock().unwrap().take().map(|close_vector| {
for close in close_vector {
let _ = close.send(());
}
});

self.executor_close.lock().unwrap().take().map(|executor_close_vector| {
for e in executor_close_vector {
e.map(|v| v.send(()));
}
});
}
}
16 changes: 16 additions & 0 deletions http/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ extern crate jsonrpc_core;
use std::str::Lines;
use std::net::TcpStream;
use std::io::{Read, Write};
use std::time::Duration;
use self::jsonrpc_core::{IoHandler, Params, Value, Error, ErrorCode};

use self::jsonrpc_core::futures::{self, Future};
Expand Down Expand Up @@ -1166,3 +1167,18 @@ fn world_5() -> String {
fn world_batch() -> String {
"[{\"jsonrpc\":\"2.0\",\"result\":\"world\",\"id\":1}]\n".into()
}

#[test]
fn close_handle_makes_wait_return() {
let mut server = serve(id);
let close_handle = server.close_handle();

let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(server.wait()).unwrap();
});
thread::sleep(Duration::from_secs(1));
close_handle.close();

rx.recv_timeout(Duration::from_secs(10)).expect("Expected server to close");
}
15 changes: 15 additions & 0 deletions server-utils/src/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ impl Executor {
self.executor().spawn(future)
}

/// Returns a handle to close the underlying event loop
pub fn close_handle(&mut self) -> Option<futures::Complete<()>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mut not needed

Suggested change
pub fn close_handle(&mut self) -> Option<futures::Complete<()>> {
pub fn close_handle(&self) -> Option<futures::Complete<()>> {

if let Executor::Spawned(ref mut eloop) = self {
eloop.close_handle()
} else {
None
}
}

/// Closes underlying event loop (if any!).
pub fn close(self) {
if let Executor::Spawned(eloop) = self {
Expand Down Expand Up @@ -163,4 +172,10 @@ impl RpcEventLoop {
warn!("Event Loop is already finished. {:?}", e);
});
}

/// Returns the close signal that can be used to close the event loop even while
/// another thread is blocked on the event loop in "wait"
pub fn close_handle(&mut self) -> Option<futures::Complete<()>>{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function doesn't make much sense - the whole point of handle is to be able to have multiple instance of it.
This function here will take() the close handle and only return it for the first caller. Subsequent calls will always yield None.
What's more it violates the assumption in the rest of this struct (see expect("..") calls) - so every other function like close() or wait() will actually panic.

I still believe that having this function is completely unnecessary. See how it's done in WebSockets server - we own Executor via Arc<Mutex<Option<Executor>>> and close it completely inside CloseHandle (actually it's not perfect there, cause cloning the handle and calling close() on a clone would panic, but it's a separate issue).

self.close.take()
}
}