From c680ac03179813d4f327fbc0f566bd8b397f7ffc Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Sun, 21 Sep 2025 17:25:24 -0400 Subject: [PATCH 1/5] Make Router::add_route private. Signed-off-by: Josh Matthews --- src/router.rs | 10 +--------- src/test.rs | 7 +++---- 2 files changed, 4 insertions(+), 13 deletions(-) diff --git a/src/router.rs b/src/router.rs index 449dae5f..81a78713 100644 --- a/src/router.rs +++ b/src/router.rs @@ -62,11 +62,7 @@ impl RouterProxy { /// Add a new (receiver, callback) pair to the router, and send a wakeup message /// to the router. - /// - /// Consider using [add_typed_route](Self::add_typed_route) instead, which prevents - /// mismatches between the receiver and callback types. - #[deprecated(since = "0.19.0", note = "please use 'add_typed_route' instead")] - pub fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) { + fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) { let comm = self.comm.lock().unwrap(); if comm.shutdown { @@ -81,9 +77,6 @@ impl RouterProxy { /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message /// to the router. - /// - /// Unlike [add_route](Self::add_route) this method is strongly typed and guarantees - /// that the `receiver` and the `callback` use the same message type. pub fn add_typed_route(&self, receiver: IpcReceiver, mut callback: TypedRouterHandler) where T: Serialize + for<'de> Deserialize<'de> + 'static, @@ -94,7 +87,6 @@ impl RouterProxy { callback(typed_message) }; - #[allow(deprecated)] self.add_route(receiver.to_opaque(), Box::new(modified_callback)); } diff --git a/src/test.rs b/src/test.rs index 11d2eb06..3862103d 100644 --- a/src/test.rs +++ b/src/test.rs @@ -274,11 +274,10 @@ fn router_simple_global() { tx.send(person.clone()).unwrap(); let (callback_fired_sender, callback_fired_receiver) = crossbeam_channel::unbounded::(); - #[allow(deprecated)] - ROUTER.add_route( - rx.to_opaque(), + ROUTER.add_typed_route( + rx, Box::new(move |person| { - callback_fired_sender.send(person.to().unwrap()).unwrap(); + callback_fired_sender.send(person.unwrap()).unwrap(); }), ); let received_person = callback_fired_receiver.recv().unwrap(); From a269da5da6a9e4eeeaa94df80f3b46a1e41a0e1f Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Sun, 21 Sep 2025 17:27:07 -0400 Subject: [PATCH 2/5] Add one-shot handlers to Router API. Signed-off-by: Josh Matthews --- src/router.rs | 63 +++++++++++++++++++++++++++++++++++++++++++++------ src/test.rs | 23 +++++++++++++++++++ 2 files changed, 79 insertions(+), 7 deletions(-) diff --git a/src/router.rs b/src/router.rs index 81a78713..feab154c 100644 --- a/src/router.rs +++ b/src/router.rs @@ -77,8 +77,32 @@ impl RouterProxy { /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message /// to the router. - pub fn add_typed_route(&self, receiver: IpcReceiver, mut callback: TypedRouterHandler) - where + pub fn add_typed_route( + &self, + receiver: IpcReceiver, + mut callback: TypedRouterMultiHandler, + ) where + T: Serialize + for<'de> Deserialize<'de> + 'static, + { + // Before passing the message on to the callback, turn it into the appropriate type + let modified_callback = move |msg: IpcMessage| { + let typed_message = msg.to::(); + callback(typed_message) + }; + + self.add_route( + receiver.to_opaque(), + RouterHandler::Multi(Box::new(modified_callback)), + ); + } + + /// Add a new `(receiver, callback)` pair to the router, and send a wakeup message + /// to the router. + pub fn add_typed_one_shot_route( + &self, + receiver: IpcReceiver, + callback: TypedRouterOneShotHandler, + ) where T: Serialize + for<'de> Deserialize<'de> + 'static, { // Before passing the message on to the callback, turn it into the appropriate type @@ -87,7 +111,10 @@ impl RouterProxy { callback(typed_message) }; - self.add_route(receiver.to_opaque(), Box::new(modified_callback)); + self.add_route( + receiver.to_opaque(), + RouterHandler::Once(Box::new(modified_callback)), + ); } /// Send a shutdown message to the router containing a ACK sender, @@ -218,7 +245,18 @@ impl Router { }, // Event from one of our registered receivers, call callback. IpcSelectionResult::MessageReceived(id, message) => { - self.handlers.get_mut(&id).unwrap()(message) + match self.handlers.get_mut(&id).unwrap() { + RouterHandler::Once(_) => {}, + RouterHandler::Multi(handler) => { + (handler)(message); + continue; + }, + }; + let RouterHandler::Once(handler) = self.handlers.remove(&id).unwrap() + else { + continue; + }; + (handler)(message); }, IpcSelectionResult::ChannelClosed(id) => { let _ = self.handlers.remove(&id).unwrap(); @@ -238,7 +276,18 @@ enum RouterMsg { } /// Function to call when a new event is received from the corresponding receiver. -pub type RouterHandler = Box; +pub type RouterMultiHandler = Box; + +/// Function to call once when a new event is received from the corresponding receiver. +pub type RouterOneShotHandler = Box; + +enum RouterHandler { + Once(RouterOneShotHandler), + Multi(RouterMultiHandler), +} + +/// Like [RouterMultiHandler] but includes the type that will be passed to the callback +pub type TypedRouterMultiHandler = Box) + Send>; -/// Like [RouterHandler] but includes the type that will be passed to the callback -pub type TypedRouterHandler = Box) + Send>; +/// Like [RouterOneShotHandler] but includes the type that will be passed to the callback +pub type TypedRouterOneShotHandler = Box) + Send>; diff --git a/src/test.rs b/src/test.rs index 3862103d..3cefecd3 100644 --- a/src/test.rs +++ b/src/test.rs @@ -356,6 +356,29 @@ fn router_routing_to_new_crossbeam_receiver() { assert_eq!(received_person, person); } +#[test] +fn router_once_handler() { + let person = ("Patrick Walton".to_owned(), 29); + let (tx, rx) = ipc::channel().unwrap(); + let (tx2, rx2) = ipc::channel().unwrap(); + + let router = RouterProxy::new(); + let mut once_tx2 = Some(tx2); + router.add_typed_one_shot_route( + rx, + Box::new(move |_msg| once_tx2.take().unwrap().send(()).unwrap()), + ); + + // Send one single event. + tx.send(person.clone()).unwrap(); + // Wait for acknowledgement that the callback ran. + rx2.recv().unwrap(); + // This send should succeed but no handler should run. If it does run, + // a panic will occur. + tx.send(person.clone()).unwrap(); + assert!(rx2.recv().is_err()); +} + #[test] fn router_multiplexing() { let person = ("Patrick Walton".to_owned(), 29); From 745e7f653c5240002d26dc0e9f96253294c8d393 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Sun, 21 Sep 2025 17:27:43 -0400 Subject: [PATCH 3/5] Publish 0.21.0. Signed-off-by: Josh Matthews --- Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.toml b/Cargo.toml index 5a3b54e3..2992ab8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "ipc-channel" -version = "0.20.2" +version = "0.21.0" description = "A multiprocess drop-in replacement for Rust channels" authors = ["The Servo Project Developers"] license = "MIT OR Apache-2.0" From d3591a75534924b038777a7b83422086414a3702 Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Fri, 26 Sep 2025 23:21:19 -0400 Subject: [PATCH 4/5] Ensure tests shut down router thread. Signed-off-by: Josh Matthews --- src/router.rs | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/router.rs b/src/router.rs index feab154c..6a85bb34 100644 --- a/src/router.rs +++ b/src/router.rs @@ -38,6 +38,12 @@ pub struct RouterProxy { comm: Mutex, } +impl Drop for RouterProxy { + fn drop(&mut self) { + self.shutdown(); + } +} + #[allow(clippy::new_without_default)] impl RouterProxy { pub fn new() -> RouterProxy { From 563fc07cc4a57faaaac7d9b59f03471ebc9263ec Mon Sep 17 00:00:00 2001 From: Josh Matthews Date: Fri, 26 Sep 2025 23:22:18 -0400 Subject: [PATCH 5/5] Ignore additional messages associated with one-shot handlers. Signed-off-by: Josh Matthews --- src/router.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/router.rs b/src/router.rs index 6a85bb34..297097b0 100644 --- a/src/router.rs +++ b/src/router.rs @@ -119,7 +119,7 @@ impl RouterProxy { self.add_route( receiver.to_opaque(), - RouterHandler::Once(Box::new(modified_callback)), + RouterHandler::Once(Some(Box::new(modified_callback))), ); } @@ -252,17 +252,15 @@ impl Router { // Event from one of our registered receivers, call callback. IpcSelectionResult::MessageReceived(id, message) => { match self.handlers.get_mut(&id).unwrap() { - RouterHandler::Once(_) => {}, - RouterHandler::Multi(handler) => { + RouterHandler::Once(handler) => { + if let Some(handler) = handler.take() { + (handler)(message); + } + }, + RouterHandler::Multi(ref mut handler) => { (handler)(message); - continue; }, - }; - let RouterHandler::Once(handler) = self.handlers.remove(&id).unwrap() - else { - continue; - }; - (handler)(message); + } }, IpcSelectionResult::ChannelClosed(id) => { let _ = self.handlers.remove(&id).unwrap(); @@ -284,11 +282,11 @@ enum RouterMsg { /// Function to call when a new event is received from the corresponding receiver. pub type RouterMultiHandler = Box; -/// Function to call once when a new event is received from the corresponding receiver. +/// Function to call the first time that a message is received from the corresponding receiver. pub type RouterOneShotHandler = Box; enum RouterHandler { - Once(RouterOneShotHandler), + Once(Option), Multi(RouterMultiHandler), }