Skip to content

Commit

Permalink
Remove the thread pool from future::Cache
Browse files Browse the repository at this point in the history
- Make `future::Cache`'s internal `schedule_write_op` method to yield to the other
  async tasks when the write op channel is full.
- Strengthen the orderings of a `compare_exchange` operation in Housekeeper to ensure
  that other thread can see the updated value immediately.
  • Loading branch information
tatsuya6502 committed Aug 8, 2023
1 parent 9e83dcc commit c86e675
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 10 deletions.
4 changes: 2 additions & 2 deletions src/common/concurrent/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,8 @@ impl BlockingHousekeeper {
match self.is_sync_running.compare_exchange(
false,
true,
Ordering::AcqRel,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
let now = cache.now();
Expand Down Expand Up @@ -298,8 +298,8 @@ where
match self.on_demand_sync_scheduled.compare_exchange(
false,
true,
Ordering::AcqRel,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
let unsafe_weak_ptr = Arc::clone(&self.inner);
Expand Down
13 changes: 13 additions & 0 deletions src/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
//!
//! To use this module, enable a crate feature called "future".

use async_lock::Mutex;
use futures_util::future::BoxFuture;
use once_cell::sync::Lazy;
use std::{future::Future, hash::Hash, sync::Arc};

mod base_cache;
Expand Down Expand Up @@ -65,3 +67,14 @@ where
self.0.next()
}
}

/// May yield to other async tasks.
pub(crate) async fn may_yield() {
static LOCK: Lazy<Mutex<()>> = Lazy::new(Default::default);

// Acquire the lock then immediately release it. This `await` may yield to other
// tasks.
//
// TODO: This behavior is tested only with Tokio. Check other async runtimes.
let _ = LOCK.lock().await;
}
26 changes: 19 additions & 7 deletions src/future/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1880,22 +1880,34 @@ where
housekeeper: Option<&HouseKeeperArc>,
) -> Result<(), TrySendError<WriteOp<K, V>>> {
let mut op = op;
let mut spin_count = 0u8;
loop {
BaseCache::<K, V, S>::apply_reads_writes_if_needed(inner, ch, now, housekeeper).await;
match ch.try_send(op) {
Ok(()) => break,
Ok(()) => return Ok(()),
Err(TrySendError::Full(op1)) => {
op = op1;
// Wastes some CPU time with a hint to indicate to the CPU that
// we are spinning
for _ in 0..10 {
std::hint::spin_loop();
}
}
Err(e @ TrySendError::Disconnected(_)) => return Err(e),
}

// We have got a `TrySendError::Full` above. Wait for a bit and try
// again.
if spin_count < 10 {
spin_count += 1;
// Wastes some CPU time with a hint to indicate to the CPU that we
// are spinning
for _ in 0..8 {
std::hint::spin_loop();
}
} else {
spin_count = 0;
// Try to yield to other tasks. We have to yield sometimes, otherwise
// other task, which is draining the `ch`, will not make any
// progress. If this happens, we will stuck in this loop forever.
super::may_yield().await;
}
}
Ok(())
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/future/housekeeper.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ impl Housekeeper {
match self.is_sync_running.compare_exchange(
false,
true,
Ordering::AcqRel,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => {
let now = cache.now();
Expand Down

0 comments on commit c86e675

Please sign in to comment.