diff --git a/Cargo.toml b/Cargo.toml index 5a3b54e3..2992ab8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ipc-channel" -version = "0.20.2" +version = "0.21.0" description = "A multiprocess drop-in replacement for Rust channels" authors = ["The Servo Project Developers"] license = "MIT OR Apache-2.0" diff --git a/src/router.rs b/src/router.rs index 449dae5f..297097b0 100644 --- a/src/router.rs +++ b/src/router.rs @@ -38,6 +38,12 @@ pub struct RouterProxy { comm: Mutex, } +impl Drop for RouterProxy { + fn drop(&mut self) { + self.shutdown(); + } +} + #[allow(clippy::new_without_default)] impl RouterProxy { pub fn new() -> RouterProxy { @@ -62,11 +68,7 @@ impl RouterProxy { /// Add a new (receiver, callback) pair to the router, and send a wakeup message /// to the router. - /// - /// Consider using [add_typed_route](Self::add_typed_route) instead, which prevents - /// mismatches between the receiver and callback types. - #[deprecated(since = "0.19.0", note = "please use 'add_typed_route' instead")] - pub fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) { + fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) { let comm = self.comm.lock().unwrap(); if comm.shutdown { @@ -81,11 +83,11 @@ impl RouterProxy { /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message /// to the router. - /// - /// Unlike [add_route](Self::add_route) this method is strongly typed and guarantees - /// that the `receiver` and the `callback` use the same message type. - pub fn add_typed_route(&self, receiver: IpcReceiver, mut callback: TypedRouterHandler) - where + pub fn add_typed_route( + &self, + receiver: IpcReceiver, + mut callback: TypedRouterMultiHandler, + ) where T: Serialize + for<'de> Deserialize<'de> + 'static, { // Before passing the message on to the callback, turn it into the appropriate type @@ -94,8 +96,31 @@ impl RouterProxy { callback(typed_message) }; - #[allow(deprecated)] - self.add_route(receiver.to_opaque(), Box::new(modified_callback)); + self.add_route( + receiver.to_opaque(), + RouterHandler::Multi(Box::new(modified_callback)), + ); + } + + /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message + /// to the router. + pub fn add_typed_one_shot_route( + &self, + receiver: IpcReceiver, + callback: TypedRouterOneShotHandler, + ) where + T: Serialize + for<'de> Deserialize<'de> + 'static, + { + // Before passing the message on to the callback, turn it into the appropriate type + let modified_callback = move |msg: IpcMessage| { + let typed_message = msg.to::(); + callback(typed_message) + }; + + self.add_route( + receiver.to_opaque(), + RouterHandler::Once(Some(Box::new(modified_callback))), + ); } /// Send a shutdown message to the router containing a ACK sender, @@ -226,7 +251,16 @@ impl Router { }, // Event from one of our registered receivers, call callback. IpcSelectionResult::MessageReceived(id, message) => { - self.handlers.get_mut(&id).unwrap()(message) + match self.handlers.get_mut(&id).unwrap() { + RouterHandler::Once(handler) => { + if let Some(handler) = handler.take() { + (handler)(message); + } + }, + RouterHandler::Multi(ref mut handler) => { + (handler)(message); + }, + } }, IpcSelectionResult::ChannelClosed(id) => { let _ = self.handlers.remove(&id).unwrap(); @@ -246,7 +280,18 @@ enum RouterMsg { } /// Function to call when a new event is received from the corresponding receiver. -pub type RouterHandler = Box; +pub type RouterMultiHandler = Box; + +/// Function to call the first time that a message is received from the corresponding receiver. +pub type RouterOneShotHandler = Box; + +enum RouterHandler { + Once(Option), + Multi(RouterMultiHandler), +} + +/// Like [RouterMultiHandler] but includes the type that will be passed to the callback +pub type TypedRouterMultiHandler = Box) + Send>; -/// Like [RouterHandler] but includes the type that will be passed to the callback -pub type TypedRouterHandler = Box) + Send>; +/// Like [RouterOneShotHandler] but includes the type that will be passed to the callback +pub type TypedRouterOneShotHandler = Box) + Send>; diff --git a/src/test.rs b/src/test.rs index 11d2eb06..3cefecd3 100644 --- a/src/test.rs +++ b/src/test.rs @@ -274,11 +274,10 @@ fn router_simple_global() { tx.send(person.clone()).unwrap(); let (callback_fired_sender, callback_fired_receiver) = crossbeam_channel::unbounded::(); - #[allow(deprecated)] - ROUTER.add_route( - rx.to_opaque(), + ROUTER.add_typed_route( + rx, Box::new(move |person| { - callback_fired_sender.send(person.to().unwrap()).unwrap(); + callback_fired_sender.send(person.unwrap()).unwrap(); }), ); let received_person = callback_fired_receiver.recv().unwrap(); @@ -357,6 +356,29 @@ fn router_routing_to_new_crossbeam_receiver() { assert_eq!(received_person, person); } +#[test] +fn router_once_handler() { + let person = ("Patrick Walton".to_owned(), 29); + let (tx, rx) = ipc::channel().unwrap(); + let (tx2, rx2) = ipc::channel().unwrap(); + + let router = RouterProxy::new(); + let mut once_tx2 = Some(tx2); + router.add_typed_one_shot_route( + rx, + Box::new(move |_msg| once_tx2.take().unwrap().send(()).unwrap()), + ); + + // Send one single event. + tx.send(person.clone()).unwrap(); + // Wait for acknowledgement that the callback ran. + rx2.recv().unwrap(); + // This send should succeed but no handler should run. If it does run, + // a panic will occur. + tx.send(person.clone()).unwrap(); + assert!(rx2.recv().is_err()); +} + #[test] fn router_multiplexing() { let person = ("Patrick Walton".to_owned(), 29);