Skip to content

Commit

Permalink
Merge pull request #1240 from apackin/setForPollingQueries
Browse files Browse the repository at this point in the history
fix(graphql): deduplicate pollers
  • Loading branch information
vincenzopalazzo committed Oct 20, 2022
2 parents ba2cd33 + 3cc0942 commit a2d068b
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 3 deletions.
2 changes: 1 addition & 1 deletion packages/graphql/lib/src/cache/cache.dart
Expand Up @@ -138,7 +138,7 @@ class GraphQLCache extends NormalizingDataProxy {
/// Write normalized data into the cache,
/// deeply merging maps with existing values
///
/// Called from [witeQuery] and [writeFragment].
/// Called from [writeQuery] and [writeFragment].
void writeNormalized(String dataId, Map<String, dynamic>? value) {
if (value is Map<String, Object>) {
final existing = store.get(dataId);
Expand Down
4 changes: 2 additions & 2 deletions packages/graphql/lib/src/scheduler/scheduler.dart
Expand Up @@ -18,7 +18,7 @@ class QueryScheduler {

/// Map going from poling interval to the query ids that fire on that interval.
/// These query ids are associated with a [ObservableQuery] in the registeredQueries.
Map<Duration?, List<String>> intervalQueries = <Duration?, List<String>>{};
Map<Duration?, Set<String>> intervalQueries = <Duration?, Set<String>>{};

/// Map going from polling interval durations to polling timers.
final Map<Duration?, Timer> _pollingTimers = <Duration?, Timer>{};
Expand Down Expand Up @@ -75,7 +75,7 @@ class QueryScheduler {
if (intervalQueries.containsKey(interval)) {
intervalQueries[interval]!.add(queryId);
} else {
intervalQueries[interval] = <String>[queryId];
intervalQueries[interval] = Set<String>.of([queryId]);

_pollingTimers[interval] = Timer.periodic(
interval!,
Expand Down
161 changes: 161 additions & 0 deletions packages/graphql/test/observable_query_test.dart
@@ -0,0 +1,161 @@
import 'package:test/test.dart';
import 'package:mockito/mockito.dart';

import 'package:graphql/client.dart';
import 'package:gql/language.dart';

import './helpers.dart';

void main() {
const String readSingle = r'''
query ReadSingle() {
single() {
id,
__typename,
name
}
}
''';
const data = {
'single': {
'id': '1',
'__typename': 'Single',
'name': 'initialQueryName',
},
};
const pollDuration = Duration(milliseconds: 20);

final queryResponseMatcher = isA<QueryResult>().having(
(result) => result.data!['single']['name'],
'name',
'initialQueryName',
);

late MockLink link;
late GraphQLClient client;

group('observable query ', () {
setUp(() {
link = MockLink();

client = GraphQLClient(
cache: getTestCache(),
link: link,
);

final queryResponse = Response(data: data, response: {});
when(
link.request(any),
).thenAnswer(
(_) => Stream.fromIterable(
[queryResponse],
),
);
});
test('can start poller', () async {
final observable = client.watchQuery(
WatchQueryOptions(
document: parseString(readSingle),
),
);
expect(
observable.stream,
emitsInOrder(
[queryResponseMatcher, queryResponseMatcher, emitsDone],
),
);
observable.startPolling(pollDuration);
await Future<void>.delayed(pollDuration * 2.1);
observable.close();
}, timeout: Timeout(Duration(seconds: 1)));
test('can stop poller', () async {
final observable = client.watchQuery(
WatchQueryOptions(
document: parseString(readSingle),
),
);
expect(
observable.stream,
neverEmits(
isA<QueryResult>().having(
(result) => result.data!['single']['name'],
'name',
'initialQueryName',
),
),
);
observable.startPolling(pollDuration);
observable.stopPolling();
await Future<void>.delayed(pollDuration * 2.1);
observable.close();
}, timeout: Timeout(Duration(seconds: 1)));
test('can deduplicate startPolling calls with the same duration', () async {
final observable = client.watchQuery(
WatchQueryOptions(
document: parseString(readSingle),
),
);
expect(
observable.stream,
emitsInOrder(
[queryResponseMatcher, queryResponseMatcher, emitsDone],
),
);
observable.startPolling(pollDuration);
observable.startPolling(pollDuration);
observable.startPolling(pollDuration);
await Future<void>.delayed(pollDuration * 2.1);
observable.close();
}, timeout: Timeout(Duration(seconds: 1)));
test('can deduplicate startPolling calls with different durations',
() async {
final observable = client.watchQuery(
WatchQueryOptions(
document: parseString(readSingle),
),
);
expect(
observable.stream,
emitsInOrder(
[
queryResponseMatcher,
queryResponseMatcher,
queryResponseMatcher,
emitsDone
],
),
);
observable.startPolling(Duration(milliseconds: 10));
observable.startPolling(Duration(milliseconds: 20));
observable.startPolling(Duration(milliseconds: 30));
observable.startPolling(pollDuration);
await Future<void>.delayed(pollDuration * 3.1);
observable.close();
}, timeout: Timeout(Duration(seconds: 1)));
test('can stop pollers in quick succession', () async {
final observable = client.watchQuery(
WatchQueryOptions(
document: parseString(readSingle),
),
);
expect(
observable.stream,
emitsInOrder(
[
queryResponseMatcher,
queryResponseMatcher,
queryResponseMatcher,
emitsDone
],
),
);
observable.startPolling(pollDuration);
observable.stopPolling();
observable.startPolling(pollDuration);
observable.stopPolling();
observable.startPolling(pollDuration);
await Future<void>.delayed(pollDuration * 3.1);
observable.close();
}, timeout: Timeout(Duration(seconds: 1)));
});
}

0 comments on commit a2d068b

Please sign in to comment.