-
Notifications
You must be signed in to change notification settings - Fork 14
/
websocket_transport_io.dart
158 lines (141 loc) · 5.58 KB
/
websocket_transport_io.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'dart:typed_data';
import 'package:connectanum/src/message/goodbye.dart';
import 'websocket_transport_serialization.dart';
import '../../message/abstract_message.dart';
import '../../serializer/abstract_serializer.dart';
import '../../transport/abstract_transport.dart';
import '../../serializer/json/serializer.dart' as serializer_json;
import '../../serializer/msgpack/serializer.dart' as serializer_msgpack;
import '../../serializer/cbor/serializer.dart' as serializer_cbor;
/// This transport type is used to connect via web sockets
/// in a dart vm environment. A known issue is that this
/// transport does not support auto reconnnect in the dart vm
/// use [SocketTransport] instead! This may not work if your
/// router does not raw socket transport
class WebSocketTransport extends AbstractTransport {
final String _url;
final AbstractSerializer _serializer;
final String _serializerType;
/// The keys of the map are the header
/// fields and the values are either String or List<String>
final Map<String, dynamic>? _headers;
bool _goodbyeSent = false;
bool _goodbyeReceived = false;
WebSocket? _socket;
Completer? _onConnectionLost;
Completer? _onDisconnect;
late Completer _onReady;
WebSocketTransport(this._url, this._serializer, this._serializerType,
[this._headers])
: assert(_serializerType == WebSocketSerialization.serializationJson ||
_serializerType == WebSocketSerialization.serializationMsgpack ||
_serializerType == WebSocketSerialization.serializationCbor);
factory WebSocketTransport.withJsonSerializer(String url,
[Map<String, dynamic>? headers]) =>
WebSocketTransport(url, serializer_json.Serializer(),
WebSocketSerialization.serializationJson, headers);
factory WebSocketTransport.withMsgpackSerializer(String url,
[Map<String, dynamic>? headers]) =>
WebSocketTransport(url, serializer_msgpack.Serializer(),
WebSocketSerialization.serializationMsgpack, headers);
factory WebSocketTransport.withCborSerializer(String url,
[Map<String, dynamic>? headers]) =>
WebSocketTransport(url, serializer_cbor.Serializer(),
WebSocketSerialization.serializationCbor, headers);
/// Calling close will close the underlying socket connection
@override
Future<void> close({error}) {
_socket?.close();
complete(_onDisconnect, error);
return Future.value();
}
/// on connection lost will only complete if the other end closes unexpectedly
@override
Completer? get onConnectionLost => _onConnectionLost;
/// on disconnect will complete whenever the socket connection closes down
@override
Completer? get onDisconnect => _onDisconnect;
/// This method will return true if the underlying socket has a ready state of open
@override
bool get isOpen {
return _socket != null && _socket!.readyState == WebSocket.open;
}
/// for this transport this is equal to [isOpen]
@override
bool get isReady => isOpen;
/// This future completes as soon as the connection is established and fully initialized
@override
Future<void> get onReady => _onReady.future;
/// This method opens the underlying socket connection and prepares all state completers.
/// Since dart does not handle states for io WebSockets, this class contains a custom ping pong
/// mechanism to achieve connection states. The [pingInterval] controls the ping pong mechanism.
/// As soon as the web socket connection is established, the returning future will complete
/// or fail respectively
@override
Future<void> open({Duration? pingInterval}) async {
_onReady = Completer();
_onDisconnect = Completer();
_onConnectionLost = Completer();
try {
_socket = await WebSocket.connect(_url,
protocols: [_serializerType], headers: _headers);
_onReady.complete();
if (pingInterval != null) {
Timer.periodic(
pingInterval,
(timer) => _socket!.pingInterval = Duration(
milliseconds: (pingInterval.inMilliseconds * 2 / 3).floor()));
}
} on SocketException catch (exception) {
_onConnectionLost!.complete(exception);
}
}
/// This method takes a [message], serializes it to a JSON and passes it to
/// the underlying socket.
@override
void send(AbstractMessage message) {
if (message is Goodbye) {
_goodbyeSent = true;
}
if (_serializerType == WebSocketSerialization.serializationJson) {
_socket!
.addUtf8Text(utf8.encoder.convert(_serializer.serialize(message)));
} else {
_socket!.add(_serializer.serialize(message));
}
}
/// This method return a [Stream] that streams all incoming messages as unserialized
/// objects.
@override
Stream<AbstractMessage?> receive() {
_socket!.done.then((done) {
if ((_socket!.closeCode == null || _socket!.closeCode! > 1000) &&
!_goodbyeSent &&
!_goodbyeReceived) {
_onConnectionLost!.complete();
} else {
_onDisconnect!.complete();
}
}, onError: (error) {
if (!_onDisconnect!.isCompleted) {
_onConnectionLost!.complete(error);
}
});
return _socket!.map((messageEvent) {
AbstractMessage? message;
if (_serializerType == WebSocketSerialization.serializationJson) {
message =
_serializer.deserialize(utf8.encode(messageEvent) as Uint8List?);
} else {
message = _serializer.deserialize(messageEvent);
}
if (message is Goodbye) {
_goodbyeReceived = true;
}
return message;
});
}
}