Skip to content

Commit

Permalink
Auto merge of #94 - danlrobertson:switch_to_epoll, r=pcwalton
Browse files Browse the repository at this point in the history
Linux OsIpcReceiverSet - Switch to use mio

Switch from using poll to a level-triggered use of epoll for the Linux OSIpcReceiverSet. The use of epoll should perform better when ther are a large number of watched fd's.

Side note: An edge-triggered use of `epoll` would be very easy here, but to make the transition easier I chose level-triggered. Let me know if you thing edge-triggered would be better. Comments and critiques are welcome!

<!-- Reviewable:start -->

---

This change is [<img src="https://reviewable.io/review_button.svg" height="34" align="absmiddle" alt="Reviewable"/>](https://reviewable.io/reviews/servo/ipc-channel/94)

<!-- Reviewable:end -->
  • Loading branch information
bors-servo committed Dec 2, 2016
2 parents 1c12453 + 78891e7 commit 703a044
Show file tree
Hide file tree
Showing 4 changed files with 212 additions and 45 deletions.
3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,8 @@ serde = "0.8"
uuid = {version = "0.3", features = ["v4"]}
fnv = "1.0.3"

[target.'cfg(any(target_os = "linux", target_os = "freebsd"))'.dependencies]
mio = "0.6.1"

[dev-dependencies]
crossbeam = "0.2"
140 changes: 140 additions & 0 deletions benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,3 +172,143 @@ fn size_22_4m(b: &mut test::Bencher) {
fn size_23_8m(b: &mut test::Bencher) {
bench_size(b, 8 * 1024 * 1024);
}

mod receiver_set {
use ipc_channel::ipc::{self, IpcReceiverSet};
use test;

fn gen_select_test(b: &mut test::Bencher, to_send: usize, n: usize) -> () {
let mut active = Vec::with_capacity(to_send);
let mut dormant = Vec::with_capacity(n - to_send);
let mut rx_set = IpcReceiverSet::new().unwrap();
for _ in 0..to_send {
let (tx, rx) = ipc::channel().unwrap();
rx_set.add(rx).unwrap();
active.push(tx);
}
for _ in to_send..n {
let (tx, rx) = ipc::channel::<()>().unwrap();
rx_set.add(rx).unwrap();
dormant.push(tx);
}
b.iter(|| {
for tx in active.iter() {
tx.send(()).unwrap();
}
let mut received = 0;
while received < to_send {
for result in rx_set.select().unwrap().into_iter() {
let (_, _) = result.unwrap();
received += 1;
}
}
});
}

fn create_empty_set() -> Result<IpcReceiverSet, ()> {
Ok(IpcReceiverSet::new().unwrap())
}

fn add_n_rxs(rx_set: &mut IpcReceiverSet, n: usize) -> () {
for _ in 0..n {
let (_, rx) = ipc::channel::<()>().unwrap();
rx_set.add(rx).unwrap();
}
}

#[bench]
fn send_on_1_of_1(b: &mut test::Bencher) -> () {
gen_select_test(b, 1, 1);
}

#[bench]
fn send_on_1_of_5(b: &mut test::Bencher) -> () {
gen_select_test(b, 1, 5);
}

#[bench]
fn send_on_2_of_5(b: &mut test::Bencher) -> () {
gen_select_test(b, 2, 5);
}

#[bench]
fn send_on_5_of_5(b: &mut test::Bencher) -> () {
gen_select_test(b, 5, 5);
}

#[bench]
fn send_on_1_of_20(b: &mut test::Bencher) -> () {
gen_select_test(b, 1, 20);
}

#[bench]
fn send_on_5_of_20(b: &mut test::Bencher) -> () {
gen_select_test(b, 5, 20);
}

#[bench]
fn send_on_20_of_20(b: &mut test::Bencher) -> () {
gen_select_test(b, 20, 20);
}

#[bench]
fn send_on_1_of_100(b: &mut test::Bencher) -> () {
gen_select_test(b, 1, 100);
}

#[bench]
fn send_on_5_of_100(b: &mut test::Bencher) -> () {
gen_select_test(b, 5, 100);
}
#[bench]
fn send_on_20_of_100(b: &mut test::Bencher) -> () {
gen_select_test(b, 20, 100);
}

#[bench]
fn send_on_100_of_100(b: &mut test::Bencher) -> () {
gen_select_test(b, 100, 100);
}

#[bench]
fn create_and_destroy_empty_set(b: &mut test::Bencher) -> () {
b.iter(|| {
create_empty_set().unwrap();
});
}

#[bench]
fn create_and_destroy_set_of_10(b: &mut test::Bencher) -> () {
b.iter(|| {
let mut rx_set = IpcReceiverSet::new().unwrap();
add_n_rxs(&mut rx_set, 10);
});
}

#[bench]
fn create_and_destroy_set_of_5(b: &mut test::Bencher) -> () {
b.iter(|| {
let mut rx_set = IpcReceiverSet::new().unwrap();
add_n_rxs(&mut rx_set, 5);
});
}

#[bench]
// Benchmark adding and removing closed receivers from the set
fn add_and_remove_closed_receivers(b: &mut test::Bencher) -> () {
b.iter(|| {
let mut rx_set = IpcReceiverSet::new().unwrap();
{
{
let (_, rx) = ipc::channel::<()>().unwrap();
rx_set.add(rx).unwrap();
}
// On select Receivers with a "ClosedChannel" event
// will be closed
rx_set.select().unwrap();
let (_, rx) = ipc::channel::<()>().unwrap();
rx_set.add(rx).unwrap();
}
});
}
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ extern crate rand;
extern crate serde;
#[cfg(any(feature = "force-inprocess", target_os = "windows", target_os = "android"))]
extern crate uuid;
#[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux",
target_os = "freebsd")))]
extern crate mio;
#[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux",
target_os = "freebsd")))]
extern crate fnv;
Expand Down
111 changes: 66 additions & 45 deletions src/platform/unix/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@

