Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent segmentation fault in sync and future caches, and also upgrade crossbeam-epoch to v0.9.9 #157

Merged
merged 6 commits into from
Jul 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,21 @@
# Moka Cache — Change Log

## Version 0.9.2

### Fixed

- Fix segmentation faults in `sync` and `future` caches under heavy loads on
many-core machine ([#34][gh-issue-0034]):
- NOTE: Although this issue was found in our testing environment 10 months ago
(v0.5.1), no user reported that they had the same issue.
- NOTE: In [v0.8.4](#version-084), we added a mitigation to reduce the chance of
the segfaults occurring.

### Changed

- Upgrade crossbeam-epoch from v0.8.2 to v0.9.9 ([#157][gh-pull-0157]).


## Version 0.9.1

### Fixed
Expand Down Expand Up @@ -427,6 +443,7 @@ The minimum supported Rust version (MSRV) is now 1.51.0 (2021-03-25).

[gh-pull-0167]: https://github.com/moka-rs/moka/pull/167/
[gh-pull-0159]: https://github.com/moka-rs/moka/pull/159/
[gh-pull-0157]: https://github.com/moka-rs/moka/pull/157/
[gh-pull-0145]: https://github.com/moka-rs/moka/pull/145/
[gh-pull-0143]: https://github.com/moka-rs/moka/pull/143/
[gh-pull-0138]: https://github.com/moka-rs/moka/pull/138/
Expand Down
10 changes: 3 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "moka"
version = "0.9.1"
version = "0.9.2"
edition = "2018"
rust-version = "1.51"

Expand Down Expand Up @@ -47,7 +47,7 @@ atomic64 = []
unstable-debug-counters = ["future"]

[dependencies]
crossbeam-channel = "0.5.4"
crossbeam-channel = "0.5.5"
crossbeam-utils = "0.8"
num_cpus = "1.13"
once_cell = "1.7"
Expand All @@ -61,15 +61,11 @@ tagptr = "0.2"
triomphe = { version = "0.1.3", default-features = false }

# Optional dependencies (enabled by default)
crossbeam-epoch = { version = "0.9.9", optional = true }
quanta = { version = "0.10.0", optional = true }
thiserror = { version = "1.0", optional = true }
uuid = { version = "1.1", features = ["v4"], optional = true }

# Although v0.8.2 is not the current version (v0.9.x), we will keep using it until
# we perform enough tests to get conformable with memory safety.
# See: https://github.com/moka-rs/moka/issues/34
crossbeam-epoch = { version = "0.8.2", optional = true }

# Optional dependencies (dashmap)
dashmap = { version = "5.2", optional = true }

Expand Down
64 changes: 45 additions & 19 deletions src/cht/map/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,24 @@ use std::{
hash::{BuildHasher, Hash, Hasher},
mem::{self, MaybeUninit},
ptr,
sync::atomic::{self, AtomicUsize, Ordering},
sync::{
atomic::{self, AtomicUsize, Ordering},
Arc, Mutex, TryLockError,
},
};

#[cfg(feature = "unstable-debug-counters")]
use crate::common::concurrent::debug_counters;

use crossbeam_epoch::{Atomic, CompareAndSetError, Guard, Owned, Shared};
use crossbeam_epoch::{Atomic, CompareExchangeError, Guard, Owned, Shared};

pub(crate) const BUCKET_ARRAY_DEFAULT_LENGTH: usize = 128;

pub(crate) struct BucketArray<K, V> {
pub(crate) buckets: Box<[Atomic<Bucket<K, V>>]>,
pub(crate) next: Atomic<BucketArray<K, V>>,
pub(crate) epoch: usize,
pub(crate) rehash_lock: Arc<Mutex<()>>,
pub(crate) tombstone_count: AtomicUsize,
}

Expand Down Expand Up @@ -49,6 +53,7 @@ impl<K, V> BucketArray<K, V> {
buckets,
next: Atomic::null(),
epoch,
rehash_lock: Arc::new(Mutex::new(())),
tombstone_count: Default::default(),
}
}
Expand Down Expand Up @@ -147,10 +152,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {

let new_bucket_ptr = this_bucket_ptr.with_tag(TOMBSTONE_TAG);

match this_bucket.compare_and_set_weak(
match this_bucket.compare_exchange_weak(
this_bucket_ptr,
new_bucket_ptr,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
// Succeeded. Return the removed value. (can be null)
Expand Down Expand Up @@ -200,10 +206,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {

let new_bucket = state.into_insert_bucket();

if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak(
if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange_weak(
this_bucket_ptr,
new_bucket,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
maybe_state = Some(InsertOrModifyState::from_bucket_value(new, None));
Expand Down Expand Up @@ -265,10 +272,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
(state.into_insert_bucket(), None)
};

if let Err(CompareAndSetError { new, .. }) = this_bucket.compare_and_set_weak(
if let Err(CompareExchangeError { new, .. }) = this_bucket.compare_exchange_weak(
this_bucket_ptr,
new_bucket,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
// Failed. Reload to retry.
Expand Down Expand Up @@ -314,10 +322,11 @@ impl<'g, K: 'g + Eq, V: 'g> BucketArray<K, V> {
if this_bucket_ptr.is_null() && is_tombstone(bucket_ptr) {
ProbeLoopAction::Return(None)
} else if this_bucket
.compare_and_set_weak(
.compare_exchange_weak(
this_bucket_ptr,
bucket_ptr,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
)
.is_ok()
Expand Down Expand Up @@ -394,11 +403,24 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
guard: &'g Guard,
build_hasher: &H,
rehash_op: RehashOp,
) -> &'g BucketArray<K, V>
) -> Option<&'g BucketArray<K, V>>
where
K: Hash + Eq,
H: BuildHasher,
{
// Ensure that the rehashing is not performed concurrently.
let lock;
match self.rehash_lock.try_lock() {
Ok(lk) => lock = lk,
Err(TryLockError::WouldBlock) => {
// Wait until the lock become available.
std::mem::drop(self.rehash_lock.lock());
// We need to return here to see if rehashing is still needed.
return None;
}
Err(e @ TryLockError::Poisoned(_)) => panic!("{:?}", e),
};

let next_array = self.next_array(guard, rehash_op);

for this_bucket in self.buckets.iter() {
Expand All @@ -420,10 +442,11 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {

while is_borrowed(next_bucket_ptr)
&& next_bucket
.compare_and_set_weak(
.compare_exchange_weak(
next_bucket_ptr,
to_put_ptr,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
)
.is_err()
Expand All @@ -440,10 +463,11 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
}

if this_bucket
.compare_and_set_weak(
.compare_exchange_weak(
this_bucket_ptr,
Shared::null().with_tag(SENTINEL_TAG),
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
)
.is_ok()
Expand All @@ -460,8 +484,9 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
}
}
}
std::mem::drop(lock);

next_array
Some(next_array)
}

fn next_array(&self, guard: &'g Guard, rehash_op: RehashOp) -> &'g BucketArray<K, V> {
Expand All @@ -479,14 +504,15 @@ impl<'g, K: 'g, V: 'g> BucketArray<K, V> {
Owned::new(BucketArray::with_length(self.epoch + 1, new_length))
});

match self.next.compare_and_set_weak(
match self.next.compare_exchange_weak(
Shared::null(),
new_next,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
Ok(p) => return unsafe { p.deref() },
Err(CompareAndSetError { new, .. }) => {
Err(CompareExchangeError { new, .. }) => {
maybe_new_next = Some(new);
}
}
Expand Down
61 changes: 42 additions & 19 deletions src/cht/map/bucket_array_ref.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use std::{
sync::atomic::{AtomicUsize, Ordering},
};

use crossbeam_epoch::{Atomic, CompareAndSetError, Guard, Owned, Shared};
use crossbeam_epoch::{Atomic, CompareExchangeError, Guard, Owned, Shared};

pub(crate) struct BucketArrayRef<'a, K, V, S> {
pub(crate) bucket_array: &'a Atomic<BucketArray<K, V>>,
Expand Down Expand Up @@ -47,8 +47,11 @@ where
break;
}
Err(_) => {
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
if let Some(r) =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
{
bucket_array_ref = r;
}
}
}
}
Expand Down Expand Up @@ -81,7 +84,9 @@ where
if rehash_op.is_skip() {
break;
}
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op);
if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
bucket_array_ref = r;
}
}

match bucket_array_ref.remove_if(guard, hash, &mut eq, condition) {
Expand All @@ -106,8 +111,11 @@ where
}
Err(c) => {
condition = c;
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
if let Some(r) =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
{
bucket_array_ref = r;
}
}
}
}
Expand Down Expand Up @@ -143,7 +151,9 @@ where
if rehash_op.is_skip() {
break;
}
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op);
if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
bucket_array_ref = r;
}
}

match bucket_array_ref.insert_if_not_present(guard, hash, state) {
Expand Down Expand Up @@ -171,8 +181,11 @@ where
}
Err(s) => {
state = s;
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
if let Some(r) =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
{
bucket_array_ref = r;
}
}
}
}
Expand Down Expand Up @@ -207,7 +220,9 @@ where
if rehash_op.is_skip() {
break;
}
bucket_array_ref = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op);
if let Some(r) = bucket_array_ref.rehash(guard, self.build_hasher, rehash_op) {
bucket_array_ref = r;
}
}

