Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Linux OsIpcReceiverSet - Switch to use mio #94

Merged
merged 2 commits into from
Dec 25, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@ serde = "0.8"
uuid = {version = "0.3", features = ["v4"]}
fnv = "1.0.3"

[target.'cfg(any(target_os = "linux", target_os = "freebsd"))'.dependencies]
mio = "0.6.1"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are missing the FreeBSD case here, aren't you?...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. Hadn't updated this since that landed.


[dev-dependencies]
crossbeam = "0.2"
140 changes: 140 additions & 0 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,143 @@ fn size_22_4m(b: &mut test::Bencher) {
fn size_23_8m(b: &mut test::Bencher) {
bench_size(b, 8 * 1024 * 1024);
}

mod receiver_set {
use ipc_channel::ipc::{self, IpcReceiverSet};
use test;

fn gen_select_test(b: &mut test::Bencher, to_send: usize, n: usize) -> () {
let mut active = Vec::with_capacity(to_send);
let mut dormant = Vec::with_capacity(n - to_send);
let mut rx_set = IpcReceiverSet::new().unwrap();
for _ in 0..to_send {
let (tx, rx) = ipc::channel().unwrap();
rx_set.add(rx).unwrap();
active.push(tx);
}
for _ in to_send..n {
let (tx, rx) = ipc::channel::<()>().unwrap();
rx_set.add(rx).unwrap();
dormant.push(tx);
}
b.iter(|| {
for tx in active.iter() {
tx.send(()).unwrap();
}
let mut received = 0;
while received < to_send {
for result in rx_set.select().unwrap().into_iter() {
let (_, _) = result.unwrap();
received += 1;
}
}
});
}

fn create_empty_set() -> Result<IpcReceiverSet, ()> {
Ok(IpcReceiverSet::new().unwrap())
}

fn add_n_rxs(rx_set: &mut IpcReceiverSet, n: usize) -> () {
for _ in 0..n {
let (_, rx) = ipc::channel::<()>().unwrap();
rx_set.add(rx).unwrap();
}
}

#[bench]
fn send_on_1_of_1(b: &mut test::Bencher) -> () {
gen_select_test(b, 1, 1);
}

#[bench]
fn send_on_1_of_5(b: &mut test::Bencher) -> () {
gen_select_test(b, 1, 5);
}

#[bench]
fn send_on_2_of_5(b: &mut test::Bencher) -> () {
gen_select_test(b, 2, 5);
}

#[bench]
fn send_on_5_of_5(b: &mut test::Bencher) -> () {
gen_select_test(b, 5, 5);
}

#[bench]
fn send_on_1_of_20(b: &mut test::Bencher) -> () {
gen_select_test(b, 1, 20);
}

#[bench]
fn send_on_5_of_20(b: &mut test::Bencher) -> () {
gen_select_test(b, 5, 20);
}

#[bench]
fn send_on_20_of_20(b: &mut test::Bencher) -> () {
gen_select_test(b, 20, 20);
}

#[bench]
fn send_on_1_of_100(b: &mut test::Bencher) -> () {
gen_select_test(b, 1, 100);
}

#[bench]
fn send_on_5_of_100(b: &mut test::Bencher) -> () {
gen_select_test(b, 5, 100);
}
#[bench]
fn send_on_20_of_100(b: &mut test::Bencher) -> () {
gen_select_test(b, 20, 100);
}

#[bench]
fn send_on_100_of_100(b: &mut test::Bencher) -> () {
gen_select_test(b, 100, 100);
}

#[bench]
fn create_and_destroy_empty_set(b: &mut test::Bencher) -> () {
b.iter(|| {
create_empty_set().unwrap();
});
}

#[bench]
fn create_and_destroy_set_of_10(b: &mut test::Bencher) -> () {
b.iter(|| {
let mut rx_set = IpcReceiverSet::new().unwrap();
add_n_rxs(&mut rx_set, 10);
});
}

#[bench]
fn create_and_destroy_set_of_5(b: &mut test::Bencher) -> () {
b.iter(|| {
let mut rx_set = IpcReceiverSet::new().unwrap();
add_n_rxs(&mut rx_set, 5);
});
}

#[bench]
// Benchmark adding and removing closed receivers from the set
fn add_and_remove_closed_receivers(b: &mut test::Bencher) -> () {
b.iter(|| {
let mut rx_set = IpcReceiverSet::new().unwrap();
{
{
let (_, rx) = ipc::channel::<()>().unwrap();
rx_set.add(rx).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you could just reuse add_n_rxs() here?

}
// On select Receivers with a "ClosedChannel" event
// will be closed
rx_set.select().unwrap();
let (_, rx) = ipc::channel::<()>().unwrap();
rx_set.add(rx).unwrap();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure why you are still adding another channel here, rather than making this a test only for selecting over closed channels?...

Not really a show-stopper though. In fact I'm pondering other ideas I could try to make some of these tests more significant -- so maybe I should just submit a follow-up PR myself :-)

}
});
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ extern crate rand;
extern crate serde;
#[cfg(any(feature = "force-inprocess", target_os = "windows", target_os = "android"))]
extern crate uuid;
#[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux",
target_os = "freebsd")))]
extern crate mio;
#[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux",
target_os = "freebsd")))]
extern crate fnv;
Expand Down
111 changes: 66 additions & 45 deletions src/platform/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

