Skip to content

Commit

Permalink
m: Optimize block_on by caching Parker and Waker
Browse files Browse the repository at this point in the history
  • Loading branch information
Pixelstormer committed Sep 11, 2023
1 parent 1b1466a commit 8c3c3bd
Show file tree
Hide file tree
Showing 2 changed files with 301 additions and 88 deletions.
211 changes: 123 additions & 88 deletions src/driver.rs
@@ -1,13 +1,15 @@
use std::cell::Cell;
use std::cell::{Cell, RefCell};
use std::future::Future;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::Arc;
use std::task::Waker;
use std::task::{Context, Poll};
use std::thread;
use std::time::{Duration, Instant};

use async_lock::OnceCell;
use futures_lite::pin;
use parking::Parker;
use waker_fn::waker_fn;

use crate::reactor::Reactor;
Expand Down Expand Up @@ -120,112 +122,145 @@ pub fn block_on<T>(future: impl Future<Output = T>) -> T {
unparker().unpark();
});

// Parker and unparker for notifying the current thread.
let (p, u) = parking::pair();
// This boolean is set to `true` when the current thread is blocked on I/O.
let io_blocked = Arc::new(AtomicBool::new(false));
// Creates a parker and an associated waker that unparks it.
fn parker_and_waker() -> (Parker, Waker, Arc<AtomicBool>) {
// Parker and unparker for notifying the current thread.
let (p, u) = parking::pair();

// This boolean is set to `true` when the current thread is blocked on I/O.
let io_blocked = Arc::new(AtomicBool::new(false));

// Prepare the waker.
let waker = waker_fn({
let io_blocked = io_blocked.clone();
move || {
if u.unpark() {
// Check if waking from another thread and if currently blocked on I/O.
if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) {
Reactor::get().notify();
}
}
}
});

(p, waker, io_blocked)
}

thread_local! {
// Cached parker and waker for efficiency.
static CACHE: RefCell<(Parker, Waker, Arc<AtomicBool>)> = RefCell::new(parker_and_waker());

// Indicates that the current thread is polling I/O, but not necessarily blocked on it.
static IO_POLLING: Cell<bool> = Cell::new(false);
}

