diff --git a/src/router.rs b/src/router.rs index 70163416c..aa8654da1 100644 --- a/src/router.rs +++ b/src/router.rs @@ -7,6 +7,12 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. +//! Routers allow converting IPC channels to crossbeam channels. +//! The [RouterProxy](router::RouterProxy) provides various methods to register `IpcReceiver`s. +//! The router will then either call the appropriate callback +//! or route the message to a crossbeam `Sender` or `Receiver`. +//! You should use the global `ROUTER` to access the `RouterProxy` methods (via `ROUTER`'s +//! `Deref` for `RouterProxy`. use std::collections::HashMap; use std::sync::Mutex; use std::thread; @@ -19,15 +25,26 @@ use crossbeam_channel::{self, Receiver, Sender}; use serde::{Deserialize, Serialize}; lazy_static! { + /// Global object wrapping a `RouterProxy`. + /// Add routes ([add_route](RouterProxy::add_route)), or convert IpcReceiver + /// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver)) pub static ref ROUTER: RouterProxy = RouterProxy::new(); } +/// A `RouterProxy` provides methods for talking to the router. Calling +/// [new](RouterProxy::new) automatically spins up a router thread which +/// waits for events on its registered `IpcReceiver`s. The `RouterProxy`'s +/// methods communicate with the running router thread to register new +/// `IpcReceiver`'s pub struct RouterProxy { comm: Mutex, } impl RouterProxy { pub fn new() -> RouterProxy { + // Router acts like a receiver, running in its own thread with both + // receiver ends. + // Router proxy takes both sending ends. let (msg_sender, msg_receiver) = crossbeam_channel::unbounded(); let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap(); thread::spawn(move || Router::new(msg_receiver, wakeup_receiver).run()); @@ -39,6 +56,8 @@ impl RouterProxy { } } + /// Add a new (receiver, callback) pair to the router, and send a wakeup message + /// to the router. pub fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) { let comm = self.comm.lock().unwrap(); comm.msg_sender @@ -81,13 +100,22 @@ struct RouterProxyComm { wakeup_sender: IpcSender<()>, } +/// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet +/// and listens for events using select(). struct Router { + /// Get messages from RouterProxy. Currently, the only message supported is: + /// `AddRoute(OpaqueIpcReceiver, RouterHandler)`. Add a new receiver which will + /// call function: RouterHandler when new event arrives. msg_receiver: Receiver, + /// The ID/index of the special channel we use to identify messages from msg_receiver. msg_wakeup_id: u64, + /// Set of all receivers which have been registered for us to select on. ipc_receiver_set: IpcReceiverSet, + /// Maps ids to their handler functions. handlers: HashMap, } + impl Router { fn new(msg_receiver: Receiver, wakeup_receiver: IpcReceiver<()>) -> Router { let mut ipc_receiver_set = IpcReceiverSet::new().unwrap(); @@ -100,14 +128,27 @@ impl Router { } } + /// Continously loop waiting for wakeup signals from router proxy. + /// Iterate over events either: + /// 1) If a message comes in from our special `wakeup_receiver` (identified through + /// msg_wakeup_id. Read message from `msg_receiver` and add a new receiver + /// to our receiver set. + /// 2) Call appropriate handler based on message id. + /// 3) Remove handler once channel closes. fn run(&mut self) { loop { + // Wait for events to come from our select() new channels are added to + // our ReceiverSet below. let results = match self.ipc_receiver_set.select() { Ok(results) => results, Err(_) => break, }; + + // Iterate over numerous events that were ready at this time. for result in results.into_iter() { match result { + // Message came from the RouterProxy. Listen on our `msg_receiver` + // channel. IpcSelectionResult::MessageReceived(id, _) if id == self.msg_wakeup_id => match self.msg_receiver.recv().unwrap() { RouterMsg::AddRoute(receiver, handler) => { @@ -116,6 +157,7 @@ impl Router { self.handlers.insert(new_receiver_id, handler); }, }, + // Event from one of our registered receivers, call callback. IpcSelectionResult::MessageReceived(id, message) => self.handlers.get_mut(&id).unwrap()(message), IpcSelectionResult::ChannelClosed(id) => { @@ -128,7 +170,10 @@ impl Router { } enum RouterMsg { + /// Register the receiver OpaqueIpcReceiver for listening for events on. + /// When a message comes from this receiver, call RouterHandler. AddRoute(OpaqueIpcReceiver, RouterHandler), } +/// Function to call when a new event is received from the corresponding receiver. pub type RouterHandler = Box;