Skip to content

Commit

Permalink
feat(graphql): add support for graphql-transport-ws
Browse files Browse the repository at this point in the history
  • Loading branch information
maximilianmaihoefner authored and vincenzopalazzo committed May 8, 2022
1 parent 3f8e0e0 commit b73bb36
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 21 deletions.
93 changes: 73 additions & 20 deletions packages/graphql/lib/src/links/websocket_link/websocket_client.dart
Expand Up @@ -3,20 +3,17 @@ import 'dart:collection';
import 'dart:convert';
import 'dart:typed_data';

import 'package:gql_exec/gql_exec.dart';
import 'package:graphql/src/core/query_options.dart' show WithType;
import 'package:graphql/src/links/gql_links.dart';
import 'package:graphql/src/utilities/platform.dart';
import 'package:meta/meta.dart';

import 'package:graphql/src/core/query_options.dart' show WithType;
import 'package:gql_exec/gql_exec.dart';

import 'package:stream_channel/stream_channel.dart';
import 'package:web_socket_channel/web_socket_channel.dart';
import 'package:web_socket_channel/status.dart' as ws_status;

import 'package:rxdart/rxdart.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:uuid/uuid.dart';
import 'package:uuid/uuid_util.dart';
import 'package:web_socket_channel/status.dart' as ws_status;
import 'package:web_socket_channel/web_socket_channel.dart';

import './websocket_messages.dart';

Expand Down Expand Up @@ -144,6 +141,13 @@ class SocketClientConfig {
}
}

class SocketSubProtocol {
SocketSubProtocol._();

static const String graphqlWs = "graphql-ws";
static const String graphqlTransportWs = "graphql-transport-ws";
}

