-
Notifications
You must be signed in to change notification settings - Fork 32
/
io.dart
96 lines (83 loc) · 2.78 KB
/
io.dart
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
import 'dart:async';
import 'dart:convert';
import 'package:http/http.dart' as http;
import 'package:pool/pool.dart';
import 'package:sse_channel/sse_channel.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:uuid/uuid.dart';
import 'src/event_source_transformer.dart';
final _requestPool = Pool(1000);
typedef OnConnected = void Function();
class IOSseChannel extends StreamChannelMixin implements SseChannel {
int _lastMessageId = -1;
final Uri _serverUrl;
final String _clientId;
late final StreamController<String?> _incomingController;
late final StreamController<String?> _outgoingController;
final _onConnected = Completer();
IOSseChannel._(
Uri serverUrl,
) : _serverUrl = serverUrl,
_clientId = Uuid().v4(),
_outgoingController = StreamController<String?>() {
final client = http.Client();
_incomingController =
StreamController<String?>.broadcast(onListen: () async {
final request = http.Request(
'GET',
_serverUrl.replace(
queryParameters: {
'sseClientId': _clientId,
},
),
)..headers['Accept'] = 'text/event-stream';
await client.send(request).then((response) {
if (response.statusCode == 200) {
response.stream.transform(EventSourceTransformer()).listen((event) {
_incomingController.sink.add(event.data);
});
_onConnected.complete();
} else {
//incomingController.addError(
// SseClientException('Failed to connect to ${uri.toString()}'));
}
});
}, onCancel: () {
_incomingController.close();
});
_onConnected.future.whenComplete(
() => _outgoingController.stream.listen(_onOutgoingMessage),
);
}
factory IOSseChannel.connect(Uri url) {
return IOSseChannel._(
url,
);
}
@override
StreamSink get sink => _outgoingController.sink;
@override
Stream get stream => _incomingController.stream;
Future<void> _onOutgoingMessage(String? message) async {
String? encodedMessage;
await _requestPool.withResource(() async {
try {
encodedMessage = jsonEncode(message);
} on JsonUnsupportedObjectError {
//_logger.warning('[$_clientId] Unable to encode outgoing message: $e');
} on ArgumentError {
//_logger.warning('[$_clientId] Invalid argument: $e');
}
try {
final url =
'$_serverUrl?sseClientId=$_clientId&messageId=${_lastMessageId++}';
await http.post(Uri.parse(url), body: encodedMessage);
} catch (error) {
//final augmentedError =
// '[$_clientId] SSE client failed to send $message:\n $error';
//_logger.severe(augmentedError);
//_closeWithError(augmentedError);
}
});
}
}