Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: topics subscription resiliency #38

Merged
merged 4 commits into from
Dec 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 47 additions & 15 deletions examples/topics.dart
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import 'dart:async';
import 'dart:io';

import 'package:client_sdk_dart/client_sdk_dart.dart';
Expand All @@ -13,32 +14,63 @@ void main() async {
CredentialProvider.fromEnvironmentVariable("MOMENTO_API_KEY"),
Mobile.latest());

var result = await topicClient.publish("cache", "topic", StringValue("hi"));
switch (result) {
case TopicPublishSuccess():
print("Successful publish!");
case TopicPublishError():
print("Publish error: ${result.errorCode} ${result.message}");
}
// start publishing messages in 2 seconds
Timer(const Duration(seconds: 2), () async {
// publish 10 messages spaced 1 second apart
for (final _ in Iterable.generate(10)) {
var result =
await topicClient.publish("cache", "topic", StringValue("hi"));
switch (result) {
case TopicPublishSuccess():
print("Successful publish!");
case TopicPublishError():
print("Publish error: ${result.errorCode} ${result.message}");
}
sleep(Duration(seconds: 1));
}
});

var sub = topicClient.subscribe("cache", "topic");
switch (sub) {
case TopicSubscription():
print("Successful subscription!");
await for (final msg in sub.stream) {
switch (msg) {
case TopicSubscriptionItemBinary():
print("Binary value: ${msg.value}");
case TopicSubscriptionItemText():
print("String value: ${msg.value}");
case TopicSubscriptionItemError():
print("Error receiving message: ${msg.errorCode}");

// cancel subscription 10 seconds from now
Timer(const Duration(seconds: 10), () {
print("Cancelling subscription!");
sub.unsubscribe();
});

try {
await for (final msg in sub.stream) {
switch (msg) {
case TopicSubscriptionItemBinary():
print("Binary value: ${msg.value}");
case TopicSubscriptionItemText():
print("String value: ${msg.value}");
case TopicSubscriptionItemError():
print("Error receiving message: ${msg.errorCode}");
}
}
} catch (e) {
print("Runtime type: ${e.runtimeType}");
print("Error with await for loop: $e");
}
case TopicSubscribeError():
print("Subscribe error: ${sub.errorCode} ${sub.message}");
}

// unsubscribing should not affect the topic client's ability

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is really useful, I imagine it was great as almost a TDD approach to working on this feature. In the long run though, we probably want to keep these examples as simple as humanly possible, so we might want to move this to an "advanced topics example" or something.

Not in any way, shape, or form a blocker for this PR :) just calling it out as something to be aware of for future touching up on these examples/docs and stuff.

// to subscribe to another topic afterwards
var sub2 = topicClient.subscribe("cache", "topic");
switch (sub2) {
case TopicSubscription():
print("Successful 2nd subscription!");
case TopicSubscribeError():
print("Subscribe error: ${sub2.errorCode} ${sub2.message}");
}

topicClient.close();
print("End of Momento topics example");
exit(0);
}
38 changes: 22 additions & 16 deletions lib/src/internal/pubsub_client.dart
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart';
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:client_sdk_dart/src/errors/errors.dart';
import 'package:client_sdk_dart/src/internal/topics_grpc_manager.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscribe.dart';
import 'package:fixnum/fixnum.dart';
import 'package:grpc/grpc.dart';

import '../config/topic_configuration.dart';
Expand All @@ -13,22 +15,16 @@ abstract class AbstractPubsubClient {
String cacheName, String topicName, Value value);

TopicSubscribeResponse subscribe(String cacheName, String topicName);

void close();
}

