Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 30 additions & 11 deletions crates/thread_aware/src/cell/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -497,17 +497,36 @@ impl<T, S: Strategy> ThreadAware for Arc<T, S> {
let (data, factory) = match &self.factory {
// We can use the closure to create new data
Factory::Closure(factory, factory_source_affinity) => {
let factory_clone = (**factory).clone();

// In case factory source is stored in factory, use that - it means we already transferred the factory
// once, so we know the original source affinity. Otherwise, use source as that means this is the first
// time we're transferring the Arc, so source is the source affinity of the factory as well.
let factory_source = factory_source_affinity.unwrap_or(source);

(
sync::Arc::new(factory_clone.relocated(factory_source, destination).call_once()),
Factory::Closure(sync::Arc::clone(factory), Some(factory_source)),
)
// First-call dedup: `with_closure` eagerly built `self.value` for the
// calling thread's affinity but did not record which one. The first
// `relocated()` observes `source` from the caller, which is by contract
// the affinity of the thread that called `with_closure`. When that source
// equals the destination, the eager `self.value` is already the correct
// per-affinity value — reuse it instead of running the factory again.
//
// Without this shortcut, `Instantiation::All` on the calling thread races
// on the storage write lock: if the source==destination task wins first it
// factory-builds (counter +=1) before any peer task can populate the source
// slot via the end-of-function fallback below, leading to a non-deterministic
// factory call count.
if factory_source_affinity.is_none() && source == MemoryAffinity::Pinned(destination) {
(
sync::Arc::clone(&self.value),
Factory::Closure(sync::Arc::clone(factory), Some(source)),
)
} else {
let factory_clone = (**factory).clone();

// In case factory source is stored in factory, use that - it means we already transferred the factory
// once, so we know the original source affinity. Otherwise, use source as that means this is the first
// time we're transferring the Arc, so source is the source affinity of the factory as well.
let factory_source = factory_source_affinity.unwrap_or(source);

(
sync::Arc::new(factory_clone.relocated(factory_source, destination).call_once()),
Factory::Closure(sync::Arc::clone(factory), Some(factory_source)),
)
}
}

// We can clone and transfer the data
Expand Down
81 changes: 67 additions & 14 deletions crates/thread_aware/src/cell/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,31 +460,84 @@ fn test_strong_count_independent_across_affinities() {
}

#[test]
fn test_relocated_source_equals_destination_does_not_corrupt_storage() {
// Regression test: when source == destination, Arc::relocated() must NOT overwrite the
// newly-created value in storage with the stale pre-relocation value.
fn test_relocated_source_equals_destination_first_call_reuses_eager_value() {
// Regression test for two coupled invariants of `Arc::relocated` when source == destination:
//
// 1. First-call dedup: the eager value built inside `with_closure` for the calling
// thread's affinity must be reused. Calling the factory again would violate the
// "factory runs at most once per affinity" contract that callers like
// `Instantiation::All` rely on, producing the off-by-one factory-call count that
// caused the original flake.
//
// 2. No storage corruption: after the relocate completes, the storage slot for that
// affinity must hold the value that was actually returned, not a stale pre-relocation
// value written by the source-restore branch.
let affinities = pinned_affinities(&[2]);
let affinity = affinities[0];

// Create an Arc whose Counter starts at zero, then advance it.
// `Counter`'s inner atomic is shared across clones, so mutating it here lets us tell
// "reused eager value" (sees 42) apart from "fresh factory build" (Counter::new ⇒ 0).
let arc = PerCore::new(Counter::new);
arc.increment_by(42);
assert_eq!(arc.value(), 42);

// Relocate with source == destination. The ThreadAware impl always creates a *new*
// Counter (value resets to 0), so `relocated` must return 0 and must also leave
// storage holding 0 (not the stale 42).
// First relocation with source == destination must reuse the eager value, NOT rerun
// the factory. This is the fix for the `counts > len` flake.
let relocated = arc.relocated(affinity.into(), affinity);
assert_eq!(relocated.value(), 0, "relocated value should come from factory");
assert_eq!(
relocated.value(),
42,
"first relocated(source, source) must reuse the eager value from with_closure"
);

// A second relocation from the same slot must find the factory-created value (0) in
// storage, not the stale pre-relocation value (42). Before the bug fix, the first
// relocated() call wrote the stale Arc<Counter(42)> back into the storage slot,
// so the second call's `get_clone` fast-path would return 42 instead of 0.
// Subsequent relocations from the same slot must read from storage (cached), not
// resurrect a stale pre-relocation value via the source-restore branch.
let relocated_again = relocated.relocated(affinity.into(), affinity);
assert_eq!(
relocated_again.value(),
0,
"subsequent relocation must not see stale pre-relocation value from storage"
42,
"subsequent relocation must see the cached storage value, not a stale or fresh one"
);
}

#[test]
fn test_with_closure_relocate_to_source_reuses_eager_value() {
// Regression test for the flaky `Instantiation::All` factory-call count.
//
// `with_closure` eagerly invokes the factory once to materialize `self.value` for the
// calling thread's affinity, but does not record which affinity that was. The first
// `relocated()` call observes the caller's affinity as `source`, so when source equals
// the destination, the eager value must be reused and the factory must NOT be called
// again. Without this shortcut, parallel relocations to all worker affinities — where
// one task targets the source affinity itself — race on the storage write lock and
// the factory-call count becomes non-deterministic.
let affinities = pinned_affinities(&[2]);
let affinity_a = affinities[0];
let affinity_b = affinities[1];

let trc = PerCore::with_closure(relocate((), |()| 42));

// Capture the eagerly-built inner Arc so we can compare identity later.
let pre_arc = trc.clone().into_arc();
assert_eq!(*pre_arc, 42, "with_closure should eagerly build the value");

// First relocation has source == destination. With the fix, this must reuse the eager
// value rather than calling the factory again — verified via Arc::ptr_eq on the inner
// sync::Arc<T>.
let trc_a = trc.clone().relocated(affinity_a.into(), affinity_a);
let post_arc = trc_a.into_arc();
assert!(
sync::Arc::ptr_eq(&pre_arc, &post_arc),
"relocated(source, source) must reuse the eager sync::Arc from with_closure"
);

// Relocating to a different affinity still requires a fresh per-affinity value, so
// the inner Arc must differ (factory was invoked for the new affinity).
let trc_b = trc.relocated(affinity_a.into(), affinity_b);
let other_arc = trc_b.into_arc();
assert!(
!sync::Arc::ptr_eq(&pre_arc, &other_arc),
"relocated(source, other) must build a fresh per-affinity value"
);
assert_eq!(*other_arc, 42);
}
Loading