Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 60 additions & 74 deletions src/shims/unix/linux_like/epoll.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::io;
use std::time::Duration;

Expand All @@ -18,12 +18,13 @@ type EpollEventKey = (FdId, FdNum);
#[derive(Debug, Default)]
struct Epoll {
/// A map of EpollEventInterests registered under this epoll instance. Each entry is
/// differentiated using FdId and file descriptor value. Note that we do not have a separate
/// "ready" list; instead, a boolean flag in this list tracks which subset is ready. This makes
/// `epoll_wait` less efficient, but also requires less bookkeeping.
/// differentiated using FdId and file descriptor value.
interest_list: RefCell<BTreeMap<EpollEventKey, EpollEventInterest>>,
/// The queue of threads blocked on this epoll instance, and how many events they'd like to get.
queue: RefCell<VecDeque<(ThreadId, u32)>>,
/// The subset of interests that is currently considered "ready". Stored separately so we
/// can access it more efficiently.
ready_set: RefCell<BTreeSet<EpollEventKey>>,
/// The queue of threads blocked on this epoll instance.
queue: RefCell<VecDeque<ThreadId>>,
}

impl VisitProvenance for Epoll {
Expand All @@ -44,8 +45,6 @@ pub struct EpollEventInterest {
relevant_events: u32,
/// The currently active events for this file descriptor.
active_events: u32,
/// Whether this interest is in the "ready" set.
ready: bool,
/// The vector clock for wakeups.
clock: VClock,
/// User-defined data associated with this interest.
Expand Down Expand Up @@ -176,13 +175,19 @@ impl EpollInterestTable {
if let Some(epolls) = self.0.remove(&id) {
for epoll in epolls.iter().filter_map(|(_id, epoll)| epoll.upgrade()) {
// This is a still-live epoll with interest in this FD. Remove all
// relevent interests.
// relevent interests (including from the ready set).
epoll
.interest_list
.borrow_mut()
.extract_if(range_for_id(id), |_, _| true)
// Consume the iterator.
.for_each(|_| ());
epoll
.ready_set
.borrow_mut()
.extract_if(range_for_id(id), |_| true)
// Consume the iterator.
.for_each(|_| ());
}
}
}
Expand Down Expand Up @@ -326,7 +331,6 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
relevant_events: events,
data,
active_events: 0,
ready: false,
clock: VClock::default(),
};
if interest_list.try_insert(epoll_key, new_interest).is_err() {
Expand All @@ -351,19 +355,20 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
move |callback| {
// Need to release the RefCell when this closure returns, so we have to move
// it into the closure, so we have to do a re-lookup here.
callback(interest_list.get_mut(&epoll_key).unwrap())
callback(epoll_key, interest_list.get_mut(&epoll_key).unwrap())
},
)?;

interp_ok(Scalar::from_i32(0))
} else if op == epoll_ctl_del {
let epoll_key = (id, fd);

// Remove epoll_event_interest from interest_list.
// Remove epoll_event_interest from interest_list and ready_set.
if interest_list.remove(&epoll_key).is_none() {
// We did not have interest in this.
return this.set_last_error_and_return_i32(LibcError("ENOENT"));
};
epfd.ready_set.borrow_mut().remove(&epoll_key);
// If this was the last interest in this FD, remove us from the global list
// of who is interested in this FD.
if interest_list.range(range_for_id(id)).next().is_none() {
Expand Down Expand Up @@ -441,7 +446,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
return this.set_last_error_and_return(LibcError("EBADF"), dest);
};

if timeout == 0 || epfd.interest_list.borrow().values().any(|i| i.ready) {
if timeout == 0 || !epfd.ready_set.borrow().is_empty() {
// If the timeout is 0 or there is a ready event, we can return immediately.
return_ready_list(&epfd, dest, &event, this)?;
} else {
Expand All @@ -459,9 +464,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
}
};
// Record this thread as blocked.
epfd.queue
.borrow_mut()
.push_back((this.active_thread(), maxevents.try_into().unwrap()));
epfd.queue.borrow_mut().push_back(this.active_thread());
// And block it.
let dest = dest.clone();
// We keep a strong ref to the underlying `Epoll` to make sure it sticks around.
Expand All @@ -487,7 +490,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
// Remove the current active thread_id from the blocked thread_id list.
epfd
.queue.borrow_mut()
.retain(|&(id, _events)| id != this.active_thread());
.retain(|&id| id != this.active_thread());
this.write_int(0, &dest)?;
interp_ok(())
},
Expand Down Expand Up @@ -526,9 +529,9 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> {
let active_events = fd_ref.as_unix(this).epoll_active_events()?.get_event_bitmask(this);
for epoll in epolls {
update_readiness(this, &epoll, active_events, force_edge, |callback| {
for (_key, interest) in epoll.interest_list.borrow_mut().range_mut(range_for_id(id))
for (&key, interest) in epoll.interest_list.borrow_mut().range_mut(range_for_id(id))
{
callback(interest)?;
callback(key, interest)?;
}
interp_ok(())
})?;
Expand All @@ -550,46 +553,34 @@ fn update_readiness<'tcx>(
active_events: u32,
force_edge: bool,
for_each_interest: impl FnOnce(
&mut dyn FnMut(&mut EpollEventInterest) -> InterpResult<'tcx>,
&mut dyn FnMut(EpollEventKey, &mut EpollEventInterest) -> InterpResult<'tcx>,
) -> InterpResult<'tcx>,
) -> InterpResult<'tcx> {
let mut num_ready = 0u32; // how many events we have ready to deliver
for_each_interest(&mut |interest| {
let mut ready_set = epoll.ready_set.borrow_mut();
for_each_interest(&mut |key, interest| {
// Update the ready events tracked in this interest.
let new_readiness = interest.relevant_events & active_events;
let prev_readiness = std::mem::replace(&mut interest.active_events, new_readiness);
if new_readiness == 0 {
// Un-trigger this, there's nothing left to report here.
interest.ready = false;
ready_set.remove(&key);
} else if force_edge || new_readiness != prev_readiness & new_readiness {
// Either we force an "edge" to be detected, or there's a bit set in `new`
// that was not set in `prev`.
interest.ready = true;
// that was not set in `prev`. In both cases, this is ready now.
ready_set.insert(key);
ecx.release_clock(|clock| {
interest.clock.join(clock);
})?;
num_ready = num_ready.saturating_add(1);
}
interp_ok(())
})?;
// Edge-triggered notifications only wake up as many threads as are needed to deliver
// all the events.
while num_ready > 0
&& let Some((thread_id, events)) = epoll.queue.borrow_mut().pop_front()
// While there are events ready to be delivered, wake up a thread to receive them.
while !ready_set.is_empty()
&& let Some(thread_id) = epoll.queue.borrow_mut().pop_front()
{
drop(ready_set); // release the "lock" so the unblocked thread can have it
ecx.unblock_thread(thread_id, BlockReason::Epoll)?;
// Keep track of how many events we have left to deliver (except if we saturated;
// in that case we just wake up everybody).
if num_ready != u32::MAX {
num_ready = num_ready.saturating_sub(events);
}
}
// Sanity-check: if there are threads left to wake up, then there are no more ready events.
if !epoll.queue.borrow().is_empty() {
assert!(
epoll.interest_list.borrow().values().all(|i| !i.ready),
"there are unconsumed ready events and threads ready to take them"
);
ready_set = epoll.ready_set.borrow_mut();
}

interp_ok(())
Expand All @@ -604,43 +595,38 @@ fn return_ready_list<'tcx>(
ecx: &mut MiriInterpCx<'tcx>,
) -> InterpResult<'tcx, i32> {
let mut interest_list = epfd.interest_list.borrow_mut();
let mut ready_set = epfd.ready_set.borrow_mut();
let mut num_of_events: i32 = 0;
let mut array_iter = ecx.project_array_fields(events)?;

let mut interests = interest_list.iter_mut();
while let Some(slot) = array_iter.next(ecx)? {
// Search for next ready event that we are intersted in. This is an inefficient linear scan.
// We could make it efficient by tracking the set of triggered events in a BTreeSet or so,
// but since this is very unlikely to be a bottleneck we prefer cleaner code.
for (key, interest) in interests.by_ref() {
// Sanity-check to ensure that the info about this event is up-to-date.
if cfg!(debug_assertions) {
// Ensure this matches the latest readiness of this FD.
// We have to do an FD lookup by ID for this. The FdNum might be already closed.
let fd = &ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap();
let current_readiness =
fd.as_unix(ecx).epoll_active_events()?.get_event_bitmask(ecx);
assert_eq!(interest.active_events, current_readiness & interest.relevant_events);
}
// Skip event if it has not been triggered.
if !interest.ready {
continue;
}
// Deliver event to caller.
ecx.write_int_fields_named(
&[("events", interest.active_events.into()), ("u64", interest.data.into())],
&slot.1,
)?;
num_of_events = num_of_events.strict_add(1);
// Synchronize waking thread with the event of interest.
ecx.acquire_clock(&interest.clock)?;
// Mark this interest as no-longer-ready, since it has been delivered (and we only
// support ET).
interest.ready = false;
// Skip out of this loop so that we go to the next slot in the array.
break;
// Sanity-check to ensure that all event info is up-to-date.
if cfg!(debug_assertions) {
for (key, interest) in interest_list.iter() {
// Ensure this matches the latest readiness of this FD.
// We have to do an FD lookup by ID for this. The FdNum might be already closed.
let fd = &ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap();
let current_active = fd.as_unix(ecx).epoll_active_events()?.get_event_bitmask(ecx);
assert_eq!(interest.active_events, current_active & interest.relevant_events);
}
}

// While there is a slot to store another event, and an event to store, deliver that event.
while let Some(slot) = array_iter.next(ecx)?
&& let Some(&key) = ready_set.first()
{
let interest = interest_list.get_mut(&key).expect("non-existent event in ready set");
// Deliver event to caller.
ecx.write_int_fields_named(
&[("events", interest.active_events.into()), ("u64", interest.data.into())],
&slot.1,
)?;
num_of_events = num_of_events.strict_add(1);
// Synchronize receiving thread with the event of interest.
ecx.acquire_clock(&interest.clock)?;
// Since currently, all events are edge-triggered, we remove them from the ready set when
// they get delivered.
ready_set.remove(&key);
}
ecx.write_int(num_of_events, dest)?;
interp_ok(num_of_events)
}