From 27e0793fa17a61e62c498bb268393bda15c41bbf Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Fri, 29 Aug 2025 15:33:32 -0700 Subject: [PATCH 01/12] feat: Add support for getting headers on event source connection. --- .../lib/launchdarkly_event_source_client.dart | 18 ++-- .../event_source_client/lib/src/events.dart | 98 +++++++++++++++++++ .../lib/src/message_event.dart | 31 ------ .../lib/src/sse_client_html.dart | 10 +- .../lib/src/sse_client_http.dart | 12 ++- .../lib/src/state_connecting.dart | 2 + .../lib/src/state_value_object.dart | 7 +- .../lib/src/stateful_sse_parser.dart | 2 +- .../event_source_client/test/events_test.dart | 71 ++++++++++++++ .../test/sse_client_http_test.dart | 2 +- .../test/state_connected_test.dart | 2 +- .../test/stateful_sse_parser_test.dart | 2 +- .../event_source_client/test/test_utils.dart | 2 +- 13 files changed, 209 insertions(+), 50 deletions(-) create mode 100644 packages/event_source_client/lib/src/events.dart delete mode 100644 packages/event_source_client/lib/src/message_event.dart create mode 100644 packages/event_source_client/test/events_test.dart diff --git a/packages/event_source_client/lib/launchdarkly_event_source_client.dart b/packages/event_source_client/lib/launchdarkly_event_source_client.dart index 2ca0bef7..2f14eb5a 100644 --- a/packages/event_source_client/lib/launchdarkly_event_source_client.dart +++ b/packages/event_source_client/lib/launchdarkly_event_source_client.dart @@ -4,12 +4,12 @@ library launchdarkly_sse; import 'dart:async'; import 'src/http_consts.dart'; -import 'src/message_event.dart'; +import 'src/events.dart'; import 'src/sse_client_stub.dart' if (dart.library.io) 'src/sse_client_http.dart' if (dart.library.js_interop) 'src/sse_client_html.dart'; -export 'src/message_event.dart' show MessageEvent; +export 'src/events.dart' show Event, MessageEvent, ConnectedEvent; /// HTTP methods supported by the event source client. enum SseHttpMethod { @@ -29,10 +29,10 @@ enum SseHttpMethod { /// An [SSEClient] that works to maintain a SSE connection to a server. /// -/// You can receive [MessageEvent]s by listening to the [stream] object. The SSEClient will -/// connect when there is a nonzero number of subscribers on [stream] and will disconnect when -/// there are zero subscribers on [stream]. In certain cases, unrecoverable errors will be -/// reported on the [stream] at which point the stream will be done. +/// You can receive [Events]s by listening to the [allEvents] object. The SSEClient will +/// connect when there is a nonzero number of subscribers on [allEvents] and will disconnect when +/// there are zero subscribers on [allEvents]. In certain cases, unrecoverable errors will be +/// reported on the [allEvents] at which point the stream will be done. /// /// The [SSEClient] will make best effort to maintain the streaming connection. abstract class SSEClient { @@ -46,8 +46,14 @@ abstract class SSEClient { /// Subscribe to this [stream] to receive events and sometimes errors. The first /// subscribe triggers the connection, so expect network delay initially. + /// The [allEvents] stream includes message events as well as other event types. + @Deprecated('[allEvents] instead') Stream get stream; + /// Subscribe to [allEvents] to receive events and sometimes errors. The first + /// subscribe triggers the connection, so expect network delay initially. + Stream get allEvents => StreamController().stream; + /// Closes the SSEClient and tears down connections and resources. Do not use the /// SSEClient after close is called, behavior is undefined at that point. Future close(); diff --git a/packages/event_source_client/lib/src/events.dart b/packages/event_source_client/lib/src/events.dart new file mode 100644 index 00000000..1b7e5bc9 --- /dev/null +++ b/packages/event_source_client/lib/src/events.dart @@ -0,0 +1,98 @@ +import 'dart:collection'; + +final class Event {} + +/// Represents a message that came across the SSE stream. +final class MessageEvent implements Event { + /// The type of the message. + final String type; + + /// The data sent in the message. + final String data; + + /// An optional message id that was provided. + final String? id; + + /// Creates the message with the provided values. + const MessageEvent(this.type, this.data, this.id); + + @override + String toString() { + return 'MessageEvent{type:$type,data:$data,id:$id}'; + } + + @override + bool operator ==(Object other) => + identical(this, other) || + other is MessageEvent && + runtimeType == other.runtimeType && + type == other.type && + data == other.data && + id == other.id; + + @override + int get hashCode => type.hashCode ^ data.hashCode ^ id.hashCode; +} + +/// Event emitted when the SSE client connects. +final class ConnectedEvent implements Event { + /// Any headers associated with the connection. + final UnmodifiableMapView? headers; + + /// Create a connected event with the specified headers. + const ConnectedEvent({this.headers}); + + @override + String toString() { + return 'ConnectedEvent{headers:$headers}'; + } + + bool _compareHeaders(UnmodifiableMapView? otherHeaders) { + if (headers == null && otherHeaders == null) { + return true; + } + if (headers != null && otherHeaders == null) { + return false; + } + if (headers == null && otherHeaders != null) { + return false; + } + var self = headers!; + var other = otherHeaders!; + if (self.length != other.length) { + return false; + } + for (var pair in self.entries) { + if (!other.containsKey(pair.key)) { + return false; + } + if (pair.value != other[pair.key]) { + return false; + } + } + return true; + } + + @override + bool operator ==(Object other) { + return identical(this, other) || + other is ConnectedEvent && _compareHeaders(other.headers); + } + + @override + int get hashCode => headers != null + ? Object.hashAllUnordered( + headers!.entries.map((item) => Object.hash(item.key, item.value))) + : null.hashCode; +} + +bool isMessageEvent(Event event) { + { + switch (event) { + case MessageEvent(): + return true; + default: + return false; + } + } +} diff --git a/packages/event_source_client/lib/src/message_event.dart b/packages/event_source_client/lib/src/message_event.dart deleted file mode 100644 index 0913a31e..00000000 --- a/packages/event_source_client/lib/src/message_event.dart +++ /dev/null @@ -1,31 +0,0 @@ -/// Represents a message that came across the SSE stream. -class MessageEvent { - /// The type of the message. - final String type; - - /// The data sent in the message. - final String data; - - /// An optional message id that was provided. - final String? id; - - /// Creates the message with the provided values. - MessageEvent(this.type, this.data, this.id); - - @override - String toString() { - return '{type:$type,data:$data,id:$id}'; - } - - @override - bool operator ==(Object other) => - identical(this, other) || - other is MessageEvent && - runtimeType == other.runtimeType && - type == other.type && - data == other.data && - id == other.id; - - @override - int get hashCode => type.hashCode ^ data.hashCode ^ id.hashCode; -} diff --git a/packages/event_source_client/lib/src/sse_client_html.dart b/packages/event_source_client/lib/src/sse_client_html.dart index 6fbcd7c1..1ca0dd80 100644 --- a/packages/event_source_client/lib/src/sse_client_html.dart +++ b/packages/event_source_client/lib/src/sse_client_html.dart @@ -6,7 +6,7 @@ import 'dart:math' as math; import '../launchdarkly_event_source_client.dart'; import 'backoff.dart'; -import 'message_event.dart' as ld_message_event; +import 'events.dart' as ld_message_event; /// An [SSEClient] that uses the [web.EventSource] available on most browsers for web platform support. class HtmlSseClient implements SSEClient { @@ -82,8 +82,12 @@ class HtmlSseClient implements SSEClient { /// Subscribe to this [stream] to receive events and sometimes errors. The first /// subscribe triggers the connection, so expect a network delay initially. @override - Stream get stream => - _messageEventsController.stream; + Stream get stream => _messageEventsController.stream + .where((t) => ld_message_event.isMessageEvent(t)) + .cast(); + + @override + Stream get allEvents => _messageEventsController.stream; @override Future close() => _messageEventsController.close(); diff --git a/packages/event_source_client/lib/src/sse_client_http.dart b/packages/event_source_client/lib/src/sse_client_http.dart index 8b9c5f26..5423b253 100644 --- a/packages/event_source_client/lib/src/sse_client_http.dart +++ b/packages/event_source_client/lib/src/sse_client_http.dart @@ -4,6 +4,7 @@ import 'dart:math' as math; import 'package:http/http.dart' as http; import '../launchdarkly_event_source_client.dart'; +import 'events.dart' show isMessageEvent; import 'state_idle.dart'; import 'state_value_object.dart'; @@ -18,7 +19,7 @@ class HttpSseClient implements SSEClient { static const defaultReadTimeout = Duration(minutes: 5); /// This controller is for the events going to the subscribers of this client. - late final StreamController _messageEventsController; + late final StreamController _messageEventsController; /// This controller is for controlling the internal state machine when subscribers /// subscribe / unsubscribe. @@ -53,7 +54,7 @@ class HttpSseClient implements SSEClient { math.Random random, String? body, String httpMethod) { - _messageEventsController = StreamController.broadcast( + _messageEventsController = StreamController.broadcast( // this is triggered when first listener subscribes onListen: () => _connectionDesiredStateController.add(true), // this is triggered when last listener unsubscribes @@ -79,7 +80,12 @@ class HttpSseClient implements SSEClient { /// Subscribe to this [stream] to receive events and sometimes errors. The first /// subscribe triggers the connection, so expect a network delay initially. @override - Stream get stream => _messageEventsController.stream; + Stream get stream => _messageEventsController.stream + .where((t) => isMessageEvent(t)) + .cast(); + + @override + Stream get allEvents => _messageEventsController.stream; @override Future close() async { diff --git a/packages/event_source_client/lib/src/state_connecting.dart b/packages/event_source_client/lib/src/state_connecting.dart index e2864d9d..9e02bcb0 100644 --- a/packages/event_source_client/lib/src/state_connecting.dart +++ b/packages/event_source_client/lib/src/state_connecting.dart @@ -73,6 +73,8 @@ class StateConnecting { return () => StateBackoff.run(svo); } + svo.connectHeaders = response.headers; + return () => StateConnected.run(svo, client, response.stream); } on TimeoutException { // didn't connect in a timely manner, so backoff then we'll try again diff --git a/packages/event_source_client/lib/src/state_value_object.dart b/packages/event_source_client/lib/src/state_value_object.dart index d48b2a33..7a147bc4 100644 --- a/packages/event_source_client/lib/src/state_value_object.dart +++ b/packages/event_source_client/lib/src/state_value_object.dart @@ -3,7 +3,7 @@ import 'package:http/http.dart' as http; import 'dart:math' as math; import 'backoff.dart'; -import 'message_event.dart'; +import 'events.dart'; typedef ClientFactory = http.Client Function(); @@ -31,7 +31,7 @@ class StateValues { // This is a broadcast stream, and the request is only posted if there are // listeners, so it is an ephemeral trigger. final Stream resetRequest; - final EventSink eventSink; + final EventSink eventSink; final Sink transitionSink; // for testing transitions final ClientFactory clientFactory; final math.Random random; @@ -45,6 +45,9 @@ class StateValues { /// The most recently received event ID from the server. Used for resumption. String lastId = ''; + /// Headers received from the connection. + Map? connectHeaders; + /// Creates a [_StateValues] instance. Used by the state machine. StateValues( this.uri, diff --git a/packages/event_source_client/lib/src/stateful_sse_parser.dart b/packages/event_source_client/lib/src/stateful_sse_parser.dart index 5147cda0..53da41eb 100644 --- a/packages/event_source_client/lib/src/stateful_sse_parser.dart +++ b/packages/event_source_client/lib/src/stateful_sse_parser.dart @@ -1,6 +1,6 @@ import 'dart:async'; -import 'message_event.dart'; +import 'events.dart'; /// Enum for tracking the category of the last parsed rune. enum _LastParsed { diff --git a/packages/event_source_client/test/events_test.dart b/packages/event_source_client/test/events_test.dart new file mode 100644 index 00000000..3ebb1adb --- /dev/null +++ b/packages/event_source_client/test/events_test.dart @@ -0,0 +1,71 @@ +import 'dart:collection'; + +import 'package:test/test.dart'; +import 'package:launchdarkly_event_source_client/src/events.dart'; + +class CompareCase { + final Event a; + final Event b; + final bool result; + + const CompareCase(this.a, this.b, this.result); + + @override + String toString() { + return '$a, $b, $result'; + } +} + +void main() { + group('given different connect messages', () { + var cases = [ + CompareCase(ConnectedEvent(), ConnectedEvent(), true), + CompareCase( + ConnectedEvent(), + ConnectedEvent(headers: UnmodifiableMapView({'key': 'value'})), + false), + CompareCase( + ConnectedEvent(headers: UnmodifiableMapView({'key': 'value'})), + ConnectedEvent(), + false), + CompareCase( + ConnectedEvent(headers: UnmodifiableMapView({'key': 'value'})), + ConnectedEvent(headers: UnmodifiableMapView({'key': 'value'})), + true), + CompareCase( + ConnectedEvent(headers: UnmodifiableMapView({'key': 'valueA'})), + ConnectedEvent(headers: UnmodifiableMapView({'key': 'value'})), + false), + CompareCase( + ConnectedEvent( + headers: + UnmodifiableMapView({'key': 'value', 'second': 'value'})), + ConnectedEvent(headers: UnmodifiableMapView({'key': 'value'})), + false), + CompareCase( + ConnectedEvent( + headers: + UnmodifiableMapView({'key': 'value', 'second': 'value'})), + ConnectedEvent( + headers: + UnmodifiableMapView({'second': 'value', 'key': 'value'})), + true), + ]; + + for (var testCase in cases) { + test('Compare $testCase', () { + expect(testCase.a == testCase.b, equals(testCase.result)); + }); + + test('HashCode $testCase', () { + var codeA = testCase.a.hashCode; + var codeB = testCase.b.hashCode; + if (testCase.result) { + expect(codeA, equals(codeB)); + } else { + expect(codeA, isNot(equals(codeB))); + } + }); + } + }); +} diff --git a/packages/event_source_client/test/sse_client_http_test.dart b/packages/event_source_client/test/sse_client_http_test.dart index 91e593d0..1cb7057a 100644 --- a/packages/event_source_client/test/sse_client_http_test.dart +++ b/packages/event_source_client/test/sse_client_http_test.dart @@ -7,7 +7,7 @@ import 'dart:math' as math; import 'package:http/http.dart'; import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; import 'package:launchdarkly_event_source_client/src/http_consts.dart'; -import 'package:launchdarkly_event_source_client/src/message_event.dart'; +import 'package:launchdarkly_event_source_client/src/events.dart'; import 'package:launchdarkly_event_source_client/src/sse_client_http.dart'; import 'package:launchdarkly_event_source_client/src/state_connected.dart'; import 'package:launchdarkly_event_source_client/src/state_connecting.dart'; diff --git a/packages/event_source_client/test/state_connected_test.dart b/packages/event_source_client/test/state_connected_test.dart index f19a7d88..7e4a9e03 100644 --- a/packages/event_source_client/test/state_connected_test.dart +++ b/packages/event_source_client/test/state_connected_test.dart @@ -4,7 +4,7 @@ import 'dart:async'; import 'dart:convert'; import 'package:http/http.dart'; -import 'package:launchdarkly_event_source_client/src/message_event.dart'; +import 'package:launchdarkly_event_source_client/src/events.dart'; import 'package:launchdarkly_event_source_client/src/state_backoff.dart'; import 'package:launchdarkly_event_source_client/src/state_connected.dart'; import 'package:launchdarkly_event_source_client/src/state_idle.dart'; diff --git a/packages/event_source_client/test/stateful_sse_parser_test.dart b/packages/event_source_client/test/stateful_sse_parser_test.dart index f1dffbd1..510aba6d 100644 --- a/packages/event_source_client/test/stateful_sse_parser_test.dart +++ b/packages/event_source_client/test/stateful_sse_parser_test.dart @@ -1,6 +1,6 @@ import 'dart:async'; -import 'package:launchdarkly_event_source_client/src/message_event.dart'; +import 'package:launchdarkly_event_source_client/src/events.dart'; import 'package:launchdarkly_event_source_client/src/stateful_sse_parser.dart'; import 'package:test/test.dart'; import 'package:mocktail/mocktail.dart'; diff --git a/packages/event_source_client/test/test_utils.dart b/packages/event_source_client/test/test_utils.dart index 032ccfa5..0597018a 100644 --- a/packages/event_source_client/test/test_utils.dart +++ b/packages/event_source_client/test/test_utils.dart @@ -7,7 +7,7 @@ import 'dart:math' as math; import 'package:http/http.dart'; import 'package:http/testing.dart'; import 'package:launchdarkly_event_source_client/src/http_consts.dart'; -import 'package:launchdarkly_event_source_client/src/message_event.dart'; +import 'package:launchdarkly_event_source_client/src/events.dart'; import 'package:launchdarkly_event_source_client/src/state_value_object.dart'; class TestUtils { From ace28b830d03b895964e0584aad506502367e0dc Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 2 Sep 2025 09:18:49 -0700 Subject: [PATCH 02/12] Remove event source from melow workspace. --- melos.yaml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/melos.yaml b/melos.yaml index 303547cb..9dee3418 100644 --- a/melos.yaml +++ b/melos.yaml @@ -4,7 +4,9 @@ environment: sdk: '>=3.4.0 <4.0.0' packages: - - packages/* + # Remove the event_source_client from the workspace temporarily to allow a breaking change. + - packages/common + - packages/common_client - packages/flutter_client_sdk/example - apps/* From e885ac4fad969f2e03500a9d2f62c00fa1ee1ac9 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 2 Sep 2025 12:19:19 -0700 Subject: [PATCH 03/12] Add test implementation. --- .../pubspec.lock | 8 +- .../streaming_data_source_test.dart | 19 +--- .../lib/launchdarkly_event_source_client.dart | 32 +++++++ .../lib/src/message_event.dart | 2 + .../lib/src/sse_client_test.dart | 86 +++++++++++++++++++ 5 files changed, 126 insertions(+), 21 deletions(-) create mode 100644 packages/event_source_client/lib/src/sse_client_test.dart diff --git a/apps/flutter_client_contract_test_service/pubspec.lock b/apps/flutter_client_contract_test_service/pubspec.lock index 792bb2f5..655c70ab 100644 --- a/apps/flutter_client_contract_test_service/pubspec.lock +++ b/apps/flutter_client_contract_test_service/pubspec.lock @@ -414,7 +414,7 @@ packages: path: "../../packages/flutter_client_sdk" relative: true source: path - version: "4.11.0" + version: "4.11.1" lints: dependency: "direct dev" description: @@ -872,10 +872,10 @@ packages: dependency: transitive description: name: vector_math - sha256: "80b3257d1492ce4d091729e3a67a60407d227c27241d6927be0130c98e741803" + sha256: d530bd74fea330e6e364cda7a85019c434070188383e1cd8d9777ee586914c5b url: "https://pub.dev" source: hosted - version: "2.1.4" + version: "2.2.0" vm_service: dependency: transitive description: @@ -957,5 +957,5 @@ packages: source: hosted version: "3.1.3" sdks: - dart: ">=3.7.0-0 <4.0.0" + dart: ">=3.8.0-0 <4.0.0" flutter: ">=3.22.0" diff --git a/packages/common_client/test/data_sources/streaming_data_source_test.dart b/packages/common_client/test/data_sources/streaming_data_source_test.dart index 4ae337c8..f9cca2c7 100644 --- a/packages/common_client/test/data_sources/streaming_data_source_test.dart +++ b/packages/common_client/test/data_sources/streaming_data_source_test.dart @@ -12,21 +12,6 @@ import 'package:launchdarkly_common_client/src/flag_manager/flag_manager.dart'; import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; import 'package:test/test.dart'; -class MockSseClient implements SSEClient { - final Stream mockStream; - - MockSseClient(this.mockStream); - - @override - Future close() async {} - - @override - void restart() {} - - @override - Stream get stream => mockStream; -} - ( StreamingDataSource, FlagManager, @@ -45,7 +30,6 @@ class MockSseClient implements SSEClient { final logger = LDLogger(); final httpProperties = inProperties ?? HttpProperties(); - final client = MockSseClient(mockStream); const sdkKey = 'dummy-key'; final flagManager = FlagManager(sdkKey: sdkKey, logger: logger, maxCachedContexts: 5); @@ -62,7 +46,8 @@ class MockSseClient implements SSEClient { clientFactory: (Uri uri, HttpProperties properties, String? body, SseHttpMethod? method) { factoryCallback?.call(uri, properties, body, method); - return client; + return SSEClient.testClient(uri, {'put', 'patch', 'delete'}, + sourceStream: mockStream); }); streaming.events.asyncMap((event) async { diff --git a/packages/event_source_client/lib/launchdarkly_event_source_client.dart b/packages/event_source_client/lib/launchdarkly_event_source_client.dart index 2ca0bef7..608f9c15 100644 --- a/packages/event_source_client/lib/launchdarkly_event_source_client.dart +++ b/packages/event_source_client/lib/launchdarkly_event_source_client.dart @@ -2,14 +2,17 @@ library launchdarkly_sse; import 'dart:async'; +import 'dart:collection'; import 'src/http_consts.dart'; import 'src/message_event.dart'; import 'src/sse_client_stub.dart' if (dart.library.io) 'src/sse_client_http.dart' if (dart.library.js_interop) 'src/sse_client_html.dart'; +import 'src/sse_client_test.dart'; export 'src/message_event.dart' show MessageEvent; +export 'src/sse_client_test.dart' show TestSseClient; /// HTTP methods supported by the event source client. enum SseHttpMethod { @@ -90,4 +93,33 @@ abstract class SSEClient { return getSSEClient(uri, eventTypes, mergedHeaders, connectTimeout, readTimeout, body, httpMethod.toString()); } + + /// Get an SSE client for use in unit tests. + /// + /// Most parameters are the same as those of the main SSEClient factory, but + /// the test client supports an additional property which is the [sourceStream]. + /// Events sent to the [sourceStream] will also be emitted by the event source + /// if the event source has listeners. When a user unsubscribes from the event + /// stream, then the test client will unsubscribe from the source stream. + /// + /// This method is primarily for use the the LaunchDarkly SDK implementation. + /// Changes may be made to this API without following semantic conventions. + static TestSseClient testClient( + Uri uri, + Set eventTypes, { + Map headers = defaultHeaders, + Duration connectTimeout = defaultConnectTimeout, + Duration readTimeout = defaultReadTimeout, + String? body, + SseHttpMethod httpMethod = SseHttpMethod.get, + Stream? sourceStream, + }) { + return TestSseClient.internal( + headers: UnmodifiableMapView(headers), + connectTimeout: connectTimeout, + readTimeout: readTimeout, + body: body, + httpMethod: httpMethod, + sourceStream: sourceStream); + } } diff --git a/packages/event_source_client/lib/src/message_event.dart b/packages/event_source_client/lib/src/message_event.dart index 0913a31e..1cab45d8 100644 --- a/packages/event_source_client/lib/src/message_event.dart +++ b/packages/event_source_client/lib/src/message_event.dart @@ -10,6 +10,8 @@ class MessageEvent { final String? id; /// Creates the message with the provided values. + /// Implementation note: Any new parameters should be added as new optional + /// parameters unless added in a major version. MessageEvent(this.type, this.data, this.id); @override diff --git a/packages/event_source_client/lib/src/sse_client_test.dart b/packages/event_source_client/lib/src/sse_client_test.dart new file mode 100644 index 00000000..0e090b22 --- /dev/null +++ b/packages/event_source_client/lib/src/sse_client_test.dart @@ -0,0 +1,86 @@ +import 'dart:async'; +import 'dart:collection'; + +import '../launchdarkly_event_source_client.dart'; + +const String _simulatedErrorString = + 'an error has occurred, any potential string may be provided here and it should not be treated as an interface'; + +/// An SSE client to use for testing. +/// +/// Changes may be made to this class without following semantic conventions. +final class TestSseClient implements SSEClient { + final UnmodifiableMapView headers; + final Duration connectTimeout; + final Duration readTimeout; + final String? body; + final SseHttpMethod httpMethod; + late final Stream? _sourceStream; + StreamSubscription? _sourceStreamSubscription; + + /// This controller is for the events going to the subscribers of this client. + late final StreamController _messageEventsController; + + @override + Future close() async { + _messageEventsController.close(); + } + + @override + void restart() {} + + @override + Stream get stream => _messageEventsController.stream; + + /// Emit an event on the stream. + /// Has no effect if the client has been closed. + /// + /// [event] The event to emit. + void emitEvent(MessageEvent event) { + if (_messageEventsController.isClosed) { + return; + } + _messageEventsController.sink.add(event); + } + + /// Emit an error event. + /// + /// [error] The error to emit. The event source makes no contract about the + /// type of errors it will emit. If not error is provided, then a default + /// error will be emitted. + void emitError({Object? error}) { + if (_messageEventsController.isClosed) { + return; + } + if (error != null) { + _messageEventsController.sink.addError(error); + } else { + _messageEventsController.sink.addError(Exception(_simulatedErrorString)); + } + } + + TestSseClient.internal({ + required this.headers, + required this.connectTimeout, + required this.readTimeout, + required this.body, + required this.httpMethod, + Stream? sourceStream, + }) { + _sourceStream = sourceStream; + _messageEventsController = StreamController.broadcast( + onListen: () { + _sourceStreamSubscription = _sourceStream?.listen((event) { + emitEvent(event); + }); + _sourceStreamSubscription?.onError((error) { + emitError(); + }); + }, + onCancel: () { + _sourceStreamSubscription?.cancel(); + close(); + }, + ); + } +} From ed5eb103f484641203d4876c76378bb109121b26 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 2 Sep 2025 14:07:57 -0700 Subject: [PATCH 04/12] Fix existing tests. --- .../lib/launchdarkly_event_source_client.dart | 4 ++-- .../lib/src/{sse_client_test.dart => test_sse_client.dart} | 0 packages/event_source_client/test/sse_client_http_test.dart | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) rename packages/event_source_client/lib/src/{sse_client_test.dart => test_sse_client.dart} (100%) diff --git a/packages/event_source_client/lib/launchdarkly_event_source_client.dart b/packages/event_source_client/lib/launchdarkly_event_source_client.dart index 7a50418d..6512652e 100644 --- a/packages/event_source_client/lib/launchdarkly_event_source_client.dart +++ b/packages/event_source_client/lib/launchdarkly_event_source_client.dart @@ -9,10 +9,10 @@ import 'src/events.dart'; import 'src/sse_client_stub.dart' if (dart.library.io) 'src/sse_client_http.dart' if (dart.library.js_interop) 'src/sse_client_html.dart'; -import 'src/sse_client_test.dart'; +import 'src/test_sse_client.dart'; export 'src/events.dart' show Event, MessageEvent, OpenEvent; -export 'src/sse_client_test.dart' show TestSseClient; +export 'src/test_sse_client.dart' show TestSseClient; /// HTTP methods supported by the event source client. enum SseHttpMethod { diff --git a/packages/event_source_client/lib/src/sse_client_test.dart b/packages/event_source_client/lib/src/test_sse_client.dart similarity index 100% rename from packages/event_source_client/lib/src/sse_client_test.dart rename to packages/event_source_client/lib/src/test_sse_client.dart diff --git a/packages/event_source_client/test/sse_client_http_test.dart b/packages/event_source_client/test/sse_client_http_test.dart index 8d6c1bea..3cb1c9cb 100644 --- a/packages/event_source_client/test/sse_client_http_test.dart +++ b/packages/event_source_client/test/sse_client_http_test.dart @@ -32,7 +32,7 @@ void main() { Uri.parse('/path'), {'put'}, {}, - Duration(days: 99), + Duration(days: 99), Duration(days: 99), transitionController.sink, TestUtils.makeMockHttpClient, @@ -94,7 +94,7 @@ void main() { // this expect statement will register a listener on the stream triggering the client to // connect to the mock client. var messageEvent = await sseClientUnderTest.stream.first; - expect(messageEvent.data, equals('helloworld')); + expect((messageEvent as MessageEvent).data, equals('helloworld')); sseClientUnderTest.close(); }); From cfa03b461913a02a40931216a0bca7400002b3c0 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 2 Sep 2025 14:09:58 -0700 Subject: [PATCH 05/12] Revert change from common client. --- .../streaming_data_source_test.dart | 19 +++++++++++++++++-- 1 file changed, 17 insertions(+), 2 deletions(-) diff --git a/packages/common_client/test/data_sources/streaming_data_source_test.dart b/packages/common_client/test/data_sources/streaming_data_source_test.dart index f9cca2c7..4ae337c8 100644 --- a/packages/common_client/test/data_sources/streaming_data_source_test.dart +++ b/packages/common_client/test/data_sources/streaming_data_source_test.dart @@ -12,6 +12,21 @@ import 'package:launchdarkly_common_client/src/flag_manager/flag_manager.dart'; import 'package:launchdarkly_event_source_client/launchdarkly_event_source_client.dart'; import 'package:test/test.dart'; +class MockSseClient implements SSEClient { + final Stream mockStream; + + MockSseClient(this.mockStream); + + @override + Future close() async {} + + @override + void restart() {} + + @override + Stream get stream => mockStream; +} + ( StreamingDataSource, FlagManager, @@ -30,6 +45,7 @@ import 'package:test/test.dart'; final logger = LDLogger(); final httpProperties = inProperties ?? HttpProperties(); + final client = MockSseClient(mockStream); const sdkKey = 'dummy-key'; final flagManager = FlagManager(sdkKey: sdkKey, logger: logger, maxCachedContexts: 5); @@ -46,8 +62,7 @@ import 'package:test/test.dart'; clientFactory: (Uri uri, HttpProperties properties, String? body, SseHttpMethod? method) { factoryCallback?.call(uri, properties, body, method); - return SSEClient.testClient(uri, {'put', 'patch', 'delete'}, - sourceStream: mockStream); + return client; }); streaming.events.asyncMap((event) async { From 91984b8e0b0c98d301652dcbbd38589885a73b53 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 2 Sep 2025 14:51:30 -0700 Subject: [PATCH 06/12] Make sure everything uses Event and not MessageEvent. --- .../event_source_client/lib/src/events.dart | 2 +- .../lib/src/sse_client_html.dart | 3 +-- .../lib/src/state_connected.dart | 6 +++++ .../lib/src/stateful_sse_parser.dart | 4 ++-- .../event_source_client/test/events_test.dart | 24 +++++++------------ .../test/sse_client_http_test.dart | 4 ++-- .../test/state_connected_test.dart | 2 +- .../test/stateful_sse_parser_test.dart | 4 ++-- .../event_source_client/test/test_utils.dart | 4 ++-- 9 files changed, 25 insertions(+), 28 deletions(-) diff --git a/packages/event_source_client/lib/src/events.dart b/packages/event_source_client/lib/src/events.dart index 37ea75cb..633c5fc1 100644 --- a/packages/event_source_client/lib/src/events.dart +++ b/packages/event_source_client/lib/src/events.dart @@ -48,7 +48,7 @@ final class OpenEvent implements Event { @override String toString() { - return 'ConnectedEvent{headers:$headers}'; + return 'OpenEvent{headers:$headers}'; } bool _compareHeaders(UnmodifiableMapView? otherHeaders) { diff --git a/packages/event_source_client/lib/src/sse_client_html.dart b/packages/event_source_client/lib/src/sse_client_html.dart index 64d545d7..707ba24a 100644 --- a/packages/event_source_client/lib/src/sse_client_html.dart +++ b/packages/event_source_client/lib/src/sse_client_html.dart @@ -14,8 +14,7 @@ class HtmlSseClient implements SSEClient { web.EventSource? _eventSource; /// This controller is for the events going to the subscribers of this client. - late final StreamController - _messageEventsController; + late final StreamController _messageEventsController; Backoff _backoff = Backoff(math.Random()); diff --git a/packages/event_source_client/lib/src/state_connected.dart b/packages/event_source_client/lib/src/state_connected.dart index 6610cd05..4cba2a36 100644 --- a/packages/event_source_client/lib/src/state_connected.dart +++ b/packages/event_source_client/lib/src/state_connected.dart @@ -1,7 +1,9 @@ import 'dart:async'; +import 'dart:collection'; import 'dart:convert'; import 'package:http/http.dart' as http; +import 'events.dart'; import 'state_backoff.dart'; import 'state_idle.dart'; import 'state_value_object.dart'; @@ -15,6 +17,10 @@ class StateConnected { StateValues svo, http.Client client, Stream> stream) async { // record transition to this state for testing/logging svo.transitionSink.add(StateConnected); + svo.eventSink.add(OpenEvent( + headers: svo.connectHeaders != null + ? UnmodifiableMapView(svo.connectHeaders!) + : null)); // wait for either the stream to terminate or desired connection change to transition final transition = await Future.any([ diff --git a/packages/event_source_client/lib/src/stateful_sse_parser.dart b/packages/event_source_client/lib/src/stateful_sse_parser.dart index 53da41eb..c0565d9d 100644 --- a/packages/event_source_client/lib/src/stateful_sse_parser.dart +++ b/packages/event_source_client/lib/src/stateful_sse_parser.dart @@ -47,7 +47,7 @@ class StatefulSSEParser { /// This function will iterate over the SSE stream [chunk] provided and statefully process it. /// When warranted, [MessageEvent]s may be sent to the provided [sink]. Subsequent calls /// with subsequent [chunk]s provided will be treated as a continuation of the data stream. - void parse(String chunk, EventSink sink) { + void parse(String chunk, EventSink sink) { // switch statements are used instead of a state machine for memory and performance reasons for (var rune in chunk.runes) { switch (_lastParsed) { @@ -207,7 +207,7 @@ class StatefulSSEParser { /// This function is intended to send a [MessageEvent] to the provided [sink] when invoked. /// There are some edge cases in which dispatching will be cancelled, such as invalid data. - void _dispatchEvent(EventSink sink) { + void _dispatchEvent(EventSink sink) { // if data is empty, ignore this dispatch and reset, but don't clear id if (_dataBuffer.isEmpty) { _eventType = ''; diff --git a/packages/event_source_client/test/events_test.dart b/packages/event_source_client/test/events_test.dart index 32da87f9..98fe5460 100644 --- a/packages/event_source_client/test/events_test.dart +++ b/packages/event_source_client/test/events_test.dart @@ -20,22 +20,14 @@ void main() { group('given different connect messages', () { var cases = [ CompareCase(OpenEvent(), OpenEvent(), true), - CompareCase( - OpenEvent(), - OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), - false), - CompareCase( - OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), - OpenEvent(), - false), - CompareCase( - OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), - OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), - true), - CompareCase( - OpenEvent(headers: UnmodifiableMapView({'key': 'valueA'})), - OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), - false), + CompareCase(OpenEvent(), + OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), false), + CompareCase(OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), + OpenEvent(), false), + CompareCase(OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), + OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), true), + CompareCase(OpenEvent(headers: UnmodifiableMapView({'key': 'valueA'})), + OpenEvent(headers: UnmodifiableMapView({'key': 'value'})), false), CompareCase( OpenEvent( headers: diff --git a/packages/event_source_client/test/sse_client_http_test.dart b/packages/event_source_client/test/sse_client_http_test.dart index 3cb1c9cb..862b8fa4 100644 --- a/packages/event_source_client/test/sse_client_http_test.dart +++ b/packages/event_source_client/test/sse_client_http_test.dart @@ -32,7 +32,7 @@ void main() { Uri.parse('/path'), {'put'}, {}, - Duration(days: 99), + Duration(days: 99), Duration(days: 99), transitionController.sink, TestUtils.makeMockHttpClient, @@ -68,7 +68,7 @@ void main() { // this expect statement will register a listener on the stream triggering the client to // connect to the mock client. The mock client is set up to send a message. var messageEvent = await sseClientUnderTest.stream.first; - expect(messageEvent, isA()); + expect(messageEvent, isA()); expect((messageEvent as MessageEvent).data, equals('helloworld')); }); diff --git a/packages/event_source_client/test/state_connected_test.dart b/packages/event_source_client/test/state_connected_test.dart index 7e4a9e03..a027aae9 100644 --- a/packages/event_source_client/test/state_connected_test.dart +++ b/packages/event_source_client/test/state_connected_test.dart @@ -18,7 +18,7 @@ class MockClient extends Mock implements Client {} void main() { test('Test connected emits events', () async { final transitionController = StreamController.broadcast(); - final eventController = StreamController.broadcast(); + final eventController = StreamController.broadcast(); final dataController = StreamController>.broadcast(); final mockClient = MockClient(); // this mock client doesn't do anything in this test case diff --git a/packages/event_source_client/test/stateful_sse_parser_test.dart b/packages/event_source_client/test/stateful_sse_parser_test.dart index 510aba6d..3f2dc1ad 100644 --- a/packages/event_source_client/test/stateful_sse_parser_test.dart +++ b/packages/event_source_client/test/stateful_sse_parser_test.dart @@ -5,14 +5,14 @@ import 'package:launchdarkly_event_source_client/src/stateful_sse_parser.dart'; import 'package:test/test.dart'; import 'package:mocktail/mocktail.dart'; -class MockSink extends Mock implements EventSink {} +class MockSink extends Mock implements EventSink {} void main() { setUpAll(() { registerFallbackValue(MessageEvent('fallback', 'fallback', 'fallback')); }); - void testCase(String input, List expected) { + void testCase(String input, List expected) { final parserUnderTest = StatefulSSEParser(); final mockSink = MockSink(); parserUnderTest.parse(input, mockSink); diff --git a/packages/event_source_client/test/test_utils.dart b/packages/event_source_client/test/test_utils.dart index 0597018a..62fc019a 100644 --- a/packages/event_source_client/test/test_utils.dart +++ b/packages/event_source_client/test/test_utils.dart @@ -24,7 +24,7 @@ class TestUtils { Duration? connectTimeout, Duration? readTimeout, Stream? connectionDesired, - EventSink? eventSink, + EventSink? eventSink, Sink? transitionSink, ClientFactory? clientFactory, math.Random? random, @@ -36,7 +36,7 @@ class TestUtils { connectTimeout ?? Duration.zero, readTimeout ?? Duration.zero, connectionDesired ?? StreamController.broadcast().stream, - eventSink ?? StreamController.broadcast(), + eventSink ?? StreamController.broadcast(), transitionSink ?? StreamController.broadcast(), clientFactory ?? makeMockHttpClient, math.Random(), From 92616690a7fd396da5ba52162cadd26532a7c9b3 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 2 Sep 2025 15:04:45 -0700 Subject: [PATCH 07/12] Update parser and connected state to handle Event type. --- .../lib/src/state_connected.dart | 18 +++++++++++++----- .../test/stateful_sse_parser_test.dart | 4 +++- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/packages/event_source_client/lib/src/state_connected.dart b/packages/event_source_client/lib/src/state_connected.dart index 4cba2a36..5cd9eb2c 100644 --- a/packages/event_source_client/lib/src/state_connected.dart +++ b/packages/event_source_client/lib/src/state_connected.dart @@ -52,12 +52,20 @@ class StateConnected { recordedActiveSince = true; } - // hold on to most recent id if there is one so we can use it for session resumption - svo.lastId = event.id ?? svo.lastId; + // Implementation note: Currently only message events are supported + // by the parser, but we could potentially extend that to support + // emitting comment events. + switch (event) { + case MessageEvent(): + // hold on to most recent id if there is one so we can use it for session resumption + svo.lastId = event.id ?? svo.lastId; - // only emit events that have event types the sse client was configured to use - if (svo.eventTypes.contains(event.type)) { - svo.eventSink.add(event); + // only emit events that have event types the sse client was configured to use + if (svo.eventTypes.contains(event.type)) { + svo.eventSink.add(event); + } + default: + break; } } diff --git a/packages/event_source_client/test/stateful_sse_parser_test.dart b/packages/event_source_client/test/stateful_sse_parser_test.dart index 3f2dc1ad..e528584a 100644 --- a/packages/event_source_client/test/stateful_sse_parser_test.dart +++ b/packages/event_source_client/test/stateful_sse_parser_test.dart @@ -23,8 +23,10 @@ void main() { reason: 'Captured:{$captured}, Expected:{$expected}'); for (var i = 0; i < captured.length; i++) { - final expectedMessageEvent = expected[i]; + expect(expected[i], isA()); + final expectedMessageEvent = expected[i] as MessageEvent; final messageEvent = captured[i] as MessageEvent; + expect(messageEvent, isA()); expect(messageEvent.type, equals(expectedMessageEvent.type)); expect(messageEvent.data, equals(expectedMessageEvent.data)); expect(messageEvent.id, equals(expectedMessageEvent.id)); From 4c2dc50f9540daeb02ddb3092ef64e6ae2b6c6ca Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 2 Sep 2025 15:18:59 -0700 Subject: [PATCH 08/12] Tests --- .../test/state_connected_test.dart | 87 ++++++++++++++++++- 1 file changed, 86 insertions(+), 1 deletion(-) diff --git a/packages/event_source_client/test/state_connected_test.dart b/packages/event_source_client/test/state_connected_test.dart index a027aae9..73a24272 100644 --- a/packages/event_source_client/test/state_connected_test.dart +++ b/packages/event_source_client/test/state_connected_test.dart @@ -1,6 +1,7 @@ // ignore_for_file: close_sinks import 'dart:async'; +import 'dart:collection'; import 'dart:convert'; import 'package:http/http.dart'; @@ -16,6 +17,54 @@ import 'test_utils.dart'; class MockClient extends Mock implements Client {} void main() { + test('Test connected emits OpenEvent without headers when entered', () async { + final transitionController = StreamController.broadcast(); + final eventController = StreamController.broadcast(); + final dataController = StreamController>.broadcast(); + final mockClient = MockClient(); + + final svo = TestUtils.makeMockStateValues( + eventTypes: {'put'}, + transitionSink: transitionController, + eventSink: eventController.sink, + clientFactory: () => mockClient); + + // connectHeaders is null, so OpenEvent should have no headers + svo.connectHeaders = null; + + expectLater(transitionController.stream, emitsInOrder([StateConnected])); + expectLater(eventController.stream, emitsInOrder([OpenEvent()])); + + StateConnected.run(svo, mockClient, dataController.stream); + }); + + test('Test connected emits OpenEvent with headers when entered', () async { + final transitionController = StreamController.broadcast(); + final eventController = StreamController.broadcast(); + final dataController = StreamController>.broadcast(); + final mockClient = MockClient(); + + final svo = TestUtils.makeMockStateValues( + eventTypes: {'put'}, + transitionSink: transitionController, + eventSink: eventController.sink, + clientFactory: () => mockClient); + + // Set connectHeaders to simulate headers received from connection + svo.connectHeaders = { + 'x-custom-header': 'custom-value', + 'content-type': 'text/event-stream' + }; + + final expectedOpenEvent = + OpenEvent(headers: UnmodifiableMapView(svo.connectHeaders!)); + + expectLater(transitionController.stream, emitsInOrder([StateConnected])); + expectLater(eventController.stream, emitsInOrder([expectedOpenEvent])); + + StateConnected.run(svo, mockClient, dataController.stream); + }); + test('Test connected emits events', () async { final transitionController = StreamController.broadcast(); final eventController = StreamController.broadcast(); @@ -32,7 +81,7 @@ void main() { expectLater(transitionController.stream, emitsInOrder([StateConnected])); expectLater(eventController.stream, - emitsInOrder([MessageEvent('put', 'helloworld', '')])); + emitsInOrder([OpenEvent(), MessageEvent('put', 'helloworld', '')])); StateConnected.run(svo, mockClient, dataController.stream); dataController.add(utf8.encode('event:put\ndata:helloworld\n\n')); }); @@ -78,4 +127,40 @@ void main() { resetController.sink.add(null); await StateConnected.run(svo, mockClient, dataController.stream); }); + + test('Test OpenEvent is emitted before MessageEvents in order', () async { + final transitionController = StreamController.broadcast(); + final eventController = StreamController.broadcast(); + final dataController = StreamController>.broadcast(); + final mockClient = MockClient(); + + final svo = TestUtils.makeMockStateValues( + eventTypes: {'put', 'patch'}, + transitionSink: transitionController, + eventSink: eventController.sink, + clientFactory: () => mockClient); + + // Set connectHeaders to simulate headers received from connection + svo.connectHeaders = {'x-request-id': 'abc123', 'server': 'nginx/1.18.0'}; + + final expectedOpenEvent = + OpenEvent(headers: UnmodifiableMapView(svo.connectHeaders!)); + + // Verify that OpenEvent comes first, followed by MessageEvents in order + expectLater(transitionController.stream, emitsInOrder([StateConnected])); + expectLater( + eventController.stream, + emitsInOrder([ + expectedOpenEvent, + MessageEvent('put', 'first-message', '1'), + MessageEvent('patch', 'second-message', '2') + ])); + + StateConnected.run(svo, mockClient, dataController.stream); + + // Send multiple events to verify ordering + dataController.add(utf8.encode('id:1\nevent:put\ndata:first-message\n\n')); + dataController + .add(utf8.encode('id:2\nevent:patch\ndata:second-message\n\n')); + }); } From 0a7a73a99c0a679bf2198b3040a1b8ea6552e43e Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Tue, 2 Sep 2025 15:21:25 -0700 Subject: [PATCH 09/12] Doc --- .../lib/launchdarkly_event_source_client.dart | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/packages/event_source_client/lib/launchdarkly_event_source_client.dart b/packages/event_source_client/lib/launchdarkly_event_source_client.dart index 6512652e..b663c72a 100644 --- a/packages/event_source_client/lib/launchdarkly_event_source_client.dart +++ b/packages/event_source_client/lib/launchdarkly_event_source_client.dart @@ -32,10 +32,11 @@ enum SseHttpMethod { /// An [SSEClient] that works to maintain a SSE connection to a server. /// -/// You can receive [Events]s by listening to the [allEvents] object. The SSEClient will -/// connect when there is a nonzero number of subscribers on [allEvents] and will disconnect when -/// there are zero subscribers on [allEvents]. In certain cases, unrecoverable errors will be -/// reported on the [allEvents] at which point the stream will be done. +/// You can receive [Events]s by listening to the [stream] object. The SSEClient +/// will connect when there is a nonzero number of subscribers on the [stream] +/// and will disconnect when there are zero subscribers on the [stream]. +/// In certain cases, unrecoverable errors will be reported on the [stream] at +/// which point the stream will be done. /// /// The [SSEClient] will make best effort to maintain the streaming connection. abstract class SSEClient { From 3883d520d566d54fe66fbd8b4d60c35431e968e6 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 3 Sep 2025 12:50:33 -0700 Subject: [PATCH 10/12] Address PR feedback. --- melos.yaml | 4 ++- .../lib/src/test_sse_client.dart | 1 - .../test/sse_client_http_test.dart | 31 ++++++++++++++----- 3 files changed, 27 insertions(+), 9 deletions(-) diff --git a/melos.yaml b/melos.yaml index 9dee3418..9fbc4a45 100644 --- a/melos.yaml +++ b/melos.yaml @@ -21,7 +21,9 @@ scripts: # Add more packages as more of them have tests. # Tests are ran with flutter as it supports coverage. Some packages may also include flutter # dependencies. - run: MELOS_PACKAGES="launchdarkly_dart_common,launchdarkly_common_client,launchdarkly_flutter_client_sdk" melos exec -- flutter test . --coverage + run: > + MELOS_PACKAGES="launchdarkly_dart_common,launchdarkly_common_client,launchdarkly_flutter_client_sdk" melos exec -- flutter test . --coverage && + cd packages/event_source_client && dart test merge-trace-files: description: Merge all packages coverage trace files ignoring data related to generated files. diff --git a/packages/event_source_client/lib/src/test_sse_client.dart b/packages/event_source_client/lib/src/test_sse_client.dart index 9026db3a..e58f0d3a 100644 --- a/packages/event_source_client/lib/src/test_sse_client.dart +++ b/packages/event_source_client/lib/src/test_sse_client.dart @@ -79,7 +79,6 @@ final class TestSseClient implements SSEClient { }, onCancel: () { _sourceStreamSubscription?.cancel(); - close(); }, ); } diff --git a/packages/event_source_client/test/sse_client_http_test.dart b/packages/event_source_client/test/sse_client_http_test.dart index 862b8fa4..380517ee 100644 --- a/packages/event_source_client/test/sse_client_http_test.dart +++ b/packages/event_source_client/test/sse_client_http_test.dart @@ -42,8 +42,10 @@ void main() { // this expect statement will register a listener on the stream triggering the client to // connect to the mock client. The mock client is set up to send a message. - expectLater(sseClientUnderTest.stream, - emitsInOrder([MessageEvent('put', 'helloworld', '')])); + expectLater( + sseClientUnderTest.stream, + emitsInOrder( + [isA(), MessageEvent('put', 'helloworld', '')])); }); test('Test disconnects when stream.first unsubscribes', () async { @@ -67,9 +69,16 @@ void main() { // this expect statement will register a listener on the stream triggering the client to // connect to the mock client. The mock client is set up to send a message. - var messageEvent = await sseClientUnderTest.stream.first; - expect(messageEvent, isA()); - expect((messageEvent as MessageEvent).data, equals('helloworld')); + var events = []; + await for (final event in sseClientUnderTest.stream) { + events.add(event); + if (events.length >= 2) break; // Collect OpenEvent and MessageEvent + } + + expect(events.length, equals(2)); + expect(events[0], isA()); + expect(events[1], isA()); + expect((events[1] as MessageEvent).data, equals('helloworld')); }); test('Test close', () async { @@ -93,8 +102,16 @@ void main() { // this expect statement will register a listener on the stream triggering the client to // connect to the mock client. - var messageEvent = await sseClientUnderTest.stream.first; - expect((messageEvent as MessageEvent).data, equals('helloworld')); + var events = []; + await for (final event in sseClientUnderTest.stream) { + events.add(event); + if (events.length >= 2) break; // Collect OpenEvent and MessageEvent + } + + expect(events.length, equals(2)); + expect(events[0], isA()); + expect(events[1], isA()); + expect((events[1] as MessageEvent).data, equals('helloworld')); sseClientUnderTest.close(); }); From 22b6107401649951a90dd6ef8ae547e96e25859d Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 3 Sep 2025 12:58:45 -0700 Subject: [PATCH 11/12] Namespace --- packages/event_source_client/lib/src/sse_client_html.dart | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/event_source_client/lib/src/sse_client_html.dart b/packages/event_source_client/lib/src/sse_client_html.dart index f20804e7..55c91b9c 100644 --- a/packages/event_source_client/lib/src/sse_client_html.dart +++ b/packages/event_source_client/lib/src/sse_client_html.dart @@ -90,7 +90,7 @@ class HtmlSseClient implements SSEClient { /// Subscribe to this [stream] to receive events and sometimes errors. The first /// subscribe triggers the connection, so expect a network delay initially. @override - Stream get stream => _messageEventsController.stream; + Stream get stream => _messageEventsController.stream; @override Future close() async { From b493ec8a528259617a1187012828d41726833dc7 Mon Sep 17 00:00:00 2001 From: Ryan Lamb <4955475+kinyoklion@users.noreply.github.com> Date: Wed, 3 Sep 2025 13:03:22 -0700 Subject: [PATCH 12/12] Fix SSE test. --- .../bin/sse_contract_test_service.dart | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/apps/sse_contract_test_service/bin/sse_contract_test_service.dart b/apps/sse_contract_test_service/bin/sse_contract_test_service.dart index 071fa353..b34943af 100644 --- a/apps/sse_contract_test_service/bin/sse_contract_test_service.dart +++ b/apps/sse_contract_test_service/bin/sse_contract_test_service.dart @@ -65,13 +65,18 @@ class TestApiImpl extends TestApi { httpMethod: method, headers: headers); final subscription = client.stream.listen((event) { - callbackClient.callbackNumberPost( - PostCallback( - kind: 'event', - event: PostCallbackEvent( - type: event.type, data: event.data, id: event.id)), - callbackNumber: callbackId); - callbackId++; + switch (event) { + case MessageEvent(): + callbackClient.callbackNumberPost( + PostCallback( + kind: 'event', + event: PostCallbackEvent( + type: event.type, data: event.data, id: event.id)), + callbackNumber: callbackId); + callbackId++; + default: + break; + } }, onError: (error) { callbackClient.callbackNumberPost( PostCallback(kind: 'error', comment: error.toString()),