Skip to content

Commit

Permalink
Merge pull request #18 from momentohq/subscribe
Browse files Browse the repository at this point in the history
feat: topic subscribe
  • Loading branch information
anitarua committed Dec 19, 2023
2 parents 8bfd286 + 7aeedc8 commit cc786f0
Show file tree
Hide file tree
Showing 6 changed files with 168 additions and 16 deletions.
4 changes: 0 additions & 4 deletions example/client_sdk_dart_example.dart

This file was deleted.

48 changes: 48 additions & 0 deletions example/topics.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import 'dart:io';

import 'package:client_sdk_dart/client_sdk_dart.dart';
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_publish.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscribe.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscription_item.dart';
import 'package:client_sdk_dart/src/messages/values.dart';
import 'package:logging/logging.dart';

void main() async {
Logger.root.level = Level.ALL; // defaults to Level.INFO
Logger.root.onRecord.listen((record) {
print('${record.level.name}: ${record.time}: ${record.message}');
});

var topicClient = TopicClient(
CredentialProvider.fromEnvironmentVariable("MOMENTO_API_KEY"));

var result = await topicClient.publish("cache", "topic", StringValue("hi"));
switch (result) {
case TopicPublishSuccess():
print("Successful publish!");
case TopicPublishError():
print("Publish error: ${result.errorCode} ${result.message}");
}

var sub = await topicClient.subscribe("cache", "topic");
switch (sub) {
case TopicSubscription():
print("Successful subscription!");
await for (final msg in sub.stream) {
switch (msg) {
case TopicSubscriptionItemBinary():
print("Binary value: ${msg.value}");
case TopicSubscriptionItemText():
print("String value: ${msg.value}");
case TopicSubscriptionItemError():
print("Error receiving message: ${msg.errorCode}");
}
}
case TopicSubscribeError():
print("Subscribe error: ${sub.errorCode} ${sub.message}");
}

print("End of Momento topics example");
exit(0);
}
47 changes: 37 additions & 10 deletions lib/src/internal/pubsub_client.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart';
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:client_sdk_dart/src/errors/errors.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscribe.dart';
import 'package:grpc/grpc.dart';

import '../messages/values.dart';
Expand All @@ -9,15 +10,35 @@ import '../messages/responses/topics/topic_publish.dart';
abstract class AbstractPubsubClient {
Future<TopicPublishResponse> publish(
String cacheName, String topicName, Value value);

Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName);
}

