Skip to content

Commit

Permalink
feat: topic subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
anitarua committed Dec 18, 2023
1 parent 8bfd286 commit fa2b3be
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 11 deletions.
40 changes: 31 additions & 9 deletions lib/src/internal/pubsub_client.dart
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart';
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:client_sdk_dart/src/errors/errors.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscribe.dart';
import 'package:grpc/grpc.dart';

import '../messages/values.dart';
Expand All @@ -9,15 +10,31 @@ import '../messages/responses/topics/topic_publish.dart';
abstract class AbstractPubsubClient {
Future<TopicPublishResponse> publish(
String cacheName, String topicName, Value value);

Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName);
}

class ClientPubsub implements AbstractPubsubClient {
late ClientChannel _channel;
late PubsubClient _client;

ClientPubsub(CredentialProvider credentialProvider) {
_channel = ClientChannel(credentialProvider.cacheEndpoint);
_client = PubsubClient(_channel);
}

TopicValue_ _valueToTopicValue(Value v) {
var topicValue = TopicValue_();
switch (v) {
case StringValue():
topicValue.text = v.value;
return topicValue;
case BinaryValue():
topicValue.binary = v.value;
return topicValue;
}
}

@override
Future<TopicPublishResponse> publish(
String cacheName, String topicName, Value value) async {
Expand All @@ -37,15 +54,20 @@ class ClientPubsub implements AbstractPubsubClient {
}
}

TopicValue_ _valueToTopicValue(Value v) {
var topicValue = TopicValue_();
switch (v) {
case StringValue():
topicValue.text = v.value;
return topicValue;
case BinaryValue():
topicValue.binary = v.value;
return topicValue;
@override
Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName) async {
var request = SubscriptionRequest_();
request.cacheName = cacheName;
request.topic = topicName;
try {
var stream = _client.subscribe(request);
return TopicSubscription(stream);
} catch (e) {
if (e is SdkException) {
return TopicSubscribeError(e);
}
return TopicSubscribeError(
UnknownException("Unexpected error: $e", null, null));
}
}
}
36 changes: 36 additions & 0 deletions lib/src/messages/responses/topics/topic_subscribe.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import 'package:client_sdk_dart/generated/cachepubsub.pbgrpc.dart';
import 'package:client_sdk_dart/src/messages/responses/responses_base.dart';
import 'package:grpc/grpc.dart';
import 'package:logging/logging.dart';

import 'topic_subscription_item.dart';

sealed class TopicSubscribeResponse {}

class TopicSubscription implements TopicSubscribeResponse {
final ResponseStream<SubscriptionItem_> _stream;

TopicSubscription(this._stream);

Stream<TopicSubscriptionItemResponse?> get stream => _stream.map(_processResult).where((item) => item != null);

TopicSubscriptionItemResponse? _processResult(SubscriptionItem_ item) {
final logger = Logger("TopicSubscribeResponse");
switch (item.runtimeType) {
case TopicItem_:
return createTopicItemResponse(item as TopicItem_);
case Heartbeat_:
logger.info("topic client received a heartbeat");
case Discontinuity_:
logger.info("topic client received a discontinuity");
default:
logger.shout("topic client received unknown subscription item: ", item.runtimeType);
}
return null;
}
}

class TopicSubscribeError extends ErrorResponseBase
implements TopicSubscribeResponse {
TopicSubscribeError(super.exception);
}
33 changes: 33 additions & 0 deletions lib/src/messages/responses/topics/topic_subscription_item.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import 'package:client_sdk_dart/generated/cachepubsub.pb.dart';
import 'package:client_sdk_dart/src/errors/errors.dart';
import 'package:client_sdk_dart/src/messages/responses/responses_base.dart';

sealed class TopicSubscriptionItemResponse {}

class TopicSubscriptionItemText implements TopicSubscriptionItemResponse {
final String _value;
TopicSubscriptionItemText(this._value);
String get value => _value;
}

class TopicSubscriptionItemBinary implements TopicSubscriptionItemResponse {
final List<int> _value;
TopicSubscriptionItemBinary(this._value);
List<int> get value => _value;
}

class TopicSubscriptionItemError extends ErrorResponseBase
implements TopicSubscriptionItemResponse {
TopicSubscriptionItemError(super.exception);
}

TopicSubscriptionItemResponse createTopicItemResponse(TopicItem_ item) {
switch (item.value.whichKind()) {
case TopicValue__Kind.text:
return TopicSubscriptionItemText(item.value.text);
case TopicValue__Kind.binary:
return TopicSubscriptionItemBinary(item.value.binary);
default:
return TopicSubscriptionItemError(UnknownException("unknown TopicItemResponse value: $item.value", null, null));
}
}
12 changes: 10 additions & 2 deletions lib/src/topic_client.dart
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:logging/logging.dart';
import 'internal/pubsub_client.dart';
import 'messages/responses/topics/topic_subscribe.dart';
import 'messages/values.dart';
import 'messages/responses/topics/topic_publish.dart';

abstract class ITopicClient {
Future<TopicPublishResponse> publish(
String cacheName, String topicName, Value value);

Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName);
}

class TopicClient implements ITopicClient {
Expand All @@ -20,7 +23,12 @@ class TopicClient implements ITopicClient {

@override
Future<TopicPublishResponse> publish(
String cacheName, String topicName, Value value) {
return _pubsubClient.publish(cacheName, topicName, value);
String cacheName, String topicName, Value value) async {
return await _pubsubClient.publish(cacheName, topicName, value);
}

@override
Future<TopicSubscribeResponse> subscribe(String cacheName, String topicName) async {
return await _pubsubClient.subscribe(cacheName, topicName);
}
}

0 comments on commit fa2b3be

Please sign in to comment.