Eliminate Rx Merge gate in queue-serialized operators#1097
Open
dwcullop wants to merge 11 commits into
Open
Conversation
reactivemarbles#1079 moved cross-cache operators from Synchronize(lock) to SynchronizeSafe, which routes deliveries through a SharedDeliveryQueue that releases the lock before invoking downstream observers. The intent was to make the lock no longer held across cross-cache calls, so two operators on a bidirectional pipeline could not form an ABBA cycle. Six operators (Page, Virtualise, AutoRefresh, Sort, GroupOnImmutable, and QueryWhenChanged) routed every input through the queue but then combined the inputs with Observable.Merge before delivery. Rx's Merge installs its own private gate and holds it for the full duration of every downstream OnNext. When downstream delivery walks into another cache's writer lock, the two Merge gates on the two operators reconstruct the ABBA cycle that the queue- drain design was supposed to eliminate. DeadlockTortureTest.Page_DoesNotDeadlock caught this for Page; the other five had the same latent bug. This adds IObservable<T>.UnsynchronizedMerge, a drop-in alternative to Observable.Merge that performs no synchronization of its own. It is safe to use only when every input is already serialized (in this library, by routing through the same SharedDeliveryQueue). All six operators now use it. Sort's three-source case becomes a single UnsynchronizedMerge call instead of nested .Merge().Merge(), removing one of the two gates that the chained form created. FullJoin uses the same Merge syntax but its two inputs come from independently materialized AsObservableCache().Connect() streams that share no queue. The Merge gate is the only thing serializing them; this PR leaves FullJoin alone. DeadlockTortureTest grows three new cases (GroupWithImmutableState, QueryWhenChanged, and Virtualise added to the stacked + multi-pair scenarios) so a future regression in any of the six operators is caught by the existing torture fixture. Verified: 14/14 DeadlockTortureTest pass at MaxParallelThreads=16 across 10 iterations; 422/422 Sort/Virtualise/Page/AutoRefresh/Group/QueryWhenChanged unit tests pass; full Cache + List suite passes (2321 passed, 1 skipped).
Initial implementation subscribed every source to a single shared Observer.Create instance. The instance is an AnonymousObserver, which derives from ObserverBase and tracks a one-shot _isStopped flag inside its OnCompleted/OnError. Once any source's terminal notification flips that flag, every subsequent OnCompleted from the remaining sources is silently dropped before reaching the pending counter, so the merged observable never emits OnCompleted downstream. CrossCacheDeadlockStressTest.AllOperators_CrossCache_NoDeadlock_CorrectResults caught this consistently in CI: the sourceB.Sort.Virtualise pipeline received OnCompleted from virtBRequests (its first source), but the matching OnCompleted from sourceB.Dispose arrived at a stopped observer and was discarded, leaving LastOrDefaultAsync waiting forever. Each source now subscribes through its own Observer.Create instance. The OnNextSafe/OnErrorSafe/OnCompletedSafe actions close over the same shared pending and terminated counters, so the all-must-complete and first-error-wins semantics are unchanged; only the per-observer one-shot state is now isolated per source. This matches the per-InnerObserver pattern that Rx's own Observable.Merge uses internally. Also apply UnsynchronizedMerge to TransformWithForcedTransform, which was missed in the original survey. Its shared.Merge(refresher) routed both inputs through the same SharedDeliveryQueue but kept Rx's gate, giving the same latent ABBA exposure that DeadlockTortureTest.TransformWithForce_DoesNotDeadlock flagged in CI. Verified: CrossCacheDeadlockStressTest plus the full DeadlockTortureTest fixture pass 10/10 at xUnit.MaxParallelThreads=16; full test suite passes 2323/2323 at xUnit.MaxParallelThreads=4.
Six of the operators changed in this branch followed the same shape:
var queue = new SharedDeliveryQueue();
var s1 = source1.SynchronizeSafe(queue).Select(projection1);
var s2 = source2.SynchronizeSafe(queue).Select(projection2);
return new CompositeDisposable(s1.UnsynchronizedMerge(s2)... , queue);
Every site allocates its own queue, threads it through each input, and
unwinds it in the disposable. The pattern is mechanical and easy to get
wrong: the queue must outlive the subscription, every input must be
serialized through the same queue, and the merge must skip Rx's gate.
DeliveryQueueMerge wraps that pattern as one operator. Each overload
owns its own SharedDeliveryQueue, routes every input through it via
SynchronizeSafe(queue), and combines the serialized streams with
UnsynchronizedMerge. The returned disposable tears down the merge
before the queue so terminal notifications still flow through the
still-active queue.
Two flavours:
DeliveryQueueMerge(IObservable<T>, params IObservable<T>[])
same-type merge, no projection (AutoRefresh)
DeliveryQueueMerge(IObservable<T1>, Func<T1,TOut>, IObservable<T2>, Func<T2,TOut>)
heterogeneous two-source merge with projections invoked inside the drain
(Page, Virtualise, GroupOnImmutable, QueryWhenChanged)
DeliveryQueueMerge(IObservable<T1>, ..., IObservable<T2>, ..., IObservable<T3>, ...)
three-source heterogeneous merge (Sort, non-early-return branch)
TransformWithForcedTransform keeps its current shape: its queue is shared
with a Publish()/cacheLoader subscription that lives outside the merge,
so the queue cannot be encapsulated by a merge operator. UnsynchronizedMerge
remains the helper there.
Verified locally: 437/437 unit tests across the six affected operators pass;
DeadlockTortureTest plus CrossCacheDeadlockStressTest pass 10/10 at
xUnit.MaxParallelThreads=16; full test suite passes at MaxParallelThreads=4.
The heterogeneous DeliveryQueueMerge overloads pushed too much into
each call site to read like idiomatic Rx, and at five of the six
operators the projections had to run inside the shared delivery queue
to preserve Rx semantics, which the operator-level signature could not
express without exposing the queue type to the caller.
Keep the same-type extension overload only:
public static IObservable<T> DeliveryQueueMerge<T>(
this IObservable<T> first,
params IObservable<T>[] others)
This reads as a drop-in for Observable.Merge at AutoRefresh's call
site, which is the only place all inputs are already the same type
and need no per-input projection inside the drain.
Page, Virtualise, Sort, GroupOnImmutable, and QueryWhenChanged keep
the explicit SharedDeliveryQueue + SynchronizeSafe(queue) + UnsynchronizedMerge
shape introduced earlier in this branch. Each call site shows the
queue plumbing because the projections must execute inside the drain;
making that visible matches the rest of the code in the file.
Tests with subject inputs (Page, Virtualise, BatchIf, TransformWithForce, AllDangerous_Stacked, MultiplePairs) created the subject but nothing ever called OnNext on it. The bidirectional source writes still flowed through the operator's Merge gate, so the original deadlock was triggered, but the operator's subject-driven branch (refresher, request changes, pause toggle) was never invoked during the race. A regression that broke only that branch would not be caught. Add an optional subjectPusher callback to RunBidirectionalDeadlockTest that runs on a third worker thread, gated by the same Barrier as the two writer threads, and have each subject-bearing test push its own pattern on the subject while sources are writing. For the Page/Virtualise/BatchIf inline subjects in MultiplePairs, lift them to named locals so they can be referenced from the pusher closure. Also collapse the vertical layout introduced in the previous commits for DeliveryQueueMerge's CompositeDisposable construction and the UnsynchronizedMerge OnCompleted predicate.
Every input has the same element type T, so the type-erased
SharedDeliveryQueue with its per-source DeliverySubQueue<T> wrappers
was carrying machinery (bitset, sub-queue list, type-erased StageNext/
DeliverStaged dispatch) that the same-type merge never used.
Replace the implementation with one DeliveryQueue<T> and per-source
Observer.Create instances:
- OnNext: forwarded directly to queue.OnNext. The queue's gate
serializes concurrent calls from multiple producers; the drain
delivers items in arrival order outside the lock, so a downstream
observer that walks into another cache's writer lock cannot
deadlock with this serialization point.
- OnError: forwarded directly to queue.OnError. The queue marks
itself terminated at the first error reaching the drain, so a
second concurrent error from another source is dropped at enqueue
and the downstream observer sees OnError exactly once.
- OnCompleted: counter-gated; only the last surviving source's
completion calls queue.OnCompleted, matching Observable.Merge's
all-must-complete semantic. If a source has already errored, the
queue is terminated and the eventual OnCompleted at the counter's
floor is dropped at enqueue.
The per-source Observer.Create instance is required for the same
reason it is in UnsynchronizedMerge: Rx's ObserverBase sets a one-shot
stopped flag on the first OnCompleted/OnError, and a single shared
observer would silently drop terminal notifications from every source
after the first.
AutoRefresh is the only consumer of DeliveryQueueMerge. All tests
across AutoRefresh, DeadlockTortureTest, and CrossCacheDeadlockStressTest
pass; deadlock fixture passes 5/5 at xUnit.MaxParallelThreads=16.
PR build failed AllDangerous_Stacked_DoNotDeadlock after 27s on a single iteration (the per-iteration TimeoutSeconds=15 budget was exceeded, then RunBidirectionalDeadlockTest returned false). It was not a deadlock; the pipeline was just doing too much work. Each force.OnNext in this test triggers TransformWithForcedTransform's refresher, which scans cache.KeyValues and emits a refresh changeset that flows through the full 9-operator stack (GroupWithImmutableState, TransformMany, AutoRefresh, Filter, Transform, OnItemRemoved, DisposeMany, Sort, Virtualise, Page). At ItemCount=200 pusher iterations with three subjects pushed per iteration (force, pageReq, virtReq), the pusher thread did ~600 push operations per iteration on top of the two writer threads' 200 source AddOrUpdates each. The other torture tests have a single-operator pipeline and one pusher and fit well within the budget; only the stacked case combines a heavy pipeline with three concurrent pushers. Reduce StackedPushCount to ItemCount/4 = 50, three subjects each. That keeps the subject branches under contention (still 150 pushes per iteration, still well above source-write rate) while bringing each iteration's worst case comfortably under TimeoutSeconds. The other subject-bearing tests are unchanged.
Previous commit reduced the AllDangerous_Stacked pusher load to fit the 15s per-iteration budget on the CI runner. That was the wrong trade: the test is a torture test, and shaving load to match the slowest hardware costs coverage. The CI runners are deliberately stripped down; the test budget should account for them. Raise TimeoutSeconds from 15 to 60 across the fixture and restore the full ItemCount pusher loop in AllDangerous_Stacked. The timeout still catches an actual deadlock (which hangs forever, not 60s), and the extra budget covers worst-case scheduling on a small VM.
Contributor
There was a problem hiding this comment.
Pull request overview
This PR continues the queue-drain deadlock work by replacing selected Observable.Merge usages that reintroduced downstream-held gates after SynchronizeSafe serialization. It adds internal merge helpers and updates affected cache operators plus deadlock torture coverage.
Changes:
- Added gate-free
UnsynchronizedMergeand queue-backedDeliveryQueueMergehelpers. - Replaced
Mergein several cache operators already usingSharedDeliveryQueue/queue-drain delivery. - Expanded deadlock torture tests to exercise subject-driven merge branches and
GroupWithImmutableState/QueryWhenChanged.
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 3 comments.
Show a summary per file
| File | Description |
|---|---|
src/DynamicData/Internal/SynchronizeSafeExtensions.cs |
Adds UnsynchronizedMerge for already-serialized inputs. |
src/DynamicData/Internal/DeliveryQueueMergeExtensions.cs |
Adds queue-backed merge for same-type inputs needing serialization. |
src/DynamicData/Internal/SharedDeliveryQueue.cs |
Restores BOM only. |
src/DynamicData/Cache/Internal/Virtualise.cs |
Replaces Merge with UnsynchronizedMerge. |
src/DynamicData/Cache/Internal/TransformWithForcedTransform.cs |
Replaces forced refresh merge with UnsynchronizedMerge. |
src/DynamicData/Cache/Internal/Sort.cs |
Collapses chained merges into one UnsynchronizedMerge. |
src/DynamicData/Cache/Internal/QueryWhenChanged.cs |
Replaces item-trigger branch merge with UnsynchronizedMerge. |
src/DynamicData/Cache/Internal/Page.cs |
Replaces page request/data merge with UnsynchronizedMerge. |
src/DynamicData/Cache/Internal/GroupOnImmutable.cs |
Replaces group/regroup merge with UnsynchronizedMerge. |
src/DynamicData/Cache/Internal/AutoRefresh.cs |
Uses DeliveryQueueMerge for source and refresh changes. |
src/DynamicData.Tests/Cache/DeadlockTortureTest.cs |
Expands deadlock stress scenarios and subject-push branches. |
…cused helper coverage Address reviewer feedback on reactivemarbles#1097. SortAndPage and SortAndVirtualize had the same shape that motivated the rest of this PR: three queue-serialized inputs combined with Observable.Merge, which reinstates the gate we removed elsewhere. Replace the Merge with UnsynchronizedMerge at both sites. SortAndPage drops the static Observable.Merge form for the extension-method form; SortAndVirtualize collapses chained .Merge().Merge() into a single UnsynchronizedMerge call, removing the second redundant gate too. DeadlockTortureTest now covers both new operators alongside the older Sort().Page() and Sort().Virtualise() forms. Each test pushes on its own subject during the race so the request branch of the merge fires under contention. MultiplePairs_Simultaneous_NoDeadlock gains two more parallel lanes (SortAndPage, SortAndVirtualize) wired through separate BehaviorSubjects so all four request streams are pushed concurrently. Add focused unit tests for the two helpers: UnsynchronizedMergeFixture covers the Rx Merge-compatible contract: arrival-order forwarding, all-must-complete OnCompleted, first-error-wins, late-terminal-after-error suppression, argument-order subscription, synchronous Empty/Throw sources at subscribe, and the no-others fallback. DeliveryQueueMergeFixture covers the same behavioural contract for the queue-backed variant plus a serialization check: two producers race 1000 items each through the merged stream while the observer asserts a max of one in-flight OnNext, with the full bag delivered exactly once. Verification: - 36/36 helper + DeadlockTortureTest pass in a single run. - DeadlockTortureTest 16/16 pass 5/5 consecutive runs at xUnit.MaxParallelThreads=16. - 422/422 affected operator tests pass.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
PR #1079 moved cross-cache operators from
Synchronize(lock)toSynchronizeSafe, which routes every notification through aSharedDeliveryQueuethat releases the lock before invoking the downstream observer. The goal was that no operator-level lock would be held across a cross-cache call, so two operators on a bidirectional pipeline could not form an ABBA cycle.Several operators completed the queue routing but then combined their already-serialized inputs with
Observable.Mergebefore delivery:PageVirtualiseAutoRefreshSort(the branch wherecomparerChangedObservableorresorteris present)GroupOnImmutableQueryWhenChanged(theitemChangedTriggerbranch)TransformWithForcedTransformRx's
Observable.Mergeinstalls a private_gateand holds it for the full duration of every downstreamOnNext. When that downstream delivery walks into another cache's writer lock, the twoMergegates on the two operators reconstruct the ABBA cycle that the queue-drain design was supposed to eliminate.DeadlockTortureTest.Page_DoesNotDeadlock(added in #1079) caught this forPageas an intermittent CI failure. The same latent bug existed in the other six operators; the existing torture fixture did not exercise their merge branches under cross-cache writes.Fix
Introduce two internal Rx helpers and apply them at the seven sites:
IObservable<T>.UnsynchronizedMerge<T>(this IObservable<T>, params IObservable<T>[])inSynchronizeSafeExtensions.cs. Drop-in alternative toObservable.Mergethat performs no synchronization of its own. Preserves Merge's terminal semantics (completes only after every source completes; first error terminates; subscription happens in argument order) but does not install a gate. Each source is subscribed through its ownObserver.Createinstance because Rx'sObserverBasesets a one-shot stopped flag on the firstOnCompleted/OnError; a single shared observer would silently drop terminal notifications from every source after the first. The OnCompleted handler decrements a shared counter; only the last surviving source's completion fires downstream.Safe to use only when every input is already serialized; in this library that precondition is satisfied by routing each input through the same
SharedDeliveryQueueviaSynchronizeSafe(queue)before the merge.IObservable<T>.DeliveryQueueMerge<T>(this IObservable<T>, params IObservable<T>[])inDeliveryQueueMergeExtensions.cs. Same-type Rx merge that owns its ownDeliveryQueue<T>so the call site reads like an ordinaryObservable.Mergeand never has to mention queue plumbing. Used where every input has the same element type and no per-input projection is needed inside the drain (AutoRefresh).Applied to the seven sites:
AutoRefreshshared.DeliveryQueueMerge(refreshChanges)PageSharedDeliveryQueue+SynchronizeSafe(queue).Select(projection)per input +UnsynchronizedMergeVirtualisePageSort(conditional branch)GroupOnImmutablePageQueryWhenChanged(itemChangedTrigger branch)PageTransformWithForcedTransformUnsynchronizedMergedirectly (queue shared with aPublish/cacheLoader subscription that lives outside the merge)Sort's three-source case becomes a singleUnsynchronizedMerge(a, b, c)call instead of nested.Merge().Merge(), which also removes one of the two gates the chained form would have created.Why
FullJoinis unchangedFullJoinuses the sameMergesyntax but its two inputs come fromleftCache.Connect()andrightCache.Connect()on independently materializedAsObservableCache()stages that share no queue. There, theMergegate is the only thing serializing the two cache deliveries before they mutatejoinedCache. Removing it without alternative serialization would race the joined cache.FullJoinis left alone.Test coverage
DeadlockTortureTestis expanded so the same fixture catches a future regression in any of the seven operators:[Fact] GroupWithImmutableState_DoesNotDeadlock.[Fact] QueryWhenChanged_DoesNotDeadlock- uses a side-channelSubscribe(_ => otherCache.AddOrUpdate(...))to close the ABBA cycle, sinceQueryWhenChangeddoes not produce a changeset thatPopulateIntocan consume.AllDangerous_Stacked_DoNotDeadlocknow stacksGroupWithImmutableStateandVirtualiseinto the kitchen-sink pipeline.MultiplePairs_Simultaneous_NoDeadlockgains aGroupWithImmutableStatelane.Tests that pass a subject input (
Page,Virtualise,BatchIf,TransformWithForce,AllDangerous_Stacked,MultiplePairs) previously created the subject and let it sit idle.BehaviorSubjectinitial values reached the operator, but no test ever pushed during the race, so the operator's subject-driven branch was never exercised. The deadlock still formed via source-side flow alone, which is whyPage/Virtualisedid fail on main, but the test could miss a regression that only manifested on the subject-driven path.RunBidirectionalDeadlockTestnow takes an optionalAction? subjectPusherthat runs on a third worker gated by the sameBarrier, and each subject-bearing test pushes its own pattern on the subject while the writer threads are pushing on the sources.The per-iteration
TimeoutSecondsis raised from 15 to 60. The CI runner is intentionally stripped down; the test budget should accommodate it. A real deadlock hangs forever, not 60s, so the timeout still distinguishes deadlock from slow.Verification
DeadlockTortureTestfixture: 14/14 pass atxUnit.MaxParallelThreads=16, 10 consecutive runs, zero failures.Sort+Virtualise+Page+AutoRefresh+Group*+QueryWhenChanged: 437/437 pass.xUnit.MaxParallelThreads=4: 2323 passed, 0 failed, 1 skipped.