Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "ipc-channel"
version = "0.20.2"
version = "0.21.0"
description = "A multiprocess drop-in replacement for Rust channels"
authors = ["The Servo Project Developers"]
license = "MIT OR Apache-2.0"
Expand Down
77 changes: 61 additions & 16 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ pub struct RouterProxy {
comm: Mutex<RouterProxyComm>,
}

impl Drop for RouterProxy {
fn drop(&mut self) {
self.shutdown();
}
}

#[allow(clippy::new_without_default)]
impl RouterProxy {
pub fn new() -> RouterProxy {
Expand All @@ -62,11 +68,7 @@ impl RouterProxy {

/// Add a new (receiver, callback) pair to the router, and send a wakeup message
/// to the router.
///
/// Consider using [add_typed_route](Self::add_typed_route) instead, which prevents
/// mismatches between the receiver and callback types.
#[deprecated(since = "0.19.0", note = "please use 'add_typed_route' instead")]
pub fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
let comm = self.comm.lock().unwrap();

if comm.shutdown {
Expand All @@ -81,11 +83,11 @@ impl RouterProxy {

/// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
/// to the router.
///
/// Unlike [add_route](Self::add_route) this method is strongly typed and guarantees
/// that the `receiver` and the `callback` use the same message type.
pub fn add_typed_route<T>(&self, receiver: IpcReceiver<T>, mut callback: TypedRouterHandler<T>)
where
pub fn add_typed_route<T>(
&self,
receiver: IpcReceiver<T>,
mut callback: TypedRouterMultiHandler<T>,
) where
T: Serialize + for<'de> Deserialize<'de> + 'static,
{
// Before passing the message on to the callback, turn it into the appropriate type
Expand All @@ -94,8 +96,31 @@ impl RouterProxy {
callback(typed_message)
};

#[allow(deprecated)]
self.add_route(receiver.to_opaque(), Box::new(modified_callback));
self.add_route(
receiver.to_opaque(),
RouterHandler::Multi(Box::new(modified_callback)),
);
}

/// Add a new `(receiver, callback)` pair to the router, and send a wakeup message
/// to the router.
pub fn add_typed_one_shot_route<T>(
&self,
receiver: IpcReceiver<T>,
callback: TypedRouterOneShotHandler<T>,
) where
T: Serialize + for<'de> Deserialize<'de> + 'static,
{
// Before passing the message on to the callback, turn it into the appropriate type
let modified_callback = move |msg: IpcMessage| {
let typed_message = msg.to::<T>();
callback(typed_message)
};

self.add_route(
receiver.to_opaque(),
RouterHandler::Once(Some(Box::new(modified_callback))),
);
}

/// Send a shutdown message to the router containing a ACK sender,
Expand Down Expand Up @@ -226,7 +251,16 @@ impl Router {
},
// Event from one of our registered receivers, call callback.
IpcSelectionResult::MessageReceived(id, message) => {
self.handlers.get_mut(&id).unwrap()(message)
match self.handlers.get_mut(&id).unwrap() {
RouterHandler::Once(handler) => {
if let Some(handler) = handler.take() {
(handler)(message);
}
},
RouterHandler::Multi(ref mut handler) => {
(handler)(message);
},
}
},
IpcSelectionResult::ChannelClosed(id) => {
let _ = self.handlers.remove(&id).unwrap();
Expand All @@ -246,7 +280,18 @@ enum RouterMsg {
}

/// Function to call when a new event is received from the corresponding receiver.
pub type RouterHandler = Box<dyn FnMut(IpcMessage) + Send>;
pub type RouterMultiHandler = Box<dyn FnMut(IpcMessage) + Send>;

/// Function to call the first time that a message is received from the corresponding receiver.
pub type RouterOneShotHandler = Box<dyn FnOnce(IpcMessage) + Send>;

enum RouterHandler {
Once(Option<RouterOneShotHandler>),
Multi(RouterMultiHandler),
}

/// Like [RouterMultiHandler] but includes the type that will be passed to the callback
pub type TypedRouterMultiHandler<T> = Box<dyn FnMut(Result<T, bincode::Error>) + Send>;

/// Like [RouterHandler] but includes the type that will be passed to the callback
pub type TypedRouterHandler<T> = Box<dyn FnMut(Result<T, bincode::Error>) + Send>;
/// Like [RouterOneShotHandler] but includes the type that will be passed to the callback
pub type TypedRouterOneShotHandler<T> = Box<dyn FnOnce(Result<T, bincode::Error>) + Send>;
30 changes: 26 additions & 4 deletions src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,11 +274,10 @@ fn router_simple_global() {
tx.send(person.clone()).unwrap();

let (callback_fired_sender, callback_fired_receiver) = crossbeam_channel::unbounded::<Person>();
#[allow(deprecated)]
ROUTER.add_route(
rx.to_opaque(),
ROUTER.add_typed_route(
rx,
Box::new(move |person| {
callback_fired_sender.send(person.to().unwrap()).unwrap();
callback_fired_sender.send(person.unwrap()).unwrap();
}),
);
let received_person = callback_fired_receiver.recv().unwrap();
Expand Down Expand Up @@ -357,6 +356,29 @@ fn router_routing_to_new_crossbeam_receiver() {
assert_eq!(received_person, person);
}

#[test]
fn router_once_handler() {
let person = ("Patrick Walton".to_owned(), 29);
let (tx, rx) = ipc::channel().unwrap();
let (tx2, rx2) = ipc::channel().unwrap();

let router = RouterProxy::new();
let mut once_tx2 = Some(tx2);
router.add_typed_one_shot_route(
rx,
Box::new(move |_msg| once_tx2.take().unwrap().send(()).unwrap()),
);

// Send one single event.
tx.send(person.clone()).unwrap();
// Wait for acknowledgement that the callback ran.
rx2.recv().unwrap();
// This send should succeed but no handler should run. If it does run,
// a panic will occur.
tx.send(person.clone()).unwrap();
assert!(rx2.recv().is_err());
}

#[test]
fn router_multiplexing() {
let person = ("Patrick Walton".to_owned(), 29);
Expand Down
Loading