diff --git a/examples/topics.dart b/examples/topics.dart index 8c5be63..f590b21 100644 --- a/examples/topics.dart +++ b/examples/topics.dart @@ -1,3 +1,4 @@ +import 'dart:async'; import 'dart:io'; import 'package:client_sdk_dart/client_sdk_dart.dart'; @@ -13,32 +14,63 @@ void main() async { CredentialProvider.fromEnvironmentVariable("MOMENTO_API_KEY"), Mobile.latest()); - var result = await topicClient.publish("cache", "topic", StringValue("hi")); - switch (result) { - case TopicPublishSuccess(): - print("Successful publish!"); - case TopicPublishError(): - print("Publish error: ${result.errorCode} ${result.message}"); - } + // start publishing messages in 2 seconds + Timer(const Duration(seconds: 2), () async { + // publish 10 messages spaced 1 second apart + for (final _ in Iterable.generate(10)) { + var result = + await topicClient.publish("cache", "topic", StringValue("hi")); + switch (result) { + case TopicPublishSuccess(): + print("Successful publish!"); + case TopicPublishError(): + print("Publish error: ${result.errorCode} ${result.message}"); + } + sleep(Duration(seconds: 1)); + } + }); var sub = topicClient.subscribe("cache", "topic"); switch (sub) { case TopicSubscription(): print("Successful subscription!"); - await for (final msg in sub.stream) { - switch (msg) { - case TopicSubscriptionItemBinary(): - print("Binary value: ${msg.value}"); - case TopicSubscriptionItemText(): - print("String value: ${msg.value}"); - case TopicSubscriptionItemError(): - print("Error receiving message: ${msg.errorCode}"); + + // cancel subscription 10 seconds from now + Timer(const Duration(seconds: 10), () { + print("Cancelling subscription!"); + sub.unsubscribe(); + }); + + try { + await for (final msg in sub.stream) { + switch (msg) { + case TopicSubscriptionItemBinary(): + print("Binary value: ${msg.value}"); + case TopicSubscriptionItemText(): + print("String value: ${msg.value}"); + case TopicSubscriptionItemError(): + print("Error receiving message: ${msg.errorCode}"); + } } + } catch (e) { + print("Runtime type: ${e.runtimeType}"); + print("Error with await for loop: $e"); } case TopicSubscribeError(): print("Subscribe error: ${sub.errorCode} ${sub.message}"); } + // unsubscribing should not affect the topic client's ability + // to subscribe to another topic afterwards + var sub2 = topicClient.subscribe("cache", "topic"); + switch (sub2) { + case TopicSubscription(): + print("Successful 2nd subscription!"); + case TopicSubscribeError(): + print("Subscribe error: ${sub2.errorCode} ${sub2.message}"); + } + + topicClient.close(); print("End of Momento topics example"); exit(0); } diff --git a/lib/src/internal/pubsub_client.dart b/lib/src/internal/pubsub_client.dart index bf33598..f63182b 100644 --- a/lib/src/internal/pubsub_client.dart +++ b/lib/src/internal/pubsub_client.dart @@ -1,7 +1,9 @@ import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart'; import 'package:client_sdk_dart/src/auth/credential_provider.dart'; import 'package:client_sdk_dart/src/errors/errors.dart'; +import 'package:client_sdk_dart/src/internal/topics_grpc_manager.dart'; import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscribe.dart'; +import 'package:fixnum/fixnum.dart'; import 'package:grpc/grpc.dart'; import '../config/topic_configuration.dart'; @@ -13,22 +15,16 @@ abstract class AbstractPubsubClient { String cacheName, String topicName, Value value); TopicSubscribeResponse subscribe(String cacheName, String topicName); + + void close(); } class ClientPubsub implements AbstractPubsubClient { - late ClientChannel _channel; - late PubsubClient _client; - late TopicConfiguration _configuration; + final TopicConfiguration _configuration; + late final TopicGrpcManager _grpcManager; - ClientPubsub( - CredentialProvider credentialProvider, TopicConfiguration configuration) { - _channel = ClientChannel(credentialProvider.cacheEndpoint); - _client = PubsubClient(_channel, - options: CallOptions(metadata: { - 'authorization': credentialProvider.apiKey, - 'agent': 'dart:0.1.0' - })); - _configuration = configuration; + ClientPubsub(CredentialProvider credentialProvider, this._configuration) { + _grpcManager = TopicGrpcManager(credentialProvider); } TopicValue_ _valueToTopicValue(Value v) { @@ -51,7 +47,7 @@ class ClientPubsub implements AbstractPubsubClient { request.topic = topicName; request.value = _valueToTopicValue(value); try { - await _client.publish(request, + await _grpcManager.client.publish(request, options: CallOptions( timeout: _configuration.transportStrategy.grpcConfig.deadline)); return TopicPublishSuccess(); @@ -65,14 +61,19 @@ class ClientPubsub implements AbstractPubsubClient { } @override - TopicSubscribeResponse subscribe(String cacheName, String topicName) { + TopicSubscribeResponse subscribe(String cacheName, String topicName, + {Int64? resumeAtTopicSequenceNumber}) { var request = SubscriptionRequest_(); request.cacheName = cacheName; request.topic = topicName; + request.resumeAtTopicSequenceNumber = + resumeAtTopicSequenceNumber ?? Int64(0); try { - var stream = _client.subscribe(request); - return TopicSubscription(stream); + var stream = _grpcManager.client.subscribe(request); + return TopicSubscription(stream, request.resumeAtTopicSequenceNumber, + this, cacheName, topicName); } catch (e) { + print("Error in pubsubclient.subscribe: $e"); if (e is SdkException) { return TopicSubscribeError(e); } @@ -80,4 +81,9 @@ class ClientPubsub implements AbstractPubsubClient { UnknownException("Unexpected error: $e", null, null)); } } + + @override + void close() { + _grpcManager.close(); + } } diff --git a/lib/src/internal/topics_grpc_manager.dart b/lib/src/internal/topics_grpc_manager.dart new file mode 100644 index 0000000..63dcb85 --- /dev/null +++ b/lib/src/internal/topics_grpc_manager.dart @@ -0,0 +1,30 @@ +import 'package:client_sdk_dart/client_sdk_dart.dart'; +import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart'; +import 'package:grpc/grpc.dart'; +import 'package:logging/logging.dart'; + +class TopicGrpcManager { + late final ClientChannel _channel; + late final PubsubClient _client; + final _logger = Logger('MomentoTopicClient'); + + TopicGrpcManager(CredentialProvider credentialProvider) { + _channel = ClientChannel(credentialProvider.cacheEndpoint); + _client = PubsubClient(_channel, + options: CallOptions(metadata: { + 'authorization': credentialProvider.apiKey, + 'agent': 'dart:0.1.0' + })); + } + + PubsubClient get client => _client; + ClientChannel get channel => _channel; + + void close() { + try { + _channel.shutdown(); + } catch (e) { + _logger.warning("Error shutting down channel: $e"); + } + } +} diff --git a/lib/src/messages/responses/topics/topic_subscribe.dart b/lib/src/messages/responses/topics/topic_subscribe.dart index 4999b0c..48a0213 100644 --- a/lib/src/messages/responses/topics/topic_subscribe.dart +++ b/lib/src/messages/responses/topics/topic_subscribe.dart @@ -1,24 +1,64 @@ import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart'; +import 'package:client_sdk_dart/src/errors/errors.dart'; import 'package:client_sdk_dart/src/messages/responses/responses_base.dart'; import 'package:grpc/grpc.dart'; import 'package:logging/logging.dart'; +import 'package:fixnum/fixnum.dart'; +import '../../../internal/pubsub_client.dart'; import 'topic_subscription_item.dart'; sealed class TopicSubscribeResponse {} class TopicSubscription implements TopicSubscribeResponse { - final ResponseStream _stream; + ResponseStream _stream; + Int64 lastSequenceNumber; + final ClientPubsub _client; + String cacheName; + String topicName; + bool retry = true; + final logger = Logger("TopicSubscribeResponse"); - TopicSubscription(this._stream); + TopicSubscription(this._stream, this.lastSequenceNumber, this._client, + this.cacheName, this.topicName); - Stream get stream => - _stream.map(_processResult).where((item) => item != null).cast(); + Stream get stream { + return _handleStream(); + } + + Stream _handleStream() async* { + while (retry) { + try { + await for (final msg in _stream) { + var result = _processResult(msg); + if (result != null) { + yield result; + } + } + } catch (e) { + if (e is CancelledException || + (e is GrpcError && e.codeName == "CANCELLED")) { + logger.fine("Subscription was cancelled, not reconnecting"); + retry = false; + } else { + logger.fine("Attempting to reconnect after receiving error: $e"); + var result = _client.subscribe(cacheName, topicName, + resumeAtTopicSequenceNumber: lastSequenceNumber); + if (result is TopicSubscription) { + _stream = result._stream; + lastSequenceNumber = result.lastSequenceNumber; + } else if (result is TopicSubscribeError) { + logger.fine("Error reconnecting: ${result.message}"); + } + } + } + } + } TopicSubscriptionItemResponse? _processResult(SubscriptionItem_ item) { - final logger = Logger("TopicSubscribeResponse"); switch (item.whichKind()) { case SubscriptionItem__Kind.item: + lastSequenceNumber = item.item.topicSequenceNumber; return createTopicItemResponse(item.item); case SubscriptionItem__Kind.heartbeat: logger.fine("topic client received a heartbeat"); @@ -30,6 +70,10 @@ class TopicSubscription implements TopicSubscribeResponse { } return null; } + + void unsubscribe() async { + await _stream.cancel(); + } } class TopicSubscribeError extends ErrorResponseBase diff --git a/lib/src/topic_client.dart b/lib/src/topic_client.dart index 74f0d41..dfd8cde 100644 --- a/lib/src/topic_client.dart +++ b/lib/src/topic_client.dart @@ -1,5 +1,5 @@ import 'package:client_sdk_dart/src/auth/credential_provider.dart'; -import 'package:client_sdk_dart/src/config/logger.dart'; +// import 'package:client_sdk_dart/src/config/logger.dart'; import 'package:logging/logging.dart'; import 'config/topic_configuration.dart'; import 'internal/pubsub_client.dart'; @@ -12,6 +12,8 @@ abstract class ITopicClient { String cacheName, String topicName, Value value); TopicSubscribeResponse subscribe(String cacheName, String topicName); + + void close(); } class TopicClient implements ITopicClient { @@ -21,7 +23,7 @@ class TopicClient implements ITopicClient { TopicClient( CredentialProvider credentialProvider, TopicConfiguration configuration) : _pubsubClient = ClientPubsub(credentialProvider, configuration) { - _logger.level = determineLoggerLevel(configuration.logLevel); + // _logger.level = determineLoggerLevel(configuration.logLevel); _logger.finest("initializing topic client"); } @@ -35,4 +37,9 @@ class TopicClient implements ITopicClient { TopicSubscribeResponse subscribe(String cacheName, String topicName) { return _pubsubClient.subscribe(cacheName, topicName); } + + @override + void close() { + _pubsubClient.close(); + } }