Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
246 changes: 207 additions & 39 deletions packages/dart/lib/src/utils/parse_live_list.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,23 @@ class ParseLiveList<T extends ParseObject> {
List<String>? preloadedColumns,
}) : _preloadedColumns = preloadedColumns ?? const <String>[] {
_debug = isDebugEnabled();
_debugLoggedInit = isDebugEnabled();
}

/// Creates a new [ParseLiveList] for the given [query].
///
/// [lazyLoading] enables lazy loading of full object data. When `true` and
/// [preloadedColumns] is provided, the initial query fetches only those columns,
/// and full objects are loaded on-demand when accessed via [getAt].
/// When [preloadedColumns] is empty or null, all fields are fetched regardless
/// of [lazyLoading] value. Default is `true`.
///
/// [preloadedColumns] specifies which fields to fetch in the initial query when
/// lazy loading is enabled. Order fields are automatically included to ensure
/// proper sorting. If null or empty, all fields are fetched.
///
/// [listenOnAllSubItems] and [listeningIncludes] control which nested objects
/// receive live query updates.
static Future<ParseLiveList<T>> create<T extends ParseObject>(
QueryBuilder<T> query, {
bool? listenOnAllSubItems,
Expand All @@ -26,7 +41,7 @@ class ParseLiveList<T extends ParseObject> {
)
: _toIncludeMap(listeningIncludes ?? <String>[]),
lazyLoading,
preloadedColumns: preloadedColumns ?? const <String>[],
preloadedColumns: preloadedColumns,
);

return parseLiveList._init().then((_) {
Expand All @@ -45,6 +60,9 @@ class ParseLiveList<T extends ParseObject> {
late StreamController<ParseLiveListEvent<T>> _eventStreamController;
int _nextID = 0;
late bool _debug;
// Separate from _debug to allow one-time initialization logging
// while still logging all errors/warnings when _debug is true
late bool _debugLoggedInit;

int get nextID => _nextID++;

Expand Down Expand Up @@ -134,12 +152,22 @@ class ParseLiveList<T extends ParseObject> {

Future<ParseResponse> _runQuery() async {
final QueryBuilder<T> query = QueryBuilder<T>.copy(_query);
if (_debug) {
print('ParseLiveList: lazyLoading is ${_lazyLoading ? 'on' : 'off'}');

// Log lazy loading mode only once during initialization to avoid log spam
if (_debugLoggedInit) {
print(
'ParseLiveList: Initialized with lazyLoading=${_lazyLoading ? 'on' : 'off'}, preloadedColumns=${_preloadedColumns.isEmpty ? 'none' : _preloadedColumns.join(", ")}',
);
_debugLoggedInit = false;
}
if (_lazyLoading) {

// Only restrict fields if lazy loading is enabled AND preloaded columns are specified
// This allows fetching minimal data upfront and loading full objects on-demand
if (_lazyLoading && _preloadedColumns.isNotEmpty) {
final List<String> keys = _preloadedColumns.toList();
if (_lazyLoading && query.limiters.containsKey('order')) {

// Automatically include order fields to ensure sorting works correctly
if (query.limiters.containsKey('order')) {
keys.addAll(
query.limiters['order'].toString().split(',').map((String string) {
if (string.startsWith('-')) {
Expand All @@ -149,10 +177,11 @@ class ParseLiveList<T extends ParseObject> {
}),
);
}
if (keys.isNotEmpty) {
query.keysToReturn(keys);
}

// Deduplicate keys to minimize request size
query.keysToReturn(keys.toSet().toList());
}

return await query.query<T>();
}

Expand All @@ -161,13 +190,20 @@ class ParseLiveList<T extends ParseObject> {

final ParseResponse parseResponse = await _runQuery();
if (parseResponse.success) {
// Determine if fields were actually restricted in the query
// Only mark as not loaded if lazy loading AND we actually restricted fields
final bool fieldsRestricted =
_lazyLoading && _preloadedColumns.isNotEmpty;

_list =
parseResponse.results
?.map<ParseLiveListElement<T>>(
(dynamic element) => ParseLiveListElement<T>(
element,
updatedSubItems: _listeningIncludes,
loaded: !_lazyLoading,
// Mark as loaded if we fetched all fields (no restriction)
// Mark as not loaded only if fields were actually restricted
loaded: !fieldsRestricted,
),
)
.toList() ??
Expand Down Expand Up @@ -223,6 +259,11 @@ class ParseLiveList<T extends ParseObject> {
final List<T> newList =
parseResponse.results as List<T>? ?? <T>[];

// Determine if fields were actually restricted in the query,
// same logic as in _init().
final bool fieldsRestricted =
_lazyLoading && _preloadedColumns.isNotEmpty;

//update List
for (int i = 0; i < _list.length; i++) {
final ParseObject currentObject = _list[i].object;
Expand Down Expand Up @@ -265,7 +306,11 @@ class ParseLiveList<T extends ParseObject> {
}

for (int i = 0; i < newList.length; i++) {
tasks.add(_objectAdded(newList[i], loaded: false));
// Mark as loaded when all fields were fetched; only treat as
// not loaded when fields are actually restricted.
tasks.add(
_objectAdded(newList[i], loaded: !fieldsRestricted),
);
}
}
await Future.wait(tasks);
Expand Down Expand Up @@ -486,34 +531,147 @@ class ParseLiveList<T extends ParseObject> {
}
}

Stream<T> getAt(final int index) async* {
if (index < _list.length) {
if (!_list[index].loaded) {
final QueryBuilder<T> queryBuilder = QueryBuilder<T>.copy(_query)
..whereEqualTo(
keyVarObjectId,
_list[index].object.get<String>(keyVarObjectId),
)
..setLimit(1);
final ParseResponse response = await queryBuilder.query<T>();
if (_list.isEmpty) {
yield* _createStreamError<T>(
ParseError(message: 'ParseLiveList: _list is empty'),
);
/// Returns a stream for the element at the given [index].
///
/// Returns the element's existing broadcast stream, which allows multiple
/// listeners without creating redundant network requests or stream instances.
///
/// When lazy loading is enabled and an element is not yet loaded, the first
/// access will trigger loading. This is useful for pagination scenarios.
/// Subsequent calls return the same stream without additional loads.
///
/// The returned stream is a broadcast stream from ParseLiveListElement,
/// preventing the N+1 query bug that occurred with async* generators.
Stream<T> getAt(final int index) {
if (index < 0 || index >= _list.length) {
// Return an empty stream for out-of-bounds indices
return const Stream.empty();
}

final element = _list[index];

// If not yet loaded (happens with lazy loading), trigger loading
// This will only happen once per element due to the loaded and isLoading flags
if (!element.loaded) {
_loadElementAt(index);
}

// Return the element's broadcast stream
// Multiple subscriptions to this stream won't trigger multiple loads
return element.stream;
}

/// Asynchronously loads the full data for the element at [index].
///
/// Called when an element is accessed for the first time.
/// Errors are emitted to the element's stream so listeners can handle them.
Future<void> _loadElementAt(int index) async {
if (index < 0 || index >= _list.length) {
return;
}

final element = _list[index];

// Race condition protection: skip if element is already loaded or
// currently being loaded by another concurrent call
if (element.loaded || element.isLoading) {
return;
}

// Set loading flag to prevent concurrent load operations
element.isLoading = true;

try {
final QueryBuilder<T> queryBuilder = QueryBuilder<T>.copy(_query)
..whereEqualTo(
keyVarObjectId,
element.object.get<String>(keyVarObjectId),
)
..setLimit(1);

final ParseResponse response = await queryBuilder.query<T>();

// Check if list was modified during async operation
if (_list.isEmpty || index >= _list.length) {
if (_debug) {
print('ParseLiveList: List was modified during element load');
}
return;
}

if (response.success &&
response.results != null &&
response.results!.isNotEmpty) {
// Verify we're still updating the same object (list may have been modified)
final currentElement = _list[index];
if (currentElement.object.objectId != element.object.objectId) {
if (_debug) {
print('ParseLiveList: Element at index $index changed during load');
}
return;
}
if (response.success) {
_list[index].object = response.results?.first;
} else {
ParseError? error = response.error;
if (error != null) yield* _createStreamError<T>(error);
// Setting the object will mark it as loaded and emit it to the stream
_list[index].object = response.results!.first;
} else if (response.error != null) {
// Emit error to the element's stream so listeners can handle it.
// Guard against list mutations so we don't emit on the wrong element.
final currentElement = _list[index];
if (currentElement.object.objectId != element.object.objectId) {
if (_debug) {
print(
'ParseLiveList: Element at index $index changed during load (error)',
);
}
return;
}
currentElement.emitError(response.error!, StackTrace.current);
if (_debug) {
print(
'ParseLiveList: Error loading element at index $index: ${response.error}',
);
}
} else {
// Object not found (possibly deleted between initial query and load).
// Note: Element remains loaded=false, so subsequent getAt() calls will
// retry the query. This is acceptable because:
// 1. LiveQuery will send a delete event to remove the element if needed
// 2. Retries are rare (object would need to be deleted mid-load)
// 3. No error is emitted to avoid alarming users for transient issues
if (_debug) {
print('ParseLiveList: Element at index $index not found during load');
}
}
} catch (e, stackTrace) {
// List may have changed while the query was in flight
if (_list.isEmpty || index >= _list.length) {
if (_debug) {
print(
'ParseLiveList: List was modified during element load (exception)',
);
}
return;
}

final currentElement = _list[index];
if (currentElement.object.objectId != element.object.objectId) {
if (_debug) {
print(
'ParseLiveList: Element at index $index changed during load (exception)',
);
}
return;
}

// Emit exception to the element's stream
currentElement.emitError(e, stackTrace);
if (_debug) {
print(
'ParseLiveList: Exception loading element at index $index: $e\n$stackTrace',
);
}
// just for testing
// await Future<void>.delayed(const Duration(seconds: 2));
yield _list[index].object;
yield* _list[index].stream;
} finally {
// Clear loading flag to allow future retry attempts
element.isLoading = false;
}
}

Expand Down Expand Up @@ -579,18 +737,16 @@ class ParseLiveElement<T extends ParseObject> extends ParseLiveListElement<T> {
if (includeObject != null) {
queryBuilder.includeObject(includeObject);
}
_init(object, loaded: loaded, includeObject: includeObject);
// Fire-and-forget initialization; errors surface through element stream
// ignore: unawaited_futures
_init(object, loaded: loaded);
}

Subscription<T>? _subscription;
Map<String, dynamic>? _includes;
late QueryBuilder<T> queryBuilder;

Future<void> _init(
T object, {
bool loaded = false,
List<String>? includeObject,
}) async {
Future<void> _init(T object, {bool loaded = false}) async {
if (!loaded) {
final ParseResponse parseResponse = await queryBuilder.query();
if (parseResponse.success) {
Expand Down Expand Up @@ -663,6 +819,10 @@ class ParseLiveListElement<T extends ParseObject> {
final StreamController<T> _streamController = StreamController<T>.broadcast();
T _object;
bool _loaded = false;

/// Indicates whether this element is currently being loaded.
/// Used to prevent concurrent load operations.
bool isLoading = false;
late Map<PathKey, dynamic> _updatedSubItems;
LiveQuery? _liveQuery;
final Future<void> _subscriptionQueue = Future<void>.value();
Expand Down Expand Up @@ -791,6 +951,14 @@ class ParseLiveListElement<T extends ParseObject> {

bool get loaded => _loaded;

/// Emits an error to the stream for listeners to handle.
/// Used when lazy loading fails to fetch the full object data.
void emitError(Object error, StackTrace stackTrace) {
if (!_streamController.isClosed) {
_streamController.addError(error, stackTrace);
}
}

void dispose() {
_unsubscribe(_updatedSubItems);
_streamController.close();
Expand Down
4 changes: 0 additions & 4 deletions packages/dart/lib/src/utils/parse_utils.dart
Original file line number Diff line number Diff line change
Expand Up @@ -123,10 +123,6 @@ Future<ParseResponse> batchRequest(
}
}

Stream<T> _createStreamError<T>(Object error) async* {
throw error;
}

List removeDuplicateParseObjectByObjectId(Iterable iterable) {
final list = iterable.toList();

Expand Down
Loading