diff --git a/lib/src/client/streamable_https.dart b/lib/src/client/streamable_https.dart index 4902427..f0a943e 100644 --- a/lib/src/client/streamable_https.dart +++ b/lib/src/client/streamable_https.dart @@ -378,7 +378,10 @@ class StreamableHttpClientTransport implements Transport { // Process the buffer line by line while (buffer.contains('\n')) { final index = buffer.indexOf('\n'); - final line = buffer.substring(0, index); + var line = buffer.substring(0, index); + if (line.endsWith('\r')) { + line = line.substring(0, line.length - 1); + } buffer = buffer.substring(index + 1); if (line.isEmpty) { diff --git a/test/client/streamable_https_test.dart b/test/client/streamable_https_test.dart index d581520..34a6fa5 100644 --- a/test/client/streamable_https_test.dart +++ b/test/client/streamable_https_test.dart @@ -476,5 +476,65 @@ void main() { // server received and processed our DELETE request expect(true, isTrue); }); + + test('handles CRLF line endings in SSE events', () async { + transport = StreamableHttpClientTransport(serverUrl); + + final messageCompleter = Completer(); + transport.onmessage = (message) { + print('Transport received message: ${jsonEncode(message.toJson())}'); + messageCompleter.complete(message); + }; + + transport.onerror = (error) { + print('Transport error: $error'); + }; + + await transport.start(); + + final notification = JsonRpcInitializedNotification(); + await transport.send(notification); + + await Future.delayed(Duration(milliseconds: 1000)); + + if (currentSseConnections.isEmpty) { + fail('No SSE connections established'); + } + + print( + 'About to send SSE event, active connections: ${currentSseConnections.length}'); + + for (final connection in List.from(currentSseConnections)) { + try { + final message = { + 'jsonrpc': '2.0', + 'method': 'notifications/initialized', + }; + + final data = jsonEncode(message); + print('Sending SSE event with data: $data'); + + connection.write('event: message\r\n\r\n'); + connection.write('data: $data\r\n\r\n'); + await connection.flush(); + print('Sent SSE event'); + } catch (e) { + print('Error sending SSE event: $e'); + fail('Failed to send SSE event: $e'); + } + } + + final message = await messageCompleter.future.timeout( + Duration(seconds: 5), + onTimeout: () { + print('*** TIMEOUT: No message received via SSE after 5 seconds'); + throw TimeoutException('No message received via SSE'); + }, + ); + + expect(message, isA()); + expect((message as JsonRpcNotification).method, + equals('notifications/initialized')); + }, timeout: Timeout(Duration(seconds: 10))); }); }