class ClientPubsub implements AbstractPubsubClient {
late ClientChannel _channel;
late PubsubClient _client;

ClientPubsub(CredentialProvider credentialProvider) {
_channel = ClientChannel(credentialProvider.cacheEndpoint);
_client = PubsubClient(_channel);
_client = PubsubClient(_channel,
options: CallOptions(metadata: {
'authorization': credentialProvider.apiKey,
'agent': 'dart:0.1.0'
}));
}

TopicValue_ _valueToTopicValue(Value v) {
var topicValue = TopicValue_();
switch (v) {
case StringValue():
topicValue.text = v.value;
return topicValue;
case BinaryValue():
topicValue.binary = v.value;
return topicValue;
}
}

@override
Future<TopicPublishResponse> publish(
String cacheName, String topicName, Value value) async {
Expand All @@ -37,15 +58,21 @@ class ClientPubsub implements AbstractPubsubClient {
}
}

TopicValue_ _valueToTopicValue(Value v) {
var topicValue = TopicValue_();
switch (v) {
case StringValue():
topicValue.text = v.value;
return topicValue;
case BinaryValue():
topicValue.binary = v.value;
return topicValue;
@override
Future<TopicSubscribeResponse> subscribe(
String cacheName, String topicName) async {
var request = SubscriptionRequest_();
request.cacheName = cacheName;
request.topic = topicName;
try {
var stream = _client.subscribe(request);
return TopicSubscription(stream);
} catch (e) {
if (e is SdkException) {
return TopicSubscribeError(e);
}
return TopicSubscribeError(
UnknownException("Unexpected error: $e", null, null));
}
}
}
38 changes: 38 additions & 0 deletions lib/src/messages/responses/topics/topic_subscribe.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart';
import 'package:client_sdk_dart/src/messages/responses/responses_base.dart';
import 'package:grpc/grpc.dart';
import 'package:logging/logging.dart';

import 'topic_subscription_item.dart';

sealed class TopicSubscribeResponse {}

class TopicSubscription implements TopicSubscribeResponse {
final ResponseStream<SubscriptionItem_> _stream;

TopicSubscription(this._stream);

Stream<TopicSubscriptionItemResponse> get stream =>
_stream.map(_processResult).where((item) => item != null).cast();

TopicSubscriptionItemResponse? _processResult(SubscriptionItem_ item) {
final logger = Logger("TopicSubscribeResponse");
switch (item.whichKind()) {
case SubscriptionItem__Kind.item:
return createTopicItemResponse(item.item);
case SubscriptionItem__Kind.heartbeat:
logger.fine("topic client received a heartbeat");
case SubscriptionItem__Kind.discontinuity:
logger.fine("topic client received a discontinuity");
default:
logger.shout(
"topic client received unknown subscription item: ${item.whichKind()}");
}
return null;
}
}

class TopicSubscribeError extends ErrorResponseBase
implements TopicSubscribeResponse {
TopicSubscribeError(super.exception);
}
34 changes: 34 additions & 0 deletions lib/src/messages/responses/topics/topic_subscription_item.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
import 'package:client_sdk_dart/generated/cachepubsub.pb.dart';
import 'package:client_sdk_dart/src/errors/errors.dart';
import 'package:client_sdk_dart/src/messages/responses/responses_base.dart';

sealed class TopicSubscriptionItemResponse {}

class TopicSubscriptionItemText implements TopicSubscriptionItemResponse {
final String _value;
TopicSubscriptionItemText(this._value);
String get value => _value;
}

class TopicSubscriptionItemBinary implements TopicSubscriptionItemResponse {
final List<int> _value;
TopicSubscriptionItemBinary(this._value);
List<int> get value => _value;
}

class TopicSubscriptionItemError extends ErrorResponseBase
implements TopicSubscriptionItemResponse {
TopicSubscriptionItemError(super.exception);
}

TopicSubscriptionItemResponse createTopicItemResponse(TopicItem_ item) {
switch (item.value.whichKind()) {
case TopicValue__Kind.text:
return TopicSubscriptionItemText(item.value.text);
case TopicValue__Kind.binary:
return TopicSubscriptionItemBinary(item.value.binary);
default:
return TopicSubscriptionItemError(UnknownException(
"unknown TopicItemResponse value: ${item.value}", null, null));
}
}
13 changes: 11 additions & 2 deletions lib/src/topic_client.dart
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:logging/logging.dart';
import 'internal/pubsub_client.dart';
import 'messages/responses/topics/topic_subscribe.dart';
import 'messages/values.dart';
import 'messages/responses/topics/topic_publish.dart';

abstract class ITopicClient {
Future<TopicPublishResponse> publish(
String cacheName, String topicName, Value value);

Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName);
}

class TopicClient implements ITopicClient {
Expand All @@ -20,7 +23,13 @@ class TopicClient implements ITopicClient {

@override
Future<TopicPublishResponse> publish(
String cacheName, String topicName, Value value) {
return _pubsubClient.publish(cacheName, topicName, value);
String cacheName, String topicName, Value value) async {
return await _pubsubClient.publish(cacheName, topicName, value);
}

@override
Future<TopicSubscribeResponse> subscribe(
String cacheName, String topicName) async {
return await _pubsubClient.subscribe(cacheName, topicName);
}
}

0 comments on commit cc786f0

Please sign in to comment.