/// Wraps a standard web socket instance to marshal and un-marshal the server /
/// client payloads into dart object representation.
///
Expand All @@ -155,7 +159,7 @@ class SocketClientConfig {
class SocketClient {
SocketClient(
this.url, {
this.protocols = const ['graphql-ws'],
this.protocol = SocketSubProtocol.graphqlWs,
this.config = const SocketClientConfig(),
@visibleForTesting this.randomBytesForUuid,
@visibleForTesting this.onMessage,
Expand All @@ -166,7 +170,7 @@ class SocketClient {

Uint8List? randomBytesForUuid;
final String url;
final Iterable<String>? protocols;
final String protocol;
final SocketClientConfig config;

final BehaviorSubject<SocketConnectionState> _connectionStateController =
Expand All @@ -179,6 +183,7 @@ class SocketClient {
bool _wasDisposed = false;

Timer? _reconnectTimer;
Timer? _pingTimer;

@visibleForTesting
GraphQLWebSocketChannel? socketChannel;
Expand Down Expand Up @@ -239,17 +244,34 @@ class SocketClient {
// Even though config.connect is sync, we call async in order to make the
// SocketConnectionState.connected attribution not overload SocketConnectionState.connecting
var connection =
await config.connect(uri: Uri.parse(url), protocols: protocols);
await config.connect(uri: Uri.parse(url), protocols: [protocol]);
socketChannel = connection.forGraphQL();
_connectionStateController.add(SocketConnectionState.connected);
_write(initOperation);

if (config.inactivityTimeout != null) {
_disconnectOnKeepAliveTimeout(_messages);
if (protocol == SocketSubProtocol.graphqlWs) {
_disconnectOnKeepAliveTimeout(_messages);
}
if (protocol == SocketSubProtocol.graphqlTransportWs) {
_enqueuePing();
}
}

_messageSubscription = _messages.listen(
onMessage,
(message) {
if (onMessage != null) {
onMessage!(message);
}

if (protocol == SocketSubProtocol.graphqlTransportWs) {
if (message.type == 'ping') {
_write(PongMessage());
} else if (message.type == 'pong') {
_enqueuePing();
}
}
},
onDone: onConnectionLost,
// onDone will not be triggered if the subscription is
// auto-cancelled on error; make sure to pass false
Expand All @@ -276,6 +298,7 @@ class SocketClient {
}
print('Disconnected from websocket.');
_reconnectTimer?.cancel();
_pingTimer?.cancel();
_keepAliveSubscription?.cancel();
_messageSubscription?.cancel();

Expand All @@ -302,6 +325,14 @@ class SocketClient {
}
}

void _enqueuePing() {
_pingTimer?.cancel();
_pingTimer = new Timer(
config.inactivityTimeout!,
() => _write(PingMessage()),
);
}

/// Closes the underlying socket if connected, and stops reconnection attempts.
/// After calling this method, this [SocketClient] instance must be considered
/// unusable. Instead, create a new instance of this class.
Expand All @@ -314,6 +345,7 @@ class SocketClient {
_wasDisposed = true;
print('Disposing socket client..');
_reconnectTimer?.cancel();
_pingTimer?.cancel();
_keepAliveSubscription?.cancel();

await Future.wait([
Expand Down Expand Up @@ -385,6 +417,10 @@ class SocketClient {
return message.id == id;
}

if (message is SubscriptionNext) {
return message.id == id;
}

if (message is SubscriptionError) {
return message.id == id;
}
Expand Down Expand Up @@ -422,18 +458,34 @@ class SocketClient {
parse(message.toJson()),
));

dataErrorComplete
.where((message) => message is SubscriptionNext)
.cast<SubscriptionNext>()
.listen((message) => response.add(
parse(message.toJson()),
));

dataErrorComplete
.where((message) => message is SubscriptionError)
.cast<SubscriptionError>()
.listen((message) => response.addError(message));

if (!_subscriptionInitializers[id]!.hasBeenTriggered) {
_write(
StartOperation(
id,
serialize(payload),
),
);
if (protocol == SocketSubProtocol.graphqlTransportWs) {
_write(
SubscribeOperation(
id,
serialize(payload),
),
);
} else {
_write(
StartOperation(
id,
serialize(payload),
),
);
}
_subscriptionInitializers[id]!.hasBeenTriggered = true;
}
});
Expand All @@ -445,7 +497,8 @@ class SocketClient {
_subscriptionInitializers.remove(id);

sub?.cancel();
if (_connectionStateController.value == SocketConnectionState.connected &&
if (protocol == SocketSubProtocol.graphqlWs &&
_connectionStateController.value == SocketConnectionState.connected &&
socketChannel != null) {
_write(StopOperation(id));
}
Expand Down
@@ -1,5 +1,5 @@
import 'package:gql_link/gql_link.dart';
import 'package:gql_exec/gql_exec.dart';
import 'package:gql_link/gql_link.dart';

import './websocket_client.dart';

Expand All @@ -16,9 +16,11 @@ class WebSocketLink extends Link {
WebSocketLink(
this.url, {
this.config = const SocketClientConfig(),
this.subProtocol = SocketSubProtocol.graphqlWs,
});

final String url;
final String subProtocol;
final SocketClientConfig config;

// cannot be final because we're changing the instance upon a header change.
Expand All @@ -39,6 +41,7 @@ class WebSocketLink extends Link {
_socketClient = SocketClient(
url,
config: config,
protocol: subProtocol,
);
}

Expand Down
Expand Up @@ -20,11 +20,16 @@ class MessageTypes {
static const String connectionKeepAlive = "ka";

// client operations
static const String subscribe = "subscribe";
static const String start = "start";
static const String stop = "stop";

static const String ping = "ping";
static const String pong = "pong";

// server operations
static const String data = "data";
static const String next = "next";
static const String error = "error";
static const String complete = "complete";

Expand Down Expand Up @@ -71,13 +76,21 @@ abstract class GraphQLSocketMessage extends JsonSerializable {
return ConnectionKeepAlive();

// for completeness
case MessageTypes.subscribe:
return SubscribeOperation(id, payload);
case MessageTypes.start:
return StartOperation(id, payload);
case MessageTypes.stop:
return StopOperation(id);
case MessageTypes.ping:
return PingMessage(payload);
case MessageTypes.pong:
return PongMessage(payload);

case MessageTypes.data:
return SubscriptionData(id, payload['data'], payload['errors']);
case MessageTypes.next:
return SubscriptionNext(id, payload['data'], payload['errors']);
case MessageTypes.error:
return SubscriptionError(id, payload);
case MessageTypes.complete:
Expand Down Expand Up @@ -131,6 +144,46 @@ class QueryPayload extends JsonSerializable {
};
}

class SubscribeOperation extends GraphQLSocketMessage {
SubscribeOperation(this.id, this.payload) : super(MessageTypes.subscribe);

final String id;

final Map<String, dynamic> payload;

@override
toJson() => {
"type": type,
"id": id,
"payload": payload,
};
}

class PingMessage extends GraphQLSocketMessage {
PingMessage([this.payload = const <String, dynamic>{}])
: super(MessageTypes.ping);

final Map<String, dynamic> payload;

@override
toJson() => {
"type": type,
"payload": payload,
};
}

class PongMessage extends GraphQLSocketMessage {
PongMessage([this.payload]) : super(MessageTypes.pong);

final Map<String, dynamic>? payload;

@override
toJson() => {
"type": type,
"payload": payload,
};
}

/// A message to tell the server to create a subscription. The contents of the
/// query will be defined by the payload request. The id provided will be used
/// to tag messages such that they can be identified for this subscription
Expand Down Expand Up @@ -209,6 +262,28 @@ class SubscriptionData extends GraphQLSocketMessage {
other is SubscriptionData && jsonEncode(other) == jsonEncode(this);
}

class SubscriptionNext extends GraphQLSocketMessage {
SubscriptionNext(this.id, this.data, this.errors) : super(MessageTypes.next);

final String id;
final dynamic data;
final dynamic errors;

@override
toJson() => {
"type": type,
"data": data,
"errors": errors,
};

@override
int get hashCode => toJson().hashCode;

@override
bool operator ==(dynamic other) =>
other is SubscriptionNext && jsonEncode(other) == jsonEncode(this);
}

/// Errors sent from the server to the client if the subscription operation was
/// not successful, usually due to GraphQL validation errors.
class SubscriptionError extends GraphQLSocketMessage {
Expand Down

0 comments on commit b73bb36

Please sign in to comment.