From 594a671600392081a26f7accbdbe359a6be1a574 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Thu, 4 Dec 2025 13:37:10 -0600 Subject: [PATCH 01/16] fix: getAt() returns multiple broadcast streams for each element --- .../dart/lib/src/utils/parse_live_list.dart | 156 ++++++-- .../test/src/utils/parse_live_list_test.dart | 337 ++++++++++++++++++ 2 files changed, 461 insertions(+), 32 deletions(-) create mode 100644 packages/dart/test/src/utils/parse_live_list_test.dart diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index 86ad72ad3..c33c13a5f 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -11,6 +11,20 @@ class ParseLiveList { _debug = isDebugEnabled(); } + /// Creates a new [ParseLiveList] for the given [query]. + /// + /// [lazyLoading] enables lazy loading of full object data. When `true` and + /// [preloadedColumns] is provided, the initial query fetches only those columns, + /// and full objects are loaded on-demand when accessed via [getAt]. + /// When [preloadedColumns] is empty or null, all fields are fetched regardless + /// of [lazyLoading] value. Default is `true`. + /// + /// [preloadedColumns] specifies which fields to fetch in the initial query when + /// lazy loading is enabled. Order fields are automatically included to ensure + /// proper sorting. If null or empty, all fields are fetched. + /// + /// [listenOnAllSubItems] and [listeningIncludes] control which nested objects + /// receive live query updates. static Future> create( QueryBuilder query, { bool? listenOnAllSubItems, @@ -26,7 +40,7 @@ class ParseLiveList { ) : _toIncludeMap(listeningIncludes ?? []), lazyLoading, - preloadedColumns: preloadedColumns ?? const [], + preloadedColumns: preloadedColumns, ); return parseLiveList._init().then((_) { @@ -137,9 +151,14 @@ class ParseLiveList { if (_debug) { print('ParseLiveList: lazyLoading is ${_lazyLoading ? 'on' : 'off'}'); } - if (_lazyLoading) { + + // Only restrict fields if lazy loading is enabled AND preloaded columns are specified + // This allows fetching minimal data upfront and loading full objects on-demand + if (_lazyLoading && _preloadedColumns.isNotEmpty) { final List keys = _preloadedColumns.toList(); - if (_lazyLoading && query.limiters.containsKey('order')) { + + // Automatically include order fields to ensure sorting works correctly + if (query.limiters.containsKey('order')) { keys.addAll( query.limiters['order'].toString().split(',').map((String string) { if (string.startsWith('-')) { @@ -149,10 +168,10 @@ class ParseLiveList { }), ); } - if (keys.isNotEmpty) { - query.keysToReturn(keys); - } + + query.keysToReturn(keys); } + return await query.query(); } @@ -161,13 +180,20 @@ class ParseLiveList { final ParseResponse parseResponse = await _runQuery(); if (parseResponse.success) { + // Determine if fields were actually restricted in the query + // Only mark as not loaded if lazy loading AND we actually restricted fields + final bool fieldsRestricted = + _lazyLoading && _preloadedColumns.isNotEmpty; + _list = parseResponse.results ?.map>( (dynamic element) => ParseLiveListElement( element, updatedSubItems: _listeningIncludes, - loaded: !_lazyLoading, + // Mark as loaded if we fetched all fields (no restriction) + // Mark as not loaded only if fields were actually restricted + loaded: !fieldsRestricted, ), ) .toList() ?? @@ -486,34 +512,92 @@ class ParseLiveList { } } - Stream getAt(final int index) async* { - if (index < _list.length) { - if (!_list[index].loaded) { - final QueryBuilder queryBuilder = QueryBuilder.copy(_query) - ..whereEqualTo( - keyVarObjectId, - _list[index].object.get(keyVarObjectId), - ) - ..setLimit(1); - final ParseResponse response = await queryBuilder.query(); - if (_list.isEmpty) { - yield* _createStreamError( - ParseError(message: 'ParseLiveList: _list is empty'), - ); - return; + /// Returns a stream for the element at the given [index]. + /// + /// Returns the element's existing broadcast stream, which allows multiple + /// listeners without creating redundant network requests or stream instances. + /// + /// When lazy loading is enabled and an element is not yet loaded, the first + /// access will trigger loading. This is useful for pagination scenarios. + /// Subsequent calls return the same stream without additional loads. + /// + /// The returned stream is a broadcast stream from ParseLiveListElement, + /// preventing the N+1 query bug that occurred with async* generators. + Stream getAt(final int index) { + if (index >= _list.length) { + // Return an empty stream for out-of-bounds indices + return const Stream.empty(); + } + + final element = _list[index]; + + // If not yet loaded (happens with lazy loading), trigger loading + // This will only happen once per element due to the loaded flag + if (!element.loaded) { + _loadElementAt(index); + } + + // Return the element's broadcast stream + // Multiple subscriptions to this stream won't trigger multiple loads + return element.stream; + } + + /// Asynchronously loads the full data for the element at [index]. + /// + /// Called when an element is accessed for the first time. + /// Errors are emitted to the element's stream so listeners can handle them. + Future _loadElementAt(int index) async { + if (index >= _list.length) { + return; + } + + final element = _list[index]; + + // Double-check: another call might have started loading already + if (element.loaded) { + return; + } + + try { + final QueryBuilder queryBuilder = QueryBuilder.copy(_query) + ..whereEqualTo( + keyVarObjectId, + element.object.get(keyVarObjectId), + ) + ..setLimit(1); + + final ParseResponse response = await queryBuilder.query(); + + // Check if list was modified during async operation + if (_list.isEmpty || index >= _list.length) { + if (_debug) { + print('ParseLiveList: List was modified during element load'); } - if (response.success) { - _list[index].object = response.results?.first; - } else { - ParseError? error = response.error; - if (error != null) yield* _createStreamError(error); - return; + return; + } + + if (response.success && + response.results != null && + response.results!.isNotEmpty) { + // Setting the object will mark it as loaded and emit it to the stream + _list[index].object = response.results!.first; + } else if (response.error != null) { + // Emit error to the element's stream so listeners can handle it + element.emitError(response.error!, StackTrace.current); + if (_debug) { + print( + 'ParseLiveList: Error loading element at index $index: ${response.error}', + ); } } - // just for testing - // await Future.delayed(const Duration(seconds: 2)); - yield _list[index].object; - yield* _list[index].stream; + } catch (e, stackTrace) { + // Emit exception to the element's stream + element.emitError(e, stackTrace); + if (_debug) { + print( + 'ParseLiveList: Exception loading element at index $index: $e\n$stackTrace', + ); + } } } @@ -791,6 +875,14 @@ class ParseLiveListElement { bool get loaded => _loaded; + /// Emits an error to the stream for listeners to handle. + /// Used when lazy loading fails to fetch the full object data. + void emitError(Object error, StackTrace stackTrace) { + if (!_streamController.isClosed) { + _streamController.addError(error, stackTrace); + } + } + void dispose() { _unsubscribe(_updatedSubItems); _streamController.close(); diff --git a/packages/dart/test/src/utils/parse_live_list_test.dart b/packages/dart/test/src/utils/parse_live_list_test.dart new file mode 100644 index 000000000..bda976a2c --- /dev/null +++ b/packages/dart/test/src/utils/parse_live_list_test.dart @@ -0,0 +1,337 @@ +import 'dart:async'; + +import 'package:parse_server_sdk/parse_server_sdk.dart'; +import 'package:test/test.dart'; + +import '../../test_utils.dart'; + +// NOTE: ParseLiveList Stream Architecture Documentation +// ====================================================== +// +// STREAM IMPLEMENTATION: +// --------------------- +// ParseLiveList.getAt() returns a broadcast stream for each element: +// +// Stream getAt(final int index) { +// if (index >= _list.length) { +// return const Stream.empty(); +// } +// final element = _list[index]; +// if (!element.loaded) { +// _loadElementAt(index); // Loads data on first access +// } +// return element.stream; // Returns broadcast stream +// } +// +// BROADCAST STREAM BENEFITS: +// ------------------------- +// ParseLiveListElement uses StreamController.broadcast(), which: +// - Allows multiple listeners on the same stream without errors +// - Shares the stream instance across all subscribers +// - Calls _loadElementAt() only once per element, regardless of subscription count +// - Prevents N+1 query problems where multiple subscriptions trigger multiple network requests +// +// IMPLEMENTATION REQUIREMENTS: +// --------------------------- +// The implementation must maintain these characteristics: +// 1. getAt() returns element.stream directly (NOT an async* generator) +// 2. ParseLiveListElement._streamController uses StreamController.broadcast() +// 3. Multiple calls to getAt(index) return the same underlying broadcast stream +// 4. Element loading occurs at most once per element +// +// TESTING LIMITATIONS: +// ------------------- +// Unit tests cannot directly verify this architecture because: +// 1. Stream identity cannot be tested (stream getters create wrapper instances) +// 2. Async* generators vs regular functions cannot be distinguished from outside +// 3. Query execution counts require integration testing with network layer monitoring +// +// Therefore, these tests verify supporting implementation details and behaviors, +// but code review is required to ensure the core architecture is maintained. + +void main() { + setUpAll(() async { + await initializeParse(); + }); + + group('ParseLiveList - Implementation Details', () { + test('lazyLoading=false marks elements as loaded immediately', () async { + // When lazy loading is disabled, all object fields are fetched in the + // initial query, so elements are marked as loaded=true immediately. + // This prevents unnecessary _loadElementAt() calls since all data + // is already available. + + const lazyLoading = false; // Fetch all fields upfront + + // Implementation behavior with lazyLoading=false: + // - Initial query fetches all object fields + // - Elements are marked loaded=true + // - getAt() returns streams without triggering additional loads + + final element = ParseLiveListElement( + ParseObject('TestClass')..objectId = 'test1', + loaded: !lazyLoading, // Should be true when lazyLoading=false + updatedSubItems: {}, + ); + + expect( + element.loaded, + true, + reason: 'Elements should be marked loaded when lazyLoading=false', + ); + }); + + test( + 'lazyLoading=true with empty preloadedColumns fetches all fields', + () async { + // When lazyLoading is enabled but preloadedColumns is empty or null, + // field restriction is not applied and all object fields are fetched + // in the initial query, resulting in elements marked as loaded=true. + + const lazyLoading = true; + const preloadedColumns = []; // Empty! + + // Logic: fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty + // fieldsRestricted = true && false = false + // loaded = !fieldsRestricted = !false = true + final fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty; + + final element = ParseLiveListElement( + ParseObject('TestClass')..objectId = 'test1', + loaded: !fieldsRestricted, // Should be true (no fields restricted) + updatedSubItems: {}, + ); + + expect( + element.loaded, + true, + reason: + 'Elements should be marked loaded when lazyLoading=true but preloadedColumns is empty', + ); + }, + ); + + test( + 'lazyLoading=true with preloadedColumns restricts initial query', + () async { + // When lazy loading is enabled with specified preloadedColumns, + // the initial query fetches only those fields, and elements are + // marked as loaded=false. Full object data is loaded on-demand + // when getAt() is called. + + const lazyLoading = true; + const preloadedColumns = ['name', 'order']; // Not empty! + + // Logic: fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty + // fieldsRestricted = true && true = true + // loaded = !fieldsRestricted = !true = false + final fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty; + + final element = ParseLiveListElement( + ParseObject('TestClass')..objectId = 'test1', + loaded: !fieldsRestricted, // Should be false (fields were restricted) + updatedSubItems: {}, + ); + + expect( + element.loaded, + false, + reason: + 'Elements should be marked not loaded when lazyLoading=true WITH preloadedColumns', + ); + }, + ); + + test('lazyLoading=false should NOT restrict fields automatically', () { + // When lazy loading is disabled, fetch all fields + final query = QueryBuilder(ParseObject('Room')) + ..orderByAscending('order'); + + final queryCopy = QueryBuilder.copy(query); + + expect( + queryCopy.limiters.containsKey('keys'), + false, + reason: + 'ParseLiveList should not restrict fields when lazyLoading=false', + ); + }); + + test('lazyLoading=true with preloadedColumns should restrict fields', () { + // When lazy loading with preloaded columns, the initial query should + // only fetch those columns + final query = QueryBuilder(ParseObject('Room')) + ..orderByAscending('order') + ..keysToReturn(['name', 'order']); // Simulating what _runQuery does + + final queryCopy = QueryBuilder.copy(query); + + expect( + queryCopy.limiters.containsKey('keys'), + true, + reason: + 'ParseLiveList should restrict fields when lazyLoading=true with preloadedColumns', + ); + }); + }); + + group('ParseLiveList - Stream Creation Bug', () { + test( + 'getAt() creates a new stream each time it is called (demonstrates the bug)', + () async { + // This test demonstrates the architectural issue: getAt() is an async* generator + // that creates a NEW stream every time it's called, rather than returning a + // cached/reusable stream. + + // We can't easily test the full ParseLiveList without a real server, but we can + // demonstrate the stream behavior by examining the method signature and behavior. + + // The bug is in this pattern (from parse_live_list.dart line 489): + // Stream getAt(final int index) async* { ... } + // + // This is an async generator function. Each call creates a NEW Stream instance. + + // Here's a simplified demonstration of the problem: + final streams = >[]; + + Stream createStream() async* { + yield 1; + yield 2; + } + + // Each call creates a different stream instance + streams.add(createStream()); + streams.add(createStream()); + streams.add(createStream()); + + // Verify they are different instances + expect( + identical(streams[0], streams[1]), + false, + reason: 'async* generator creates new stream on each call', + ); + expect( + identical(streams[1], streams[2]), + false, + reason: 'async* generator creates new stream on each call', + ); + }, + ); + + test( + 'broadcast streams can have multiple listeners (solution approach)', + () async { + // This demonstrates the solution: using a broadcast StreamController + // that can be subscribed to multiple times + + final controller = StreamController.broadcast(); + + final values1 = []; + final values2 = []; + final values3 = []; + + // Multiple subscriptions to the SAME stream + final sub1 = controller.stream.listen(values1.add); + final sub2 = controller.stream.listen(values2.add); + final sub3 = controller.stream.listen(values3.add); + + // Add values + controller.add(1); + controller.add(2); + + await Future.delayed(const Duration(milliseconds: 50)); + + // All listeners receive the same values + expect(values1, [1, 2]); + expect(values2, [1, 2]); + expect(values3, [1, 2]); + + // The key is that the broadcast stream can be listened to multiple times + // (unlike async* generators which create new streams each time) + expect( + controller.stream.isBroadcast, + true, + reason: 'Broadcast stream allows multiple listeners', + ); + + await sub1.cancel(); + await sub2.cancel(); + await sub3.cancel(); + await controller.close(); + }, + ); + + test('async* generator vs broadcast stream behavior difference', () async { + // This test clearly shows the difference between the two approaches + + int generatorCallCount = 0; + + // Approach 1: async* generator (CURRENT - PROBLEMATIC) + Stream generatorApproach() async* { + generatorCallCount++; + yield 1; + } + + // Each call creates new stream and executes the function + final genStream1 = generatorApproach(); + final genStream2 = generatorApproach(); + final genStream3 = generatorApproach(); + + expect( + generatorCallCount, + 0, + reason: 'Generator not executed until subscribed', + ); + + await genStream1.first; + expect(generatorCallCount, 1); + + await genStream2.first; + expect( + generatorCallCount, + 2, + reason: 'Each stream subscription triggers generator', + ); + + await genStream3.first; + expect( + generatorCallCount, + 3, + reason: 'Third subscription triggers third execution', + ); + + // Approach 2: broadcast stream (SOLUTION) + int broadcastInitCount = 0; + + final broadcastController = StreamController.broadcast(); + + // Initialization happens once + void initBroadcast() { + broadcastInitCount++; + broadcastController.add(1); + } + + initBroadcast(); + expect(broadcastInitCount, 1); + + // Multiple subscriptions to same stream - no re-initialization + final sub1 = broadcastController.stream.listen((_) {}); + expect(broadcastInitCount, 1, reason: 'No additional initialization'); + + final sub2 = broadcastController.stream.listen((_) {}); + expect( + broadcastInitCount, + 1, + reason: 'Still no additional initialization', + ); + + final sub3 = broadcastController.stream.listen((_) {}); + expect(broadcastInitCount, 1, reason: 'Stream reused, not recreated'); + + await sub1.cancel(); + await sub2.cancel(); + await sub3.cancel(); + await broadcastController.close(); + }); + }); +} From 007797a76df58b673610fbd961f4f09f5668c9ab Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Thu, 4 Dec 2025 14:02:16 -0600 Subject: [PATCH 02/16] Address multiple PR feedback items --- .../dart/lib/src/utils/parse_live_list.dart | 40 ++++++++++++++++--- .../test/src/utils/parse_live_list_test.dart | 9 +++-- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index c33c13a5f..3734a7df9 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -9,6 +9,7 @@ class ParseLiveList { List? preloadedColumns, }) : _preloadedColumns = preloadedColumns ?? const [] { _debug = isDebugEnabled(); + _debugLoggedInit = isDebugEnabled(); } /// Creates a new [ParseLiveList] for the given [query]. @@ -59,6 +60,8 @@ class ParseLiveList { late StreamController> _eventStreamController; int _nextID = 0; late bool _debug; + // Separate from _debug to allow one-time initialization logging without affecting error logging + late bool _debugLoggedInit; int get nextID => _nextID++; @@ -148,8 +151,11 @@ class ParseLiveList { Future _runQuery() async { final QueryBuilder query = QueryBuilder.copy(_query); - if (_debug) { - print('ParseLiveList: lazyLoading is ${_lazyLoading ? 'on' : 'off'}'); + + // Log lazy loading mode only once during initialization to avoid log spam + if (_debugLoggedInit) { + print('ParseLiveList: Initialized with lazyLoading=${_lazyLoading ? 'on' : 'off'}, preloadedColumns=${_preloadedColumns.isEmpty ? 'none' : _preloadedColumns.join(", ")}'); + _debugLoggedInit = false; } // Only restrict fields if lazy loading is enabled AND preloaded columns are specified @@ -553,11 +559,15 @@ class ParseLiveList { final element = _list[index]; - // Double-check: another call might have started loading already - if (element.loaded) { + // Race condition protection: skip if element is already loaded or + // currently being loaded by another concurrent call + if (element.loaded || element._isLoading) { return; } + // Set loading flag to prevent concurrent load operations + element._isLoading = true; + try { final QueryBuilder queryBuilder = QueryBuilder.copy(_query) ..whereEqualTo( @@ -579,6 +589,14 @@ class ParseLiveList { if (response.success && response.results != null && response.results!.isNotEmpty) { + // Verify we're still updating the same object (list may have been modified) + final currentElement = _list[index]; + if (currentElement.object.objectId != element.object.objectId) { + if (_debug) { + print('ParseLiveList: Element at index $index changed during load'); + } + return; + } // Setting the object will mark it as loaded and emit it to the stream _list[index].object = response.results!.first; } else if (response.error != null) { @@ -589,6 +607,13 @@ class ParseLiveList { 'ParseLiveList: Error loading element at index $index: ${response.error}', ); } + } else { + // Object not found (possibly deleted between initial query and load) + if (_debug) { + print( + 'ParseLiveList: Element at index $index not found during load', + ); + } } } catch (e, stackTrace) { // Emit exception to the element's stream @@ -598,6 +623,9 @@ class ParseLiveList { 'ParseLiveList: Exception loading element at index $index: $e\n$stackTrace', ); } + } finally { + // Clear loading flag to allow future retry attempts + element._isLoading = false; } } @@ -734,7 +762,8 @@ class ParseLiveListElement { this._object, { bool loaded = false, Map? updatedSubItems, - }) : _loaded = loaded { + }) : _loaded = loaded, + _isLoading = false { _updatedSubItems = _toSubscriptionMap( updatedSubItems ?? {}, ); @@ -747,6 +776,7 @@ class ParseLiveListElement { final StreamController _streamController = StreamController.broadcast(); T _object; bool _loaded = false; + bool _isLoading = false; late Map _updatedSubItems; LiveQuery? _liveQuery; final Future _subscriptionQueue = Future.value(); diff --git a/packages/dart/test/src/utils/parse_live_list_test.dart b/packages/dart/test/src/utils/parse_live_list_test.dart index bda976a2c..9b515d15a 100644 --- a/packages/dart/test/src/utils/parse_live_list_test.dart +++ b/packages/dart/test/src/utils/parse_live_list_test.dart @@ -143,7 +143,9 @@ void main() { ); test('lazyLoading=false should NOT restrict fields automatically', () { - // When lazy loading is disabled, fetch all fields + // Verifies baseline: a fresh QueryBuilder has no 'keys' restriction. + // The actual _runQuery() behavior with lazyLoading=false is tested + // indirectly through the loaded flag tests above. final query = QueryBuilder(ParseObject('Room')) ..orderByAscending('order'); @@ -158,8 +160,9 @@ void main() { }); test('lazyLoading=true with preloadedColumns should restrict fields', () { - // When lazy loading with preloaded columns, the initial query should - // only fetch those columns + // Verifies that keysToReturn() sets the 'keys' limiter as expected. + // Note: This simulates _runQuery() behavior; actual integration testing + // would require mocking the network layer. final query = QueryBuilder(ParseObject('Room')) ..orderByAscending('order') ..keysToReturn(['name', 'order']); // Simulating what _runQuery does From 68668344afe1103bdcf7b9ae150780d705a5b8b3 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Thu, 4 Dec 2025 14:09:59 -0600 Subject: [PATCH 03/16] Fix stale reference Documentation and formatting fixes --- .../dart/lib/src/utils/parse_live_list.dart | 20 ++++++++++--------- .../test/src/utils/parse_live_list_test.dart | 8 ++++---- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index 3734a7df9..05af7a7da 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -154,7 +154,9 @@ class ParseLiveList { // Log lazy loading mode only once during initialization to avoid log spam if (_debugLoggedInit) { - print('ParseLiveList: Initialized with lazyLoading=${_lazyLoading ? 'on' : 'off'}, preloadedColumns=${_preloadedColumns.isEmpty ? 'none' : _preloadedColumns.join(", ")}'); + print( + 'ParseLiveList: Initialized with lazyLoading=${_lazyLoading ? 'on' : 'off'}, preloadedColumns=${_preloadedColumns.isEmpty ? 'none' : _preloadedColumns.join(", ")}', + ); _debugLoggedInit = false; } @@ -162,7 +164,7 @@ class ParseLiveList { // This allows fetching minimal data upfront and loading full objects on-demand if (_lazyLoading && _preloadedColumns.isNotEmpty) { final List keys = _preloadedColumns.toList(); - + // Automatically include order fields to ensure sorting works correctly if (query.limiters.containsKey('order')) { keys.addAll( @@ -174,7 +176,7 @@ class ParseLiveList { }), ); } - + query.keysToReturn(keys); } @@ -538,7 +540,7 @@ class ParseLiveList { final element = _list[index]; // If not yet loaded (happens with lazy loading), trigger loading - // This will only happen once per element due to the loaded flag + // This will only happen once per element due to the loaded and _isLoading flags if (!element.loaded) { _loadElementAt(index); } @@ -601,7 +603,8 @@ class ParseLiveList { _list[index].object = response.results!.first; } else if (response.error != null) { // Emit error to the element's stream so listeners can handle it - element.emitError(response.error!, StackTrace.current); + // Use _list[index] to ensure we emit to the current element at this index + _list[index].emitError(response.error!, StackTrace.current); if (_debug) { print( 'ParseLiveList: Error loading element at index $index: ${response.error}', @@ -610,14 +613,13 @@ class ParseLiveList { } else { // Object not found (possibly deleted between initial query and load) if (_debug) { - print( - 'ParseLiveList: Element at index $index not found during load', - ); + print('ParseLiveList: Element at index $index not found during load'); } } } catch (e, stackTrace) { // Emit exception to the element's stream - element.emitError(e, stackTrace); + // Use _list[index] to ensure we emit to the current element at this index + _list[index].emitError(e, stackTrace); if (_debug) { print( 'ParseLiveList: Exception loading element at index $index: $e\n$stackTrace', diff --git a/packages/dart/test/src/utils/parse_live_list_test.dart b/packages/dart/test/src/utils/parse_live_list_test.dart index 9b515d15a..479476d22 100644 --- a/packages/dart/test/src/utils/parse_live_list_test.dart +++ b/packages/dart/test/src/utils/parse_live_list_test.dart @@ -180,11 +180,11 @@ void main() { group('ParseLiveList - Stream Creation Bug', () { test( - 'getAt() creates a new stream each time it is called (demonstrates the bug)', + 'async* generators create new streams on each call (educational context)', () async { - // This test demonstrates the architectural issue: getAt() is an async* generator - // that creates a NEW stream every time it's called, rather than returning a - // cached/reusable stream. + // This test demonstrates async* generator behavior that contributed to the bug. + // It's educational context, not a test of the actual ParseLiveList bug. + // The real bug required integration testing with network request monitoring. // We can't easily test the full ParseLiveList without a real server, but we can // demonstrate the stream behavior by examining the method signature and behavior. From fc46d7a536861c597226dbdd52e927ca0e22fc53 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Thu, 4 Dec 2025 14:49:54 -0600 Subject: [PATCH 04/16] Apply concurrent modification guards to error handler Tighten encapsulation on ParseLiveListElement Improve comments --- .../dart/lib/src/utils/parse_live_list.dart | 34 +++++++++++++++---- .../test/src/utils/parse_live_list_test.dart | 7 ++++ 2 files changed, 34 insertions(+), 7 deletions(-) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index 05af7a7da..eea88ee37 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -60,7 +60,8 @@ class ParseLiveList { late StreamController> _eventStreamController; int _nextID = 0; late bool _debug; - // Separate from _debug to allow one-time initialization logging without affecting error logging + // Separate from _debug to allow one-time initialization logging + // while still logging all errors/warnings when _debug is true late bool _debugLoggedInit; int get nextID => _nextID++; @@ -532,7 +533,7 @@ class ParseLiveList { /// The returned stream is a broadcast stream from ParseLiveListElement, /// preventing the N+1 query bug that occurred with async* generators. Stream getAt(final int index) { - if (index >= _list.length) { + if (index < 0 || index >= _list.length) { // Return an empty stream for out-of-bounds indices return const Stream.empty(); } @@ -563,12 +564,12 @@ class ParseLiveList { // Race condition protection: skip if element is already loaded or // currently being loaded by another concurrent call - if (element.loaded || element._isLoading) { + if (element.loaded || element.isLoading) { return; } // Set loading flag to prevent concurrent load operations - element._isLoading = true; + element.isLoading = true; try { final QueryBuilder queryBuilder = QueryBuilder.copy(_query) @@ -612,14 +613,30 @@ class ParseLiveList { } } else { // Object not found (possibly deleted between initial query and load) + // Don't emit error - LiveQuery will send a deletion event to handle this if (_debug) { print('ParseLiveList: Element at index $index not found during load'); } } } catch (e, stackTrace) { + // List may have changed while the query was in flight + if (_list.isEmpty || index >= _list.length) { + if (_debug) { + print('ParseLiveList: List was modified during element load (exception)'); + } + return; + } + + final currentElement = _list[index]; + if (currentElement.object.objectId != element.object.objectId) { + if (_debug) { + print('ParseLiveList: Element at index $index changed during load (exception)'); + } + return; + } + // Emit exception to the element's stream - // Use _list[index] to ensure we emit to the current element at this index - _list[index].emitError(e, stackTrace); + currentElement.emitError(e, stackTrace); if (_debug) { print( 'ParseLiveList: Exception loading element at index $index: $e\n$stackTrace', @@ -627,7 +644,7 @@ class ParseLiveList { } } finally { // Clear loading flag to allow future retry attempts - element._isLoading = false; + element.isLoading = false; } } @@ -907,6 +924,9 @@ class ParseLiveListElement { bool get loaded => _loaded; + bool get isLoading => _isLoading; + set isLoading(bool value) => _isLoading = value; + /// Emits an error to the stream for listeners to handle. /// Used when lazy loading fails to fetch the full object data. void emitError(Object error, StackTrace stackTrace) { diff --git a/packages/dart/test/src/utils/parse_live_list_test.dart b/packages/dart/test/src/utils/parse_live_list_test.dart index 479476d22..473c25d13 100644 --- a/packages/dart/test/src/utils/parse_live_list_test.dart +++ b/packages/dart/test/src/utils/parse_live_list_test.dart @@ -48,6 +48,13 @@ import '../../test_utils.dart'; // // Therefore, these tests verify supporting implementation details and behaviors, // but code review is required to ensure the core architecture is maintained. +// +// INTEGRATION TESTING RECOMMENDATIONS: +// ------------------------------------ +// To fully verify the N+1 query fix, integration tests should: +// 1. Monitor actual network requests to the Parse server +// 2. Subscribe to the same element multiple times +// 3. Verify only one query is executed regardless of subscription count void main() { setUpAll(() async { From 2f034af2d2445067c9288326c6f706013d255249 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Thu, 4 Dec 2025 14:58:25 -0600 Subject: [PATCH 05/16] dart format --- packages/dart/lib/src/utils/parse_live_list.dart | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index eea88ee37..7fa134d45 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -622,7 +622,9 @@ class ParseLiveList { // List may have changed while the query was in flight if (_list.isEmpty || index >= _list.length) { if (_debug) { - print('ParseLiveList: List was modified during element load (exception)'); + print( + 'ParseLiveList: List was modified during element load (exception)', + ); } return; } @@ -630,7 +632,9 @@ class ParseLiveList { final currentElement = _list[index]; if (currentElement.object.objectId != element.object.objectId) { if (_debug) { - print('ParseLiveList: Element at index $index changed during load (exception)'); + print( + 'ParseLiveList: Element at index $index changed during load (exception)', + ); } return; } From ce4ed3ed41e0dce586c6999d3370dacc490aeca1 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Thu, 4 Dec 2025 16:00:30 -0600 Subject: [PATCH 06/16] Address coderabbitai feedback --- .../dart/lib/src/utils/parse_live_list.dart | 22 ++++++++++++------- packages/dart/lib/src/utils/parse_utils.dart | 4 ---- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index 7fa134d45..aef094e13 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -603,9 +603,18 @@ class ParseLiveList { // Setting the object will mark it as loaded and emit it to the stream _list[index].object = response.results!.first; } else if (response.error != null) { - // Emit error to the element's stream so listeners can handle it - // Use _list[index] to ensure we emit to the current element at this index - _list[index].emitError(response.error!, StackTrace.current); + // Emit error to the element's stream so listeners can handle it. + // Guard against list mutations so we don't emit on the wrong element. + final currentElement = _list[index]; + if (currentElement.object.objectId != element.object.objectId) { + if (_debug) { + print( + 'ParseLiveList: Element at index $index changed during load (error)', + ); + } + return; + } + currentElement.emitError(response.error!, StackTrace.current); if (_debug) { print( 'ParseLiveList: Error loading element at index $index: ${response.error}', @@ -786,7 +795,7 @@ class ParseLiveListElement { bool loaded = false, Map? updatedSubItems, }) : _loaded = loaded, - _isLoading = false { + isLoading = false { _updatedSubItems = _toSubscriptionMap( updatedSubItems ?? {}, ); @@ -799,7 +808,7 @@ class ParseLiveListElement { final StreamController _streamController = StreamController.broadcast(); T _object; bool _loaded = false; - bool _isLoading = false; + bool isLoading = false; late Map _updatedSubItems; LiveQuery? _liveQuery; final Future _subscriptionQueue = Future.value(); @@ -928,9 +937,6 @@ class ParseLiveListElement { bool get loaded => _loaded; - bool get isLoading => _isLoading; - set isLoading(bool value) => _isLoading = value; - /// Emits an error to the stream for listeners to handle. /// Used when lazy loading fails to fetch the full object data. void emitError(Object error, StackTrace stackTrace) { diff --git a/packages/dart/lib/src/utils/parse_utils.dart b/packages/dart/lib/src/utils/parse_utils.dart index 32e018f42..aa2b100f9 100644 --- a/packages/dart/lib/src/utils/parse_utils.dart +++ b/packages/dart/lib/src/utils/parse_utils.dart @@ -123,10 +123,6 @@ Future batchRequest( } } -Stream _createStreamError(Object error) async* { - throw error; -} - List removeDuplicateParseObjectByObjectId(Iterable iterable) { final list = iterable.toList(); From 37fa8667b1556d7229e293123a2fe0a7d8ade547 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Thu, 4 Dec 2025 16:21:40 -0600 Subject: [PATCH 07/16] More coderabbitai feedback --- packages/dart/lib/src/utils/parse_live_list.dart | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index aef094e13..514a22fb3 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -258,6 +258,11 @@ class ParseLiveList { final List newList = parseResponse.results as List? ?? []; + // Determine if fields were actually restricted in the query, + // same logic as in _init(). + final bool fieldsRestricted = + _lazyLoading && _preloadedColumns.isNotEmpty; + //update List for (int i = 0; i < _list.length; i++) { final ParseObject currentObject = _list[i].object; @@ -300,7 +305,11 @@ class ParseLiveList { } for (int i = 0; i < newList.length; i++) { - tasks.add(_objectAdded(newList[i], loaded: false)); + // Mark as loaded when all fields were fetched; only treat as + // not loaded when fields are actually restricted. + tasks.add( + _objectAdded(newList[i], loaded: !fieldsRestricted), + ); } } await Future.wait(tasks); @@ -556,7 +565,7 @@ class ParseLiveList { /// Called when an element is accessed for the first time. /// Errors are emitted to the element's stream so listeners can handle them. Future _loadElementAt(int index) async { - if (index >= _list.length) { + if (index < 0 || index >= _list.length) { return; } From 14d6ba8c7e09165eae561fe567d8981e3460a2a2 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Thu, 4 Dec 2025 16:38:57 -0600 Subject: [PATCH 08/16] Fix code analysis failure --- packages/dart/lib/src/utils/parse_live_list.dart | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index 514a22fb3..be496c815 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -732,18 +732,14 @@ class ParseLiveElement extends ParseLiveListElement { if (includeObject != null) { queryBuilder.includeObject(includeObject); } - _init(object, loaded: loaded, includeObject: includeObject); + _init(object, loaded: loaded); } Subscription? _subscription; Map? _includes; late QueryBuilder queryBuilder; - Future _init( - T object, { - bool loaded = false, - List? includeObject, - }) async { + Future _init(T object, {bool loaded = false}) async { if (!loaded) { final ParseResponse parseResponse = await queryBuilder.query(); if (parseResponse.success) { From 794821df47c7d45150a59d3edd5fe70677c0f6c8 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Thu, 4 Dec 2025 16:54:24 -0600 Subject: [PATCH 09/16] Address coderabbitai nitpicks --- packages/dart/lib/src/utils/parse_live_list.dart | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index be496c815..04ef057e3 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -178,7 +178,8 @@ class ParseLiveList { ); } - query.keysToReturn(keys); + // Deduplicate keys to minimize request size + query.keysToReturn(keys.toSet().toList()); } return await query.query(); @@ -630,8 +631,12 @@ class ParseLiveList { ); } } else { - // Object not found (possibly deleted between initial query and load) - // Don't emit error - LiveQuery will send a deletion event to handle this + // Object not found (possibly deleted between initial query and load). + // Note: Element remains loaded=false, so subsequent getAt() calls will + // retry the query. This is acceptable because: + // 1. LiveQuery will send a delete event to remove the element if needed + // 2. Retries are rare (object would need to be deleted mid-load) + // 3. No error is emitted to avoid alarming users for transient issues if (_debug) { print('ParseLiveList: Element at index $index not found during load'); } @@ -732,6 +737,8 @@ class ParseLiveElement extends ParseLiveListElement { if (includeObject != null) { queryBuilder.includeObject(includeObject); } + // Fire-and-forget initialization; errors surface through element stream + // ignore: unawaited_futures _init(object, loaded: loaded); } From ac234280a4be49fb8a8f2050f41f3f276dc5a6fa Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Fri, 5 Dec 2025 08:47:50 -0600 Subject: [PATCH 10/16] Update packages/dart/test/src/utils/parse_live_list_test.dart Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/dart/test/src/utils/parse_live_list_test.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/dart/test/src/utils/parse_live_list_test.dart b/packages/dart/test/src/utils/parse_live_list_test.dart index 473c25d13..5c3ed3aa6 100644 --- a/packages/dart/test/src/utils/parse_live_list_test.dart +++ b/packages/dart/test/src/utils/parse_live_list_test.dart @@ -276,7 +276,7 @@ void main() { int generatorCallCount = 0; - // Approach 1: async* generator (CURRENT - PROBLEMATIC) + // Approach 1: async* generator (OLD IMPLEMENTATION - PROBLEMATIC) Stream generatorApproach() async* { generatorCallCount++; yield 1; From b125b02fe4f39ffd8837ac3dc382263abeb67c34 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Fri, 5 Dec 2025 08:48:07 -0600 Subject: [PATCH 11/16] Update packages/dart/lib/src/utils/parse_live_list.dart Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/dart/lib/src/utils/parse_live_list.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index 04ef057e3..45a6241f3 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -551,7 +551,7 @@ class ParseLiveList { final element = _list[index]; // If not yet loaded (happens with lazy loading), trigger loading - // This will only happen once per element due to the loaded and _isLoading flags + // This will only happen once per element due to the loaded and isLoading flags if (!element.loaded) { _loadElementAt(index); } From ef2f9af7936278da6474d402ed77475812346053 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Fri, 5 Dec 2025 08:48:23 -0600 Subject: [PATCH 12/16] Update packages/dart/lib/src/utils/parse_live_list.dart Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/dart/lib/src/utils/parse_live_list.dart | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index 45a6241f3..803a9831a 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -820,6 +820,8 @@ class ParseLiveListElement { final StreamController _streamController = StreamController.broadcast(); T _object; bool _loaded = false; + /// Indicates whether this element is currently being loaded. + /// Used to prevent concurrent load operations. bool isLoading = false; late Map _updatedSubItems; LiveQuery? _liveQuery; From 39a8ba4ea6e39dda48bce6f0a2eb257833c41083 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Fri, 5 Dec 2025 08:48:44 -0600 Subject: [PATCH 13/16] Update packages/dart/test/src/utils/parse_live_list_test.dart Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/dart/test/src/utils/parse_live_list_test.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/dart/test/src/utils/parse_live_list_test.dart b/packages/dart/test/src/utils/parse_live_list_test.dart index 5c3ed3aa6..0eabfff6f 100644 --- a/packages/dart/test/src/utils/parse_live_list_test.dart +++ b/packages/dart/test/src/utils/parse_live_list_test.dart @@ -99,7 +99,7 @@ void main() { const preloadedColumns = []; // Empty! // Logic: fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty - // fieldsRestricted = true && false = false + // fieldsRestricted evaluates to (true && false) = false // loaded = !fieldsRestricted = !false = true final fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty; From a9142767c3b2deb140849d4e483bf808812bdcfe Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Fri, 5 Dec 2025 08:49:32 -0600 Subject: [PATCH 14/16] Update packages/dart/test/src/utils/parse_live_list_test.dart Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/dart/test/src/utils/parse_live_list_test.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/dart/test/src/utils/parse_live_list_test.dart b/packages/dart/test/src/utils/parse_live_list_test.dart index 0eabfff6f..d1fbf84c0 100644 --- a/packages/dart/test/src/utils/parse_live_list_test.dart +++ b/packages/dart/test/src/utils/parse_live_list_test.dart @@ -130,7 +130,7 @@ void main() { const preloadedColumns = ['name', 'order']; // Not empty! // Logic: fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty - // fieldsRestricted = true && true = true + // fieldsRestricted evaluates to (true && true) = true // loaded = !fieldsRestricted = !true = false final fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty; From 8f89f91a8767ae98fb7d1ea2f44e1781d0d6d35c Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Fri, 5 Dec 2025 08:52:01 -0600 Subject: [PATCH 15/16] Update packages/dart/lib/src/utils/parse_live_list.dart Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- packages/dart/lib/src/utils/parse_live_list.dart | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index 803a9831a..ccfac8790 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -806,8 +806,7 @@ class ParseLiveListElement { this._object, { bool loaded = false, Map? updatedSubItems, - }) : _loaded = loaded, - isLoading = false { + }) : _loaded = loaded { _updatedSubItems = _toSubscriptionMap( updatedSubItems ?? {}, ); From ee5e2dc8fe96b4b0b8f9d7fcaeec36d5bf54c3d2 Mon Sep 17 00:00:00 2001 From: Kirk Morrow <9563562+kirkmorrow@users.noreply.github.com> Date: Fri, 5 Dec 2025 08:58:08 -0600 Subject: [PATCH 16/16] dart format --- packages/dart/lib/src/utils/parse_live_list.dart | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index ccfac8790..e06ef0ed4 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -819,6 +819,7 @@ class ParseLiveListElement { final StreamController _streamController = StreamController.broadcast(); T _object; bool _loaded = false; + /// Indicates whether this element is currently being loaded. /// Used to prevent concurrent load operations. bool isLoading = false;