Skip to content

Commit

Permalink
Auto merge of #237 - gatoWololo:router_docs, r=jdm
Browse files Browse the repository at this point in the history
Add documentation and comments to router.rs

Add documentation and comments detailing how the `RouterProxy`, `Router`, `ROUTER`, interact and work (both for a user reading the rendered docs, and someone trying to modify the code)
  • Loading branch information
bors-servo committed Jul 18, 2019
2 parents ee3ccba + 65199fb commit 3e26817
Showing 1 changed file with 45 additions and 0 deletions.
45 changes: 45 additions & 0 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! Routers allow converting IPC channels to crossbeam channels.
//! The [RouterProxy](router::RouterProxy) provides various methods to register `IpcReceiver<T>`s.
//! The router will then either call the appropriate callback
//! or route the message to a crossbeam `Sender<T>` or `Receiver<T>`.
//! You should use the global `ROUTER` to access the `RouterProxy` methods (via `ROUTER`'s
//! `Deref` for `RouterProxy`.
use std::collections::HashMap;
use std::sync::Mutex;
use std::thread;
Expand All @@ -19,15 +25,26 @@ use crossbeam_channel::{self, Receiver, Sender};
use serde::{Deserialize, Serialize};

lazy_static! {
/// Global object wrapping a `RouterProxy`.
/// Add routes ([add_route](RouterProxy::add_route)), or convert IpcReceiver<T>
/// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver))
pub static ref ROUTER: RouterProxy = RouterProxy::new();
}

/// A `RouterProxy` provides methods for talking to the router. Calling
/// [new](RouterProxy::new) automatically spins up a router thread which
/// waits for events on its registered `IpcReceiver<T>`s. The `RouterProxy`'s
/// methods communicate with the running router thread to register new
/// `IpcReceiver<T>`'s
pub struct RouterProxy {
comm: Mutex<RouterProxyComm>,
}

impl RouterProxy {
pub fn new() -> RouterProxy {
// Router acts like a receiver, running in its own thread with both
// receiver ends.
// Router proxy takes both sending ends.
let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap();
thread::spawn(move || Router::new(msg_receiver, wakeup_receiver).run());
Expand All @@ -39,6 +56,8 @@ impl RouterProxy {
}
}

/// Add a new (receiver, callback) pair to the router, and send a wakeup message
/// to the router.
pub fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
let comm = self.comm.lock().unwrap();
comm.msg_sender
Expand Down Expand Up @@ -81,13 +100,22 @@ struct RouterProxyComm {
wakeup_sender: IpcSender<()>,
}

/// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet
/// and listens for events using select().
struct Router {
/// Get messages from RouterProxy. Currently, the only message supported is:
/// `AddRoute(OpaqueIpcReceiver, RouterHandler)`. Add a new receiver which will
/// call function: RouterHandler when new event arrives.
msg_receiver: Receiver<RouterMsg>,
/// The ID/index of the special channel we use to identify messages from msg_receiver.
msg_wakeup_id: u64,
/// Set of all receivers which have been registered for us to select on.
ipc_receiver_set: IpcReceiverSet,
/// Maps ids to their handler functions.
handlers: HashMap<u64, RouterHandler>,
}


impl Router {
fn new(msg_receiver: Receiver<RouterMsg>, wakeup_receiver: IpcReceiver<()>) -> Router {
let mut ipc_receiver_set = IpcReceiverSet::new().unwrap();
Expand All @@ -100,14 +128,27 @@ impl Router {
}
}

/// Continously loop waiting for wakeup signals from router proxy.
/// Iterate over events either:
/// 1) If a message comes in from our special `wakeup_receiver` (identified through
/// msg_wakeup_id. Read message from `msg_receiver` and add a new receiver
/// to our receiver set.
/// 2) Call appropriate handler based on message id.
/// 3) Remove handler once channel closes.
fn run(&mut self) {
loop {
// Wait for events to come from our select() new channels are added to
// our ReceiverSet below.
let results = match self.ipc_receiver_set.select() {
Ok(results) => results,
Err(_) => break,
};

// Iterate over numerous events that were ready at this time.
for result in results.into_iter() {
match result {
// Message came from the RouterProxy. Listen on our `msg_receiver`
// channel.
IpcSelectionResult::MessageReceived(id, _) if id == self.msg_wakeup_id =>
match self.msg_receiver.recv().unwrap() {
RouterMsg::AddRoute(receiver, handler) => {
Expand All @@ -116,6 +157,7 @@ impl Router {
self.handlers.insert(new_receiver_id, handler);
},
},
// Event from one of our registered receivers, call callback.
IpcSelectionResult::MessageReceived(id, message) =>
self.handlers.get_mut(&id).unwrap()(message),
IpcSelectionResult::ChannelClosed(id) => {
Expand All @@ -128,7 +170,10 @@ impl Router {
}

enum RouterMsg {
/// Register the receiver OpaqueIpcReceiver for listening for events on.
/// When a message comes from this receiver, call RouterHandler.
AddRoute(OpaqueIpcReceiver, RouterHandler),
}

/// Function to call when a new event is received from the corresponding receiver.
pub type RouterHandler = Box<FnMut(OpaqueIpcMessage) + Send>;

0 comments on commit 3e26817

Please sign in to comment.