From c0aff4506b63f338c2007b085af746914aad6d5b Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 6 May 2026 18:43:36 +0000 Subject: [PATCH 1/6] fix: replace infinite retry loop in _execOverDatabase with counterBefore signal for eventual consistency Agent-Logs-Url: https://github.com/pubkey/rxdb/sessions/e3b3eb19-a9bb-4a2e-ade7-4a0d97dab92f Co-authored-by: pubkey <8926560+pubkey@users.noreply.github.com> --- src/rx-query.ts | 37 ++++++++++++++++++++++++------------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index 5362e829bb0..96c5ff30fcd 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -21,8 +21,7 @@ import { PROMISE_RESOLVE_FALSE, RXJS_SHARE_REPLAY_DEFAULTS, ensureNotFalsy, - areRxDocumentArraysEqual, - promiseWait + areRxDocumentArraysEqual } from './plugins/utils/index.ts'; import { newRxError, @@ -272,7 +271,7 @@ export class RxQueryBase< * executes the query on the database * @return results-array with document-data */ - async _execOverDatabase(rerunCount = 0): Promise<{ + async _execOverDatabase(): Promise<{ result: RxDocumentData[] | number; counter: number; }> { @@ -283,14 +282,9 @@ export class RxQueryBase< }; /** - * @performance - * Instead of subscribing to eventBulks$ to detect concurrent writes, - * we snapshot the change event counter before and after the query. - * If the counter changed, a write happened during execution and - * we must re-run the query to ensure correct results. - * This avoids the overhead of RxJS Subject subscribe/unsubscribe per query. - * - * @link https://github.com/pubkey/rxdb/issues/7067 + * Snapshot the counter before running the query. + * If a write commits during the query, the counter will be higher + * when we check it below, and we need to signal re-evaluation. */ const counterBefore = this.collection._changeEventBuffer.getCounter(); @@ -343,9 +337,26 @@ export class RxQueryBase< }; } + /** + * If a write committed concurrently during our query, our result may be + * stale: the write's change event was already buffered so getCounter() + * inside queryCollection() advanced _latestChangeEvent past the event, + * which would cause subsequent _ensureEqual() calls to see + * _isResultsInSync()=true and skip re-evaluation entirely. + * + * By returning counter=counterBefore instead of the actual (advanced) + * counter, we ensure the change events already queued in + * _ensureEqualQueue will see _isResultsInSync()=false and re-run the + * query with fresh data. This avoids an unbounded retry loop while still + * guaranteeing eventual correctness through the live-query mechanism. + * + * @link https://github.com/pubkey/rxdb/issues/7067 + */ if (this.collection._changeEventBuffer.getCounter() !== counterBefore) { - await promiseWait(rerunCount * 20); - return this._execOverDatabase(rerunCount + 1); + return { + result: result.result, + counter: counterBefore + }; } return result; From 172fa427f8305fcf236a3719ea32dd3d060a34f9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 6 May 2026 21:40:03 +0000 Subject: [PATCH 2/6] test: add regression test for #7067 first-emission stall under concurrent writes Agent-Logs-Url: https://github.com/pubkey/rxdb/sessions/18b5bac4-27b1-4b4f-a295-1f30c3108f7a Co-authored-by: pubkey <8926560+pubkey@users.noreply.github.com> --- test/unit/rx-query.test.ts | 96 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index ff661e337df..b09a4b4649d 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -1417,6 +1417,102 @@ describe('rx-query.test.ts', () => { db.close(); }); }); + /** + * Regression test for https://github.com/pubkey/rxdb/issues/7067 + * + * _execOverDatabase previously contained an unbounded retry loop: + * whenever getCounter() advanced during the query it would re-run + * with `promiseWait(rerunCount * 20)` backoff. Under a continuous + * write loop the loop never exited, so query.$ never emitted. + * + * The fix returns counter=counterBefore when a concurrent write is + * detected, letting the existing _ensureEqualQueue mechanism + * re-evaluate once — no loop, no growing delay. + * + * We verify the fix by running a tight write loop while subscribing + * and asserting that the first emission arrives within a strict + * deadline (2 s). With the old retry loop the test would time out. + */ + it('#7067 query.$ emits first result within deadline under continuous concurrent writes', async () => { + const memStorage = getRxStorageMemory(); + + /** + * Wrap query() so each call takes ~10 ms, simulating a real + * async storage (Dexie/IndexedDB) where reads are never instant. + * This makes concurrent writes reliably advance the counter before + * the query returns, triggering the counter-mismatch code path. + */ + const slowStorage = Object.assign({}, memStorage, { + name: 'slow-' + memStorage.name, + async createStorageInstance(params: any) { + const instance = await memStorage.createStorageInstance(params); + if (params.collectionName === 'humans') { + const orig = instance.query.bind(instance); + instance.query = (preparedQuery: any) => + new Promise(res => setTimeout(() => res(orig(preparedQuery)), 10)); + } + return instance; + } + }); + + const db = await createRxDatabase({ + name: randomToken(10), + storage: wrappedValidateAjvStorage({ storage: slowStorage }), + eventReduce: false + }); + const { humans: c } = await db.addCollections({ + humans: { schema: schemas.human } + }); + + // Pre-insert one document so the subscription has something to emit. + await c.insert(schemaObjects.humanData('seed')); + + // Fire concurrent writes without awaiting each one so they truly + // overlap with the subscription initialisation and advance the + // ChangeEventBuffer counter while _execOverDatabase is in flight. + // A fixed number of inserts keeps teardown bounded. + const WRITE_COUNT = isFastMode() ? 10 : 30; + const writePromises = Array.from({ length: WRITE_COUNT }, () => + c.insert(schemaObjects.humanData(randomToken(10))).catch(() => { /* ignore teardown errors */ }) + ); + + // Give the first writes a tiny head-start before subscribing. + await promiseWait(5); + + let firstEmissionDocCount = -1; + const DEADLINE_MS = 2000; + const firstEmissionReceived = new Promise(res => { + const sub = c.find().$.subscribe(docs => { + if (firstEmissionDocCount < 0 && docs.length > 0) { + firstEmissionDocCount = docs.length; + sub.unsubscribe(); + res(); + } + }); + }); + + // Assert that the first emission arrives within the deadline. + let timedOut = false; + await Promise.race([ + firstEmissionReceived, + promiseWait(DEADLINE_MS).then(() => { + timedOut = true; + }) + ]); + + // Wait for all writes to finish before tearing down. + await Promise.all(writePromises); + + assert.strictEqual( + timedOut, + false, + 'query.$ did not emit within ' + DEADLINE_MS + ' ms under ' + WRITE_COUNT + ' concurrent writes. ' + + 'This indicates the retry-loop regression from #7067.' + ); + assert.ok(firstEmissionDocCount > 0, 'emission should contain at least one document'); + + db.close(); + }); /** * @link https://github.com/pubkey/rxdb/pull/7497 */ From 810fbd58de8c89030d9d4a6ab0ac37a94a3091af Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 18 May 2026 12:12:16 +0000 Subject: [PATCH 3/6] fix: restore fixed wait in query rerun path Agent-Logs-Url: https://github.com/pubkey/rxdb/sessions/e5ba458c-8692-4a6e-9c4c-fc4c9a0c0251 Co-authored-by: pubkey <8926560+pubkey@users.noreply.github.com> --- src/rx-query.ts | 20 +++++++++----------- test/unit/rx-query.test.ts | 15 +++++++-------- 2 files changed, 16 insertions(+), 19 deletions(-) diff --git a/src/rx-query.ts b/src/rx-query.ts index 96c5ff30fcd..83925f67896 100644 --- a/src/rx-query.ts +++ b/src/rx-query.ts @@ -21,7 +21,8 @@ import { PROMISE_RESOLVE_FALSE, RXJS_SHARE_REPLAY_DEFAULTS, ensureNotFalsy, - areRxDocumentArraysEqual + areRxDocumentArraysEqual, + promiseWait } from './plugins/utils/index.ts'; import { newRxError, @@ -271,7 +272,7 @@ export class RxQueryBase< * executes the query on the database * @return results-array with document-data */ - async _execOverDatabase(): Promise<{ + async _execOverDatabase(rerunCount = 0): Promise<{ result: RxDocumentData[] | number; counter: number; }> { @@ -344,19 +345,16 @@ export class RxQueryBase< * which would cause subsequent _ensureEqual() calls to see * _isResultsInSync()=true and skip re-evaluation entirely. * - * By returning counter=counterBefore instead of the actual (advanced) - * counter, we ensure the change events already queued in - * _ensureEqualQueue will see _isResultsInSync()=false and re-run the - * query with fresh data. This avoids an unbounded retry loop while still - * guaranteeing eventual correctness through the live-query mechanism. + * Re-run the query when a write commits concurrently. We still yield + * with a fixed wait so non-async storages do not monopolize the event + * loop, but we avoid the old growing backoff that could delay + * convergence more and more under write pressure. * * @link https://github.com/pubkey/rxdb/issues/7067 */ if (this.collection._changeEventBuffer.getCounter() !== counterBefore) { - return { - result: result.result, - counter: counterBefore - }; + await promiseWait(20); + return this._execOverDatabase(rerunCount + 1); } return result; diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index b09a4b4649d..2dc61837c76 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -1420,14 +1420,13 @@ describe('rx-query.test.ts', () => { /** * Regression test for https://github.com/pubkey/rxdb/issues/7067 * - * _execOverDatabase previously contained an unbounded retry loop: - * whenever getCounter() advanced during the query it would re-run - * with `promiseWait(rerunCount * 20)` backoff. Under a continuous - * write loop the loop never exited, so query.$ never emitted. + * _execOverDatabase previously contained a retry loop with + * `promiseWait(rerunCount * 20)` backoff. Under a continuous write + * loop the growing delay could keep pushing the first emission out. * - * The fix returns counter=counterBefore when a concurrent write is - * detected, letting the existing _ensureEqualQueue mechanism - * re-evaluate once — no loop, no growing delay. + * The fix keeps the retry, but switches it to a fixed + * `promiseWait(20)` so the event loop still gets a chance to run on + * fast storages without compounding the delay on every re-run. * * We verify the fix by running a tight write loop while subscribing * and asserting that the first emission arrives within a strict @@ -1507,7 +1506,7 @@ describe('rx-query.test.ts', () => { timedOut, false, 'query.$ did not emit within ' + DEADLINE_MS + ' ms under ' + WRITE_COUNT + ' concurrent writes. ' + - 'This indicates the retry-loop regression from #7067.' + 'This indicates the retry-delay regression from #7067.' ); assert.ok(firstEmissionDocCount > 0, 'emission should contain at least one document'); From 324897219376006e85ee6c29a47109d564c48973 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 18 May 2026 12:21:30 +0000 Subject: [PATCH 4/6] test: remove complex #7067 regression case Agent-Logs-Url: https://github.com/pubkey/rxdb/sessions/7f3e4920-b04a-47bf-9b2c-a369a840a9ee Co-authored-by: pubkey <8926560+pubkey@users.noreply.github.com> --- test/unit/rx-query.test.ts | 95 -------------------------------------- 1 file changed, 95 deletions(-) diff --git a/test/unit/rx-query.test.ts b/test/unit/rx-query.test.ts index 2dc61837c76..ff661e337df 100644 --- a/test/unit/rx-query.test.ts +++ b/test/unit/rx-query.test.ts @@ -1417,101 +1417,6 @@ describe('rx-query.test.ts', () => { db.close(); }); }); - /** - * Regression test for https://github.com/pubkey/rxdb/issues/7067 - * - * _execOverDatabase previously contained a retry loop with - * `promiseWait(rerunCount * 20)` backoff. Under a continuous write - * loop the growing delay could keep pushing the first emission out. - * - * The fix keeps the retry, but switches it to a fixed - * `promiseWait(20)` so the event loop still gets a chance to run on - * fast storages without compounding the delay on every re-run. - * - * We verify the fix by running a tight write loop while subscribing - * and asserting that the first emission arrives within a strict - * deadline (2 s). With the old retry loop the test would time out. - */ - it('#7067 query.$ emits first result within deadline under continuous concurrent writes', async () => { - const memStorage = getRxStorageMemory(); - - /** - * Wrap query() so each call takes ~10 ms, simulating a real - * async storage (Dexie/IndexedDB) where reads are never instant. - * This makes concurrent writes reliably advance the counter before - * the query returns, triggering the counter-mismatch code path. - */ - const slowStorage = Object.assign({}, memStorage, { - name: 'slow-' + memStorage.name, - async createStorageInstance(params: any) { - const instance = await memStorage.createStorageInstance(params); - if (params.collectionName === 'humans') { - const orig = instance.query.bind(instance); - instance.query = (preparedQuery: any) => - new Promise(res => setTimeout(() => res(orig(preparedQuery)), 10)); - } - return instance; - } - }); - - const db = await createRxDatabase({ - name: randomToken(10), - storage: wrappedValidateAjvStorage({ storage: slowStorage }), - eventReduce: false - }); - const { humans: c } = await db.addCollections({ - humans: { schema: schemas.human } - }); - - // Pre-insert one document so the subscription has something to emit. - await c.insert(schemaObjects.humanData('seed')); - - // Fire concurrent writes without awaiting each one so they truly - // overlap with the subscription initialisation and advance the - // ChangeEventBuffer counter while _execOverDatabase is in flight. - // A fixed number of inserts keeps teardown bounded. - const WRITE_COUNT = isFastMode() ? 10 : 30; - const writePromises = Array.from({ length: WRITE_COUNT }, () => - c.insert(schemaObjects.humanData(randomToken(10))).catch(() => { /* ignore teardown errors */ }) - ); - - // Give the first writes a tiny head-start before subscribing. - await promiseWait(5); - - let firstEmissionDocCount = -1; - const DEADLINE_MS = 2000; - const firstEmissionReceived = new Promise(res => { - const sub = c.find().$.subscribe(docs => { - if (firstEmissionDocCount < 0 && docs.length > 0) { - firstEmissionDocCount = docs.length; - sub.unsubscribe(); - res(); - } - }); - }); - - // Assert that the first emission arrives within the deadline. - let timedOut = false; - await Promise.race([ - firstEmissionReceived, - promiseWait(DEADLINE_MS).then(() => { - timedOut = true; - }) - ]); - - // Wait for all writes to finish before tearing down. - await Promise.all(writePromises); - - assert.strictEqual( - timedOut, - false, - 'query.$ did not emit within ' + DEADLINE_MS + ' ms under ' + WRITE_COUNT + ' concurrent writes. ' + - 'This indicates the retry-delay regression from #7067.' - ); - assert.ok(firstEmissionDocCount > 0, 'emission should contain at least one document'); - - db.close(); - }); /** * @link https://github.com/pubkey/rxdb/pull/7497 */ From e270637c0abe71036a0e43558d3bd8b2498b2424 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 18 May 2026 13:43:18 +0000 Subject: [PATCH 5/6] docs: add changelog entry for issue 8444 Agent-Logs-Url: https://github.com/pubkey/rxdb/sessions/e10a28bf-0e68-41a2-ad64-b8fc905af251 Co-authored-by: pubkey <8926560+pubkey@users.noreply.github.com> --- orga/changelog/fix-live-query-rerun-race-8444.md | 1 + 1 file changed, 1 insertion(+) create mode 100644 orga/changelog/fix-live-query-rerun-race-8444.md diff --git a/orga/changelog/fix-live-query-rerun-race-8444.md b/orga/changelog/fix-live-query-rerun-race-8444.md new file mode 100644 index 00000000000..71409e33dee --- /dev/null +++ b/orga/changelog/fix-live-query-rerun-race-8444.md @@ -0,0 +1 @@ +- FIX live queries missing the required re-evaluation when writes race `_execOverDatabase()`, by retrying when the `ChangeEventBuffer` counter changes during the storage read [#8444](https://github.com/pubkey/rxdb/issues/8444) From aac3a2bbf7f68326bfab4e983933620265bd57a0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Mon, 18 May 2026 13:52:00 +0000 Subject: [PATCH 6/6] docs: correct changelog wording for issue 8444 Agent-Logs-Url: https://github.com/pubkey/rxdb/sessions/698d5cce-d56a-45ec-8dd7-34d782157358 Co-authored-by: pubkey <8926560+pubkey@users.noreply.github.com> --- orga/changelog/fix-live-query-rerun-race-8444.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/orga/changelog/fix-live-query-rerun-race-8444.md b/orga/changelog/fix-live-query-rerun-race-8444.md index 71409e33dee..addee40012d 100644 --- a/orga/changelog/fix-live-query-rerun-race-8444.md +++ b/orga/changelog/fix-live-query-rerun-race-8444.md @@ -1 +1 @@ -- FIX live queries missing the required re-evaluation when writes race `_execOverDatabase()`, by retrying when the `ChangeEventBuffer` counter changes during the storage read [#8444](https://github.com/pubkey/rxdb/issues/8444) +- FIX live-query subscriptions being delayed under concurrent writes, by using a fixed `promiseWait(20)` in the `_execOverDatabase()` rerun path instead of increasing the wait time with each rerun [#8444](https://github.com/pubkey/rxdb/issues/8444)