Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add documentation and comments to router.rs #237

Merged
merged 1 commit into from
Jul 18, 2019
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 45 additions & 0 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@
// option. This file may not be copied, modified, or distributed
// except according to those terms.

//! Routers allow converting IPC channels to crossbeam channels.
//! The [RouterProxy](router::RouterProxy) provides various methods to register `IpcReceiver<T>`s.
//! The router will then either call the appropriate callback
//! or route the message to a crossbeam `Sender<T>` or `Receiver<T>`.
//! You should use the global `ROUTER` to access the `RouterProxy` methods (via `ROUTER`'s
//! `Deref` for `RouterProxy`.
use std::collections::HashMap;
use std::sync::Mutex;
use std::thread;
Expand All @@ -19,15 +25,26 @@ use crossbeam_channel::{self, Receiver, Sender};
use serde::{Deserialize, Serialize};

lazy_static! {
/// Global object wrapping a `RouterProxy`.
/// Add routes ([add_route](RouterProxy::add_route)), or convert IpcReceiver<T>
/// to crossbeam channels (e.g. [route_ipc_receiver_to_new_crossbeam_receiver](RouterProxy::route_ipc_receiver_to_new_crossbeam_receiver))
pub static ref ROUTER: RouterProxy = RouterProxy::new();
}

/// A `RouterProxy` provides methods for talking to the router. Calling
/// [new](RouterProxy::new) automatically spins up a router thread which
/// waits for events on its registered `IpcReceiver<T>`s. The `RouterProxy`'s
/// methods communicate with the running router thread to register new
/// `IpcReceiver<T>`'s
pub struct RouterProxy {
comm: Mutex<RouterProxyComm>,
}

impl RouterProxy {
pub fn new() -> RouterProxy {
// Router acts like a receiver, running in its own thread with both
// receiver ends.
// Router proxy takes both sending ends.
let (msg_sender, msg_receiver) = crossbeam_channel::unbounded();
let (wakeup_sender, wakeup_receiver) = ipc::channel().unwrap();
thread::spawn(move || Router::new(msg_receiver, wakeup_receiver).run());
Expand All @@ -39,6 +56,8 @@ impl RouterProxy {
}
}

/// Add a new (receiver, callback) pair to the router, and send a wakeup message
/// to the router.
pub fn add_route(&self, receiver: OpaqueIpcReceiver, callback: RouterHandler) {
let comm = self.comm.lock().unwrap();
comm.msg_sender
Expand Down Expand Up @@ -81,13 +100,22 @@ struct RouterProxyComm {
wakeup_sender: IpcSender<()>,
}

/// Router runs in its own thread listening for events. Adds events to its IpcReceiverSet
/// and listens for events using select().
struct Router {
/// Get messages from RouterProxy. Currently, the only message supported is:
/// `AddRoute(OpaqueIpcReceiver, RouterHandler)`. Add a new receiver which will
/// call function: RouterHandler when new event arrives.
msg_receiver: Receiver<RouterMsg>,
/// The ID/index of the special channel we use to identify messages from msg_receiver.
msg_wakeup_id: u64,
/// Set of all receivers which have been registered for us to select on.
ipc_receiver_set: IpcReceiverSet,
/// Maps ids to their handler functions.
handlers: HashMap<u64, RouterHandler>,
}


impl Router {
fn new(msg_receiver: Receiver<RouterMsg>, wakeup_receiver: IpcReceiver<()>) -> Router {
let mut ipc_receiver_set = IpcReceiverSet::new().unwrap();
Expand All @@ -100,14 +128,27 @@ impl Router {
}
}

/// Continously loop waiting for wakeup signals from router proxy.
/// Iterate over events either:
/// 1) If a message comes in from our special `wakeup_receiver` (identified through
/// msg_wakeup_id. Read message from `msg_receiver` and add a new receiver
/// to our receiver set.
/// 2) Call appropriate handler based on message id.
/// 3) Remove handler once channel closes.
fn run(&mut self) {
loop {
// Wait for events to come from our select() new channels are added to
// our ReceiverSet below.
let results = match self.ipc_receiver_set.select() {
Ok(results) => results,
Err(_) => break,
};

// Iterate over numerous events that were ready at this time.
for result in results.into_iter() {
match result {
// Message came from the RouterProxy. Listen on our `msg_receiver`
// channel.
IpcSelectionResult::MessageReceived(id, _) if id == self.msg_wakeup_id =>
match self.msg_receiver.recv().unwrap() {
RouterMsg::AddRoute(receiver, handler) => {
Expand All @@ -116,6 +157,7 @@ impl Router {
self.handlers.insert(new_receiver_id, handler);
},
},
// Event from one of our registered receivers, call callback.
IpcSelectionResult::MessageReceived(id, message) =>
self.handlers.get_mut(&id).unwrap()(message),
IpcSelectionResult::ChannelClosed(id) => {
Expand All @@ -128,7 +170,10 @@ impl Router {
}

enum RouterMsg {
/// Register the receiver OpaqueIpcReceiver for listening for events on.
/// When a message comes from this receiver, call RouterHandler.
AddRoute(OpaqueIpcReceiver, RouterHandler),
}

/// Function to call when a new event is received from the corresponding receiver.
pub type RouterHandler = Box<FnMut(OpaqueIpcMessage) + Send>;