Skip to content

Commit

Permalink
chore: pass client timeout to publish as call option
Browse files Browse the repository at this point in the history
  • Loading branch information
anitarua committed Dec 19, 2023
1 parent cc786f0 commit 7b0f0d9
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 4 deletions.
9 changes: 7 additions & 2 deletions lib/src/internal/pubsub_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'package:client_sdk_dart/src/errors/errors.dart';
import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscribe.dart';
import 'package:grpc/grpc.dart';

import '../config/topic_configuration.dart';
import '../messages/values.dart';
import '../messages/responses/topics/topic_publish.dart';

Expand All @@ -17,14 +18,16 @@ abstract class AbstractPubsubClient {
class ClientPubsub implements AbstractPubsubClient {
late ClientChannel _channel;
late PubsubClient _client;
late TopicConfiguration _configuration;

ClientPubsub(CredentialProvider credentialProvider) {
ClientPubsub(CredentialProvider credentialProvider, TopicConfiguration configuration) {
_channel = ClientChannel(credentialProvider.cacheEndpoint);
_client = PubsubClient(_channel,
options: CallOptions(metadata: {
'authorization': credentialProvider.apiKey,
'agent': 'dart:0.1.0'
}));
_configuration = configuration;
}

TopicValue_ _valueToTopicValue(Value v) {
Expand All @@ -47,7 +50,9 @@ class ClientPubsub implements AbstractPubsubClient {
request.topic = topicName;
request.value = _valueToTopicValue(value);
try {
await _client.publish(request);
await _client.publish(request,
options: CallOptions(
timeout: _configuration.transportStrategy.grpcConfig.deadline));
return TopicPublishSuccess();
} catch (e) {
if (e is SdkException) {
Expand Down
6 changes: 4 additions & 2 deletions lib/src/topic_client.dart
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import 'package:client_sdk_dart/src/auth/credential_provider.dart';
import 'package:logging/logging.dart';
import 'config/topic_configuration.dart';
import 'internal/pubsub_client.dart';
import 'messages/responses/topics/topic_subscribe.dart';
import 'messages/values.dart';
Expand All @@ -16,8 +17,9 @@ class TopicClient implements ITopicClient {
final ClientPubsub _pubsubClient;
final Logger _logger = Logger('MomentoTopicClient');

TopicClient(CredentialProvider credentialProvider)
: _pubsubClient = ClientPubsub(credentialProvider) {
TopicClient(
CredentialProvider credentialProvider, TopicConfiguration configuration)
: _pubsubClient = ClientPubsub(credentialProvider, configuration) {
_logger.finest("initializing topic client");
}

Expand Down

0 comments on commit 7b0f0d9

Please sign in to comment.