// Prepare the waker.
let waker = waker_fn({
let io_blocked = io_blocked.clone();
move || {
if u.unpark() {
// Check if waking from another thread and if currently blocked on I/O.
if !IO_POLLING.with(Cell::get) && io_blocked.load(Ordering::SeqCst) {
Reactor::get().notify();
}
CACHE.with(|cache| {
// Try grabbing the cached parker and waker.
let tmp_cached;
let tmp_fresh;
let (p, waker, io_blocked) = match cache.try_borrow_mut() {
Ok(cache) => {
// Use the cached parker and waker.
tmp_cached = cache;
&*tmp_cached
}
}
});
let cx = &mut Context::from_waker(&waker);
pin!(future);
Err(_) => {
// Looks like this is a recursive `block_on()` call.
// Create a fresh parker and waker.
tmp_fresh = parker_and_waker();
&tmp_fresh
}
};

loop {
// Poll the future.
if let Poll::Ready(t) = future.as_mut().poll(cx) {
tracing::trace!("completed");
return t;
}
pin!(future);

// Check if a notification was received.
if p.park_timeout(Duration::from_secs(0)) {
tracing::trace!("notified");
let cx = &mut Context::from_waker(waker);

// Try grabbing a lock on the reactor to process I/O events.
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
// First let wakers know this parker is processing I/O events.
IO_POLLING.with(|io| io.set(true));
let _guard = CallOnDrop(|| {
IO_POLLING.with(|io| io.set(false));
});

// Process available I/O events.
reactor_lock.react(Some(Duration::from_secs(0))).ok();
loop {
// Poll the future.
if let Poll::Ready(t) = future.as_mut().poll(cx) {
// Ensure the cached parker is reset to the unnotified state for future block_on calls,
// in case this future called wake and then immediately returned Poll::Ready.
p.park_timeout(Duration::from_secs(0));
tracing::trace!("completed");
return t;
}
continue;
}

// Try grabbing a lock on the reactor to wait on I/O.
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
// Record the instant at which the lock was grabbed.
let start = Instant::now();

loop {
// First let wakers know this parker is blocked on I/O.
IO_POLLING.with(|io| io.set(true));
io_blocked.store(true, Ordering::SeqCst);
let _guard = CallOnDrop(|| {
IO_POLLING.with(|io| io.set(false));
io_blocked.store(false, Ordering::SeqCst);
});

// Check if a notification has been received before `io_blocked` was updated
// because in that case the reactor won't receive a wakeup.
if p.park_timeout(Duration::from_secs(0)) {
tracing::trace!("notified");
break;
}
// Check if a notification was received.
if p.park_timeout(Duration::from_secs(0)) {
tracing::trace!("notified");

// Wait for I/O events.
tracing::trace!("waiting on I/O");
reactor_lock.react(None).ok();
// Try grabbing a lock on the reactor to process I/O events.
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
// First let wakers know this parker is processing I/O events.
IO_POLLING.with(|io| io.set(true));
let _guard = CallOnDrop(|| {
IO_POLLING.with(|io| io.set(false));
});

// Check if a notification has been received.
if p.park_timeout(Duration::from_secs(0)) {
tracing::trace!("notified");
break;
// Process available I/O events.
reactor_lock.react(Some(Duration::from_secs(0))).ok();
}
continue;
}

// Check if this thread been handling I/O events for a long time.
if start.elapsed() > Duration::from_micros(500) {
tracing::trace!("stops hogging the reactor");

// This thread is clearly processing I/O events for some other threads
// because it didn't get a notification yet. It's best to stop hogging the
// reactor and give other threads a chance to process I/O events for
// themselves.
drop(reactor_lock);

// Unpark the "async-io" thread in case no other thread is ready to start
// processing I/O events. This way we prevent a potential latency spike.
unparker().unpark();

// Wait for a notification.
p.park();
break;
// Try grabbing a lock on the reactor to wait on I/O.
if let Some(mut reactor_lock) = Reactor::get().try_lock() {
// Record the instant at which the lock was grabbed.
let start = Instant::now();

loop {
// First let wakers know this parker is blocked on I/O.
IO_POLLING.with(|io| io.set(true));
io_blocked.store(true, Ordering::SeqCst);
let _guard = CallOnDrop(|| {
IO_POLLING.with(|io| io.set(false));
io_blocked.store(false, Ordering::SeqCst);
});

// Check if a notification has been received before `io_blocked` was updated
// because in that case the reactor won't receive a wakeup.
if p.park_timeout(Duration::from_secs(0)) {
tracing::trace!("notified");
break;
}

// Wait for I/O events.
tracing::trace!("waiting on I/O");
reactor_lock.react(None).ok();

// Check if a notification has been received.
if p.park_timeout(Duration::from_secs(0)) {
tracing::trace!("notified");
break;
}

// Check if this thread been handling I/O events for a long time.
if start.elapsed() > Duration::from_micros(500) {
tracing::trace!("stops hogging the reactor");

// This thread is clearly processing I/O events for some other threads
// because it didn't get a notification yet. It's best to stop hogging the
// reactor and give other threads a chance to process I/O events for
// themselves.
drop(reactor_lock);

// Unpark the "async-io" thread in case no other thread is ready to start
// processing I/O events. This way we prevent a potential latency spike.
unparker().unpark();

// Wait for a notification.
p.park();
break;
}
}
} else {
// Wait for an actual notification.
tracing::trace!("sleep until notification");
p.park();
}
} else {
// Wait for an actual notification.
tracing::trace!("sleep until notification");
p.park();
}
}
})
}

/// Runs a closure when dropped.
Expand Down

0 comments on commit 8c3c3bd

Please sign in to comment.