Skip to content

Commit

Permalink
Fix SSE channel library
Browse files Browse the repository at this point in the history
  • Loading branch information
Vladyslav Hupalo committed Aug 30, 2023
1 parent 8c93dac commit 84e583b
Show file tree
Hide file tree
Showing 12 changed files with 338 additions and 30 deletions.
6 changes: 3 additions & 3 deletions example/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ description: A new Flutter project.
version: 1.0.0+1
publish_to: none
environment:
sdk: ">=2.3.0 <3.0.0"
sdk: ">=2.12.0 <3.0.0"

dependencies:
flutter:
Expand All @@ -20,12 +20,12 @@ dependencies:
path: ../

logging: ^1.2.0
rxdart: ^0.18.1
rxdart: ^0.27.7
quiver: ^3.2.1

# The following adds the Cupertino Icons font to your application.
# Use with the CupertinoIcons class for iOS style icons.
cupertino_icons: ^0.1.2
cupertino_icons: ^1.0.5

dev_dependencies:
flutter_test:
Expand Down
14 changes: 13 additions & 1 deletion lib/hub_connection.dart
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ class HubConnection {
// connectionStarted is tracked independently from connectionState, so we can check if the
// connection ever did successfully transition from connecting to connected before disconnecting.
late bool _connectionStarted;
GeneralError? _handShakeError;
Future<void>? _startPromise;
Future<void>? _stopPromise;

Expand Down Expand Up @@ -194,6 +195,7 @@ class HubConnection {
HandshakeRequestMessage(_protocol.name, _protocol.version);

_logger?.finer("Sending handshake request.");
_handShakeError = null;

await _sendMessage(
_handshakeProtocol.writeHandshakeRequest(handshakeRequest));
Expand All @@ -205,7 +207,14 @@ class HubConnection {
_resetTimeoutPeriod();
_resetKeepAliveInterval();

await _handshakeCompleter!.future;
if (_handshakeCompleter != null) {
await _handshakeCompleter!.future;
_handShakeError = null;
} else if (_handShakeError != null) {
{
throw _handShakeError!;
}
}

// It's important to check the stopDuringStartError instead of just relying on the handshakePromise
// being rejected on close, because this continuation can run after both the handshake completed successfully
Expand Down Expand Up @@ -587,6 +596,8 @@ class HubConnection {
if (!_handshakeCompleter!.isCompleted) {
_handshakeCompleter?.completeError(error);
}

_handShakeError = error;
_handshakeCompleter = null;
throw error;
}
Expand All @@ -600,6 +611,7 @@ class HubConnection {
if (!_handshakeCompleter!.isCompleted) {
_handshakeCompleter?.completeError(error);
}
_handShakeError = error;
_handshakeCompleter = null;
throw error;
} else {
Expand Down
59 changes: 39 additions & 20 deletions lib/server_sent_events_transport.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import 'dart:async';

import 'package:logging/logging.dart';
import 'package:sse_channel/sse_channel.dart';
import 'package:signalr_netcore/sse_channel/src/channel.dart';

import 'errors.dart';
import 'itransport.dart';
Expand All @@ -16,6 +16,7 @@ class ServerSentEventsTransport implements ITransport {
final Logger? _logger;
final bool _logMessageContent;
SseChannel? _sseClient;
StreamSubscription? _sseStreamSubscription;
String? _url;

@override
Expand All @@ -38,28 +39,38 @@ class ServerSentEventsTransport implements ITransport {
@override
Future<void> connect(String? url, TransferFormat transferFormat) async {
assert(!isStringEmpty(url));

_logger?.finest("(SSE transport) Connecting");

// set url before accessTokenFactory because this.url is only for send and we set the auth header instead of the query string for send
// set url before accessTokenFactory because this.url is only for send and
// we set the auth header instead of the query string for send
_url = url;

if (_accessTokenFactory != null) {
final token = await _accessTokenFactory!();

if (!isStringEmpty(token)) {
final encodedToken = Uri.encodeComponent(token);

url = url! +
(url.indexOf("?") < 0 ? "?" : "&") +
"access_token=$encodedToken";
}
}

var opened = false;

if (transferFormat != TransferFormat.Text) {
return Future.error(GeneralError(
"The Server-Sent Events transport only supports the 'Text' transfer format"));
return Future.error(
GeneralError(
"The Server-Sent Events transport only "
"supports the 'Text' transfer format",
),
);
}

SseChannel client;

try {
client = SseChannel.connect(Uri.parse(url!));
_logger?.finer('(SSE transport) connected to $url');
Expand All @@ -69,23 +80,27 @@ class ServerSentEventsTransport implements ITransport {
return Future.error(e);
}

_sseClient!.stream.listen((data) {
if (onReceive != null) {
try {
_logger?.finest(
'(SSE transport) data received. ${getDataDetail(data, _logMessageContent)}.');
onReceive!(data);
} catch (error) {
_close(error: error);
return;
_sseStreamSubscription = _sseClient!.stream.listen(
(data) {
if (onReceive != null) {
try {
_logger?.finest(
'(SSE transport) data received. ${getDataDetail(data, _logMessageContent)}.');
onReceive!(data);
} catch (error) {
_close(error: error);
return;
}
}
}
}, onError: (e) {
_logger?.severe('(SSE transport) error when listening to stream: $e');
if (opened) {
_close(error: e);
}
});
},
onError: (e) {
_logger?.severe('(SSE transport) error when listening to stream: $e');

if (opened) {
_close(error: e);
}
},
);
}

@override
Expand All @@ -107,12 +122,16 @@ class ServerSentEventsTransport implements ITransport {

@override
Future<void> stop() {
_logger?.finest("(SSE transport) Disconnecting");

_close();

return Future.value(null);
}

_close({dynamic error}) {
if (_sseClient != null) {
_sseStreamSubscription?.cancel();
_sseClient = null;

if (onClose != null) {
Expand Down
21 changes: 21 additions & 0 deletions lib/sse_channel/html.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
import 'dart:async';

import 'package:signalr_netcore/sse_channel/src/channel.dart';
import 'package:sse/client/sse_client.dart';
import 'package:stream_channel/stream_channel.dart';

class HtmlSseChannel extends StreamChannelMixin implements SseChannel {
HtmlSseChannel(this.client);

factory HtmlSseChannel.connect(Uri url) {
return HtmlSseChannel(SseClient(url.toString()));
}

final SseClient client;

@override
StreamSink get sink => client.sink;

@override
Stream get stream => client.stream;
}
122 changes: 122 additions & 0 deletions lib/sse_channel/io.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
import 'dart:async';
import 'dart:convert';

import 'package:http/http.dart';
import 'package:pool/pool.dart';
import 'package:signalr_netcore/sse_channel/src/channel.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:uuid/uuid.dart';

import 'src/event_source_transformer.dart';

final _requestPool = Pool(1000);

typedef OnConnected = void Function();

class IOSseChannel extends StreamChannelMixin implements SseChannel {
int _lastMessageId = -1;
final Uri _serverUrl;
final String _clientId;
late final StreamController<String?> _incomingController;
late final StreamController<String?> _outgoingController;
final _onConnected = Completer();

StreamSubscription? _incomingSubscription;
StreamSubscription? _outgoingSubscription;

@override
StreamSink get sink => _outgoingController.sink;

@override
Stream get stream => _incomingController.stream;

factory IOSseChannel.connect(Uri url) {
return IOSseChannel._(url);
}

IOSseChannel._(Uri serverUrl)
: _serverUrl = serverUrl,
_clientId = Uuid().v4(),
_outgoingController = StreamController<String?>() {
_incomingController = StreamController<String?>.broadcast(
onListen: () => _initialize(),
onCancel: () => _stop(),
);

_onConnected.future.whenComplete(() {
return _outgoingSubscription =
_outgoingController.stream.listen(_onOutgoingMessage);
});
}

Future<void> _initialize() async {
final queryParameters = Map<String, String>();
queryParameters.addAll({'sseClientId': _clientId});
queryParameters.addAll(_serverUrl.queryParameters);

final request = Request(
'GET',
_serverUrl.replace(queryParameters: queryParameters),
)..headers['Accept'] = 'text/event-stream';

await Client().send(request).then((response) {
if (response.statusCode == 200) {
_incomingSubscription =
response.stream.transform(EventSourceTransformer()).listen((event) {
_incomingController.sink.add(event.data);
});

_onConnected.complete();
} else {
_incomingController.addError(
SseClientException('Failed to connect to $_serverUrl'),
);
}
});
}

void _stop() {
_incomingSubscription?.cancel();
_outgoingSubscription?.cancel();
_incomingController.sink.close();
_incomingController.close();
_outgoingController.sink.close();
_outgoingController.close();
}

Future<void> _onOutgoingMessage(String? message) {
String? encodedMessage;

return _requestPool.withResource(() async {
try {
encodedMessage = jsonEncode(message);
} on JsonUnsupportedObjectError {
//_logger.warning('[$_clientId] Unable to encode outgoing message: $e');
} on ArgumentError {
//_logger.warning('[$_clientId] Invalid argument: $e');
}

try {
final url =
'$_serverUrl?sseClientId=$_clientId&messageId=${_lastMessageId++}';
await post(Uri.parse(url), body: encodedMessage);
} catch (error) {
//final augmentedError =
// '[$_clientId] SSE client failed to send $message:\n $error';
//_logger.severe(augmentedError);
//_closeWithError(augmentedError);
}
});
}
}

class SseClientException implements Exception {
final String message;

const SseClientException(this.message);

@override
String toString() {
return 'SseClientException: $message';
}
}
6 changes: 6 additions & 0 deletions lib/sse_channel/src/_connect_api.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
import 'channel.dart';

/// Creates a new Server Sent Events connection.
SseChannel connect(Uri url) {
throw UnsupportedError('No implementation of the connect api provided');
}
5 changes: 5 additions & 0 deletions lib/sse_channel/src/_connect_html.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import 'package:signalr_netcore/sse_channel/html.dart';

import 'channel.dart';

SseChannel connect(Uri url) => HtmlSseChannel.connect(url);
6 changes: 6 additions & 0 deletions lib/sse_channel/src/_connect_io.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@

import 'package:signalr_netcore/sse_channel/io.dart';

import 'channel.dart';

SseChannel connect(Uri url) => IOSseChannel.connect(url);
12 changes: 12 additions & 0 deletions lib/sse_channel/src/channel.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import 'package:stream_channel/stream_channel.dart';

// ignore: uri_does_not_exist
import '_connect_api.dart'
// ignore: uri_does_not_exist
if (dart.library.html) '_connect_html.dart'
// ignore: uri_does_not_exist
if (dart.library.io) '_connect_io.dart' as platform;

abstract class SseChannel extends StreamChannelMixin {
factory SseChannel.connect(Uri url) => platform.connect(url);
}
Loading

0 comments on commit 84e583b

Please sign in to comment.