Skip to content

Commit

Permalink
fix(client): always only pull broadcast stream once
Browse files Browse the repository at this point in the history
  • Loading branch information
micimize committed Mar 6, 2021
1 parent 5ee18eb commit 1b6a9e6
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 31 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Expand Up @@ -184,7 +184,7 @@ workflows:
nightly:
triggers:
- schedule:
cron: "3 * * * *"
cron: "41 * * * *"
filters:
branches:
only:
Expand Down
45 changes: 22 additions & 23 deletions packages/graphql/lib/src/links/websocket_link/websocket_client.dart
Expand Up @@ -167,6 +167,9 @@ class SocketClient {
@visibleForTesting
WebSocketChannel socketChannel;

@visibleForTesting
Stream<dynamic> socketStream;

@visibleForTesting
void Function(GraphQLSocketMessage) onMessage;

Expand All @@ -184,19 +187,19 @@ class SocketClient {
Response Function(Map<String, dynamic>) get parse =>
config.parser.parseResponse;

void disconnectOnKeepAliveTimeout(Stream<GraphQLSocketMessage> messages) =>
_keepAliveSubscription =
messages.whereType<ConnectionKeepAlive>().timeout(
config.inactivityTimeout,
onTimeout: (EventSink<ConnectionKeepAlive> event) {
print(
"Haven't received keep alive message for ${config.inactivityTimeout.inSeconds} seconds. Disconnecting..",
);
event.close();
socketChannel.sink.close(ws_status.goingAway);
_connectionStateController.add(SocketConnectionState.notConnected);
},
).listen(null);
void _disconnectOnKeepAliveTimeout(Stream<GraphQLSocketMessage> messages) {
_keepAliveSubscription = messages.whereType<ConnectionKeepAlive>().timeout(
config.inactivityTimeout,
onTimeout: (EventSink<ConnectionKeepAlive> event) {
print(
"Haven't received keep alive message for ${config.inactivityTimeout.inSeconds} seconds. Disconnecting..",
);
event.close();
socketChannel.sink.close(ws_status.goingAway);
_connectionStateController.add(SocketConnectionState.notConnected);
},
).listen(null);
}

/// Connects to the server.
///
Expand All @@ -219,10 +222,13 @@ class SocketClient {
print('Connected to websocket.');
_write(initOperation);

_messages = socketChannel.graphQLMessageStream;
socketStream = socketChannel.stream.asBroadcastStream();
_messages = socketStream.map<GraphQLSocketMessage>(
GraphQLSocketMessage.parse,
);

if (config.inactivityTimeout != null) {
disconnectOnKeepAliveTimeout(_messages);
_disconnectOnKeepAliveTimeout(_messages);
}

_messageSubscription = _messages.listen(
Expand Down Expand Up @@ -449,14 +455,7 @@ class SocketClient {
_connectionStateController.stream;
}

extension GraphQLWebsocket on WebSocketChannel {
/// Multi-subscription stream of messages from the other endpoint.
/// GraphQLSocketMessage
///
Stream<GraphQLSocketMessage> get graphQLMessageStream => stream
.asBroadcastStream()
.map<GraphQLSocketMessage>(GraphQLSocketMessage.parse);
}
extension GraphQLWebsocket on WebSocketChannel {}

void _defaultOnStreamError(Object error, StackTrace st) {
print('[SocketClient] message stream ecnountered error: $error\n'
Expand Down
27 changes: 20 additions & 7 deletions packages/graphql/test/websocket_test.dart
Expand Up @@ -167,7 +167,7 @@ void main() {
.first;

// ignore: unawaited_futures
socketClient.socketChannel.stream
socketClient.socketStream
.where((message) => message == expectedMessage)
.first
.then((_) {
Expand Down Expand Up @@ -205,14 +205,27 @@ void main() {
final subscriptionDataStream =
socketClient.subscribe(payload, waitForConnection);

await expectLater(
socketClient.connectionState,
emitsInOrder([
SocketConnectionState.connecting,
SocketConnectionState.connected,
]),
);

socketClient.onConnectionLost();

await socketClient.connectionState
.where((state) => state == SocketConnectionState.connected)
.first;
await expectLater(
socketClient.connectionState,
emitsInOrder([
SocketConnectionState.notConnected,
SocketConnectionState.connecting,
SocketConnectionState.connected,
]),
);

// ignore: unawaited_futures
socketClient.socketChannel.stream
socketClient.socketStream
.where((message) => message == expectedMessage)
.first
.then((_) {
Expand Down Expand Up @@ -265,7 +278,7 @@ void main() {
.where((state) => state == SocketConnectionState.connected)
.first;

await expectLater(socketClient.socketChannel.stream.map((s) {
await expectLater(socketClient.socketStream.map((s) {
return jsonDecode(s)['payload'];
}), emits(initPayload));
});
Expand Down Expand Up @@ -297,7 +310,7 @@ void main() {
.where((state) => state == SocketConnectionState.connected)
.first;

await expectLater(socketClient.socketChannel.stream.map((s) {
await expectLater(socketClient.socketStream.map((s) {
return jsonDecode(s)['payload'];
}), emits(initPayload));
});
Expand Down

0 comments on commit 1b6a9e6

Please sign in to comment.