Skip to content

Commit

Permalink
Linux: Switch from using poll to epoll
Browse files Browse the repository at this point in the history
Switch from using poll to a level-triggered use of epoll for the Linux
OSIpcReceiverSet. The use of epoll should perform better when ther are a
large number of watched fd's.
  • Loading branch information
dlrobertson committed Aug 20, 2016
1 parent 4753ffe commit b720d67
Showing 1 changed file with 49 additions and 24 deletions.
73 changes: 49 additions & 24 deletions src/platform/linux/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@
// except according to those terms.

use bincode::serde::DeserializeError;
use libc::{self, MAP_FAILED, MAP_SHARED, POLLIN, PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET};
use libc::{self, MAP_FAILED, MAP_SHARED, PROT_READ, PROT_WRITE, SOCK_SEQPACKET, SOL_SOCKET};
use libc::{SO_LINGER, S_IFMT, S_IFSOCK, c_char, c_int, c_short, c_ushort, c_void, getsockopt};
use libc::{iovec, mkstemp, mode_t, msghdr, nfds_t, off_t, poll, pollfd, recvmsg, sendmsg};
use libc::{iovec, mkstemp, mode_t, msghdr, off_t, recvmsg, sendmsg};
use libc::{epoll_wait, epoll_event, epoll_create1, epoll_ctl, EPOLLIN, EPOLL_CTL_ADD, EPOLL_CTL_DEL};
use libc::{setsockopt, size_t, sockaddr, sockaddr_un, socketpair, socklen_t};
use std::cmp;
use std::collections::HashSet;
Expand All @@ -25,6 +26,7 @@ use std::sync::Arc;
use std::thread;

const MAX_FDS_IN_CMSG: u32 = 64;
const MAX_EVENTS: i32 = 10;

lazy_static! {
static ref SYSTEM_SENDBUF_SIZE: usize = {
Expand Down Expand Up @@ -386,70 +388,93 @@ impl OsIpcChannel {
}

pub struct OsIpcReceiverSet {
pollfds: Vec<pollfd>,
epollfd: c_int,
allfds: Vec<c_int>
}

impl Drop for OsIpcReceiverSet {
fn drop(&mut self) {
unsafe {
for pollfd in self.pollfds.iter() {
let result = libc::close(pollfd.fd);
for fd in self.allfds.iter() {
let result = libc::close(*fd);
assert!(thread::panicking() || result == 0);
}
let result = libc::close(self.epollfd);
assert!(thread::panicking() || result == 0);
}
}
}

impl OsIpcReceiverSet {
pub fn new() -> Result<OsIpcReceiverSet,UnixError> {
Ok(OsIpcReceiverSet {
pollfds: Vec::new(),
epollfd: unsafe { epoll_create1(0) },
allfds: Vec::new()
})
}

pub fn add(&mut self, receiver: OsIpcReceiver) -> Result<i64,UnixError> {
let fd = receiver.consume_fd();
self.pollfds.push(pollfd {
fd: fd,
events: POLLIN,
revents: 0,
});
self.allfds.push(fd);
let mut ev = epoll_event {
events: EPOLLIN as u32,
u64: fd as u64
};
unsafe {
epoll_ctl(self.epollfd, EPOLL_CTL_ADD, fd, &mut ev as *mut epoll_event);
}
Ok(fd as i64)
}

pub fn select(&mut self) -> Result<Vec<OsIpcSelectionResult>,UnixError> {
let mut selection_results = Vec::new();
let result = unsafe {
poll(self.pollfds.as_mut_ptr(), self.pollfds.len() as nfds_t, -1)
let events: Vec<epoll_event> = unsafe {
let mut events = Vec::with_capacity(MAX_EVENTS as usize);
let nfds = epoll_wait(self.epollfd, events.as_mut_ptr(), MAX_EVENTS, -1);
if nfds <= 0 {
return Err(UnixError::last())
}
events.set_len(nfds as usize);
events
};
if result <= 0 {
return Err(UnixError::last())
}

let mut hangups = HashSet::new();
for pollfd in self.pollfds.iter_mut() {
if (pollfd.revents & POLLIN) != 0 {
match recv(pollfd.fd, BlockingMode::Blocking) {
for evt in events.iter() {
if (evt.events & EPOLLIN as u32) != 0 {
let fd = evt.u64 as c_int;
match recv(fd, BlockingMode::Blocking) {
Ok((data, channels, shared_memory_regions)) => {
selection_results.push(OsIpcSelectionResult::DataReceived(
pollfd.fd as i64,
fd as i64,
data,
channels,
shared_memory_regions));
}
Err(err) if err.channel_is_closed() => {
hangups.insert(pollfd.fd);
hangups.insert(fd);
selection_results.push(OsIpcSelectionResult::ChannelClosed(
pollfd.fd as i64))
fd as i64))
}
Err(err) => return Err(err),
}
pollfd.revents = pollfd.revents & !POLLIN
}
}

if !hangups.is_empty() {
self.pollfds.retain(|pollfd| !hangups.contains(&pollfd.fd));
for hangup in hangups.iter() {
unsafe {
// NB: See the BUGS portion of the epoll_ctl man page
// for the perpose of the event variable
let mut event = epoll_event {
events: 0,
u64: 0
};
epoll_ctl(self.epollfd, EPOLL_CTL_DEL, *hangup,
&mut event as *mut epoll_event);
libc::close(*hangup);
}
}
self.allfds.retain(|fd| !hangups.contains(&fd));
}

Ok(selection_results)
Expand Down

0 comments on commit b720d67

Please sign in to comment.