Skip to content

Commit

Permalink
Merge pull request #157 from moka-rs/crossbeam-epoch-0.9
Browse files Browse the repository at this point in the history
Prevent segmentation fault in `sync` and `future` caches, and also upgrade crossbeam-epoch to v0.9.9
  • Loading branch information
tatsuya6502 committed Jul 19, 2022
2 parents 3713c89 + e773115 commit 5240b94
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 45 deletions.
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Moka Cache — Change Log

## Version 0.9.2

### Fixed

- Fix segmentation faults in `sync` and `future` caches under heavy loads on
many-core machine ([#34][gh-issue-0034]):
- NOTE: Although this issue was found in our testing environment 10 months ago
(v0.5.1), no user reported that they had the same issue.
- NOTE: In [v0.8.4](#version-084), we added a mitigation to reduce the chance of
the segfaults occurring.

### Changed

- Upgrade crossbeam-epoch from v0.8.2 to v0.9.9 ([#157][gh-pull-0157]).


## Version 0.9.1

### Fixed
Expand Down Expand Up @@ -427,6 +443,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25).

[gh-pull-0167]: https://github.com/moka-rs/moka/pull/167/
[gh-pull-0159]: https://github.com/moka-rs/moka/pull/159/
[gh-pull-0157]: https://github.com/moka-rs/moka/pull/157/
[gh-pull-0145]: https://github.com/moka-rs/moka/pull/145/
[gh-pull-0143]: https://github.com/moka-rs/moka/pull/143/
[gh-pull-0138]: https://github.com/moka-rs/moka/pull/138/
Expand Down
10 changes: 3 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.9.1"
version = "0.9.2"
edition = "2018"
rust-version = "1.51"

Expand Down Expand Up @@ -47,7 +47,7 @@ atomic64 = []
unstable-debug-counters = ["future"]

[dependencies]
crossbeam-channel = "0.5.4"
crossbeam-channel = "0.5.5"
crossbeam-utils = "0.8"
num_cpus = "1.13"
once_cell = "1.7"
Expand All @@ -61,15 +61,11 @@ tagptr = "0.2"
triomphe = { version = "0.1.3", default-features = false }

# Optional dependencies (enabled by default)
crossbeam-epoch = { version = "0.9.9", optional = true }
quanta = { version = "0.10.0", optional = true }
thiserror = { version = "1.0", optional = true }
uuid = { version = "1.1", features = ["v4"], optional = true }

# Although v0.8.2 is not the current version (v0.9.x), we will keep using it until
# we perform enough tests to get conformable with memory safety.
# See: https://github.com/moka-rs/moka/issues/34
crossbeam-epoch = { version = "0.8.2", optional = true }

# Optional dependencies (dashmap)
dashmap = { version = "5.2", optional = true }

Expand Down
64 changes: 45 additions & 19 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@ use std::{
hash::{BuildHasher, Hash, Hasher},
mem::{self, MaybeUninit},
ptr,
sync::atomic::{self, AtomicUsize, Ordering},
sync::{
atomic::{self, AtomicUsize, Ordering},
Arc, Mutex, TryLockError,
},
};

#[cfg(feature = "unstable-debug-counters")]
use crate::common::concurrent::debug_counters;

use crossbeam_epoch::{Atomic, CompareAndSetError, Guard, Owned, Shared};
use crossbeam_epoch::{Atomic, CompareExchangeError, Guard, Owned, Shared};

pub(crate) const BUCKET_ARRAY_DEFAULT_LENGTH: usize = 128;

pub(crate) struct BucketArray<K, V> {
pub(crate) buckets: Box<[Atomic<Bucket<K, V>>]>,
pub(crate) next: Atomic<BucketArray<K, V>>,
pub(crate) epoch: usize,
pub(crate) rehash_lock: Arc<Mutex<()>>,
pub(crate) tombstone_count: AtomicUsize,
}

Expand Down Expand Up @@ -49,6 +53,7 @@ impl<K, V> BucketArray<K, V> {
buckets,
next: Atomic::null(),
epoch,
rehash_lock: Arc::new(Mutex::new(())),
tombstone_count: Default::default(),
}
}
Expand Down Expand Up @@ -147,10 +152,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {

let new_bucket_ptr = this_bucket_ptr.with_tag(TOMBSTONE_TAG);

match this_bucket.compare_and_set_weak(
match this_bucket.compare_exchange_weak(
this_bucket_ptr,
new_bucket_ptr,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
// Succeeded. Return the removed value. (can be null)
Expand Down Expand Up @@ -200,10 +206,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {

let new_bucket = state.into_insert_bucket();

if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak(
if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange_weak(
this_bucket_ptr,
new_bucket,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
maybe_state = Some(InsertOrModifyState::from_bucket_value(new, None));
Expand Down Expand Up @@ -265,10 +272,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
(state.into_insert_bucket(), None)
};

if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak(
if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange_weak(
this_bucket_ptr,
new_bucket,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
// Failed. Reload to retry.
Expand Down Expand Up @@ -314,10 +322,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
if this_bucket_ptr.is_null() && is_tombstone(bucket_ptr) {
ProbeLoopAction::Return(None)
} else if this_bucket
.compare_and_set_weak(
.compare_exchange_weak(
this_bucket_ptr,
bucket_ptr,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
)
.is_ok()
Expand Down Expand Up @@ -394,11 +403,24 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
guard: &'g Guard,
build_hasher: &H,
rehash_op: RehashOp,
) -> &'g BucketArray<K, V>
) -> Option<&'g BucketArray<K, V>>
where
K: Hash + Eq,
H: BuildHasher,
{
// Ensure that the rehashing is not performed concurrently.
let lock;
match self.rehash_lock.try_lock() {
Ok(lk) => lock = lk,
Err(TryLockError::WouldBlock) => {
// Wait until the lock become available.
std::mem::drop(self.rehash_lock.lock());
// We need to return here to see if rehashing is still needed.
return None;
}
Err(e @ TryLockError::Poisoned(_)) => panic!("{:?}", e),
};

let next_array = self.next_array(guard, rehash_op);

for this_bucket in self.buckets.iter() {
Expand All @@ -420,10 +442,11 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {

while is_borrowed(next_bucket_ptr)
&& next_bucket
.compare_and_set_weak(
.compare_exchange_weak(
next_bucket_ptr,
to_put_ptr,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
)
.is_err()
Expand All @@ -440,10 +463,11 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
}

if this_bucket
.compare_and_set_weak(
.compare_exchange_weak(
this_bucket_ptr,
Shared::null().with_tag(SENTINEL_TAG),
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
)
.is_ok()
Expand All @@ -460,8 +484,9 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
}
}
}
std::mem::drop(lock);

next_array
Some(next_array)
}

fn next_array(&self, guard: &'g Guard, rehash_op: RehashOp) -> &'g BucketArray<K, V> {
Expand All @@ -479,14 +504,15 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
Owned::new(BucketArray::with_length(self.epoch + 1, new_length))
});

match self.next.compare_and_set_weak(
match self.next.compare_exchange_weak(
Shared::null(),
new_next,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
Ok(p) => return unsafe { p.deref() },
Err(CompareAndSetError { new, .. }) => {
Err(CompareExchangeError { new, .. }) => {
maybe_new_next = Some(new);
}
}
Expand Down
61 changes: 42 additions & 19 deletions src/cht/map/bucket_array_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
sync::atomic::{AtomicUsize, Ordering},
};

use crossbeam_epoch::{Atomic, CompareAndSetError, Guard, Owned, Shared};
use crossbeam_epoch::{Atomic, CompareExchangeError, Guard, Owned, Shared};

pub(crate) struct BucketArrayRef<'a, K, V, S> {
pub(crate) bucket_array: &'a Atomic<BucketArray<K, V>>,
Expand Down Expand Up @@ -47,8 +47,11 @@ where
break;
}
Err(_) => {
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
if let Some(r) =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
{
bucket_array_ref = r;
}
}
}
}
Expand Down Expand Up @@ -81,7 +84,9 @@ where
if rehash_op.is_skip() {
break;
}
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op);
if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
bucket_array_ref = r;
}
}

match bucket_array_ref.remove_if(guard, hash, &mut eq, condition) {
Expand All @@ -106,8 +111,11 @@ where
}
Err(c) => {
condition = c;
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
if let Some(r) =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
{
bucket_array_ref = r;
}
}
}
}
Expand Down Expand Up @@ -143,7 +151,9 @@ where
if rehash_op.is_skip() {
break;
}
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op);
if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
bucket_array_ref = r;
}
}

