diff --git a/lib/src/internal/pubsub_client.dart b/lib/src/internal/pubsub_client.dart index 9f9c051..7131dce 100644 --- a/lib/src/internal/pubsub_client.dart +++ b/lib/src/internal/pubsub_client.dart @@ -4,6 +4,7 @@ import 'package:client_sdk_dart/src/errors/errors.dart'; import 'package:client_sdk_dart/src/messages/responses/topics/topic_subscribe.dart'; import 'package:grpc/grpc.dart'; +import '../config/topic_configuration.dart'; import '../messages/values.dart'; import '../messages/responses/topics/topic_publish.dart'; @@ -17,14 +18,16 @@ abstract class AbstractPubsubClient { class ClientPubsub implements AbstractPubsubClient { late ClientChannel _channel; late PubsubClient _client; + late TopicConfiguration _configuration; - ClientPubsub(CredentialProvider credentialProvider) { + ClientPubsub(CredentialProvider credentialProvider, TopicConfiguration configuration) { _channel = ClientChannel(credentialProvider.cacheEndpoint); _client = PubsubClient(_channel, options: CallOptions(metadata: { 'authorization': credentialProvider.apiKey, 'agent': 'dart:0.1.0' })); + _configuration = configuration; } TopicValue_ _valueToTopicValue(Value v) { @@ -47,7 +50,9 @@ class ClientPubsub implements AbstractPubsubClient { request.topic = topicName; request.value = _valueToTopicValue(value); try { - await _client.publish(request); + await _client.publish(request, + options: CallOptions( + timeout: _configuration.transportStrategy.grpcConfig.deadline)); return TopicPublishSuccess(); } catch (e) { if (e is SdkException) { diff --git a/lib/src/topic_client.dart b/lib/src/topic_client.dart index ddfc5b4..ae2bcb0 100644 --- a/lib/src/topic_client.dart +++ b/lib/src/topic_client.dart @@ -1,5 +1,6 @@ import 'package:client_sdk_dart/src/auth/credential_provider.dart'; import 'package:logging/logging.dart'; +import 'config/topic_configuration.dart'; import 'internal/pubsub_client.dart'; import 'messages/responses/topics/topic_subscribe.dart'; import 'messages/values.dart'; @@ -16,8 +17,9 @@ class TopicClient implements ITopicClient { final ClientPubsub _pubsubClient; final Logger _logger = Logger('MomentoTopicClient'); - TopicClient(CredentialProvider credentialProvider) - : _pubsubClient = ClientPubsub(credentialProvider) { + TopicClient( + CredentialProvider credentialProvider, TopicConfiguration configuration) + : _pubsubClient = ClientPubsub(credentialProvider, configuration) { _logger.finest("initializing topic client"); }