diff --git a/lib/src/internal/pubsub_client.dart b/lib/src/internal/pubsub_client.dart index 76fc1bc..877533c 100644 --- a/lib/src/internal/pubsub_client.dart +++ b/lib/src/internal/pubsub_client.dart @@ -1,6 +1,7 @@ import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart'; import 'package:client_sdk_dart/src/auth/credential_provider.dart'; import 'package:client_sdk_dart/src/errors/errors.dart'; +import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscribe.dart'; import 'package:grpc/grpc.dart'; import '../messages/values.dart'; @@ -9,15 +10,31 @@ import '../messages/responses/topics/topic_publish.dart'; abstract class AbstractPubsubClient { Future publish( String cacheName, String topicName, Value value); + + Future subscribe(String cacheName, String topicName); } class ClientPubsub implements AbstractPubsubClient { late ClientChannel _channel; late PubsubClient _client; + ClientPubsub(CredentialProvider credentialProvider) { _channel = ClientChannel(credentialProvider.cacheEndpoint); _client = PubsubClient(_channel); } + + TopicValue_ _valueToTopicValue(Value v) { + var topicValue = TopicValue_(); + switch (v) { + case StringValue(): + topicValue.text = v.value; + return topicValue; + case BinaryValue(): + topicValue.binary = v.value; + return topicValue; + } + } + @override Future publish( String cacheName, String topicName, Value value) async { @@ -37,15 +54,20 @@ class ClientPubsub implements AbstractPubsubClient { } } - TopicValue_ _valueToTopicValue(Value v) { - var topicValue = TopicValue_(); - switch (v) { - case StringValue(): - topicValue.text = v.value; - return topicValue; - case BinaryValue(): - topicValue.binary = v.value; - return topicValue; + @override + Future subscribe(String cacheName, String topicName) async { + var request = SubscriptionRequest_(); + request.cacheName = cacheName; + request.topic = topicName; + try { + var stream = _client.subscribe(request); + return TopicSubscription(stream); + } catch (e) { + if (e is SdkException) { + return TopicSubscribeError(e); + } + return TopicSubscribeError( + UnknownException("Unexpected error: $e", null, null)); } } } diff --git a/lib/src/messages/responses/topics/topic_subscribe.dart b/lib/src/messages/responses/topics/topic_subscribe.dart new file mode 100644 index 0000000..2a16868 --- /dev/null +++ b/lib/src/messages/responses/topics/topic_subscribe.dart @@ -0,0 +1,36 @@ +import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart'; +import 'package:client_sdk_dart/src/messages/responses/responses_base.dart'; +import 'package:grpc/grpc.dart'; +import 'package:logging/logging.dart'; + +import 'topic_subscription_item.dart'; + +sealed class TopicSubscribeResponse {} + +class TopicSubscription implements TopicSubscribeResponse { + final ResponseStream _stream; + + TopicSubscription(this._stream); + + Stream get stream => _stream.map(_processResult).where((item) => item != null); + + TopicSubscriptionItemResponse? _processResult(SubscriptionItem_ item) { + final logger = Logger("TopicSubscribeResponse"); + switch (item.runtimeType) { + case TopicItem_: + return createTopicItemResponse(item as TopicItem_); + case Heartbeat_: + logger.info("topic client received a heartbeat"); + case Discontinuity_: + logger.info("topic client received a discontinuity"); + default: + logger.shout("topic client received unknown subscription item: ", item.runtimeType); + } + return null; + } +} + +class TopicSubscribeError extends ErrorResponseBase + implements TopicSubscribeResponse { + TopicSubscribeError(super.exception); +} diff --git a/lib/src/messages/responses/topics/topic_subscription_item.dart b/lib/src/messages/responses/topics/topic_subscription_item.dart new file mode 100644 index 0000000..e467c04 --- /dev/null +++ b/lib/src/messages/responses/topics/topic_subscription_item.dart @@ -0,0 +1,33 @@ +import 'package:client_sdk_dart/generated/cachepubsub.pb.dart'; +import 'package:client_sdk_dart/src/errors/errors.dart'; +import 'package:client_sdk_dart/src/messages/responses/responses_base.dart'; + +sealed class TopicSubscriptionItemResponse {} + +class TopicSubscriptionItemText implements TopicSubscriptionItemResponse { + final String _value; + TopicSubscriptionItemText(this._value); + String get value => _value; +} + +class TopicSubscriptionItemBinary implements TopicSubscriptionItemResponse { + final List _value; + TopicSubscriptionItemBinary(this._value); + List get value => _value; +} + +class TopicSubscriptionItemError extends ErrorResponseBase + implements TopicSubscriptionItemResponse { + TopicSubscriptionItemError(super.exception); +} + +TopicSubscriptionItemResponse createTopicItemResponse(TopicItem_ item) { + switch (item.value.whichKind()) { + case TopicValue__Kind.text: + return TopicSubscriptionItemText(item.value.text); + case TopicValue__Kind.binary: + return TopicSubscriptionItemBinary(item.value.binary); + default: + return TopicSubscriptionItemError(UnknownException("unknown TopicItemResponse value: $item.value", null, null)); + } +} \ No newline at end of file diff --git a/lib/src/topic_client.dart b/lib/src/topic_client.dart index 1086274..bb95bf4 100644 --- a/lib/src/topic_client.dart +++ b/lib/src/topic_client.dart @@ -1,12 +1,15 @@ import 'package:client_sdk_dart/src/auth/credential_provider.dart'; import 'package:logging/logging.dart'; import 'internal/pubsub_client.dart'; +import 'messages/responses/topics/topic_subscribe.dart'; import 'messages/values.dart'; import 'messages/responses/topics/topic_publish.dart'; abstract class ITopicClient { Future publish( String cacheName, String topicName, Value value); + + Future subscribe(String cacheName, String topicName); } class TopicClient implements ITopicClient { @@ -20,7 +23,12 @@ class TopicClient implements ITopicClient { @override Future publish( - String cacheName, String topicName, Value value) { - return _pubsubClient.publish(cacheName, topicName, value); + String cacheName, String topicName, Value value) async { + return await _pubsubClient.publish(cacheName, topicName, value); + } + + @override + Future subscribe(String cacheName, String topicName) async { + return await _pubsubClient.subscribe(cacheName, topicName); } }