Skip to content

Commit

Permalink
fix: return topic subscribe error immediately if subscription limit r…
Browse files Browse the repository at this point in the history
…eached (#62)

* fix: return topic subscribe error immediately if subscription limit reached

* dart format

* remove keepalive settings

* update examples to await subscribe

* dart format

* use env var in flutter app instead of hardcoded api key

* dart format

* flutter app can use fromEnvVar credential provider instead
  • Loading branch information
anitarua committed Jan 9, 2024
1 parent 425a620 commit 30b9f49
Show file tree
Hide file tree
Showing 10 changed files with 85 additions and 37 deletions.
12 changes: 7 additions & 5 deletions example/flutter_chat_app/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,14 @@

## Running the Flutter Demo App

1. Provide your Momento API key in [`lib/main.dart`](./lib/main.dart) at this line:
```
static const String _momentoApiKey = "your-api-key-here";
```
You can run Flutter apps in [VSCode](https://docs.flutter.dev/tools/vs-code) or [Android Studio or IntelliJ](https://docs.flutter.dev/tools/android-studio) using built-in tooling, just make sure to provide your Momento API key as an environment variable named `MOMENTO_API_KEY`.

2. Run in [VSCode](https://docs.flutter.dev/tools/vs-code) or [Android Studio or IntelliJ](https://docs.flutter.dev/tools/android-studio). If using the command line, you can run using a command like: `flutter run -d <device_id>`. You can use `flutter devices` to see all connected devices and their IDs.
You can also use the terminal to list your devides and select one to run.

```bash
flutter devices
flutter run -d <device_id> --dart-define="MOMENTO_API_KEY=<your-api-key>"
```

## Flutter Resources

Expand Down
12 changes: 8 additions & 4 deletions example/flutter_chat_app/lib/main.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import 'dart:io';

import 'package:client_sdk_dart/momento.dart';
import 'package:momento/momento.dart';
import 'package:flutter/material.dart';

void main() {
Expand Down Expand Up @@ -33,9 +33,9 @@ class MyHomePage extends StatefulWidget {
}

class _MyHomePageState extends State<MyHomePage> {
static const String _momentoApiKey = "your-api-key-here";
TopicClient _topicClient = TopicClient(
CredentialProvider.fromString(_momentoApiKey), Mobile.latest());
CredentialProvider.fromEnvironmentVariable('MOMENTO_API_KEY'),
MobileTopicConfiguration.latest());

List<String> _messages = ["Welcome to Momento Topics!"];
final TextEditingController _textInputController = TextEditingController();
Expand All @@ -45,8 +45,12 @@ class _MyHomePageState extends State<MyHomePage> {
@override
void initState() {
super.initState();
establishSubscription();
return;
}

final subscribeResult = _topicClient.subscribe("cache", "topic");
Future<void> establishSubscription() async {
final subscribeResult = await _topicClient.subscribe("cache", "topic");
switch (subscribeResult) {
case TopicSubscription():
print("Successful subscription");
Expand Down
2 changes: 1 addition & 1 deletion example/flutter_chat_app/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ environment:
dependencies:
flutter:
sdk: flutter
client_sdk_dart:
momento:
path: ../../../client-sdk-dart


Expand Down
4 changes: 2 additions & 2 deletions example/topics/advanced.dart
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void main() async {
}
});

var subscription = topicClient.subscribe("cache", "topic");
var subscription = await topicClient.subscribe("cache", "topic");
var messageStream = switch (subscription) {
TopicSubscription() => subscription.stream,
TopicSubscribeError() => throw Exception(
Expand Down Expand Up @@ -59,7 +59,7 @@ void main() async {

// unsubscribing should not affect the topic client's ability
// to subscribe to another topic afterwards
var sub2 = topicClient.subscribe("cache", "topic");
var sub2 = await topicClient.subscribe("cache", "topic");
switch (sub2) {
case TopicSubscription():
print("Successful 2nd subscription!");
Expand Down
2 changes: 1 addition & 1 deletion example/topics/basic_subscriber.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ void main() async {
CredentialProvider.fromEnvironmentVariable("MOMENTO_API_KEY"),
TopicClientConfigurations.latest());

var subscription = topicClient.subscribe("cache", "topic");
var subscription = await topicClient.subscribe("cache", "topic");
var messageStream = switch (subscription) {
TopicSubscription() => subscription.stream,
TopicSubscribeError() => throw Exception(
Expand Down
19 changes: 14 additions & 5 deletions lib/src/internal/pubsub_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ abstract class AbstractPubsubClient {
Future<TopicPublishResponse> publish(
String cacheName, String topicName, Value value);

TopicSubscribeResponse subscribe(String cacheName, String topicName);
Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName);

void close();
}
Expand Down Expand Up @@ -61,20 +61,29 @@ class ClientPubsub implements AbstractPubsubClient {
}

@override
TopicSubscribeResponse subscribe(String cacheName, String topicName,
{Int64? resumeAtTopicSequenceNumber}) {
Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName,
{Int64? resumeAtTopicSequenceNumber}) async {
var request = SubscriptionRequest_();
request.cacheName = cacheName;
request.topic = topicName;
request.resumeAtTopicSequenceNumber =
resumeAtTopicSequenceNumber ?? Int64(0);
try {
var stream = _grpcManager.client.subscribe(request);
return TopicSubscription(stream, request.resumeAtTopicSequenceNumber,
this, cacheName, topicName);
final subscription = TopicSubscription(stream,
request.resumeAtTopicSequenceNumber, this, cacheName, topicName);

try {
await subscription.init();
return subscription;
} catch (e) {
rethrow;
}
} catch (e) {
if (e is GrpcError) {
return TopicSubscribeError(grpcStatusToSdkException(e));
} else if (e is SdkException) {
return TopicSubscribeError(e);
}
return TopicSubscribeError(
UnknownException("Unexpected error: $e", null, null));
Expand Down
6 changes: 1 addition & 5 deletions lib/src/internal/topics_grpc_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,7 @@ class TopicGrpcManager {
final _logger = Logger('MomentoTopicClient');

TopicGrpcManager(CredentialProvider credentialProvider) {
_channel = ClientChannel(credentialProvider.cacheEndpoint,
options: ChannelOptions(
keepAlive: ClientKeepAliveOptions(
pingInterval: Duration(seconds: 10),
timeout: Duration(seconds: 5))));
_channel = ClientChannel(credentialProvider.cacheEndpoint);
_client = PubsubClient(_channel,
options: CallOptions(metadata: {
'authorization': credentialProvider.apiKey,
Expand Down
56 changes: 46 additions & 10 deletions lib/src/messages/responses/topics/topic_subscribe.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,24 @@ class TopicSubscription implements TopicSubscribeResponse {
String topicName;
bool retry = true;
final logger = Logger("TopicSubscribeResponse");
late Stream _broadcastStream;

TopicSubscription(this._stream, this.lastSequenceNumber, this._client,
this.cacheName, this.topicName);
this.cacheName, this.topicName) {
_broadcastStream = _stream.asBroadcastStream();
}

Future<void> init() async {
await for (final firstItem in _broadcastStream) {
if (firstItem.whichKind() != SubscriptionItem__Kind.heartbeat) {
throw InternalServerException(
"Expected heartbeat message for topic $topicName on cache $cacheName, got ${firstItem.whichKind()}",
null,
null);
}
break;
}
}

Stream<TopicSubscriptionItemResponse> get stream {
return _handleStream();
Expand All @@ -29,7 +44,7 @@ class TopicSubscription implements TopicSubscribeResponse {
Stream<TopicSubscriptionItemResponse> _handleStream() async* {
while (retry) {
try {
await for (final msg in _stream) {
await for (final msg in _broadcastStream) {
var result = _processResult(msg);
if (result != null) {
yield result;
Expand All @@ -39,20 +54,41 @@ class TopicSubscription implements TopicSubscribeResponse {
if (e is CancelledException ||
(e is GrpcError && e.codeName == "CANCELLED")) {
logger.fine("Subscription was cancelled, not reconnecting");
await _stream.cancel();
retry = false;
} else if (e is ClientResourceExhaustedException ||
(e is GrpcError && e.codeName == "RESOURCE_EXHAUSTED")) {
logger.fine("Subscription limit reached, not reconnecting");
await _stream.cancel();
retry = false;
} else {
logger.fine("Attempting to reconnect after receiving error: $e");
var result = _client.subscribe(cacheName, topicName,
resumeAtTopicSequenceNumber: lastSequenceNumber);
if (result is TopicSubscription) {
_stream = result._stream;
lastSequenceNumber = result.lastSequenceNumber;
} else if (result is TopicSubscribeError) {
logger.fine("Error reconnecting: ${result.message}");
}
}
}

if (retry) {
logger.fine("retry is still true");
retry = await attemptReconnect();
}
}
}

Future<bool> attemptReconnect() async {
await _stream.cancel();
var result = await _client.subscribe(cacheName, topicName,
resumeAtTopicSequenceNumber: lastSequenceNumber);
if (result is TopicSubscription) {
_stream = result._stream;
_broadcastStream = result._broadcastStream;
lastSequenceNumber = result.lastSequenceNumber;
} else if (result is TopicSubscribeError) {
logger.fine("Error reconnecting: ${result.message}");
if (result.errorCode == MomentoErrorCode.limitExceededError ||
result.errorCode == MomentoErrorCode.cancelledError) {
return false;
}
}
return true;
}

TopicSubscriptionItemResponse? _processResult(SubscriptionItem_ item) {
Expand Down
7 changes: 4 additions & 3 deletions lib/src/topic_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ abstract class ITopicClient {
Future<TopicPublishResponse> publish(
String cacheName, String topicName, Value value);

TopicSubscribeResponse subscribe(String cacheName, String topicName);
Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName);

void close();
}
Expand All @@ -33,8 +33,9 @@ class TopicClient implements ITopicClient {
}

@override
TopicSubscribeResponse subscribe(String cacheName, String topicName) {
return _pubsubClient.subscribe(cacheName, topicName);
Future<TopicSubscribeResponse> subscribe(
String cacheName, String topicName) async {
return await _pubsubClient.subscribe(cacheName, topicName);
}

@override
Expand Down
2 changes: 1 addition & 1 deletion pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ dependencies:
logging: ^1.2.0
protobuf: ^3.1.0
string_validator: ^1.0.2
uuid: ^4.3.1
uuid: ^4.2.2
# path: ^1.8.0

dev_dependencies:
Expand Down

0 comments on commit 30b9f49

Please sign in to comment.