class ClientPubsub implements AbstractPubsubClient {
late ClientChannel _channel;
late PubsubClient _client;
late TopicConfiguration _configuration;
final TopicConfiguration _configuration;
late final TopicGrpcManager _grpcManager;

ClientPubsub(
CredentialProvider credentialProvider, TopicConfiguration configuration) {
_channel = ClientChannel(credentialProvider.cacheEndpoint);
_client = PubsubClient(_channel,
options: CallOptions(metadata: {
'authorization': credentialProvider.apiKey,
'agent': 'dart:0.1.0'
}));
_configuration = configuration;
ClientPubsub(CredentialProvider credentialProvider, this._configuration) {
_grpcManager = TopicGrpcManager(credentialProvider);
}

TopicValue_ _valueToTopicValue(Value v) {
Expand All @@ -51,7 +47,7 @@ class ClientPubsub implements AbstractPubsubClient {
request.topic = topicName;
request.value = _valueToTopicValue(value);
try {
await _client.publish(request,
await _grpcManager.client.publish(request,
options: CallOptions(
timeout: _configuration.transportStrategy.grpcConfig.deadline));
return TopicPublishSuccess();
Expand All @@ -65,19 +61,29 @@ class ClientPubsub implements AbstractPubsubClient {
}

@override
TopicSubscribeResponse subscribe(String cacheName, String topicName) {
TopicSubscribeResponse subscribe(String cacheName, String topicName,
{Int64? resumeAtTopicSequenceNumber}) {
var request = SubscriptionRequest_();
request.cacheName = cacheName;
request.topic = topicName;
request.resumeAtTopicSequenceNumber =
resumeAtTopicSequenceNumber ?? Int64(0);
try {
var stream = _client.subscribe(request);
return TopicSubscription(stream);
var stream = _grpcManager.client.subscribe(request);
return TopicSubscription(stream, request.resumeAtTopicSequenceNumber,
this, cacheName, topicName);
} catch (e) {
print("Error in pubsubclient.subscribe: $e");
if (e is SdkException) {
return TopicSubscribeError(e);
}
return TopicSubscribeError(
UnknownException("Unexpected error: $e", null, null));
}
}

@override
void close() {
_grpcManager.close();
}
}
30 changes: 30 additions & 0 deletions lib/src/internal/topics_grpc_manager.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import 'package:client_sdk_dart/client_sdk_dart.dart';
import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart';
import 'package:grpc/grpc.dart';
import 'package:logging/logging.dart';

class TopicGrpcManager {
late final ClientChannel _channel;
late final PubsubClient _client;
final _logger = Logger('MomentoTopicClient');

TopicGrpcManager(CredentialProvider credentialProvider) {
_channel = ClientChannel(credentialProvider.cacheEndpoint);
_client = PubsubClient(_channel,
options: CallOptions(metadata: {
'authorization': credentialProvider.apiKey,
'agent': 'dart:0.1.0'
}));
}

PubsubClient get client => _client;
ClientChannel get channel => _channel;

void close() {
try {
_channel.shutdown();
} catch (e) {
_logger.warning("Error shutting down channel: $e");
}
}
}
54 changes: 49 additions & 5 deletions lib/src/messages/responses/topics/topic_subscribe.dart
Original file line number Diff line number Diff line change
@@ -1,24 +1,64 @@
import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart';
import 'package:client_sdk_dart/src/errors/errors.dart';
import 'package:client_sdk_dart/src/messages/responses/responses_base.dart';
import 'package:grpc/grpc.dart';
import 'package:logging/logging.dart';
import 'package:fixnum/fixnum.dart';

import '../../../internal/pubsub_client.dart';
import 'topic_subscription_item.dart';

sealed class TopicSubscribeResponse {}

class TopicSubscription implements TopicSubscribeResponse {
final ResponseStream<SubscriptionItem_> _stream;
ResponseStream<SubscriptionItem_> _stream;
Int64 lastSequenceNumber;
final ClientPubsub _client;
String cacheName;
String topicName;
bool retry = true;
final logger = Logger("TopicSubscribeResponse");

TopicSubscription(this._stream);
TopicSubscription(this._stream, this.lastSequenceNumber, this._client,
this.cacheName, this.topicName);

Stream<TopicSubscriptionItemResponse> get stream =>
_stream.map(_processResult).where((item) => item != null).cast();
Stream<TopicSubscriptionItemResponse> get stream {
return _handleStream();
}

Stream<TopicSubscriptionItemResponse> _handleStream() async* {
while (retry) {
try {
await for (final msg in _stream) {
var result = _processResult(msg);
if (result != null) {
yield result;
}
}
} catch (e) {
if (e is CancelledException ||
(e is GrpcError && e.codeName == "CANCELLED")) {
logger.fine("Subscription was cancelled, not reconnecting");
retry = false;
} else {
logger.fine("Attempting to reconnect after receiving error: $e");
var result = _client.subscribe(cacheName, topicName,
resumeAtTopicSequenceNumber: lastSequenceNumber);
if (result is TopicSubscription) {
_stream = result._stream;
lastSequenceNumber = result.lastSequenceNumber;
} else if (result is TopicSubscribeError) {
logger.fine("Error reconnecting: ${result.message}");
}
}
}
}
}

TopicSubscriptionItemResponse? _processResult(SubscriptionItem_ item) {
final logger = Logger("TopicSubscribeResponse");
switch (item.whichKind()) {
case SubscriptionItem__Kind.item:
lastSequenceNumber = item.item.topicSequenceNumber;
return createTopicItemResponse(item.item);
case SubscriptionItem__Kind.heartbeat:
logger.fine("topic client received a heartbeat");
Expand All @@ -30,6 +70,10 @@ class TopicSubscription implements TopicSubscribeResponse {
}
return null;
}

void unsubscribe() async {
await _stream.cancel();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

}
}

class TopicSubscribeError extends ErrorResponseBase
Expand Down
11 changes: 9 additions & 2 deletions lib/src/topic_client.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:client_sdk_dart/src/config/logger.dart';
// import 'package:client_sdk_dart/src/config/logger.dart';
import 'package:logging/logging.dart';
import 'config/topic_configuration.dart';
import 'internal/pubsub_client.dart';
Expand All @@ -12,6 +12,8 @@ abstract class ITopicClient {
String cacheName, String topicName, Value value);

TopicSubscribeResponse subscribe(String cacheName, String topicName);

void close();
}

class TopicClient implements ITopicClient {
Expand All @@ -21,7 +23,7 @@ class TopicClient implements ITopicClient {
TopicClient(
CredentialProvider credentialProvider, TopicConfiguration configuration)
: _pubsubClient = ClientPubsub(credentialProvider, configuration) {
_logger.level = determineLoggerLevel(configuration.logLevel);
// _logger.level = determineLoggerLevel(configuration.logLevel);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

errant comments, maybe remove in a later PR.

_logger.finest("initializing topic client");
}

Expand All @@ -35,4 +37,9 @@ class TopicClient implements ITopicClient {
TopicSubscribeResponse subscribe(String cacheName, String topicName) {
return _pubsubClient.subscribe(cacheName, topicName);
}

@override
void close() {
_pubsubClient.close();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

:thu

}
}