match bucket_array_ref.insert_or_modify(guard, hash, state, on_modify) {
Expand Down Expand Up @@ -235,8 +250,11 @@ where
Err((s, f)) => {
state = s;
on_modify = f;
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
if let Some(r) =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
{
bucket_array_ref = r;
}
}
}
}
Expand All @@ -260,8 +278,11 @@ where
break;
}
Err(_) => {
bucket_array_ref =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand);
if let Some(r) =
bucket_array_ref.rehash(guard, self.build_hasher, RehashOp::Expand)
{
bucket_array_ref = r;
}
}
}
}
Expand All @@ -286,14 +307,15 @@ impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> {
let new_bucket_array =
maybe_new_bucket_array.unwrap_or_else(|| Owned::new(BucketArray::default()));

match self.bucket_array.compare_and_set_weak(
match self.bucket_array.compare_exchange_weak(
Shared::null(),
new_bucket_array,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
Ok(b) => return unsafe { b.as_ref() }.unwrap(),
Err(CompareAndSetError { new, .. }) => maybe_new_bucket_array = Some(new),
Err(CompareExchangeError { new, .. }) => maybe_new_bucket_array = Some(new),
}
}
}
Expand All @@ -314,10 +336,11 @@ impl<'a, 'g, K, V, S> BucketArrayRef<'a, K, V, S> {
return;
}

match self.bucket_array.compare_and_set_weak(
match self.bucket_array.compare_exchange_weak(
current_ptr,
min_ptr,
(Ordering::Release, Ordering::Relaxed),
Ordering::AcqRel,
Ordering::Relaxed,
guard,
) {
Ok(_) => unsafe { bucket::defer_acquire_destroy(guard, current_ptr) },
Expand Down