Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 49 additions & 11 deletions awkernel_async_lib/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,17 +155,7 @@ impl<T: Clone + Send> Future for Receiver<'_, T> {
self: core::pin::Pin<&mut Self>,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Self::Output> {
let data = self.subscriber.try_recv();

if let Some(data) = data {
Poll::Ready(data)
} else {
let mut node = MCSNode::new();
let mut inner = self.subscriber.inner.lock(&mut node);
inner.waker_subscriber = Some(cx.waker().clone());

Poll::Pending
}
self.subscriber.recv_or_register_waker(cx)
}
}

Expand All @@ -176,6 +166,54 @@ impl<T: Clone + Send> Subscriber<T> {
receiver.await
}

/// This function is designed to prevent a race condition known as the "lost wakeup".
///
/// # The Lost Wakeup Problem
///
/// A implementation that first checks for data with `try_recv()` and then acquires
/// a lock to register the waker is vulnerable to the following race condition:
///
/// 1. **Receiver:** Calls `try_recv()` and finds that the queue is empty.
/// 2. **Sender:** Pushes data to the queue. It then checks for a waker, but finds
/// `None` because the receiver has not registered one yet. The sender proceeds
/// without waking anything up.
/// 3. **Receiver:** Acquires the lock, registers its waker, and then returns
/// `Poll::Pending` to go to sleep.
///
/// As a result, the receiver is asleep while there is data in the queue. The wakeup
/// from the sender has been "lost," and the receiver will not be notified until
/// new data is sent.
///
/// # Solution
///
/// To prevent this, this function performs both the data check and the waker
/// registration within the same critical section.
/// This ensures that a sender cannot push data between the check and the
/// registration, guaranteeing that no wakeup is lost.
pub(super) fn recv_or_register_waker(
&self,
cx: &mut core::task::Context<'_>,
) -> core::task::Poll<Data<T>> {
let mut node = MCSNode::new();
let mut inner = self.inner.lock(&mut node);

inner.garbage_collect(&self.subscribers.attribute.lifespan);

if let Some(data) = inner.queue.pop() {
for _ in 0..inner.queue.queue_size() - inner.queue.len() {
if let Some(waker) = inner.waker_publishers.pop_front() {
waker.wake();
} else {
break;
}
}
core::task::Poll::Ready(data)
} else {
inner.waker_subscriber = Some(cx.waker().clone());
core::task::Poll::Pending
}
}

/// Non-blocking data receive.
/// If there is no data, return `None`.
pub fn try_recv(&self) -> Option<Data<T>> {
Expand Down