From 13cfb7813a54dca25f4bb538f877f7d9a560fb84 Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 4 Mar 2025 10:59:42 -0800 Subject: [PATCH 1/5] feat: add topic grpc config and transport strategy and make sure publish deadline is set --- src/momento/config/topic_configuration.py | 52 +++++++++- src/momento/config/topic_configurations.py | 9 +- .../transport/topic_grpc_configuration.py | 35 +++++++ .../transport/topic_transport_strategy.py | 99 +++++++++++++++++++ .../_utilities/_grpc_channel_options.py | 39 ++++++++ src/momento/internal/aio/_scs_grpc_manager.py | 13 +-- .../internal/aio/_scs_pubsub_client.py | 6 ++ .../internal/synchronous/_scs_grpc_manager.py | 13 +-- .../synchronous/_scs_pubsub_client.py | 6 ++ 9 files changed, 249 insertions(+), 23 deletions(-) create mode 100644 src/momento/config/transport/topic_grpc_configuration.py create mode 100644 src/momento/config/transport/topic_transport_strategy.py diff --git a/src/momento/config/topic_configuration.py b/src/momento/config/topic_configuration.py index 8c45ff3a..d45eb7f2 100644 --- a/src/momento/config/topic_configuration.py +++ b/src/momento/config/topic_configuration.py @@ -1,6 +1,9 @@ from __future__ import annotations from abc import ABC, abstractmethod +from datetime import timedelta + +from momento.config.transport.topic_transport_strategy import TopicTransportStrategy class TopicConfigurationBase(ABC): @@ -12,14 +15,28 @@ def get_max_subscriptions(self) -> int: def with_max_subscriptions(self, max_subscriptions: int) -> TopicConfiguration: pass + @abstractmethod + def get_transport_strategy(self) -> TopicTransportStrategy: + pass + + @abstractmethod + def with_transport_strategy(self, transport_strategy: TopicTransportStrategy) -> TopicConfiguration: + pass + + @abstractmethod + def with_client_timeout(self, client_timeout: timedelta) -> TopicConfiguration: + pass + class TopicConfiguration(TopicConfigurationBase): """Configuration options for Momento topic client.""" - def __init__(self, max_subscriptions: int = 0): + def __init__(self, transport_strategy: TopicTransportStrategy, max_subscriptions: int = 0): """Instantiate a configuration. Args: + transport_strategy (TopicTransportStrategy): Configuration options for networking with + the Momento pubsub service. max_subscriptions (int): The maximum number of subscriptions the client is expected to handle. Because each gRPC channel can handle 100 connections, we must explicitly open multiple channels to accommodate the load. NOTE: if the number of connection @@ -27,9 +44,40 @@ def __init__(self, max_subscriptions: int = 0): and hang. """ self._max_subscriptions = max_subscriptions + self._transport_strategy = transport_strategy def get_max_subscriptions(self) -> int: return self._max_subscriptions def with_max_subscriptions(self, max_subscriptions: int) -> TopicConfiguration: - return TopicConfiguration(max_subscriptions) + return TopicConfiguration(self._transport_strategy, max_subscriptions) + + def get_transport_strategy(self) -> TopicTransportStrategy: + """Access the transport strategy. + + Returns: + TopicTransportStrategy: the current configuration options for wire interactions with the Momento pubsub service. + """ + return self._transport_strategy + + def with_transport_strategy(self, transport_strategy: TopicTransportStrategy) -> TopicConfiguration: + """Copy constructor for overriding TopicTransportStrategy. + + Args: + transport_strategy (TopicTransportStrategy): the new TopicTransportStrategy. + + Returns: + TopicConfiguration: the new TopicConfiguration with the specified TopicTransportStrategy. + """ + return TopicConfiguration(transport_strategy, self._max_subscriptions) + + def with_client_timeout(self, client_timeout: timedelta) -> TopicConfiguration: + """Copies the TopicConfiguration and sets the new client-side timeout in the copy's TopicTransportStrategy. + + Args: + client_timeout (timedelta): the new client-side timeout. + + Return: + TopicConfiguration: the new TopicConfiguration. + """ + return TopicConfiguration(self._transport_strategy.with_client_timeout(client_timeout), self._max_subscriptions) diff --git a/src/momento/config/topic_configurations.py b/src/momento/config/topic_configurations.py index 229e0920..3b729b63 100644 --- a/src/momento/config/topic_configurations.py +++ b/src/momento/config/topic_configurations.py @@ -1,5 +1,9 @@ from __future__ import annotations +from datetime import timedelta + +from momento.config.transport.topic_transport_strategy import StaticTopicGrpcConfiguration, StaticTopicTransportStrategy + from .topic_configuration import TopicConfiguration @@ -21,4 +25,7 @@ def v1() -> TopicConfigurations.Default: This configuration is guaranteed not to change in future releases of the Momento Python SDK. """ - return TopicConfigurations.Default(max_subscriptions=0) + return TopicConfigurations.Default( + StaticTopicTransportStrategy(StaticTopicGrpcConfiguration(deadline=timedelta(seconds=5))), + max_subscriptions=0, + ) diff --git a/src/momento/config/transport/topic_grpc_configuration.py b/src/momento/config/transport/topic_grpc_configuration.py new file mode 100644 index 00000000..101a03c5 --- /dev/null +++ b/src/momento/config/transport/topic_grpc_configuration.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from datetime import timedelta +from typing import Optional + + +class TopicGrpcConfiguration(ABC): + @abstractmethod + def get_deadline(self) -> timedelta: + pass + + @abstractmethod + def with_deadline(self, deadline: timedelta) -> TopicGrpcConfiguration: + pass + + @abstractmethod + def get_max_send_message_length(self) -> Optional[int]: + pass + + @abstractmethod + def get_max_receive_message_length(self) -> Optional[int]: + pass + + @abstractmethod + def get_keepalive_permit_without_calls(self) -> Optional[int]: + pass + + @abstractmethod + def get_keepalive_time(self) -> Optional[timedelta]: + pass + + @abstractmethod + def get_keepalive_timeout(self) -> Optional[timedelta]: + pass diff --git a/src/momento/config/transport/topic_transport_strategy.py b/src/momento/config/transport/topic_transport_strategy.py new file mode 100644 index 00000000..d81ecec7 --- /dev/null +++ b/src/momento/config/transport/topic_transport_strategy.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from datetime import timedelta +from typing import Optional + +from momento.config.transport.topic_grpc_configuration import TopicGrpcConfiguration +from momento.internal._utilities import _validate_request_timeout + + +class TopicTransportStrategy(ABC): + """Configures the network options for communicating with the Momento pubsub service.""" + + @abstractmethod + def get_grpc_configuration(self) -> TopicGrpcConfiguration: + """Access the gRPC configuration. + + Returns: + TopicGrpcConfiguration: low-level gRPC settings for the Momento client's communication. + """ + pass + + @abstractmethod + def with_grpc_configuration(self, grpc_configuration: TopicGrpcConfiguration) -> TopicTransportStrategy: + """Copy constructor for overriding the gRPC configuration. + + Args: + grpc_configuration (TopicGrpcConfiguration): the new gRPC configuration. + + Returns: + TopicTransportStrategy: a new TopicTransportStrategy with the specified gRPC config. + """ + pass + + @abstractmethod + def with_client_timeout(self, client_timeout: timedelta) -> TopicTransportStrategy: + """Copies the TopicTransportStrategy and updates the copy's client-side timeout. + + Args: + client_timeout (timedelta): the new client-side timeout. + + Returns: + TopicTransportStrategy: the new TopicTransportStrategy. + """ + pass + + +class StaticTopicGrpcConfiguration(TopicGrpcConfiguration): + def __init__( + self, + deadline: timedelta, + max_send_message_length: Optional[int] = None, + max_receive_message_length: Optional[int] = None, + keepalive_permit_without_calls: Optional[bool] = True, + keepalive_time: Optional[timedelta] = timedelta(milliseconds=5000), + keepalive_timeout: Optional[timedelta] = timedelta(milliseconds=1000), + ): + self._deadline = deadline + self._max_send_message_length = max_send_message_length + self._max_receive_message_length = max_receive_message_length + self._keepalive_permit_without_calls = keepalive_permit_without_calls + self._keepalive_time = keepalive_time + self._keepalive_timeout = keepalive_timeout + + def get_deadline(self) -> timedelta: + return self._deadline + + def with_deadline(self, deadline: timedelta) -> TopicGrpcConfiguration: + _validate_request_timeout(deadline) + return StaticTopicGrpcConfiguration(deadline) + + def get_max_send_message_length(self) -> Optional[int]: + return self._max_send_message_length + + def get_max_receive_message_length(self) -> Optional[int]: + return self._max_receive_message_length + + def get_keepalive_permit_without_calls(self) -> Optional[int]: + return int(self._keepalive_permit_without_calls) if self._keepalive_permit_without_calls is not None else None + + def get_keepalive_time(self) -> Optional[timedelta]: + return self._keepalive_time + + def get_keepalive_timeout(self) -> Optional[timedelta]: + return self._keepalive_timeout + + +class StaticTopicTransportStrategy(TopicTransportStrategy): + def __init__(self, grpc_configuration: TopicGrpcConfiguration): + self._grpc_configuration = grpc_configuration + + def get_grpc_configuration(self) -> TopicGrpcConfiguration: + return self._grpc_configuration + + def with_grpc_configuration(self, grpc_configuration: TopicGrpcConfiguration) -> TopicTransportStrategy: + return StaticTopicTransportStrategy(grpc_configuration) + + def with_client_timeout(self, client_timeout: timedelta) -> TopicTransportStrategy: + return self.with_grpc_configuration(self._grpc_configuration.with_deadline(client_timeout)) diff --git a/src/momento/internal/_utilities/_grpc_channel_options.py b/src/momento/internal/_utilities/_grpc_channel_options.py index ca5ad583..80bdfa11 100644 --- a/src/momento/internal/_utilities/_grpc_channel_options.py +++ b/src/momento/internal/_utilities/_grpc_channel_options.py @@ -3,6 +3,7 @@ from typing import Sequence, Tuple, Union from momento.config.transport.grpc_configuration import GrpcConfiguration +from momento.config.transport.topic_grpc_configuration import TopicGrpcConfiguration from momento.config.transport.transport_strategy import StaticGrpcConfiguration from momento.internal._utilities import _timedelta_to_ms @@ -11,6 +12,44 @@ ChannelArguments = Sequence[Tuple[str, Union[int, None]]] +def grpc_topic_channel_options_from_grpc_config(grpc_config: TopicGrpcConfiguration) -> ChannelArguments: + """Create gRPC channel options from a TopicGrpcConfiguration. + + Args: + grpc_config (TopicGrpcConfiguration): the gRPC configuration. + + Returns: + grpc.aio.ChannelArgumentType: a list of gRPC channel options as key-value tuples. + """ + channel_options = [] + + max_send_length = grpc_config.get_max_send_message_length() + channel_options.append( + ("grpc.max_send_message_length", max_send_length if max_send_length is not None else DEFAULT_MAX_MESSAGE_SIZE) + ) + + max_receive_length = grpc_config.get_max_receive_message_length() + channel_options.append( + ( + "grpc.max_receive_message_length", + max_receive_length if max_receive_length is not None else DEFAULT_MAX_MESSAGE_SIZE, + ) + ) + + keepalive_permit = grpc_config.get_keepalive_permit_without_calls() + if keepalive_permit is not None: + channel_options.append(("grpc.keepalive_permit_without_calls", keepalive_permit)) + + keepalive_time = grpc_config.get_keepalive_time() + if keepalive_time is not None: + channel_options.append(("grpc.keepalive_time_ms", _timedelta_to_ms(keepalive_time))) + + keepalive_timeout = grpc_config.get_keepalive_timeout() + if keepalive_timeout is not None: + channel_options.append(("grpc.keepalive_timeout_ms", _timedelta_to_ms(keepalive_timeout))) + + return channel_options + def grpc_data_channel_options_from_grpc_config(grpc_config: GrpcConfiguration) -> ChannelArguments: """Create gRPC channel options from a GrpcConfiguration. diff --git a/src/momento/internal/aio/_scs_grpc_manager.py b/src/momento/internal/aio/_scs_grpc_manager.py index 066a7bf4..6295a209 100644 --- a/src/momento/internal/aio/_scs_grpc_manager.py +++ b/src/momento/internal/aio/_scs_grpc_manager.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -from datetime import timedelta from typing import Optional import grpc @@ -13,7 +12,6 @@ from momento.auth import CredentialProvider from momento.config import Configuration, TopicConfiguration from momento.config.auth_configuration import AuthConfiguration -from momento.config.transport.transport_strategy import StaticGrpcConfiguration from momento.errors.exceptions import ConnectionException from momento.internal._utilities import PYTHON_RUNTIME_VERSION, ClientType from momento.internal._utilities._channel_credentials import ( @@ -22,6 +20,7 @@ from momento.internal._utilities._grpc_channel_options import ( grpc_control_channel_options_from_grpc_config, grpc_data_channel_options_from_grpc_config, + grpc_topic_channel_options_from_grpc_config, ) from momento.internal.services import Service from momento.retry import RetryStrategy @@ -136,14 +135,11 @@ class _PubsubGrpcManager: """Internal gRPC pubsub manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - # NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients. - grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100)) - self._secure_channel = grpc.aio.secure_channel( target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None), - options=grpc_data_channel_options_from_grpc_config(grpc_config), + options=grpc_topic_channel_options_from_grpc_config(configuration.get_transport_strategy().get_grpc_configuration()), ) async def close(self) -> None: @@ -157,14 +153,11 @@ class _PubsubGrpcStreamManager: """Internal gRPC pubsub stream manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - # NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients. - grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100)) - self._secure_channel = grpc.aio.secure_channel( target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC), - options=grpc_data_channel_options_from_grpc_config(grpc_config), + options=grpc_topic_channel_options_from_grpc_config(configuration.get_transport_strategy().get_grpc_configuration()), ) async def close(self) -> None: diff --git a/src/momento/internal/aio/_scs_pubsub_client.py b/src/momento/internal/aio/_scs_pubsub_client.py index b3f03980..ddb7e107 100644 --- a/src/momento/internal/aio/_scs_pubsub_client.py +++ b/src/momento/internal/aio/_scs_pubsub_client.py @@ -1,6 +1,7 @@ from __future__ import annotations import math +from datetime import timedelta from momento_wire_types import cachepubsub_pb2 as pubsub_pb from momento_wire_types import cachepubsub_pb2_grpc as pubsub_grpc @@ -34,6 +35,10 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede self._logger.debug("Pubsub client instantiated with endpoint: %s", endpoint) self._endpoint = endpoint + default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline() + self._default_deadline_seconds = default_deadline.total_seconds() + print("aio deadline: ", self._default_deadline_seconds) + num_subscriptions = configuration.get_max_subscriptions() # Default to a single channel and scale up if necessary. Each channel can support # 100 subscriptions. Issuing more subscribe requests than you have channels to handle @@ -70,6 +75,7 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> await self._get_stub().Publish( # type: ignore[misc] request, + timeout=self._default_deadline_seconds, ) return TopicPublish.Success() except Exception as e: diff --git a/src/momento/internal/synchronous/_scs_grpc_manager.py b/src/momento/internal/synchronous/_scs_grpc_manager.py index 624a2f58..6b177e3b 100644 --- a/src/momento/internal/synchronous/_scs_grpc_manager.py +++ b/src/momento/internal/synchronous/_scs_grpc_manager.py @@ -1,6 +1,5 @@ from __future__ import annotations -from datetime import timedelta from threading import Event from typing import Optional @@ -14,7 +13,6 @@ from momento.auth import CredentialProvider from momento.config import Configuration, TopicConfiguration from momento.config.auth_configuration import AuthConfiguration -from momento.config.transport.transport_strategy import StaticGrpcConfiguration from momento.errors.exceptions import ConnectionException from momento.internal._utilities import PYTHON_RUNTIME_VERSION, ClientType from momento.internal._utilities._channel_credentials import ( @@ -23,6 +21,7 @@ from momento.internal._utilities._grpc_channel_options import ( grpc_control_channel_options_from_grpc_config, grpc_data_channel_options_from_grpc_config, + grpc_topic_channel_options_from_grpc_config, ) from momento.internal.services import Service from momento.internal.synchronous._add_header_client_interceptor import ( @@ -153,13 +152,10 @@ class _PubsubGrpcManager: """Internal gRPC pubsub manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - # NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients. - grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100)) - self._secure_channel = grpc.secure_channel( target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), - options=grpc_data_channel_options_from_grpc_config(grpc_config), + options=grpc_topic_channel_options_from_grpc_config(configuration.get_transport_strategy().get_grpc_configuration()), ) intercept_channel = grpc.intercept_channel( self._secure_channel, *_interceptors(credential_provider.auth_token, ClientType.TOPIC, None) @@ -177,13 +173,10 @@ class _PubsubGrpcStreamManager: """Internal gRPC pubsub stream manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - # NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients. - grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100)) - self._secure_channel = grpc.secure_channel( target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), - options=grpc_data_channel_options_from_grpc_config(grpc_config), + options=grpc_topic_channel_options_from_grpc_config(configuration.get_transport_strategy().get_grpc_configuration()), ) intercept_channel = grpc.intercept_channel( self._secure_channel, *_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC) diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index b0347a09..943ea7c8 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -1,6 +1,7 @@ from __future__ import annotations import math +from datetime import timedelta from momento_wire_types import cachepubsub_pb2 as pubsub_pb from momento_wire_types import cachepubsub_pb2_grpc as pubsub_grpc @@ -34,6 +35,10 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede self._logger.debug("Pubsub client instantiated with endpoint: %s", endpoint) self._endpoint = endpoint + default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline() + self._default_deadline_seconds = default_deadline.total_seconds() + print("sync deadline: ", self._default_deadline_seconds) + num_subscriptions = configuration.get_max_subscriptions() # Default to a single channel and scale up if necessary. Each channel can support # 100 subscriptions. Issuing more subscribe requests than you have channels to handle @@ -70,6 +75,7 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic self._get_stub().Publish( request, + timeout=self._default_deadline_seconds, ) return TopicPublish.Success() except Exception as e: From 435a61c62e96bec469180c3725310bd891e6a89d Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 4 Mar 2025 11:35:02 -0800 Subject: [PATCH 2/5] needed higher default timeout --- src/momento/config/topic_configurations.py | 2 +- src/momento/internal/aio/_scs_pubsub_client.py | 1 - src/momento/internal/synchronous/_scs_pubsub_client.py | 3 +-- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/momento/config/topic_configurations.py b/src/momento/config/topic_configurations.py index 3b729b63..08d9b7d2 100644 --- a/src/momento/config/topic_configurations.py +++ b/src/momento/config/topic_configurations.py @@ -26,6 +26,6 @@ def v1() -> TopicConfigurations.Default: This configuration is guaranteed not to change in future releases of the Momento Python SDK. """ return TopicConfigurations.Default( - StaticTopicTransportStrategy(StaticTopicGrpcConfiguration(deadline=timedelta(seconds=5))), + StaticTopicTransportStrategy(StaticTopicGrpcConfiguration(deadline=timedelta(seconds=10))), max_subscriptions=0, ) diff --git a/src/momento/internal/aio/_scs_pubsub_client.py b/src/momento/internal/aio/_scs_pubsub_client.py index ddb7e107..72249f61 100644 --- a/src/momento/internal/aio/_scs_pubsub_client.py +++ b/src/momento/internal/aio/_scs_pubsub_client.py @@ -37,7 +37,6 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline() self._default_deadline_seconds = default_deadline.total_seconds() - print("aio deadline: ", self._default_deadline_seconds) num_subscriptions = configuration.get_max_subscriptions() # Default to a single channel and scale up if necessary. Each channel can support diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index 943ea7c8..5ec7c974 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -1,7 +1,7 @@ from __future__ import annotations import math -from datetime import timedelta +from datetime import datetime, timedelta from momento_wire_types import cachepubsub_pb2 as pubsub_pb from momento_wire_types import cachepubsub_pb2_grpc as pubsub_grpc @@ -37,7 +37,6 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline() self._default_deadline_seconds = default_deadline.total_seconds() - print("sync deadline: ", self._default_deadline_seconds) num_subscriptions = configuration.get_max_subscriptions() # Default to a single channel and scale up if necessary. Each channel can support From 5423e35539b8ed549aa54912f47b01c2a3f56ee9 Mon Sep 17 00:00:00 2001 From: anitarua Date: Tue, 4 Mar 2025 11:36:04 -0800 Subject: [PATCH 3/5] fix lint errors --- src/momento/internal/_utilities/_grpc_channel_options.py | 1 + src/momento/internal/aio/_scs_grpc_manager.py | 8 ++++++-- src/momento/internal/synchronous/_scs_grpc_manager.py | 8 ++++++-- src/momento/internal/synchronous/_scs_pubsub_client.py | 2 +- 4 files changed, 14 insertions(+), 5 deletions(-) diff --git a/src/momento/internal/_utilities/_grpc_channel_options.py b/src/momento/internal/_utilities/_grpc_channel_options.py index 80bdfa11..300db846 100644 --- a/src/momento/internal/_utilities/_grpc_channel_options.py +++ b/src/momento/internal/_utilities/_grpc_channel_options.py @@ -50,6 +50,7 @@ def grpc_topic_channel_options_from_grpc_config(grpc_config: TopicGrpcConfigurat return channel_options + def grpc_data_channel_options_from_grpc_config(grpc_config: GrpcConfiguration) -> ChannelArguments: """Create gRPC channel options from a GrpcConfiguration. diff --git a/src/momento/internal/aio/_scs_grpc_manager.py b/src/momento/internal/aio/_scs_grpc_manager.py index 6295a209..77cf1eb3 100644 --- a/src/momento/internal/aio/_scs_grpc_manager.py +++ b/src/momento/internal/aio/_scs_grpc_manager.py @@ -139,7 +139,9 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None), - options=grpc_topic_channel_options_from_grpc_config(configuration.get_transport_strategy().get_grpc_configuration()), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), ) async def close(self) -> None: @@ -157,7 +159,9 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC), - options=grpc_topic_channel_options_from_grpc_config(configuration.get_transport_strategy().get_grpc_configuration()), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), ) async def close(self) -> None: diff --git a/src/momento/internal/synchronous/_scs_grpc_manager.py b/src/momento/internal/synchronous/_scs_grpc_manager.py index 6b177e3b..7240d30f 100644 --- a/src/momento/internal/synchronous/_scs_grpc_manager.py +++ b/src/momento/internal/synchronous/_scs_grpc_manager.py @@ -155,7 +155,9 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede self._secure_channel = grpc.secure_channel( target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), - options=grpc_topic_channel_options_from_grpc_config(configuration.get_transport_strategy().get_grpc_configuration()), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), ) intercept_channel = grpc.intercept_channel( self._secure_channel, *_interceptors(credential_provider.auth_token, ClientType.TOPIC, None) @@ -176,7 +178,9 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede self._secure_channel = grpc.secure_channel( target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), - options=grpc_topic_channel_options_from_grpc_config(configuration.get_transport_strategy().get_grpc_configuration()), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), ) intercept_channel = grpc.intercept_channel( self._secure_channel, *_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC) diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index 5ec7c974..3e9e9d52 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -1,7 +1,7 @@ from __future__ import annotations import math -from datetime import datetime, timedelta +from datetime import timedelta from momento_wire_types import cachepubsub_pb2 as pubsub_pb from momento_wire_types import cachepubsub_pb2_grpc as pubsub_grpc From c594756ac91df77304f30530ece27ef2112b65ee Mon Sep 17 00:00:00 2001 From: anitarua Date: Wed, 5 Mar 2025 11:23:50 -0800 Subject: [PATCH 4/5] make sure recently added grpc option also in topic grpc config helper function --- src/momento/internal/_utilities/_grpc_channel_options.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/momento/internal/_utilities/_grpc_channel_options.py b/src/momento/internal/_utilities/_grpc_channel_options.py index 300db846..9eb178e6 100644 --- a/src/momento/internal/_utilities/_grpc_channel_options.py +++ b/src/momento/internal/_utilities/_grpc_channel_options.py @@ -36,6 +36,8 @@ def grpc_topic_channel_options_from_grpc_config(grpc_config: TopicGrpcConfigurat ) ) + channel_options.append(("grpc.service_config_disable_resolution", 1)) + keepalive_permit = grpc_config.get_keepalive_permit_without_calls() if keepalive_permit is not None: channel_options.append(("grpc.keepalive_permit_without_calls", keepalive_permit)) From 87410e24e1b30c6333ba9d246bf0ccb4fed2358e Mon Sep 17 00:00:00 2001 From: anitarua Date: Thu, 6 Mar 2025 09:49:27 -0800 Subject: [PATCH 5/5] set higher timeout only in test topics config --- src/momento/config/topic_configurations.py | 2 +- tests/conftest.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/momento/config/topic_configurations.py b/src/momento/config/topic_configurations.py index 08d9b7d2..3b729b63 100644 --- a/src/momento/config/topic_configurations.py +++ b/src/momento/config/topic_configurations.py @@ -26,6 +26,6 @@ def v1() -> TopicConfigurations.Default: This configuration is guaranteed not to change in future releases of the Momento Python SDK. """ return TopicConfigurations.Default( - StaticTopicTransportStrategy(StaticTopicGrpcConfiguration(deadline=timedelta(seconds=10))), + StaticTopicTransportStrategy(StaticTopicGrpcConfiguration(deadline=timedelta(seconds=5))), max_subscriptions=0, ) diff --git a/tests/conftest.py b/tests/conftest.py index f1e039d8..e56cf925 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -52,7 +52,7 @@ ####################### TEST_CONFIGURATION = Configurations.Laptop.latest() -TEST_TOPIC_CONFIGURATION = TopicConfigurations.Default.latest() +TEST_TOPIC_CONFIGURATION = TopicConfigurations.Default.latest().with_client_timeout(timedelta(seconds=10)) TEST_AUTH_CONFIGURATION = AuthConfigurations.Laptop.latest()