match bucket_array_ref.insert_if_not_present(guard, hash, state) {
Expand Down Expand Up @@ -171,8 +181,11 @@ where
}
Err(s) => {
state = s;
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
if let Some(r) =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
{
bucket_array_ref = r;
}
}
}
}
Expand Down Expand Up @@ -207,7 +220,9 @@ where
if rehash_op.is_skip() {
break;
}
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op);
if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
bucket_array_ref = r;
}
}

match bucket_array_ref.insert_or_modify(guard, hash, state, on_modify) {
Expand Down Expand Up @@ -235,8 +250,11 @@ where
Err((s, f)) => {
state = s;
on_modify = f;
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
if let Some(r) =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
{
bucket_array_ref = r;
}
}
}
}
Expand All @@ -260,8 +278,11 @@ where
break;
}
Err(_) => {
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
if let Some(r) =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
{
bucket_array_ref = r;
}
}
}
}
Expand All @@ -286,14 +307,15 @@ impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> {
let new_bucket_array =
maybe_new_bucket_array.unwrap_or_else(|| Owned::new(BucketArray::default()));

match self.bucket_array.compare_and_set_weak(
match self.bucket_array.compare_exchange_weak(
Shared::null(),
new_bucket_array,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
Ok(b) => return unsafe { b.as_ref() }.unwrap(),
Err(CompareAndSetError { new, .. }) => maybe_new_bucket_array = Some(new),
Err(CompareExchangeError { new, .. }) => maybe_new_bucket_array = Some(new),
}
}
}
Expand All @@ -314,10 +336,11 @@ impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> {
return;
}

match self.bucket_array.compare_and_set_weak(
match self.bucket_array.compare_exchange_weak(
current_ptr,
min_ptr,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
Ok(_) => unsafe { bucket::defer_acquire_destroy(guard, current_ptr) },
Expand Down

0 comments on commit 5240b94

Please sign in to comment.