From ae5d98313eb8a46fd14b4f464a975fcd239fab78 Mon Sep 17 00:00:00 2001 From: anitarua Date: Thu, 4 Jan 2024 17:11:01 -0800 Subject: [PATCH 1/8] fix: return topic subscribe error immediately if subscription limit reached --- example/topics/basic_subscriber.dart | 2 +- lib/src/internal/pubsub_client.dart | 17 ++++++-- lib/src/internal/topics_grpc_manager.dart | 5 ++- .../responses/topics/topic_subscribe.dart | 43 ++++++++++++++----- lib/src/topic_client.dart | 7 +-- 5 files changed, 54 insertions(+), 20 deletions(-) diff --git a/example/topics/basic_subscriber.dart b/example/topics/basic_subscriber.dart index e8e51e3..71c62ea 100644 --- a/example/topics/basic_subscriber.dart +++ b/example/topics/basic_subscriber.dart @@ -7,7 +7,7 @@ void main() async { CredentialProvider.fromEnvironmentVariable("MOMENTO_API_KEY"), MobileTopicConfiguration.latest()); - var subscription = topicClient.subscribe("cache", "topic"); + var subscription = await topicClient.subscribe("cache", "topic"); var messageStream = switch (subscription) { TopicSubscription() => subscription.stream, TopicSubscribeError() => throw Exception( diff --git a/lib/src/internal/pubsub_client.dart b/lib/src/internal/pubsub_client.dart index 3684d19..ecd95cd 100644 --- a/lib/src/internal/pubsub_client.dart +++ b/lib/src/internal/pubsub_client.dart @@ -14,7 +14,7 @@ abstract class AbstractPubsubClient { Future publish( String cacheName, String topicName, Value value); - TopicSubscribeResponse subscribe(String cacheName, String topicName); + Future subscribe(String cacheName, String topicName); void close(); } @@ -61,8 +61,8 @@ class ClientPubsub implements AbstractPubsubClient { } @override - TopicSubscribeResponse subscribe(String cacheName, String topicName, - {Int64? resumeAtTopicSequenceNumber}) { + Future subscribe(String cacheName, String topicName, + {Int64? resumeAtTopicSequenceNumber}) async { var request = SubscriptionRequest_(); request.cacheName = cacheName; request.topic = topicName; @@ -70,11 +70,20 @@ class ClientPubsub implements AbstractPubsubClient { resumeAtTopicSequenceNumber ?? Int64(0); try { var stream = _grpcManager.client.subscribe(request); - return TopicSubscription(stream, request.resumeAtTopicSequenceNumber, + final subscription = TopicSubscription(stream, request.resumeAtTopicSequenceNumber, this, cacheName, topicName); + + try { + await subscription.init(); + return subscription; + } catch (e) { + rethrow; + } } catch (e) { if (e is GrpcError) { return TopicSubscribeError(grpcStatusToSdkException(e)); + } else if (e is SdkException) { + return TopicSubscribeError(e); } return TopicSubscribeError( UnknownException("Unexpected error: $e", null, null)); diff --git a/lib/src/internal/topics_grpc_manager.dart b/lib/src/internal/topics_grpc_manager.dart index 8459883..d3e657c 100644 --- a/lib/src/internal/topics_grpc_manager.dart +++ b/lib/src/internal/topics_grpc_manager.dart @@ -12,8 +12,9 @@ class TopicGrpcManager { _channel = ClientChannel(credentialProvider.cacheEndpoint, options: ChannelOptions( keepAlive: ClientKeepAliveOptions( - pingInterval: Duration(seconds: 10), - timeout: Duration(seconds: 5)))); + pingInterval: Duration(seconds: 20), + // timeout: Duration(seconds: 15), + permitWithoutCalls: true))); _client = PubsubClient(_channel, options: CallOptions(metadata: { 'authorization': credentialProvider.apiKey, diff --git a/lib/src/messages/responses/topics/topic_subscribe.dart b/lib/src/messages/responses/topics/topic_subscribe.dart index 7375bf5..e6231a9 100644 --- a/lib/src/messages/responses/topics/topic_subscribe.dart +++ b/lib/src/messages/responses/topics/topic_subscribe.dart @@ -18,9 +18,21 @@ class TopicSubscription implements TopicSubscribeResponse { String topicName; bool retry = true; final logger = Logger("TopicSubscribeResponse"); + late Stream _broadcastStream; TopicSubscription(this._stream, this.lastSequenceNumber, this._client, - this.cacheName, this.topicName); + this.cacheName, this.topicName) { + _broadcastStream = _stream.asBroadcastStream(); + } + + Future init() async { + await for (final firstItem in _broadcastStream) { + if (firstItem.whichKind() != SubscriptionItem__Kind.heartbeat) { + throw InternalServerException("Expected heartbeat message for topic $topicName on cache $cacheName, got ${firstItem.whichKind()}", null, null); + } + break; + } + } Stream get stream { return _handleStream(); @@ -29,7 +41,7 @@ class TopicSubscription implements TopicSubscribeResponse { Stream _handleStream() async* { while (retry) { try { - await for (final msg in _stream) { + await for (final msg in _broadcastStream) { var result = _processResult(msg); if (result != null) { yield result; @@ -39,17 +51,28 @@ class TopicSubscription implements TopicSubscribeResponse { if (e is CancelledException || (e is GrpcError && e.codeName == "CANCELLED")) { logger.fine("Subscription was cancelled, not reconnecting"); + await _stream.cancel(); + retry = false; + } else if (e is ClientResourceExhaustedException || (e is GrpcError && e.codeName == "RESOURCE_EXHAUSTED")) { + logger.fine("Subscription limit reached, not reconnecting"); + await _stream.cancel(); retry = false; } else { logger.fine("Attempting to reconnect after receiving error: $e"); - var result = _client.subscribe(cacheName, topicName, - resumeAtTopicSequenceNumber: lastSequenceNumber); - if (result is TopicSubscription) { - _stream = result._stream; - lastSequenceNumber = result.lastSequenceNumber; - } else if (result is TopicSubscribeError) { - logger.fine("Error reconnecting: ${result.message}"); - } + } + } + + if (retry) { + logger.fine("retry is still true"); + await _stream.cancel(); + var result = await _client.subscribe(cacheName, topicName, + resumeAtTopicSequenceNumber: lastSequenceNumber); + if (result is TopicSubscription) { + _stream = result._stream; + _broadcastStream = result._broadcastStream; + lastSequenceNumber = result.lastSequenceNumber; + } else if (result is TopicSubscribeError) { + logger.fine("Error reconnecting: ${result.message}"); } } } diff --git a/lib/src/topic_client.dart b/lib/src/topic_client.dart index 681dc4c..daa7bda 100644 --- a/lib/src/topic_client.dart +++ b/lib/src/topic_client.dart @@ -10,7 +10,7 @@ abstract class ITopicClient { Future publish( String cacheName, String topicName, Value value); - TopicSubscribeResponse subscribe(String cacheName, String topicName); + Future subscribe(String cacheName, String topicName); void close(); } @@ -33,8 +33,9 @@ class TopicClient implements ITopicClient { } @override - TopicSubscribeResponse subscribe(String cacheName, String topicName) { - return _pubsubClient.subscribe(cacheName, topicName); + Future subscribe( + String cacheName, String topicName) async { + return await _pubsubClient.subscribe(cacheName, topicName); } @override From 2875cefb65b573f29133502e3820f0498ff2ec93 Mon Sep 17 00:00:00 2001 From: anitarua Date: Thu, 4 Jan 2024 17:16:05 -0800 Subject: [PATCH 2/8] dart format --- lib/src/internal/pubsub_client.dart | 4 ++-- .../messages/responses/topics/topic_subscribe.dart | 14 +++++++++----- 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/lib/src/internal/pubsub_client.dart b/lib/src/internal/pubsub_client.dart index ecd95cd..22b41cc 100644 --- a/lib/src/internal/pubsub_client.dart +++ b/lib/src/internal/pubsub_client.dart @@ -70,8 +70,8 @@ class ClientPubsub implements AbstractPubsubClient { resumeAtTopicSequenceNumber ?? Int64(0); try { var stream = _grpcManager.client.subscribe(request); - final subscription = TopicSubscription(stream, request.resumeAtTopicSequenceNumber, - this, cacheName, topicName); + final subscription = TopicSubscription(stream, + request.resumeAtTopicSequenceNumber, this, cacheName, topicName); try { await subscription.init(); diff --git a/lib/src/messages/responses/topics/topic_subscribe.dart b/lib/src/messages/responses/topics/topic_subscribe.dart index e6231a9..5ab0ce9 100644 --- a/lib/src/messages/responses/topics/topic_subscribe.dart +++ b/lib/src/messages/responses/topics/topic_subscribe.dart @@ -22,13 +22,16 @@ class TopicSubscription implements TopicSubscribeResponse { TopicSubscription(this._stream, this.lastSequenceNumber, this._client, this.cacheName, this.topicName) { - _broadcastStream = _stream.asBroadcastStream(); - } - + _broadcastStream = _stream.asBroadcastStream(); + } + Future init() async { await for (final firstItem in _broadcastStream) { if (firstItem.whichKind() != SubscriptionItem__Kind.heartbeat) { - throw InternalServerException("Expected heartbeat message for topic $topicName on cache $cacheName, got ${firstItem.whichKind()}", null, null); + throw InternalServerException( + "Expected heartbeat message for topic $topicName on cache $cacheName, got ${firstItem.whichKind()}", + null, + null); } break; } @@ -53,7 +56,8 @@ class TopicSubscription implements TopicSubscribeResponse { logger.fine("Subscription was cancelled, not reconnecting"); await _stream.cancel(); retry = false; - } else if (e is ClientResourceExhaustedException || (e is GrpcError && e.codeName == "RESOURCE_EXHAUSTED")) { + } else if (e is ClientResourceExhaustedException || + (e is GrpcError && e.codeName == "RESOURCE_EXHAUSTED")) { logger.fine("Subscription limit reached, not reconnecting"); await _stream.cancel(); retry = false; From 7fa5434aff63f728947e0dab0e048f6e988b26ec Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 5 Jan 2024 10:08:37 -0800 Subject: [PATCH 3/8] remove keepalive settings --- lib/src/internal/topics_grpc_manager.dart | 7 +---- .../responses/topics/topic_subscribe.dart | 28 ++++++++++++------- 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/lib/src/internal/topics_grpc_manager.dart b/lib/src/internal/topics_grpc_manager.dart index d3e657c..13dbfc4 100644 --- a/lib/src/internal/topics_grpc_manager.dart +++ b/lib/src/internal/topics_grpc_manager.dart @@ -9,12 +9,7 @@ class TopicGrpcManager { final _logger = Logger('MomentoTopicClient'); TopicGrpcManager(CredentialProvider credentialProvider) { - _channel = ClientChannel(credentialProvider.cacheEndpoint, - options: ChannelOptions( - keepAlive: ClientKeepAliveOptions( - pingInterval: Duration(seconds: 20), - // timeout: Duration(seconds: 15), - permitWithoutCalls: true))); + _channel = ClientChannel(credentialProvider.cacheEndpoint); _client = PubsubClient(_channel, options: CallOptions(metadata: { 'authorization': credentialProvider.apiKey, diff --git a/lib/src/messages/responses/topics/topic_subscribe.dart b/lib/src/messages/responses/topics/topic_subscribe.dart index 5ab0ce9..14ea5ce 100644 --- a/lib/src/messages/responses/topics/topic_subscribe.dart +++ b/lib/src/messages/responses/topics/topic_subscribe.dart @@ -68,18 +68,26 @@ class TopicSubscription implements TopicSubscribeResponse { if (retry) { logger.fine("retry is still true"); - await _stream.cancel(); - var result = await _client.subscribe(cacheName, topicName, - resumeAtTopicSequenceNumber: lastSequenceNumber); - if (result is TopicSubscription) { - _stream = result._stream; - _broadcastStream = result._broadcastStream; - lastSequenceNumber = result.lastSequenceNumber; - } else if (result is TopicSubscribeError) { - logger.fine("Error reconnecting: ${result.message}"); - } + retry = await attemptReconnect(); + } + } + } + + Future attemptReconnect() async { + await _stream.cancel(); + var result = await _client.subscribe(cacheName, topicName, + resumeAtTopicSequenceNumber: lastSequenceNumber); + if (result is TopicSubscription) { + _stream = result._stream; + _broadcastStream = result._broadcastStream; + lastSequenceNumber = result.lastSequenceNumber; + } else if (result is TopicSubscribeError) { + logger.fine("Error reconnecting: ${result.message}"); + if (result.errorCode == MomentoErrorCode.limitExceededError || result.errorCode == MomentoErrorCode.cancelledError) { + return false; } } + return true; } TopicSubscriptionItemResponse? _processResult(SubscriptionItem_ item) { From 3907044d86ec06a5a24551c9affcc8c007d5ad69 Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 5 Jan 2024 10:27:44 -0800 Subject: [PATCH 4/8] update examples to await subscribe --- example/flutter_chat_app/lib/main.dart | 10 +++++++--- example/flutter_chat_app/pubspec.yaml | 2 +- example/topics/advanced.dart | 4 ++-- lib/src/messages/responses/topics/topic_subscribe.dart | 3 ++- pubspec.yaml | 2 +- 5 files changed, 13 insertions(+), 8 deletions(-) diff --git a/example/flutter_chat_app/lib/main.dart b/example/flutter_chat_app/lib/main.dart index 165f486..a7cf615 100644 --- a/example/flutter_chat_app/lib/main.dart +++ b/example/flutter_chat_app/lib/main.dart @@ -1,6 +1,6 @@ import 'dart:io'; -import 'package:client_sdk_dart/momento.dart'; +import 'package:momento/momento.dart'; import 'package:flutter/material.dart'; void main() { @@ -35,7 +35,7 @@ class MyHomePage extends StatefulWidget { class _MyHomePageState extends State { static const String _momentoApiKey = "your-api-key-here"; TopicClient _topicClient = TopicClient( - CredentialProvider.fromString(_momentoApiKey), Mobile.latest()); + CredentialProvider.fromString(_momentoApiKey), MobileTopicConfiguration.latest()); List _messages = ["Welcome to Momento Topics!"]; final TextEditingController _textInputController = TextEditingController(); @@ -45,8 +45,12 @@ class _MyHomePageState extends State { @override void initState() { super.initState(); + establishSubscription(); + return; + } - final subscribeResult = _topicClient.subscribe("cache", "topic"); + Future establishSubscription() async { + final subscribeResult = await _topicClient.subscribe("cache", "topic"); switch (subscribeResult) { case TopicSubscription(): print("Successful subscription"); diff --git a/example/flutter_chat_app/pubspec.yaml b/example/flutter_chat_app/pubspec.yaml index fcb0b31..fbf3c77 100644 --- a/example/flutter_chat_app/pubspec.yaml +++ b/example/flutter_chat_app/pubspec.yaml @@ -18,7 +18,7 @@ environment: dependencies: flutter: sdk: flutter - client_sdk_dart: + momento: path: ../../../client-sdk-dart diff --git a/example/topics/advanced.dart b/example/topics/advanced.dart index 29415d8..76cc533 100644 --- a/example/topics/advanced.dart +++ b/example/topics/advanced.dart @@ -30,7 +30,7 @@ void main() async { } }); - var subscription = topicClient.subscribe("cache", "topic"); + var subscription = await topicClient.subscribe("cache", "topic"); var messageStream = switch (subscription) { TopicSubscription() => subscription.stream, TopicSubscribeError() => throw Exception( @@ -61,7 +61,7 @@ void main() async { // unsubscribing should not affect the topic client's ability // to subscribe to another topic afterwards - var sub2 = topicClient.subscribe("cache", "topic"); + var sub2 = await topicClient.subscribe("cache", "topic"); switch (sub2) { case TopicSubscription(): print("Successful 2nd subscription!"); diff --git a/lib/src/messages/responses/topics/topic_subscribe.dart b/lib/src/messages/responses/topics/topic_subscribe.dart index 14ea5ce..36b1762 100644 --- a/lib/src/messages/responses/topics/topic_subscribe.dart +++ b/lib/src/messages/responses/topics/topic_subscribe.dart @@ -83,7 +83,8 @@ class TopicSubscription implements TopicSubscribeResponse { lastSequenceNumber = result.lastSequenceNumber; } else if (result is TopicSubscribeError) { logger.fine("Error reconnecting: ${result.message}"); - if (result.errorCode == MomentoErrorCode.limitExceededError || result.errorCode == MomentoErrorCode.cancelledError) { + if (result.errorCode == MomentoErrorCode.limitExceededError || + result.errorCode == MomentoErrorCode.cancelledError) { return false; } } diff --git a/pubspec.yaml b/pubspec.yaml index c3f8940..5fb8e5d 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -14,7 +14,7 @@ dependencies: logging: ^1.2.0 protobuf: ^3.1.0 string_validator: ^1.0.2 - uuid: ^4.3.1 + uuid: ^4.2.2 # path: ^1.8.0 dev_dependencies: From 9c4ebc422159ab1696da0251baf1ac17840b25bd Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 5 Jan 2024 10:28:50 -0800 Subject: [PATCH 5/8] dart format --- example/flutter_chat_app/lib/main.dart | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/example/flutter_chat_app/lib/main.dart b/example/flutter_chat_app/lib/main.dart index a7cf615..45bdff3 100644 --- a/example/flutter_chat_app/lib/main.dart +++ b/example/flutter_chat_app/lib/main.dart @@ -35,7 +35,8 @@ class MyHomePage extends StatefulWidget { class _MyHomePageState extends State { static const String _momentoApiKey = "your-api-key-here"; TopicClient _topicClient = TopicClient( - CredentialProvider.fromString(_momentoApiKey), MobileTopicConfiguration.latest()); + CredentialProvider.fromString(_momentoApiKey), + MobileTopicConfiguration.latest()); List _messages = ["Welcome to Momento Topics!"]; final TextEditingController _textInputController = TextEditingController(); From 81931bcc00cd1e876a7390ee0641a12ae39ae832 Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 5 Jan 2024 11:35:54 -0800 Subject: [PATCH 6/8] use env var in flutter app instead of hardcoded api key --- example/flutter_chat_app/README.md | 12 +++++++----- example/flutter_chat_app/lib/main.dart | 2 +- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/example/flutter_chat_app/README.md b/example/flutter_chat_app/README.md index 823027b..8cabb2d 100644 --- a/example/flutter_chat_app/README.md +++ b/example/flutter_chat_app/README.md @@ -8,12 +8,14 @@ ## Running the Flutter Demo App -1. Provide your Momento API key in [`lib/main.dart`](./lib/main.dart) at this line: - ``` - static const String _momentoApiKey = "your-api-key-here"; - ``` +You can run Flutter apps in [VSCode](https://docs.flutter.dev/tools/vs-code) or [Android Studio or IntelliJ](https://docs.flutter.dev/tools/android-studio) using built-in tooling, just make sure to provide your Momento API key as an environment variable named `MOMENTO_API_KEY`. -2. Run in [VSCode](https://docs.flutter.dev/tools/vs-code) or [Android Studio or IntelliJ](https://docs.flutter.dev/tools/android-studio). If using the command line, you can run using a command like: `flutter run -d `. You can use `flutter devices` to see all connected devices and their IDs. +You can also use the terminal to list your devides and select one to run. + +```bash +flutter devices +flutter run -d --dart-define="MOMENTO_API_KEY=" +``` ## Flutter Resources diff --git a/example/flutter_chat_app/lib/main.dart b/example/flutter_chat_app/lib/main.dart index 45bdff3..05747e3 100644 --- a/example/flutter_chat_app/lib/main.dart +++ b/example/flutter_chat_app/lib/main.dart @@ -33,7 +33,7 @@ class MyHomePage extends StatefulWidget { } class _MyHomePageState extends State { - static const String _momentoApiKey = "your-api-key-here"; + static const String _momentoApiKey = String.fromEnvironment('MOMENTO_API_KEY', defaultValue: ''); TopicClient _topicClient = TopicClient( CredentialProvider.fromString(_momentoApiKey), MobileTopicConfiguration.latest()); From 81e6ea6c99739b381d5fd38cd131a216c37a688e Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 5 Jan 2024 11:40:09 -0800 Subject: [PATCH 7/8] dart format --- example/flutter_chat_app/lib/main.dart | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/example/flutter_chat_app/lib/main.dart b/example/flutter_chat_app/lib/main.dart index 05747e3..eabf84f 100644 --- a/example/flutter_chat_app/lib/main.dart +++ b/example/flutter_chat_app/lib/main.dart @@ -33,7 +33,8 @@ class MyHomePage extends StatefulWidget { } class _MyHomePageState extends State { - static const String _momentoApiKey = String.fromEnvironment('MOMENTO_API_KEY', defaultValue: ''); + static const String _momentoApiKey = + String.fromEnvironment('MOMENTO_API_KEY', defaultValue: ''); TopicClient _topicClient = TopicClient( CredentialProvider.fromString(_momentoApiKey), MobileTopicConfiguration.latest()); From 5ac6dfda7b39e65714ad1066d12f49b02e867ccf Mon Sep 17 00:00:00 2001 From: anitarua Date: Fri, 5 Jan 2024 17:01:42 -0800 Subject: [PATCH 8/8] flutter app can use fromEnvVar credential provider instead --- example/flutter_chat_app/lib/main.dart | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/example/flutter_chat_app/lib/main.dart b/example/flutter_chat_app/lib/main.dart index eabf84f..2873e46 100644 --- a/example/flutter_chat_app/lib/main.dart +++ b/example/flutter_chat_app/lib/main.dart @@ -33,10 +33,8 @@ class MyHomePage extends StatefulWidget { } class _MyHomePageState extends State { - static const String _momentoApiKey = - String.fromEnvironment('MOMENTO_API_KEY', defaultValue: ''); TopicClient _topicClient = TopicClient( - CredentialProvider.fromString(_momentoApiKey), + CredentialProvider.fromEnvironmentVariable('MOMENTO_API_KEY'), MobileTopicConfiguration.latest()); List _messages = ["Welcome to Momento Topics!"];