Skip to content

Commit

Permalink
Merge pull request #31 from momentohq/publish-timeout
Browse files Browse the repository at this point in the history
chore: pass client timeout to publish as call option
  • Loading branch information
anitarua committed Dec 19, 2023
2 parents cb7619b + 5766520 commit b62c48c
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
4 changes: 3 additions & 1 deletion example/topics.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import 'dart:io';

import 'package:client_sdk_dart/client_sdk_dart.dart';
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:client_sdk_dart/src/config/topic_configurations.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_publish.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscribe.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscription_item.dart';
Expand All @@ -15,7 +16,8 @@ void main() async {
});

var topicClient = TopicClient(
CredentialProvider.fromEnvironmentVariable("MOMENTO_API_KEY"));
CredentialProvider.fromEnvironmentVariable("MOMENTO_API_KEY"),
Mobile.latest());

var result = await topicClient.publish("cache", "topic", StringValue("hi"));
switch (result) {
Expand Down
10 changes: 8 additions & 2 deletions lib/src/internal/pubsub_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'package:client_sdk_dart/src/errors/errors.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscribe.dart';
import 'package:grpc/grpc.dart';

import '../config/topic_configuration.dart';
import '../messages/values.dart';
import '../messages/responses/topics/topic_publish.dart';

Expand All @@ -17,14 +18,17 @@ abstract class AbstractPubsubClient {
class ClientPubsub implements AbstractPubsubClient {
late ClientChannel _channel;
late PubsubClient _client;
late TopicConfiguration _configuration;

ClientPubsub(CredentialProvider credentialProvider) {
ClientPubsub(
CredentialProvider credentialProvider, TopicConfiguration configuration) {
_channel = ClientChannel(credentialProvider.cacheEndpoint);
_client = PubsubClient(_channel,
options: CallOptions(metadata: {
'authorization': credentialProvider.apiKey,
'agent': 'dart:0.1.0'
}));
_configuration = configuration;
}

TopicValue_ _valueToTopicValue(Value v) {
Expand All @@ -47,7 +51,9 @@ class ClientPubsub implements AbstractPubsubClient {
request.topic = topicName;
request.value = _valueToTopicValue(value);
try {
await _client.publish(request);
await _client.publish(request,
options: CallOptions(
timeout: _configuration.transportStrategy.grpcConfig.deadline));
return TopicPublishSuccess();
} catch (e) {
if (e is SdkException) {
Expand Down
6 changes: 4 additions & 2 deletions lib/src/topic_client.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:logging/logging.dart';
import 'config/topic_configuration.dart';
import 'internal/pubsub_client.dart';
import 'messages/responses/topics/topic_subscribe.dart';
import 'messages/values.dart';
Expand All @@ -16,8 +17,9 @@ class TopicClient implements ITopicClient {
final ClientPubsub _pubsubClient;
final Logger _logger = Logger('MomentoTopicClient');

TopicClient(CredentialProvider credentialProvider)
: _pubsubClient = ClientPubsub(credentialProvider) {
TopicClient(
CredentialProvider credentialProvider, TopicConfiguration configuration)
: _pubsubClient = ClientPubsub(credentialProvider, configuration) {
_logger.finest("initializing topic client");
}

Expand Down

0 comments on commit b62c48c

Please sign in to comment.