use fnv::FnvHasher;
use bincode::serde::DeserializeError;
use libc::{self, MAP_FAILED, MAP_SHARED, POLLIN, PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET};
use libc::{self, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET};
use libc::{SO_LINGER, S_IFMT, S_IFSOCK, c_char, c_int, c_void, getsockopt};
use libc::{iovec, mkstemp, mode_t, msghdr, nfds_t, off_t, poll, pollfd, recvmsg, sendmsg};
use libc::{iovec, mkstemp, mode_t, msghdr, off_t, recvmsg, sendmsg};
use libc::{setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t, sa_family_t};
use std::cell::Cell;
use std::cmp;
Expand All @@ -27,6 +27,8 @@ use std::ptr;
use std::slice;
use std::sync::Arc;
use std::thread;
use mio::unix::EventedFd;
use mio::{Poll, Token, Events, Ready, PollOpt};

const MAX_FDS_IN_CMSG: u32 = 64;

Expand Down Expand Up @@ -79,6 +81,12 @@ pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver),UnixError> {
}
}

#[derive(Clone, Copy)]
struct PollEntry {
pub id: u64,
pub fd: c_int
}

#[derive(PartialEq, Debug)]
pub struct OsIpcReceiver {
fd: c_int,
Expand Down Expand Up @@ -415,17 +423,18 @@ impl OsIpcChannel {

pub struct OsIpcReceiverSet {
incrementor: Incrementor,
pollfds: Vec<pollfd>,
fdids: HashMap<c_int, u64, BuildHasherDefault<FnvHasher>>
poll: Poll,
pollfds: HashMap<Token, PollEntry, BuildHasherDefault<FnvHasher>>,
events: Events
}

impl Drop for OsIpcReceiverSet {
fn drop(&mut self) {
unsafe {
for pollfd in self.pollfds.iter() {
let result = libc::close(pollfd.fd);
assert!(thread::panicking() || result == 0);
}
for &PollEntry { id: _, fd } in self.pollfds.values() {
let result = unsafe {
libc::close(fd)
};
assert!(thread::panicking() || result == 0);
}
}
}
Expand All @@ -435,61 +444,67 @@ impl OsIpcReceiverSet {
let fnv = BuildHasherDefault::<FnvHasher>::default();
Ok(OsIpcReceiverSet {
incrementor: Incrementor::new(),
pollfds: Vec::new(),
fdids: HashMap::with_hasher(fnv)
poll: try!(Poll::new()),
pollfds: HashMap::with_hasher(fnv),
events: Events::with_capacity(10)
})
}

pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<u64,UnixError> {
let last_index = self.incrementor.increment();
let fd = receiver.consume_fd();
self.pollfds.push(pollfd {
fd: fd,
events: POLLIN,
revents: 0,
});
self.fdids.insert(fd, last_index);
let io = EventedFd(&fd);
let fd_token = Token(fd as usize);
let poll_entry = PollEntry {
id: last_index,
fd: fd
};
try!(self.poll.register(&io,
fd_token,
Ready::readable(),
PollOpt::level()));
self.pollfds.insert(fd_token, poll_entry);
Ok(last_index)
}

pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>,UnixError> {
let mut selection_results = Vec::new();
let result = unsafe {
poll(self.pollfds.as_mut_ptr(), self.pollfds.len() as nfds_t, -1)
match self.poll.poll(&mut self.events, None) {
Ok(sz) if sz > 0 => {},
_ => { return Err(UnixError::last()); }
};
if result <= 0 {
return Err(UnixError::last())
}

for pollfd in self.pollfds.iter_mut() {
if (pollfd.revents & POLLIN) != 0 {
match recv(pollfd.fd, BlockingMode::Blocking) {
Ok((data, channels, shared_memory_regions)) => {
selection_results.push(OsIpcSelectionResult::DataReceived(
*self.fdids.get(&pollfd.fd).unwrap(),
data,
channels,
shared_memory_regions));
}
Err(err) if err.channel_is_closed() => {
let id = self.fdids.remove(&pollfd.fd).unwrap();
unsafe {
libc::close(pollfd.fd);
for evt in self.events.iter() {
let evt_token = evt.token();
match (evt.kind().is_readable(), self.pollfds.get(&evt_token)) {
(true, Some(&poll_entry)) => {
match recv(poll_entry.fd, BlockingMode::Blocking) {
Ok((data, channels, shared_memory_regions)) => {
selection_results.push(OsIpcSelectionResult::DataReceived(
poll_entry.id,
data,
channels,
shared_memory_regions));
}
selection_results.push(OsIpcSelectionResult::ChannelClosed(id))
Err(err) if err.channel_is_closed() => {
self.pollfds.remove(&evt_token).unwrap();
unsafe {
libc::close(poll_entry.fd);
}
selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id))
}
Err(err) => return Err(err),
}
Err(err) => return Err(err),
},
(true, None) => {
panic!("Readable event for unknown token: {:?}", evt_token)
},
(false, _) => {
panic!("Received an event that was not readable for token: {:?}", evt_token)
}
pollfd.revents = pollfd.revents & !POLLIN
}
}

// File descriptors not in fdids are closed channels, and the descriptor
// has been closed. This must be done after we have finished iterating over
// the pollfds vec.
let fdids = &self.fdids;
self.pollfds.retain(|pollfd| fdids.contains_key(&pollfd.fd));

Ok(selection_results)
}
}
Expand Down Expand Up @@ -752,6 +767,12 @@ impl From<UnixError> for Error {
}
}

impl From<Error> for UnixError {
fn from(e: Error) -> UnixError {
UnixError(e.raw_os_error().unwrap())
}
}

#[derive(Copy, Clone)]
enum BlockingMode {
Blocking,
Expand Down

0 comments on commit 703a044

Please sign in to comment.