diff --git a/packages/dart/lib/src/utils/parse_live_list.dart b/packages/dart/lib/src/utils/parse_live_list.dart index 86ad72ad3..e06ef0ed4 100644 --- a/packages/dart/lib/src/utils/parse_live_list.dart +++ b/packages/dart/lib/src/utils/parse_live_list.dart @@ -9,8 +9,23 @@ class ParseLiveList { List? preloadedColumns, }) : _preloadedColumns = preloadedColumns ?? const [] { _debug = isDebugEnabled(); + _debugLoggedInit = isDebugEnabled(); } + /// Creates a new [ParseLiveList] for the given [query]. + /// + /// [lazyLoading] enables lazy loading of full object data. When `true` and + /// [preloadedColumns] is provided, the initial query fetches only those columns, + /// and full objects are loaded on-demand when accessed via [getAt]. + /// When [preloadedColumns] is empty or null, all fields are fetched regardless + /// of [lazyLoading] value. Default is `true`. + /// + /// [preloadedColumns] specifies which fields to fetch in the initial query when + /// lazy loading is enabled. Order fields are automatically included to ensure + /// proper sorting. If null or empty, all fields are fetched. + /// + /// [listenOnAllSubItems] and [listeningIncludes] control which nested objects + /// receive live query updates. static Future> create( QueryBuilder query, { bool? listenOnAllSubItems, @@ -26,7 +41,7 @@ class ParseLiveList { ) : _toIncludeMap(listeningIncludes ?? []), lazyLoading, - preloadedColumns: preloadedColumns ?? const [], + preloadedColumns: preloadedColumns, ); return parseLiveList._init().then((_) { @@ -45,6 +60,9 @@ class ParseLiveList { late StreamController> _eventStreamController; int _nextID = 0; late bool _debug; + // Separate from _debug to allow one-time initialization logging + // while still logging all errors/warnings when _debug is true + late bool _debugLoggedInit; int get nextID => _nextID++; @@ -134,12 +152,22 @@ class ParseLiveList { Future _runQuery() async { final QueryBuilder query = QueryBuilder.copy(_query); - if (_debug) { - print('ParseLiveList: lazyLoading is ${_lazyLoading ? 'on' : 'off'}'); + + // Log lazy loading mode only once during initialization to avoid log spam + if (_debugLoggedInit) { + print( + 'ParseLiveList: Initialized with lazyLoading=${_lazyLoading ? 'on' : 'off'}, preloadedColumns=${_preloadedColumns.isEmpty ? 'none' : _preloadedColumns.join(", ")}', + ); + _debugLoggedInit = false; } - if (_lazyLoading) { + + // Only restrict fields if lazy loading is enabled AND preloaded columns are specified + // This allows fetching minimal data upfront and loading full objects on-demand + if (_lazyLoading && _preloadedColumns.isNotEmpty) { final List keys = _preloadedColumns.toList(); - if (_lazyLoading && query.limiters.containsKey('order')) { + + // Automatically include order fields to ensure sorting works correctly + if (query.limiters.containsKey('order')) { keys.addAll( query.limiters['order'].toString().split(',').map((String string) { if (string.startsWith('-')) { @@ -149,10 +177,11 @@ class ParseLiveList { }), ); } - if (keys.isNotEmpty) { - query.keysToReturn(keys); - } + + // Deduplicate keys to minimize request size + query.keysToReturn(keys.toSet().toList()); } + return await query.query(); } @@ -161,13 +190,20 @@ class ParseLiveList { final ParseResponse parseResponse = await _runQuery(); if (parseResponse.success) { + // Determine if fields were actually restricted in the query + // Only mark as not loaded if lazy loading AND we actually restricted fields + final bool fieldsRestricted = + _lazyLoading && _preloadedColumns.isNotEmpty; + _list = parseResponse.results ?.map>( (dynamic element) => ParseLiveListElement( element, updatedSubItems: _listeningIncludes, - loaded: !_lazyLoading, + // Mark as loaded if we fetched all fields (no restriction) + // Mark as not loaded only if fields were actually restricted + loaded: !fieldsRestricted, ), ) .toList() ?? @@ -223,6 +259,11 @@ class ParseLiveList { final List newList = parseResponse.results as List? ?? []; + // Determine if fields were actually restricted in the query, + // same logic as in _init(). + final bool fieldsRestricted = + _lazyLoading && _preloadedColumns.isNotEmpty; + //update List for (int i = 0; i < _list.length; i++) { final ParseObject currentObject = _list[i].object; @@ -265,7 +306,11 @@ class ParseLiveList { } for (int i = 0; i < newList.length; i++) { - tasks.add(_objectAdded(newList[i], loaded: false)); + // Mark as loaded when all fields were fetched; only treat as + // not loaded when fields are actually restricted. + tasks.add( + _objectAdded(newList[i], loaded: !fieldsRestricted), + ); } } await Future.wait(tasks); @@ -486,34 +531,147 @@ class ParseLiveList { } } - Stream getAt(final int index) async* { - if (index < _list.length) { - if (!_list[index].loaded) { - final QueryBuilder queryBuilder = QueryBuilder.copy(_query) - ..whereEqualTo( - keyVarObjectId, - _list[index].object.get(keyVarObjectId), - ) - ..setLimit(1); - final ParseResponse response = await queryBuilder.query(); - if (_list.isEmpty) { - yield* _createStreamError( - ParseError(message: 'ParseLiveList: _list is empty'), - ); + /// Returns a stream for the element at the given [index]. + /// + /// Returns the element's existing broadcast stream, which allows multiple + /// listeners without creating redundant network requests or stream instances. + /// + /// When lazy loading is enabled and an element is not yet loaded, the first + /// access will trigger loading. This is useful for pagination scenarios. + /// Subsequent calls return the same stream without additional loads. + /// + /// The returned stream is a broadcast stream from ParseLiveListElement, + /// preventing the N+1 query bug that occurred with async* generators. + Stream getAt(final int index) { + if (index < 0 || index >= _list.length) { + // Return an empty stream for out-of-bounds indices + return const Stream.empty(); + } + + final element = _list[index]; + + // If not yet loaded (happens with lazy loading), trigger loading + // This will only happen once per element due to the loaded and isLoading flags + if (!element.loaded) { + _loadElementAt(index); + } + + // Return the element's broadcast stream + // Multiple subscriptions to this stream won't trigger multiple loads + return element.stream; + } + + /// Asynchronously loads the full data for the element at [index]. + /// + /// Called when an element is accessed for the first time. + /// Errors are emitted to the element's stream so listeners can handle them. + Future _loadElementAt(int index) async { + if (index < 0 || index >= _list.length) { + return; + } + + final element = _list[index]; + + // Race condition protection: skip if element is already loaded or + // currently being loaded by another concurrent call + if (element.loaded || element.isLoading) { + return; + } + + // Set loading flag to prevent concurrent load operations + element.isLoading = true; + + try { + final QueryBuilder queryBuilder = QueryBuilder.copy(_query) + ..whereEqualTo( + keyVarObjectId, + element.object.get(keyVarObjectId), + ) + ..setLimit(1); + + final ParseResponse response = await queryBuilder.query(); + + // Check if list was modified during async operation + if (_list.isEmpty || index >= _list.length) { + if (_debug) { + print('ParseLiveList: List was modified during element load'); + } + return; + } + + if (response.success && + response.results != null && + response.results!.isNotEmpty) { + // Verify we're still updating the same object (list may have been modified) + final currentElement = _list[index]; + if (currentElement.object.objectId != element.object.objectId) { + if (_debug) { + print('ParseLiveList: Element at index $index changed during load'); + } return; } - if (response.success) { - _list[index].object = response.results?.first; - } else { - ParseError? error = response.error; - if (error != null) yield* _createStreamError(error); + // Setting the object will mark it as loaded and emit it to the stream + _list[index].object = response.results!.first; + } else if (response.error != null) { + // Emit error to the element's stream so listeners can handle it. + // Guard against list mutations so we don't emit on the wrong element. + final currentElement = _list[index]; + if (currentElement.object.objectId != element.object.objectId) { + if (_debug) { + print( + 'ParseLiveList: Element at index $index changed during load (error)', + ); + } return; } + currentElement.emitError(response.error!, StackTrace.current); + if (_debug) { + print( + 'ParseLiveList: Error loading element at index $index: ${response.error}', + ); + } + } else { + // Object not found (possibly deleted between initial query and load). + // Note: Element remains loaded=false, so subsequent getAt() calls will + // retry the query. This is acceptable because: + // 1. LiveQuery will send a delete event to remove the element if needed + // 2. Retries are rare (object would need to be deleted mid-load) + // 3. No error is emitted to avoid alarming users for transient issues + if (_debug) { + print('ParseLiveList: Element at index $index not found during load'); + } + } + } catch (e, stackTrace) { + // List may have changed while the query was in flight + if (_list.isEmpty || index >= _list.length) { + if (_debug) { + print( + 'ParseLiveList: List was modified during element load (exception)', + ); + } + return; + } + + final currentElement = _list[index]; + if (currentElement.object.objectId != element.object.objectId) { + if (_debug) { + print( + 'ParseLiveList: Element at index $index changed during load (exception)', + ); + } + return; + } + + // Emit exception to the element's stream + currentElement.emitError(e, stackTrace); + if (_debug) { + print( + 'ParseLiveList: Exception loading element at index $index: $e\n$stackTrace', + ); } - // just for testing - // await Future.delayed(const Duration(seconds: 2)); - yield _list[index].object; - yield* _list[index].stream; + } finally { + // Clear loading flag to allow future retry attempts + element.isLoading = false; } } @@ -579,18 +737,16 @@ class ParseLiveElement extends ParseLiveListElement { if (includeObject != null) { queryBuilder.includeObject(includeObject); } - _init(object, loaded: loaded, includeObject: includeObject); + // Fire-and-forget initialization; errors surface through element stream + // ignore: unawaited_futures + _init(object, loaded: loaded); } Subscription? _subscription; Map? _includes; late QueryBuilder queryBuilder; - Future _init( - T object, { - bool loaded = false, - List? includeObject, - }) async { + Future _init(T object, {bool loaded = false}) async { if (!loaded) { final ParseResponse parseResponse = await queryBuilder.query(); if (parseResponse.success) { @@ -663,6 +819,10 @@ class ParseLiveListElement { final StreamController _streamController = StreamController.broadcast(); T _object; bool _loaded = false; + + /// Indicates whether this element is currently being loaded. + /// Used to prevent concurrent load operations. + bool isLoading = false; late Map _updatedSubItems; LiveQuery? _liveQuery; final Future _subscriptionQueue = Future.value(); @@ -791,6 +951,14 @@ class ParseLiveListElement { bool get loaded => _loaded; + /// Emits an error to the stream for listeners to handle. + /// Used when lazy loading fails to fetch the full object data. + void emitError(Object error, StackTrace stackTrace) { + if (!_streamController.isClosed) { + _streamController.addError(error, stackTrace); + } + } + void dispose() { _unsubscribe(_updatedSubItems); _streamController.close(); diff --git a/packages/dart/lib/src/utils/parse_utils.dart b/packages/dart/lib/src/utils/parse_utils.dart index 32e018f42..aa2b100f9 100644 --- a/packages/dart/lib/src/utils/parse_utils.dart +++ b/packages/dart/lib/src/utils/parse_utils.dart @@ -123,10 +123,6 @@ Future batchRequest( } } -Stream _createStreamError(Object error) async* { - throw error; -} - List removeDuplicateParseObjectByObjectId(Iterable iterable) { final list = iterable.toList(); diff --git a/packages/dart/test/src/utils/parse_live_list_test.dart b/packages/dart/test/src/utils/parse_live_list_test.dart new file mode 100644 index 000000000..d1fbf84c0 --- /dev/null +++ b/packages/dart/test/src/utils/parse_live_list_test.dart @@ -0,0 +1,347 @@ +import 'dart:async'; + +import 'package:parse_server_sdk/parse_server_sdk.dart'; +import 'package:test/test.dart'; + +import '../../test_utils.dart'; + +// NOTE: ParseLiveList Stream Architecture Documentation +// ====================================================== +// +// STREAM IMPLEMENTATION: +// --------------------- +// ParseLiveList.getAt() returns a broadcast stream for each element: +// +// Stream getAt(final int index) { +// if (index >= _list.length) { +// return const Stream.empty(); +// } +// final element = _list[index]; +// if (!element.loaded) { +// _loadElementAt(index); // Loads data on first access +// } +// return element.stream; // Returns broadcast stream +// } +// +// BROADCAST STREAM BENEFITS: +// ------------------------- +// ParseLiveListElement uses StreamController.broadcast(), which: +// - Allows multiple listeners on the same stream without errors +// - Shares the stream instance across all subscribers +// - Calls _loadElementAt() only once per element, regardless of subscription count +// - Prevents N+1 query problems where multiple subscriptions trigger multiple network requests +// +// IMPLEMENTATION REQUIREMENTS: +// --------------------------- +// The implementation must maintain these characteristics: +// 1. getAt() returns element.stream directly (NOT an async* generator) +// 2. ParseLiveListElement._streamController uses StreamController.broadcast() +// 3. Multiple calls to getAt(index) return the same underlying broadcast stream +// 4. Element loading occurs at most once per element +// +// TESTING LIMITATIONS: +// ------------------- +// Unit tests cannot directly verify this architecture because: +// 1. Stream identity cannot be tested (stream getters create wrapper instances) +// 2. Async* generators vs regular functions cannot be distinguished from outside +// 3. Query execution counts require integration testing with network layer monitoring +// +// Therefore, these tests verify supporting implementation details and behaviors, +// but code review is required to ensure the core architecture is maintained. +// +// INTEGRATION TESTING RECOMMENDATIONS: +// ------------------------------------ +// To fully verify the N+1 query fix, integration tests should: +// 1. Monitor actual network requests to the Parse server +// 2. Subscribe to the same element multiple times +// 3. Verify only one query is executed regardless of subscription count + +void main() { + setUpAll(() async { + await initializeParse(); + }); + + group('ParseLiveList - Implementation Details', () { + test('lazyLoading=false marks elements as loaded immediately', () async { + // When lazy loading is disabled, all object fields are fetched in the + // initial query, so elements are marked as loaded=true immediately. + // This prevents unnecessary _loadElementAt() calls since all data + // is already available. + + const lazyLoading = false; // Fetch all fields upfront + + // Implementation behavior with lazyLoading=false: + // - Initial query fetches all object fields + // - Elements are marked loaded=true + // - getAt() returns streams without triggering additional loads + + final element = ParseLiveListElement( + ParseObject('TestClass')..objectId = 'test1', + loaded: !lazyLoading, // Should be true when lazyLoading=false + updatedSubItems: {}, + ); + + expect( + element.loaded, + true, + reason: 'Elements should be marked loaded when lazyLoading=false', + ); + }); + + test( + 'lazyLoading=true with empty preloadedColumns fetches all fields', + () async { + // When lazyLoading is enabled but preloadedColumns is empty or null, + // field restriction is not applied and all object fields are fetched + // in the initial query, resulting in elements marked as loaded=true. + + const lazyLoading = true; + const preloadedColumns = []; // Empty! + + // Logic: fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty + // fieldsRestricted evaluates to (true && false) = false + // loaded = !fieldsRestricted = !false = true + final fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty; + + final element = ParseLiveListElement( + ParseObject('TestClass')..objectId = 'test1', + loaded: !fieldsRestricted, // Should be true (no fields restricted) + updatedSubItems: {}, + ); + + expect( + element.loaded, + true, + reason: + 'Elements should be marked loaded when lazyLoading=true but preloadedColumns is empty', + ); + }, + ); + + test( + 'lazyLoading=true with preloadedColumns restricts initial query', + () async { + // When lazy loading is enabled with specified preloadedColumns, + // the initial query fetches only those fields, and elements are + // marked as loaded=false. Full object data is loaded on-demand + // when getAt() is called. + + const lazyLoading = true; + const preloadedColumns = ['name', 'order']; // Not empty! + + // Logic: fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty + // fieldsRestricted evaluates to (true && true) = true + // loaded = !fieldsRestricted = !true = false + final fieldsRestricted = lazyLoading && preloadedColumns.isNotEmpty; + + final element = ParseLiveListElement( + ParseObject('TestClass')..objectId = 'test1', + loaded: !fieldsRestricted, // Should be false (fields were restricted) + updatedSubItems: {}, + ); + + expect( + element.loaded, + false, + reason: + 'Elements should be marked not loaded when lazyLoading=true WITH preloadedColumns', + ); + }, + ); + + test('lazyLoading=false should NOT restrict fields automatically', () { + // Verifies baseline: a fresh QueryBuilder has no 'keys' restriction. + // The actual _runQuery() behavior with lazyLoading=false is tested + // indirectly through the loaded flag tests above. + final query = QueryBuilder(ParseObject('Room')) + ..orderByAscending('order'); + + final queryCopy = QueryBuilder.copy(query); + + expect( + queryCopy.limiters.containsKey('keys'), + false, + reason: + 'ParseLiveList should not restrict fields when lazyLoading=false', + ); + }); + + test('lazyLoading=true with preloadedColumns should restrict fields', () { + // Verifies that keysToReturn() sets the 'keys' limiter as expected. + // Note: This simulates _runQuery() behavior; actual integration testing + // would require mocking the network layer. + final query = QueryBuilder(ParseObject('Room')) + ..orderByAscending('order') + ..keysToReturn(['name', 'order']); // Simulating what _runQuery does + + final queryCopy = QueryBuilder.copy(query); + + expect( + queryCopy.limiters.containsKey('keys'), + true, + reason: + 'ParseLiveList should restrict fields when lazyLoading=true with preloadedColumns', + ); + }); + }); + + group('ParseLiveList - Stream Creation Bug', () { + test( + 'async* generators create new streams on each call (educational context)', + () async { + // This test demonstrates async* generator behavior that contributed to the bug. + // It's educational context, not a test of the actual ParseLiveList bug. + // The real bug required integration testing with network request monitoring. + + // We can't easily test the full ParseLiveList without a real server, but we can + // demonstrate the stream behavior by examining the method signature and behavior. + + // The bug is in this pattern (from parse_live_list.dart line 489): + // Stream getAt(final int index) async* { ... } + // + // This is an async generator function. Each call creates a NEW Stream instance. + + // Here's a simplified demonstration of the problem: + final streams = >[]; + + Stream createStream() async* { + yield 1; + yield 2; + } + + // Each call creates a different stream instance + streams.add(createStream()); + streams.add(createStream()); + streams.add(createStream()); + + // Verify they are different instances + expect( + identical(streams[0], streams[1]), + false, + reason: 'async* generator creates new stream on each call', + ); + expect( + identical(streams[1], streams[2]), + false, + reason: 'async* generator creates new stream on each call', + ); + }, + ); + + test( + 'broadcast streams can have multiple listeners (solution approach)', + () async { + // This demonstrates the solution: using a broadcast StreamController + // that can be subscribed to multiple times + + final controller = StreamController.broadcast(); + + final values1 = []; + final values2 = []; + final values3 = []; + + // Multiple subscriptions to the SAME stream + final sub1 = controller.stream.listen(values1.add); + final sub2 = controller.stream.listen(values2.add); + final sub3 = controller.stream.listen(values3.add); + + // Add values + controller.add(1); + controller.add(2); + + await Future.delayed(const Duration(milliseconds: 50)); + + // All listeners receive the same values + expect(values1, [1, 2]); + expect(values2, [1, 2]); + expect(values3, [1, 2]); + + // The key is that the broadcast stream can be listened to multiple times + // (unlike async* generators which create new streams each time) + expect( + controller.stream.isBroadcast, + true, + reason: 'Broadcast stream allows multiple listeners', + ); + + await sub1.cancel(); + await sub2.cancel(); + await sub3.cancel(); + await controller.close(); + }, + ); + + test('async* generator vs broadcast stream behavior difference', () async { + // This test clearly shows the difference between the two approaches + + int generatorCallCount = 0; + + // Approach 1: async* generator (OLD IMPLEMENTATION - PROBLEMATIC) + Stream generatorApproach() async* { + generatorCallCount++; + yield 1; + } + + // Each call creates new stream and executes the function + final genStream1 = generatorApproach(); + final genStream2 = generatorApproach(); + final genStream3 = generatorApproach(); + + expect( + generatorCallCount, + 0, + reason: 'Generator not executed until subscribed', + ); + + await genStream1.first; + expect(generatorCallCount, 1); + + await genStream2.first; + expect( + generatorCallCount, + 2, + reason: 'Each stream subscription triggers generator', + ); + + await genStream3.first; + expect( + generatorCallCount, + 3, + reason: 'Third subscription triggers third execution', + ); + + // Approach 2: broadcast stream (SOLUTION) + int broadcastInitCount = 0; + + final broadcastController = StreamController.broadcast(); + + // Initialization happens once + void initBroadcast() { + broadcastInitCount++; + broadcastController.add(1); + } + + initBroadcast(); + expect(broadcastInitCount, 1); + + // Multiple subscriptions to same stream - no re-initialization + final sub1 = broadcastController.stream.listen((_) {}); + expect(broadcastInitCount, 1, reason: 'No additional initialization'); + + final sub2 = broadcastController.stream.listen((_) {}); + expect( + broadcastInitCount, + 1, + reason: 'Still no additional initialization', + ); + + final sub3 = broadcastController.stream.listen((_) {}); + expect(broadcastInitCount, 1, reason: 'Stream reused, not recreated'); + + await sub1.cancel(); + await sub2.cancel(); + await sub3.cancel(); + await broadcastController.close(); + }); + }); +}