From b73bb3653c652d747161a541dc984fa293a8605c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Maximilian=20Maih=C3=B6fner?= Date: Wed, 9 Mar 2022 22:43:24 +0100 Subject: [PATCH] feat(graphql): add support for `graphql-transport-ws` --- .../websocket_link/websocket_client.dart | 93 +++++++++++++++---- .../links/websocket_link/websocket_link.dart | 5 +- .../websocket_link/websocket_messages.dart | 75 +++++++++++++++ 3 files changed, 152 insertions(+), 21 deletions(-) diff --git a/packages/graphql/lib/src/links/websocket_link/websocket_client.dart b/packages/graphql/lib/src/links/websocket_link/websocket_client.dart index f762e3a0..fa2d174e 100644 --- a/packages/graphql/lib/src/links/websocket_link/websocket_client.dart +++ b/packages/graphql/lib/src/links/websocket_link/websocket_client.dart @@ -3,20 +3,17 @@ import 'dart:collection'; import 'dart:convert'; import 'dart:typed_data'; +import 'package:gql_exec/gql_exec.dart'; +import 'package:graphql/src/core/query_options.dart' show WithType; import 'package:graphql/src/links/gql_links.dart'; import 'package:graphql/src/utilities/platform.dart'; import 'package:meta/meta.dart'; - -import 'package:graphql/src/core/query_options.dart' show WithType; -import 'package:gql_exec/gql_exec.dart'; - -import 'package:stream_channel/stream_channel.dart'; -import 'package:web_socket_channel/web_socket_channel.dart'; -import 'package:web_socket_channel/status.dart' as ws_status; - import 'package:rxdart/rxdart.dart'; +import 'package:stream_channel/stream_channel.dart'; import 'package:uuid/uuid.dart'; import 'package:uuid/uuid_util.dart'; +import 'package:web_socket_channel/status.dart' as ws_status; +import 'package:web_socket_channel/web_socket_channel.dart'; import './websocket_messages.dart'; @@ -144,6 +141,13 @@ class SocketClientConfig { } } +class SocketSubProtocol { + SocketSubProtocol._(); + + static const String graphqlWs = "graphql-ws"; + static const String graphqlTransportWs = "graphql-transport-ws"; +} + /// Wraps a standard web socket instance to marshal and un-marshal the server / /// client payloads into dart object representation. /// @@ -155,7 +159,7 @@ class SocketClientConfig { class SocketClient { SocketClient( this.url, { - this.protocols = const ['graphql-ws'], + this.protocol = SocketSubProtocol.graphqlWs, this.config = const SocketClientConfig(), @visibleForTesting this.randomBytesForUuid, @visibleForTesting this.onMessage, @@ -166,7 +170,7 @@ class SocketClient { Uint8List? randomBytesForUuid; final String url; - final Iterable? protocols; + final String protocol; final SocketClientConfig config; final BehaviorSubject _connectionStateController = @@ -179,6 +183,7 @@ class SocketClient { bool _wasDisposed = false; Timer? _reconnectTimer; + Timer? _pingTimer; @visibleForTesting GraphQLWebSocketChannel? socketChannel; @@ -239,17 +244,34 @@ class SocketClient { // Even though config.connect is sync, we call async in order to make the // SocketConnectionState.connected attribution not overload SocketConnectionState.connecting var connection = - await config.connect(uri: Uri.parse(url), protocols: protocols); + await config.connect(uri: Uri.parse(url), protocols: [protocol]); socketChannel = connection.forGraphQL(); _connectionStateController.add(SocketConnectionState.connected); _write(initOperation); if (config.inactivityTimeout != null) { - _disconnectOnKeepAliveTimeout(_messages); + if (protocol == SocketSubProtocol.graphqlWs) { + _disconnectOnKeepAliveTimeout(_messages); + } + if (protocol == SocketSubProtocol.graphqlTransportWs) { + _enqueuePing(); + } } _messageSubscription = _messages.listen( - onMessage, + (message) { + if (onMessage != null) { + onMessage!(message); + } + + if (protocol == SocketSubProtocol.graphqlTransportWs) { + if (message.type == 'ping') { + _write(PongMessage()); + } else if (message.type == 'pong') { + _enqueuePing(); + } + } + }, onDone: onConnectionLost, // onDone will not be triggered if the subscription is // auto-cancelled on error; make sure to pass false @@ -276,6 +298,7 @@ class SocketClient { } print('Disconnected from websocket.'); _reconnectTimer?.cancel(); + _pingTimer?.cancel(); _keepAliveSubscription?.cancel(); _messageSubscription?.cancel(); @@ -302,6 +325,14 @@ class SocketClient { } } + void _enqueuePing() { + _pingTimer?.cancel(); + _pingTimer = new Timer( + config.inactivityTimeout!, + () => _write(PingMessage()), + ); + } + /// Closes the underlying socket if connected, and stops reconnection attempts. /// After calling this method, this [SocketClient] instance must be considered /// unusable. Instead, create a new instance of this class. @@ -314,6 +345,7 @@ class SocketClient { _wasDisposed = true; print('Disposing socket client..'); _reconnectTimer?.cancel(); + _pingTimer?.cancel(); _keepAliveSubscription?.cancel(); await Future.wait([ @@ -385,6 +417,10 @@ class SocketClient { return message.id == id; } + if (message is SubscriptionNext) { + return message.id == id; + } + if (message is SubscriptionError) { return message.id == id; } @@ -422,18 +458,34 @@ class SocketClient { parse(message.toJson()), )); + dataErrorComplete + .where((message) => message is SubscriptionNext) + .cast() + .listen((message) => response.add( + parse(message.toJson()), + )); + dataErrorComplete .where((message) => message is SubscriptionError) .cast() .listen((message) => response.addError(message)); if (!_subscriptionInitializers[id]!.hasBeenTriggered) { - _write( - StartOperation( - id, - serialize(payload), - ), - ); + if (protocol == SocketSubProtocol.graphqlTransportWs) { + _write( + SubscribeOperation( + id, + serialize(payload), + ), + ); + } else { + _write( + StartOperation( + id, + serialize(payload), + ), + ); + } _subscriptionInitializers[id]!.hasBeenTriggered = true; } }); @@ -445,7 +497,8 @@ class SocketClient { _subscriptionInitializers.remove(id); sub?.cancel(); - if (_connectionStateController.value == SocketConnectionState.connected && + if (protocol == SocketSubProtocol.graphqlWs && + _connectionStateController.value == SocketConnectionState.connected && socketChannel != null) { _write(StopOperation(id)); } diff --git a/packages/graphql/lib/src/links/websocket_link/websocket_link.dart b/packages/graphql/lib/src/links/websocket_link/websocket_link.dart index 8fa02a5b..2590277e 100644 --- a/packages/graphql/lib/src/links/websocket_link/websocket_link.dart +++ b/packages/graphql/lib/src/links/websocket_link/websocket_link.dart @@ -1,5 +1,5 @@ -import 'package:gql_link/gql_link.dart'; import 'package:gql_exec/gql_exec.dart'; +import 'package:gql_link/gql_link.dart'; import './websocket_client.dart'; @@ -16,9 +16,11 @@ class WebSocketLink extends Link { WebSocketLink( this.url, { this.config = const SocketClientConfig(), + this.subProtocol = SocketSubProtocol.graphqlWs, }); final String url; + final String subProtocol; final SocketClientConfig config; // cannot be final because we're changing the instance upon a header change. @@ -39,6 +41,7 @@ class WebSocketLink extends Link { _socketClient = SocketClient( url, config: config, + protocol: subProtocol, ); } diff --git a/packages/graphql/lib/src/links/websocket_link/websocket_messages.dart b/packages/graphql/lib/src/links/websocket_link/websocket_messages.dart index 0ddcaeb9..25cd6d75 100644 --- a/packages/graphql/lib/src/links/websocket_link/websocket_messages.dart +++ b/packages/graphql/lib/src/links/websocket_link/websocket_messages.dart @@ -20,11 +20,16 @@ class MessageTypes { static const String connectionKeepAlive = "ka"; // client operations + static const String subscribe = "subscribe"; static const String start = "start"; static const String stop = "stop"; + static const String ping = "ping"; + static const String pong = "pong"; + // server operations static const String data = "data"; + static const String next = "next"; static const String error = "error"; static const String complete = "complete"; @@ -71,13 +76,21 @@ abstract class GraphQLSocketMessage extends JsonSerializable { return ConnectionKeepAlive(); // for completeness + case MessageTypes.subscribe: + return SubscribeOperation(id, payload); case MessageTypes.start: return StartOperation(id, payload); case MessageTypes.stop: return StopOperation(id); + case MessageTypes.ping: + return PingMessage(payload); + case MessageTypes.pong: + return PongMessage(payload); case MessageTypes.data: return SubscriptionData(id, payload['data'], payload['errors']); + case MessageTypes.next: + return SubscriptionNext(id, payload['data'], payload['errors']); case MessageTypes.error: return SubscriptionError(id, payload); case MessageTypes.complete: @@ -131,6 +144,46 @@ class QueryPayload extends JsonSerializable { }; } +class SubscribeOperation extends GraphQLSocketMessage { + SubscribeOperation(this.id, this.payload) : super(MessageTypes.subscribe); + + final String id; + + final Map payload; + + @override + toJson() => { + "type": type, + "id": id, + "payload": payload, + }; +} + +class PingMessage extends GraphQLSocketMessage { + PingMessage([this.payload = const {}]) + : super(MessageTypes.ping); + + final Map payload; + + @override + toJson() => { + "type": type, + "payload": payload, + }; +} + +class PongMessage extends GraphQLSocketMessage { + PongMessage([this.payload]) : super(MessageTypes.pong); + + final Map? payload; + + @override + toJson() => { + "type": type, + "payload": payload, + }; +} + /// A message to tell the server to create a subscription. The contents of the /// query will be defined by the payload request. The id provided will be used /// to tag messages such that they can be identified for this subscription @@ -209,6 +262,28 @@ class SubscriptionData extends GraphQLSocketMessage { other is SubscriptionData && jsonEncode(other) == jsonEncode(this); } +class SubscriptionNext extends GraphQLSocketMessage { + SubscriptionNext(this.id, this.data, this.errors) : super(MessageTypes.next); + + final String id; + final dynamic data; + final dynamic errors; + + @override + toJson() => { + "type": type, + "data": data, + "errors": errors, + }; + + @override + int get hashCode => toJson().hashCode; + + @override + bool operator ==(dynamic other) => + other is SubscriptionNext && jsonEncode(other) == jsonEncode(this); +} + /// Errors sent from the server to the client if the subscription operation was /// not successful, usually due to GraphQL validation errors. class SubscriptionError extends GraphQLSocketMessage {