Skip to content

Commit

Permalink
fix: topics subscription resiliency (#38)
Browse files Browse the repository at this point in the history
* fix: topics subscription resiliency

* comment out logging config in topic client

* add topic grpc manager and unsubscribe method

* edit example, dart analyze
  • Loading branch information
anitarua committed Dec 27, 2023
1 parent a632ba9 commit c88060c
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 38 deletions.
62 changes: 47 additions & 15 deletions examples/topics.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:io';

import 'package:client_sdk_dart/client_sdk_dart.dart';
Expand All @@ -13,32 +14,63 @@ void main() async {
CredentialProvider.fromEnvironmentVariable("MOMENTO_API_KEY"),
Mobile.latest());

var result = await topicClient.publish("cache", "topic", StringValue("hi"));
switch (result) {
case TopicPublishSuccess():
print("Successful publish!");
case TopicPublishError():
print("Publish error: ${result.errorCode} ${result.message}");
}
// start publishing messages in 2 seconds
Timer(const Duration(seconds: 2), () async {
// publish 10 messages spaced 1 second apart
for (final _ in Iterable.generate(10)) {
var result =
await topicClient.publish("cache", "topic", StringValue("hi"));
switch (result) {
case TopicPublishSuccess():
print("Successful publish!");
case TopicPublishError():
print("Publish error: ${result.errorCode} ${result.message}");
}
sleep(Duration(seconds: 1));
}
});

var sub = topicClient.subscribe("cache", "topic");
switch (sub) {
case TopicSubscription():
print("Successful subscription!");
await for (final msg in sub.stream) {
switch (msg) {
case TopicSubscriptionItemBinary():
print("Binary value: ${msg.value}");
case TopicSubscriptionItemText():
print("String value: ${msg.value}");
case TopicSubscriptionItemError():
print("Error receiving message: ${msg.errorCode}");

// cancel subscription 10 seconds from now
Timer(const Duration(seconds: 10), () {
print("Cancelling subscription!");
sub.unsubscribe();
});

try {
await for (final msg in sub.stream) {
switch (msg) {
case TopicSubscriptionItemBinary():
print("Binary value: ${msg.value}");
case TopicSubscriptionItemText():
print("String value: ${msg.value}");
case TopicSubscriptionItemError():
print("Error receiving message: ${msg.errorCode}");
}
}
} catch (e) {
print("Runtime type: ${e.runtimeType}");
print("Error with await for loop: $e");
}
case TopicSubscribeError():
print("Subscribe error: ${sub.errorCode} ${sub.message}");
}

// unsubscribing should not affect the topic client's ability
// to subscribe to another topic afterwards
var sub2 = topicClient.subscribe("cache", "topic");
switch (sub2) {
case TopicSubscription():
print("Successful 2nd subscription!");
case TopicSubscribeError():
print("Subscribe error: ${sub2.errorCode} ${sub2.message}");
}

topicClient.close();
print("End of Momento topics example");
exit(0);
}
38 changes: 22 additions & 16 deletions lib/src/internal/pubsub_client.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart';
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:client_sdk_dart/src/errors/errors.dart';
import 'package:client_sdk_dart/src/internal/topics_grpc_manager.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscribe.dart';
import 'package:fixnum/fixnum.dart';
import 'package:grpc/grpc.dart';

import '../config/topic_configuration.dart';
Expand All @@ -13,22 +15,16 @@ abstract class AbstractPubsubClient {
String cacheName, String topicName, Value value);

TopicSubscribeResponse subscribe(String cacheName, String topicName);

void close();
}

class ClientPubsub implements AbstractPubsubClient {
late ClientChannel _channel;
late PubsubClient _client;
late TopicConfiguration _configuration;
final TopicConfiguration _configuration;
late final TopicGrpcManager _grpcManager;

ClientPubsub(
CredentialProvider credentialProvider, TopicConfiguration configuration) {
_channel = ClientChannel(credentialProvider.cacheEndpoint);
_client = PubsubClient(_channel,
options: CallOptions(metadata: {
'authorization': credentialProvider.apiKey,
'agent': 'dart:0.1.0'
}));
_configuration = configuration;
ClientPubsub(CredentialProvider credentialProvider, this._configuration) {
_grpcManager = TopicGrpcManager(credentialProvider);
}

TopicValue_ _valueToTopicValue(Value v) {
Expand All @@ -51,7 +47,7 @@ class ClientPubsub implements AbstractPubsubClient {
request.topic = topicName;
request.value = _valueToTopicValue(value);
try {
await _client.publish(request,
await _grpcManager.client.publish(request,
options: CallOptions(
timeout: _configuration.transportStrategy.grpcConfig.deadline));
return TopicPublishSuccess();
Expand All @@ -65,19 +61,29 @@ class ClientPubsub implements AbstractPubsubClient {
}

@override
TopicSubscribeResponse subscribe(String cacheName, String topicName) {
TopicSubscribeResponse subscribe(String cacheName, String topicName,
{Int64? resumeAtTopicSequenceNumber}) {
var request = SubscriptionRequest_();
request.cacheName = cacheName;
request.topic = topicName;
request.resumeAtTopicSequenceNumber =
resumeAtTopicSequenceNumber ?? Int64(0);
try {
var stream = _client.subscribe(request);
return TopicSubscription(stream);
var stream = _grpcManager.client.subscribe(request);
return TopicSubscription(stream, request.resumeAtTopicSequenceNumber,
this, cacheName, topicName);
} catch (e) {
print("Error in pubsubclient.subscribe: $e");
if (e is SdkException) {
return TopicSubscribeError(e);
}
return TopicSubscribeError(
UnknownException("Unexpected error: $e", null, null));
}
}

@override
void close() {
_grpcManager.close();
}
}
30 changes: 30 additions & 0 deletions lib/src/internal/topics_grpc_manager.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import 'package:client_sdk_dart/client_sdk_dart.dart';
import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart';
import 'package:grpc/grpc.dart';
import 'package:logging/logging.dart';

class TopicGrpcManager {
late final ClientChannel _channel;
late final PubsubClient _client;
final _logger = Logger('MomentoTopicClient');

TopicGrpcManager(CredentialProvider credentialProvider) {
_channel = ClientChannel(credentialProvider.cacheEndpoint);
_client = PubsubClient(_channel,
options: CallOptions(metadata: {
'authorization': credentialProvider.apiKey,
'agent': 'dart:0.1.0'
}));
}

PubsubClient get client => _client;
ClientChannel get channel => _channel;

void close() {
try {
_channel.shutdown();
} catch (e) {
_logger.warning("Error shutting down channel: $e");
}
}
}
54 changes: 49 additions & 5 deletions lib/src/messages/responses/topics/topic_subscribe.dart
Original file line number Diff line number Diff line change
@@ -1,24 +1,64 @@
import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart';
import 'package:client_sdk_dart/src/errors/errors.dart';
import 'package:client_sdk_dart/src/messages/responses/responses_base.dart';
import 'package:grpc/grpc.dart';
import 'package:logging/logging.dart';
import 'package:fixnum/fixnum.dart';

import '../../../internal/pubsub_client.dart';
import 'topic_subscription_item.dart';

sealed class TopicSubscribeResponse {}

class TopicSubscription implements TopicSubscribeResponse {
final ResponseStream<SubscriptionItem_> _stream;
ResponseStream<SubscriptionItem_> _stream;
Int64 lastSequenceNumber;
final ClientPubsub _client;
String cacheName;
String topicName;
bool retry = true;
final logger = Logger("TopicSubscribeResponse");

TopicSubscription(this._stream);
TopicSubscription(this._stream, this.lastSequenceNumber, this._client,
this.cacheName, this.topicName);

Stream<TopicSubscriptionItemResponse> get stream =>
_stream.map(_processResult).where((item) => item != null).cast();
Stream<TopicSubscriptionItemResponse> get stream {
return _handleStream();
}

Stream<TopicSubscriptionItemResponse> _handleStream() async* {
while (retry) {
try {
await for (final msg in _stream) {
var result = _processResult(msg);
if (result != null) {
yield result;
}
}
} catch (e) {
if (e is CancelledException ||
(e is GrpcError && e.codeName == "CANCELLED")) {
logger.fine("Subscription was cancelled, not reconnecting");
retry = false;
} else {
logger.fine("Attempting to reconnect after receiving error: $e");
var result = _client.subscribe(cacheName, topicName,
resumeAtTopicSequenceNumber: lastSequenceNumber);
if (result is TopicSubscription) {
_stream = result._stream;
lastSequenceNumber = result.lastSequenceNumber;
} else if (result is TopicSubscribeError) {
logger.fine("Error reconnecting: ${result.message}");
}
}
}
}
}

TopicSubscriptionItemResponse? _processResult(SubscriptionItem_ item) {
final logger = Logger("TopicSubscribeResponse");
switch (item.whichKind()) {
case SubscriptionItem__Kind.item:
lastSequenceNumber = item.item.topicSequenceNumber;
return createTopicItemResponse(item.item);
case SubscriptionItem__Kind.heartbeat:
logger.fine("topic client received a heartbeat");
Expand All @@ -30,6 +70,10 @@ class TopicSubscription implements TopicSubscribeResponse {
}
return null;
}

void unsubscribe() async {
await _stream.cancel();
}
}

class TopicSubscribeError extends ErrorResponseBase
Expand Down
11 changes: 9 additions & 2 deletions lib/src/topic_client.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:client_sdk_dart/src/config/logger.dart';
// import 'package:client_sdk_dart/src/config/logger.dart';
import 'package:logging/logging.dart';
import 'config/topic_configuration.dart';
import 'internal/pubsub_client.dart';
Expand All @@ -12,6 +12,8 @@ abstract class ITopicClient {
String cacheName, String topicName, Value value);

TopicSubscribeResponse subscribe(String cacheName, String topicName);

void close();
}

class TopicClient implements ITopicClient {
Expand All @@ -21,7 +23,7 @@ class TopicClient implements ITopicClient {
TopicClient(
CredentialProvider credentialProvider, TopicConfiguration configuration)
: _pubsubClient = ClientPubsub(credentialProvider, configuration) {
_logger.level = determineLoggerLevel(configuration.logLevel);
// _logger.level = determineLoggerLevel(configuration.logLevel);
_logger.finest("initializing topic client");
}

Expand All @@ -35,4 +37,9 @@ class TopicClient implements ITopicClient {
TopicSubscribeResponse subscribe(String cacheName, String topicName) {
return _pubsubClient.subscribe(cacheName, topicName);
}

@override
void close() {
_pubsubClient.close();
}
}

0 comments on commit c88060c

Please sign in to comment.