From fe826d16f09364117dcafa1355c2a9792c0ee84f Mon Sep 17 00:00:00 2001 From: Vaiz <4908982+Vaiz@users.noreply.github.com> Date: Thu, 30 Apr 2026 12:55:56 +0100 Subject: [PATCH] fix(thread_aware): dedup factory call on first relocate when source == destination MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Arc::with_closure eagerly builds self.value for the calling thread's affinity but does not record which affinity it was. The first `relocated(source, destination)` call observes `source` from the caller — by contract the affinity of the thread that ran `with_closure`. When that first relocate has `source == destination`, the eager `self.value` is already the correct per-affinity value, so reuse it instead of running the factory again. Without this shortcut, `scheduler.spawn_multiple(Instantiation::All, ...)` called from a worker thread races on the storage write lock: the `source==destination` task can win first and factory-build (with no source-seed step, since the existing `source != destination` guard elides it) before any peer task populates the source slot via the end-of-function fallback. The result is one extra factory invocation non-deterministically, observable as `counts > len` in `fetch::isolated_on_oxidizer` (e.g. `9 > 8`). This complements PR #310 (which fixed storage corruption from the `source == destination` path) by also eliminating the extra factory call on that same path. Replaces the previous `test_relocated_source_equals_destination_does_not_corrupt_storage` regression test, which was asserting the now-fixed behavior (factory re-run on first `relocate(s, s)`). Adds a deterministic `Arc::ptr_eq` test covering the new invariant. AB#7312281 Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- crates/thread_aware/src/cell/mod.rs | 41 ++++++++++---- crates/thread_aware/src/cell/tests.rs | 81 ++++++++++++++++++++++----- 2 files changed, 97 insertions(+), 25 deletions(-) diff --git a/crates/thread_aware/src/cell/mod.rs b/crates/thread_aware/src/cell/mod.rs index e9a7677ae..6ad2390ca 100644 --- a/crates/thread_aware/src/cell/mod.rs +++ b/crates/thread_aware/src/cell/mod.rs @@ -497,17 +497,36 @@ impl ThreadAware for Arc { let (data, factory) = match &self.factory { // We can use the closure to create new data Factory::Closure(factory, factory_source_affinity) => { - let factory_clone = (**factory).clone(); - - // In case factory source is stored in factory, use that - it means we already transferred the factory - // once, so we know the original source affinity. Otherwise, use source as that means this is the first - // time we're transferring the Arc, so source is the source affinity of the factory as well. - let factory_source = factory_source_affinity.unwrap_or(source); - - ( - sync::Arc::new(factory_clone.relocated(factory_source, destination).call_once()), - Factory::Closure(sync::Arc::clone(factory), Some(factory_source)), - ) + // First-call dedup: `with_closure` eagerly built `self.value` for the + // calling thread's affinity but did not record which one. The first + // `relocated()` observes `source` from the caller, which is by contract + // the affinity of the thread that called `with_closure`. When that source + // equals the destination, the eager `self.value` is already the correct + // per-affinity value — reuse it instead of running the factory again. + // + // Without this shortcut, `Instantiation::All` on the calling thread races + // on the storage write lock: if the source==destination task wins first it + // factory-builds (counter +=1) before any peer task can populate the source + // slot via the end-of-function fallback below, leading to a non-deterministic + // factory call count. + if factory_source_affinity.is_none() && source == MemoryAffinity::Pinned(destination) { + ( + sync::Arc::clone(&self.value), + Factory::Closure(sync::Arc::clone(factory), Some(source)), + ) + } else { + let factory_clone = (**factory).clone(); + + // In case factory source is stored in factory, use that - it means we already transferred the factory + // once, so we know the original source affinity. Otherwise, use source as that means this is the first + // time we're transferring the Arc, so source is the source affinity of the factory as well. + let factory_source = factory_source_affinity.unwrap_or(source); + + ( + sync::Arc::new(factory_clone.relocated(factory_source, destination).call_once()), + Factory::Closure(sync::Arc::clone(factory), Some(factory_source)), + ) + } } // We can clone and transfer the data diff --git a/crates/thread_aware/src/cell/tests.rs b/crates/thread_aware/src/cell/tests.rs index 116c14bf0..9281bc995 100644 --- a/crates/thread_aware/src/cell/tests.rs +++ b/crates/thread_aware/src/cell/tests.rs @@ -460,31 +460,84 @@ fn test_strong_count_independent_across_affinities() { } #[test] -fn test_relocated_source_equals_destination_does_not_corrupt_storage() { - // Regression test: when source == destination, Arc::relocated() must NOT overwrite the - // newly-created value in storage with the stale pre-relocation value. +fn test_relocated_source_equals_destination_first_call_reuses_eager_value() { + // Regression test for two coupled invariants of `Arc::relocated` when source == destination: + // + // 1. First-call dedup: the eager value built inside `with_closure` for the calling + // thread's affinity must be reused. Calling the factory again would violate the + // "factory runs at most once per affinity" contract that callers like + // `Instantiation::All` rely on, producing the off-by-one factory-call count that + // caused the original flake. + // + // 2. No storage corruption: after the relocate completes, the storage slot for that + // affinity must hold the value that was actually returned, not a stale pre-relocation + // value written by the source-restore branch. let affinities = pinned_affinities(&[2]); let affinity = affinities[0]; - // Create an Arc whose Counter starts at zero, then advance it. + // `Counter`'s inner atomic is shared across clones, so mutating it here lets us tell + // "reused eager value" (sees 42) apart from "fresh factory build" (Counter::new ⇒ 0). let arc = PerCore::new(Counter::new); arc.increment_by(42); assert_eq!(arc.value(), 42); - // Relocate with source == destination. The ThreadAware impl always creates a *new* - // Counter (value resets to 0), so `relocated` must return 0 and must also leave - // storage holding 0 (not the stale 42). + // First relocation with source == destination must reuse the eager value, NOT rerun + // the factory. This is the fix for the `counts > len` flake. let relocated = arc.relocated(affinity.into(), affinity); - assert_eq!(relocated.value(), 0, "relocated value should come from factory"); + assert_eq!( + relocated.value(), + 42, + "first relocated(source, source) must reuse the eager value from with_closure" + ); - // A second relocation from the same slot must find the factory-created value (0) in - // storage, not the stale pre-relocation value (42). Before the bug fix, the first - // relocated() call wrote the stale Arc back into the storage slot, - // so the second call's `get_clone` fast-path would return 42 instead of 0. + // Subsequent relocations from the same slot must read from storage (cached), not + // resurrect a stale pre-relocation value via the source-restore branch. let relocated_again = relocated.relocated(affinity.into(), affinity); assert_eq!( relocated_again.value(), - 0, - "subsequent relocation must not see stale pre-relocation value from storage" + 42, + "subsequent relocation must see the cached storage value, not a stale or fresh one" + ); +} + +#[test] +fn test_with_closure_relocate_to_source_reuses_eager_value() { + // Regression test for the flaky `Instantiation::All` factory-call count. + // + // `with_closure` eagerly invokes the factory once to materialize `self.value` for the + // calling thread's affinity, but does not record which affinity that was. The first + // `relocated()` call observes the caller's affinity as `source`, so when source equals + // the destination, the eager value must be reused and the factory must NOT be called + // again. Without this shortcut, parallel relocations to all worker affinities — where + // one task targets the source affinity itself — race on the storage write lock and + // the factory-call count becomes non-deterministic. + let affinities = pinned_affinities(&[2]); + let affinity_a = affinities[0]; + let affinity_b = affinities[1]; + + let trc = PerCore::with_closure(relocate((), |()| 42)); + + // Capture the eagerly-built inner Arc so we can compare identity later. + let pre_arc = trc.clone().into_arc(); + assert_eq!(*pre_arc, 42, "with_closure should eagerly build the value"); + + // First relocation has source == destination. With the fix, this must reuse the eager + // value rather than calling the factory again — verified via Arc::ptr_eq on the inner + // sync::Arc. + let trc_a = trc.clone().relocated(affinity_a.into(), affinity_a); + let post_arc = trc_a.into_arc(); + assert!( + sync::Arc::ptr_eq(&pre_arc, &post_arc), + "relocated(source, source) must reuse the eager sync::Arc from with_closure" + ); + + // Relocating to a different affinity still requires a fresh per-affinity value, so + // the inner Arc must differ (factory was invoked for the new affinity). + let trc_b = trc.relocated(affinity_a.into(), affinity_b); + let other_arc = trc_b.into_arc(); + assert!( + !sync::Arc::ptr_eq(&pre_arc, &other_arc), + "relocated(source, other) must build a fresh per-affinity value" ); + assert_eq!(*other_arc, 42); }