diff --git a/src/shims/unix/linux_like/epoll.rs b/src/shims/unix/linux_like/epoll.rs index 04f4e33e1d..52b761fa08 100644 --- a/src/shims/unix/linux_like/epoll.rs +++ b/src/shims/unix/linux_like/epoll.rs @@ -1,5 +1,5 @@ use std::cell::RefCell; -use std::collections::{BTreeMap, VecDeque}; +use std::collections::{BTreeMap, BTreeSet, VecDeque}; use std::io; use std::time::Duration; @@ -18,12 +18,13 @@ type EpollEventKey = (FdId, FdNum); #[derive(Debug, Default)] struct Epoll { /// A map of EpollEventInterests registered under this epoll instance. Each entry is - /// differentiated using FdId and file descriptor value. Note that we do not have a separate - /// "ready" list; instead, a boolean flag in this list tracks which subset is ready. This makes - /// `epoll_wait` less efficient, but also requires less bookkeeping. + /// differentiated using FdId and file descriptor value. interest_list: RefCell>, - /// The queue of threads blocked on this epoll instance, and how many events they'd like to get. - queue: RefCell>, + /// The subset of interests that is currently considered "ready". Stored separately so we + /// can access it more efficiently. + ready_set: RefCell>, + /// The queue of threads blocked on this epoll instance. + queue: RefCell>, } impl VisitProvenance for Epoll { @@ -44,8 +45,6 @@ pub struct EpollEventInterest { relevant_events: u32, /// The currently active events for this file descriptor. active_events: u32, - /// Whether this interest is in the "ready" set. - ready: bool, /// The vector clock for wakeups. clock: VClock, /// User-defined data associated with this interest. @@ -176,13 +175,19 @@ impl EpollInterestTable { if let Some(epolls) = self.0.remove(&id) { for epoll in epolls.iter().filter_map(|(_id, epoll)| epoll.upgrade()) { // This is a still-live epoll with interest in this FD. Remove all - // relevent interests. + // relevent interests (including from the ready set). epoll .interest_list .borrow_mut() .extract_if(range_for_id(id), |_, _| true) // Consume the iterator. .for_each(|_| ()); + epoll + .ready_set + .borrow_mut() + .extract_if(range_for_id(id), |_| true) + // Consume the iterator. + .for_each(|_| ()); } } } @@ -326,7 +331,6 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { relevant_events: events, data, active_events: 0, - ready: false, clock: VClock::default(), }; if interest_list.try_insert(epoll_key, new_interest).is_err() { @@ -351,7 +355,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { move |callback| { // Need to release the RefCell when this closure returns, so we have to move // it into the closure, so we have to do a re-lookup here. - callback(interest_list.get_mut(&epoll_key).unwrap()) + callback(epoll_key, interest_list.get_mut(&epoll_key).unwrap()) }, )?; @@ -359,11 +363,12 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { } else if op == epoll_ctl_del { let epoll_key = (id, fd); - // Remove epoll_event_interest from interest_list. + // Remove epoll_event_interest from interest_list and ready_set. if interest_list.remove(&epoll_key).is_none() { // We did not have interest in this. return this.set_last_error_and_return_i32(LibcError("ENOENT")); }; + epfd.ready_set.borrow_mut().remove(&epoll_key); // If this was the last interest in this FD, remove us from the global list // of who is interested in this FD. if interest_list.range(range_for_id(id)).next().is_none() { @@ -441,7 +446,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { return this.set_last_error_and_return(LibcError("EBADF"), dest); }; - if timeout == 0 || epfd.interest_list.borrow().values().any(|i| i.ready) { + if timeout == 0 || !epfd.ready_set.borrow().is_empty() { // If the timeout is 0 or there is a ready event, we can return immediately. return_ready_list(&epfd, dest, &event, this)?; } else { @@ -459,9 +464,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { } }; // Record this thread as blocked. - epfd.queue - .borrow_mut() - .push_back((this.active_thread(), maxevents.try_into().unwrap())); + epfd.queue.borrow_mut().push_back(this.active_thread()); // And block it. let dest = dest.clone(); // We keep a strong ref to the underlying `Epoll` to make sure it sticks around. @@ -487,7 +490,7 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { // Remove the current active thread_id from the blocked thread_id list. epfd .queue.borrow_mut() - .retain(|&(id, _events)| id != this.active_thread()); + .retain(|&id| id != this.active_thread()); this.write_int(0, &dest)?; interp_ok(()) }, @@ -526,9 +529,9 @@ pub trait EvalContextExt<'tcx>: crate::MiriInterpCxExt<'tcx> { let active_events = fd_ref.as_unix(this).epoll_active_events()?.get_event_bitmask(this); for epoll in epolls { update_readiness(this, &epoll, active_events, force_edge, |callback| { - for (_key, interest) in epoll.interest_list.borrow_mut().range_mut(range_for_id(id)) + for (&key, interest) in epoll.interest_list.borrow_mut().range_mut(range_for_id(id)) { - callback(interest)?; + callback(key, interest)?; } interp_ok(()) })?; @@ -550,46 +553,34 @@ fn update_readiness<'tcx>( active_events: u32, force_edge: bool, for_each_interest: impl FnOnce( - &mut dyn FnMut(&mut EpollEventInterest) -> InterpResult<'tcx>, + &mut dyn FnMut(EpollEventKey, &mut EpollEventInterest) -> InterpResult<'tcx>, ) -> InterpResult<'tcx>, ) -> InterpResult<'tcx> { - let mut num_ready = 0u32; // how many events we have ready to deliver - for_each_interest(&mut |interest| { + let mut ready_set = epoll.ready_set.borrow_mut(); + for_each_interest(&mut |key, interest| { // Update the ready events tracked in this interest. let new_readiness = interest.relevant_events & active_events; let prev_readiness = std::mem::replace(&mut interest.active_events, new_readiness); if new_readiness == 0 { // Un-trigger this, there's nothing left to report here. - interest.ready = false; + ready_set.remove(&key); } else if force_edge || new_readiness != prev_readiness & new_readiness { // Either we force an "edge" to be detected, or there's a bit set in `new` - // that was not set in `prev`. - interest.ready = true; + // that was not set in `prev`. In both cases, this is ready now. + ready_set.insert(key); ecx.release_clock(|clock| { interest.clock.join(clock); })?; - num_ready = num_ready.saturating_add(1); } interp_ok(()) })?; - // Edge-triggered notifications only wake up as many threads as are needed to deliver - // all the events. - while num_ready > 0 - && let Some((thread_id, events)) = epoll.queue.borrow_mut().pop_front() + // While there are events ready to be delivered, wake up a thread to receive them. + while !ready_set.is_empty() + && let Some(thread_id) = epoll.queue.borrow_mut().pop_front() { + drop(ready_set); // release the "lock" so the unblocked thread can have it ecx.unblock_thread(thread_id, BlockReason::Epoll)?; - // Keep track of how many events we have left to deliver (except if we saturated; - // in that case we just wake up everybody). - if num_ready != u32::MAX { - num_ready = num_ready.saturating_sub(events); - } - } - // Sanity-check: if there are threads left to wake up, then there are no more ready events. - if !epoll.queue.borrow().is_empty() { - assert!( - epoll.interest_list.borrow().values().all(|i| !i.ready), - "there are unconsumed ready events and threads ready to take them" - ); + ready_set = epoll.ready_set.borrow_mut(); } interp_ok(()) @@ -604,43 +595,38 @@ fn return_ready_list<'tcx>( ecx: &mut MiriInterpCx<'tcx>, ) -> InterpResult<'tcx, i32> { let mut interest_list = epfd.interest_list.borrow_mut(); + let mut ready_set = epfd.ready_set.borrow_mut(); let mut num_of_events: i32 = 0; let mut array_iter = ecx.project_array_fields(events)?; - let mut interests = interest_list.iter_mut(); - while let Some(slot) = array_iter.next(ecx)? { - // Search for next ready event that we are intersted in. This is an inefficient linear scan. - // We could make it efficient by tracking the set of triggered events in a BTreeSet or so, - // but since this is very unlikely to be a bottleneck we prefer cleaner code. - for (key, interest) in interests.by_ref() { - // Sanity-check to ensure that the info about this event is up-to-date. - if cfg!(debug_assertions) { - // Ensure this matches the latest readiness of this FD. - // We have to do an FD lookup by ID for this. The FdNum might be already closed. - let fd = &ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap(); - let current_readiness = - fd.as_unix(ecx).epoll_active_events()?.get_event_bitmask(ecx); - assert_eq!(interest.active_events, current_readiness & interest.relevant_events); - } - // Skip event if it has not been triggered. - if !interest.ready { - continue; - } - // Deliver event to caller. - ecx.write_int_fields_named( - &[("events", interest.active_events.into()), ("u64", interest.data.into())], - &slot.1, - )?; - num_of_events = num_of_events.strict_add(1); - // Synchronize waking thread with the event of interest. - ecx.acquire_clock(&interest.clock)?; - // Mark this interest as no-longer-ready, since it has been delivered (and we only - // support ET). - interest.ready = false; - // Skip out of this loop so that we go to the next slot in the array. - break; + // Sanity-check to ensure that all event info is up-to-date. + if cfg!(debug_assertions) { + for (key, interest) in interest_list.iter() { + // Ensure this matches the latest readiness of this FD. + // We have to do an FD lookup by ID for this. The FdNum might be already closed. + let fd = &ecx.machine.fds.fds.values().find(|fd| fd.id() == key.0).unwrap(); + let current_active = fd.as_unix(ecx).epoll_active_events()?.get_event_bitmask(ecx); + assert_eq!(interest.active_events, current_active & interest.relevant_events); } } + + // While there is a slot to store another event, and an event to store, deliver that event. + while let Some(slot) = array_iter.next(ecx)? + && let Some(&key) = ready_set.first() + { + let interest = interest_list.get_mut(&key).expect("non-existent event in ready set"); + // Deliver event to caller. + ecx.write_int_fields_named( + &[("events", interest.active_events.into()), ("u64", interest.data.into())], + &slot.1, + )?; + num_of_events = num_of_events.strict_add(1); + // Synchronize receiving thread with the event of interest. + ecx.acquire_clock(&interest.clock)?; + // Since currently, all events are edge-triggered, we remove them from the ready set when + // they get delivered. + ready_set.remove(&key); + } ecx.write_int(num_of_events, dest)?; interp_ok(num_of_events) }