use fnv::FnvHasher;
use bincode::serde::DeserializeError;
use libc::{self, MAP_FAILED, MAP_SHARED, POLLIN, PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET};
use libc::{self, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET};
use libc::{SO_LINGER, S_IFMT, S_IFSOCK, c_char, c_int, c_void, getsockopt};
use libc::{iovec, mkstemp, mode_t, msghdr, nfds_t, off_t, poll, pollfd, recvmsg, sendmsg};
use libc::{iovec, mkstemp, mode_t, msghdr, off_t, recvmsg, sendmsg};
use libc::{setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t, sa_family_t};
use std::cell::Cell;
use std::cmp;
Expand All @@ -27,6 +27,8 @@ use std::ptr;
use std::slice;
use std::sync::Arc;
use std::thread;
use mio::unix::EventedFd;
use mio::{Poll, Token, Events, Ready, PollOpt};

const MAX_FDS_IN_CMSG: u32 = 64;

Expand Down Expand Up @@ -79,6 +81,12 @@ pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver),UnixError> {
}
}

#[derive(Clone, Copy)]
struct PollEntry {
pub id: u64,
pub fd: c_int
}

#[derive(PartialEq, Debug)]
pub struct OsIpcReceiver {
fd: c_int,
Expand Down Expand Up @@ -415,17 +423,18 @@ impl OsIpcChannel {

pub struct OsIpcReceiverSet {
incrementor: Incrementor,
pollfds: Vec<pollfd>,
fdids: HashMap<c_int, u64, BuildHasherDefault<FnvHasher>>
poll: Poll,
pollfds: HashMap<Token, PollEntry, BuildHasherDefault<FnvHasher>>,
events: Events
}

impl Drop for OsIpcReceiverSet {
fn drop(&mut self) {
unsafe {
for pollfd in self.pollfds.iter() {
let result = libc::close(pollfd.fd);
assert!(thread::panicking() || result == 0);
}
for &PollEntry { id: _, fd } in self.pollfds.values() {
let result = unsafe {
libc::close(fd)
};
assert!(thread::panicking() || result == 0);
}
}
}
Expand All @@ -435,61 +444,67 @@ impl OsIpcReceiverSet {
let fnv = BuildHasherDefault::<FnvHasher>::default();
Ok(OsIpcReceiverSet {
incrementor: Incrementor::new(),
pollfds: Vec::new(),
fdids: HashMap::with_hasher(fnv)
poll: try!(Poll::new()),
pollfds: HashMap::with_hasher(fnv),
events: Events::with_capacity(10)
})
}

pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64,UnixError> {
let last_index = self.incrementor.increment();
let fd = receiver.consume_fd();
self.pollfds.push(pollfd {
fd: fd,
events: POLLIN,
revents: 0,
});
self.fdids.insert(fd, last_index);
let io = EventedFd(&fd);
let fd_token = Token(fd as usize);
let poll_entry = PollEntry {
id: last_index,
fd: fd
};
try!(self.poll.register(&io,
fd_token,
Ready::readable(),
PollOpt::level()));
self.pollfds.insert(fd_token, poll_entry);
Ok(last_index)
}

pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>,UnixError> {
let mut selection_results = Vec::new();
let result = unsafe {
poll(self.pollfds.as_mut_ptr(), self.pollfds.len() as nfds_t, -1)
match self.poll.poll(&mut self.events, None) {
Ok(sz) if sz > 0 => {},
_ => { return Err(UnixError::last()); }
};
if result <= 0 {
return Err(UnixError::last())
}

for pollfd in self.pollfds.iter_mut() {
if (pollfd.revents & POLLIN) != 0 {
match recv(pollfd.fd, BlockingMode::Blocking) {
Ok((data, channels, shared_memory_regions)) => {
selection_results.push(OsIpcSelectionResult::DataReceived(
*self.fdids.get(&pollfd.fd).unwrap(),
data,
channels,
shared_memory_regions));
}
Err(err) if err.channel_is_closed() => {
let id = self.fdids.remove(&pollfd.fd).unwrap();
unsafe {
libc::close(pollfd.fd);
for evt in self.events.iter() {
let evt_token = evt.token();
match (evt.kind().is_readable(), self.pollfds.get(&evt_token)) {
(true, Some(&poll_entry)) => {
match recv(poll_entry.fd, BlockingMode::Blocking) {
Ok((data, channels, shared_memory_regions)) => {
selection_results.push(OsIpcSelectionResult::DataReceived(
poll_entry.id,
data,
channels,
shared_memory_regions));
}
selection_results.push(OsIpcSelectionResult::ChannelClosed(id))
Err(err) if err.channel_is_closed() => {
self.pollfds.remove(&evt_token).unwrap();
unsafe {
libc::close(poll_entry.fd);
}
selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id))
}
Err(err) => return Err(err),
}
Err(err) => return Err(err),
},
(true, None) => {
panic!("Readable event for unknown token: {:?}", evt_token)
},
(false, _) => {
panic!("Received an event that was not readable for token: {:?}", evt_token)
}
pollfd.revents = pollfd.revents & !POLLIN
}
}

// File descriptors not in fdids are closed channels, and the descriptor
// has been closed. This must be done after we have finished iterating over
// the pollfds vec.
let fdids = &self.fdids;
self.pollfds.retain(|pollfd| fdids.contains_key(&pollfd.fd));

Ok(selection_results)
}
}
Expand Down Expand Up @@ -752,6 +767,12 @@ impl From<UnixError> for Error {
}
}

impl From<Error> for UnixError {
fn from(e: Error) -> UnixError {
UnixError(e.raw_os_error().unwrap())
}
}

#[derive(Copy, Clone)]
enum BlockingMode {
Blocking,
Expand Down