From be882855182a40e1cbfe60418cd8f87f3278346b Mon Sep 17 00:00:00 2001 From: Dan Robertson Date: Wed, 10 Aug 2016 19:18:49 +0000 Subject: [PATCH 1/2] Add bench for IpcReceiverSet Add benchmark tests for IpcReceiverSet - Create benchmarks for select - the extreme case when the set only receives an event from a small number of its children - the extreme case when the set receives an event from all of its children - normal cases (few registered children and a few events) - Create benchmarks for the creation and deletion of a set --- benches/bench.rs | 140 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) diff --git a/benches/bench.rs b/benches/bench.rs index 56933d50f..a75699a6d 100644 --- a/benches/bench.rs +++ b/benches/bench.rs @@ -172,3 +172,143 @@ fn size_22_4m(b: &mut test::Bencher) { fn size_23_8m(b: &mut test::Bencher) { bench_size(b, 8 * 1024 * 1024); } + +mod receiver_set { + use ipc_channel::ipc::{self, IpcReceiverSet}; + use test; + + fn gen_select_test(b: &mut test::Bencher, to_send: usize, n: usize) -> () { + let mut active = Vec::with_capacity(to_send); + let mut dormant = Vec::with_capacity(n - to_send); + let mut rx_set = IpcReceiverSet::new().unwrap(); + for _ in 0..to_send { + let (tx, rx) = ipc::channel().unwrap(); + rx_set.add(rx).unwrap(); + active.push(tx); + } + for _ in to_send..n { + let (tx, rx) = ipc::channel::<()>().unwrap(); + rx_set.add(rx).unwrap(); + dormant.push(tx); + } + b.iter(|| { + for tx in active.iter() { + tx.send(()).unwrap(); + } + let mut received = 0; + while received < to_send { + for result in rx_set.select().unwrap().into_iter() { + let (_, _) = result.unwrap(); + received += 1; + } + } + }); + } + + fn create_empty_set() -> Result { + Ok(IpcReceiverSet::new().unwrap()) + } + + fn add_n_rxs(rx_set: &mut IpcReceiverSet, n: usize) -> () { + for _ in 0..n { + let (_, rx) = ipc::channel::<()>().unwrap(); + rx_set.add(rx).unwrap(); + } + } + + #[bench] + fn send_on_1_of_1(b: &mut test::Bencher) -> () { + gen_select_test(b, 1, 1); + } + + #[bench] + fn send_on_1_of_5(b: &mut test::Bencher) -> () { + gen_select_test(b, 1, 5); + } + + #[bench] + fn send_on_2_of_5(b: &mut test::Bencher) -> () { + gen_select_test(b, 2, 5); + } + + #[bench] + fn send_on_5_of_5(b: &mut test::Bencher) -> () { + gen_select_test(b, 5, 5); + } + + #[bench] + fn send_on_1_of_20(b: &mut test::Bencher) -> () { + gen_select_test(b, 1, 20); + } + + #[bench] + fn send_on_5_of_20(b: &mut test::Bencher) -> () { + gen_select_test(b, 5, 20); + } + + #[bench] + fn send_on_20_of_20(b: &mut test::Bencher) -> () { + gen_select_test(b, 20, 20); + } + + #[bench] + fn send_on_1_of_100(b: &mut test::Bencher) -> () { + gen_select_test(b, 1, 100); + } + + #[bench] + fn send_on_5_of_100(b: &mut test::Bencher) -> () { + gen_select_test(b, 5, 100); + } + #[bench] + fn send_on_20_of_100(b: &mut test::Bencher) -> () { + gen_select_test(b, 20, 100); + } + + #[bench] + fn send_on_100_of_100(b: &mut test::Bencher) -> () { + gen_select_test(b, 100, 100); + } + + #[bench] + fn create_and_destroy_empty_set(b: &mut test::Bencher) -> () { + b.iter(|| { + create_empty_set().unwrap(); + }); + } + + #[bench] + fn create_and_destroy_set_of_10(b: &mut test::Bencher) -> () { + b.iter(|| { + let mut rx_set = IpcReceiverSet::new().unwrap(); + add_n_rxs(&mut rx_set, 10); + }); + } + + #[bench] + fn create_and_destroy_set_of_5(b: &mut test::Bencher) -> () { + b.iter(|| { + let mut rx_set = IpcReceiverSet::new().unwrap(); + add_n_rxs(&mut rx_set, 5); + }); + } + + #[bench] + // Benchmark adding and removing closed receivers from the set + fn add_and_remove_closed_receivers(b: &mut test::Bencher) -> () { + b.iter(|| { + let mut rx_set = IpcReceiverSet::new().unwrap(); + { + { + let (_, rx) = ipc::channel::<()>().unwrap(); + rx_set.add(rx).unwrap(); + } + // On select Receivers with a "ClosedChannel" event + // will be closed + rx_set.select().unwrap(); + let (_, rx) = ipc::channel::<()>().unwrap(); + rx_set.add(rx).unwrap(); + } + }); + } +} From f6cefc5650e5cca57629f3a9610c20dcc0289fa7 Mon Sep 17 00:00:00 2001 From: Dan Robertson Date: Wed, 10 Aug 2016 23:34:28 +0000 Subject: [PATCH 2/2] Linux: Switch from using poll to mio Switch from using poll to using mio for the Unix implementation of OSIpcReceiverSet. The use of mio should result in better performance when there are a larger number of watched descriptors. --- Cargo.toml | 3 ++ src/lib.rs | 3 ++ src/platform/unix/mod.rs | 111 +++++++++++++++++++++++---------------- 3 files changed, 72 insertions(+), 45 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index eb4795da4..46cc71d40 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -19,5 +19,8 @@ serde = "0.8" uuid = {version = "0.3", features = ["v4"]} fnv = "1.0.3" +[target.'cfg(any(target_os = "linux", target_os = "freebsd"))'.dependencies] +mio = "0.6.1" + [dev-dependencies] crossbeam = "0.2" diff --git a/src/lib.rs b/src/lib.rs index 00ede753b..013e55692 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,9 @@ extern crate rand; extern crate serde; #[cfg(any(feature = "force-inprocess", target_os = "windows", target_os = "android"))] extern crate uuid; +#[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux", + target_os = "freebsd")))] +extern crate mio; #[cfg(all(not(feature = "force-inprocess"), any(target_os = "linux", target_os = "freebsd")))] extern crate fnv; diff --git a/src/platform/unix/mod.rs b/src/platform/unix/mod.rs index 0bdd8eecb..c24cba1a7 100644 --- a/src/platform/unix/mod.rs +++ b/src/platform/unix/mod.rs @@ -9,9 +9,9 @@ use fnv::FnvHasher; use bincode::serde::DeserializeError; -use libc::{self, MAP_FAILED, MAP_SHARED, POLLIN, PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET}; +use libc::{self, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET}; use libc::{SO_LINGER, S_IFMT, S_IFSOCK, c_char, c_int, c_void, getsockopt}; -use libc::{iovec, mkstemp, mode_t, msghdr, nfds_t, off_t, poll, pollfd, recvmsg, sendmsg}; +use libc::{iovec, mkstemp, mode_t, msghdr, off_t, recvmsg, sendmsg}; use libc::{setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t, sa_family_t}; use std::cell::Cell; use std::cmp; @@ -27,6 +27,8 @@ use std::ptr; use std::slice; use std::sync::Arc; use std::thread; +use mio::unix::EventedFd; +use mio::{Poll, Token, Events, Ready, PollOpt}; const MAX_FDS_IN_CMSG: u32 = 64; @@ -79,6 +81,12 @@ pub fn channel() -> Result<(OsIpcSender, OsIpcReceiver),UnixError> { } } +#[derive(Clone, Copy)] +struct PollEntry { + pub id: u64, + pub fd: c_int +} + #[derive(PartialEq, Debug)] pub struct OsIpcReceiver { fd: c_int, @@ -415,17 +423,18 @@ impl OsIpcChannel { pub struct OsIpcReceiverSet { incrementor: Incrementor, - pollfds: Vec, - fdids: HashMap> + poll: Poll, + pollfds: HashMap>, + events: Events } impl Drop for OsIpcReceiverSet { fn drop(&mut self) { - unsafe { - for pollfd in self.pollfds.iter() { - let result = libc::close(pollfd.fd); - assert!(thread::panicking() || result == 0); - } + for &PollEntry { id: _, fd } in self.pollfds.values() { + let result = unsafe { + libc::close(fd) + }; + assert!(thread::panicking() || result == 0); } } } @@ -435,61 +444,67 @@ impl OsIpcReceiverSet { let fnv = BuildHasherDefault::::default(); Ok(OsIpcReceiverSet { incrementor: Incrementor::new(), - pollfds: Vec::new(), - fdids: HashMap::with_hasher(fnv) + poll: try!(Poll::new()), + pollfds: HashMap::with_hasher(fnv), + events: Events::with_capacity(10) }) } pub fn add(&mut self, receiver: OsIpcReceiver) -> Result { let last_index = self.incrementor.increment(); let fd = receiver.consume_fd(); - self.pollfds.push(pollfd { - fd: fd, - events: POLLIN, - revents: 0, - }); - self.fdids.insert(fd, last_index); + let io = EventedFd(&fd); + let fd_token = Token(fd as usize); + let poll_entry = PollEntry { + id: last_index, + fd: fd + }; + try!(self.poll.register(&io, + fd_token, + Ready::readable(), + PollOpt::level())); + self.pollfds.insert(fd_token, poll_entry); Ok(last_index) } pub fn select(&mut self) -> Result,UnixError> { let mut selection_results = Vec::new(); - let result = unsafe { - poll(self.pollfds.as_mut_ptr(), self.pollfds.len() as nfds_t, -1) + match self.poll.poll(&mut self.events, None) { + Ok(sz) if sz > 0 => {}, + _ => { return Err(UnixError::last()); } }; - if result <= 0 { - return Err(UnixError::last()) - } - for pollfd in self.pollfds.iter_mut() { - if (pollfd.revents & POLLIN) != 0 { - match recv(pollfd.fd, BlockingMode::Blocking) { - Ok((data, channels, shared_memory_regions)) => { - selection_results.push(OsIpcSelectionResult::DataReceived( - *self.fdids.get(&pollfd.fd).unwrap(), - data, - channels, - shared_memory_regions)); - } - Err(err) if err.channel_is_closed() => { - let id = self.fdids.remove(&pollfd.fd).unwrap(); - unsafe { - libc::close(pollfd.fd); + for evt in self.events.iter() { + let evt_token = evt.token(); + match (evt.kind().is_readable(), self.pollfds.get(&evt_token)) { + (true, Some(&poll_entry)) => { + match recv(poll_entry.fd, BlockingMode::Blocking) { + Ok((data, channels, shared_memory_regions)) => { + selection_results.push(OsIpcSelectionResult::DataReceived( + poll_entry.id, + data, + channels, + shared_memory_regions)); } - selection_results.push(OsIpcSelectionResult::ChannelClosed(id)) + Err(err) if err.channel_is_closed() => { + self.pollfds.remove(&evt_token).unwrap(); + unsafe { + libc::close(poll_entry.fd); + } + selection_results.push(OsIpcSelectionResult::ChannelClosed(poll_entry.id)) + } + Err(err) => return Err(err), } - Err(err) => return Err(err), + }, + (true, None) => { + panic!("Readable event for unknown token: {:?}", evt_token) + }, + (false, _) => { + panic!("Received an event that was not readable for token: {:?}", evt_token) } - pollfd.revents = pollfd.revents & !POLLIN } } - // File descriptors not in fdids are closed channels, and the descriptor - // has been closed. This must be done after we have finished iterating over - // the pollfds vec. - let fdids = &self.fdids; - self.pollfds.retain(|pollfd| fdids.contains_key(&pollfd.fd)); - Ok(selection_results) } } @@ -752,6 +767,12 @@ impl From for Error { } } +impl From for UnixError { + fn from(e: Error) -> UnixError { + UnixError(e.raw_os_error().unwrap()) + } +} + #[derive(Copy, Clone)] enum BlockingMode { Blocking,