diff --git a/src/momento/config/topic_configuration.py b/src/momento/config/topic_configuration.py index 8c45ff3a..d45eb7f2 100644 --- a/src/momento/config/topic_configuration.py +++ b/src/momento/config/topic_configuration.py @@ -1,6 +1,9 @@ from __future__ import annotations from abc import ABC, abstractmethod +from datetime import timedelta + +from momento.config.transport.topic_transport_strategy import TopicTransportStrategy class TopicConfigurationBase(ABC): @@ -12,14 +15,28 @@ def get_max_subscriptions(self) -> int: def with_max_subscriptions(self, max_subscriptions: int) -> TopicConfiguration: pass + @abstractmethod + def get_transport_strategy(self) -> TopicTransportStrategy: + pass + + @abstractmethod + def with_transport_strategy(self, transport_strategy: TopicTransportStrategy) -> TopicConfiguration: + pass + + @abstractmethod + def with_client_timeout(self, client_timeout: timedelta) -> TopicConfiguration: + pass + class TopicConfiguration(TopicConfigurationBase): """Configuration options for Momento topic client.""" - def __init__(self, max_subscriptions: int = 0): + def __init__(self, transport_strategy: TopicTransportStrategy, max_subscriptions: int = 0): """Instantiate a configuration. Args: + transport_strategy (TopicTransportStrategy): Configuration options for networking with + the Momento pubsub service. max_subscriptions (int): The maximum number of subscriptions the client is expected to handle. Because each gRPC channel can handle 100 connections, we must explicitly open multiple channels to accommodate the load. NOTE: if the number of connection @@ -27,9 +44,40 @@ def __init__(self, max_subscriptions: int = 0): and hang. """ self._max_subscriptions = max_subscriptions + self._transport_strategy = transport_strategy def get_max_subscriptions(self) -> int: return self._max_subscriptions def with_max_subscriptions(self, max_subscriptions: int) -> TopicConfiguration: - return TopicConfiguration(max_subscriptions) + return TopicConfiguration(self._transport_strategy, max_subscriptions) + + def get_transport_strategy(self) -> TopicTransportStrategy: + """Access the transport strategy. + + Returns: + TopicTransportStrategy: the current configuration options for wire interactions with the Momento pubsub service. + """ + return self._transport_strategy + + def with_transport_strategy(self, transport_strategy: TopicTransportStrategy) -> TopicConfiguration: + """Copy constructor for overriding TopicTransportStrategy. + + Args: + transport_strategy (TopicTransportStrategy): the new TopicTransportStrategy. + + Returns: + TopicConfiguration: the new TopicConfiguration with the specified TopicTransportStrategy. + """ + return TopicConfiguration(transport_strategy, self._max_subscriptions) + + def with_client_timeout(self, client_timeout: timedelta) -> TopicConfiguration: + """Copies the TopicConfiguration and sets the new client-side timeout in the copy's TopicTransportStrategy. + + Args: + client_timeout (timedelta): the new client-side timeout. + + Return: + TopicConfiguration: the new TopicConfiguration. + """ + return TopicConfiguration(self._transport_strategy.with_client_timeout(client_timeout), self._max_subscriptions) diff --git a/src/momento/config/topic_configurations.py b/src/momento/config/topic_configurations.py index 229e0920..3b729b63 100644 --- a/src/momento/config/topic_configurations.py +++ b/src/momento/config/topic_configurations.py @@ -1,5 +1,9 @@ from __future__ import annotations +from datetime import timedelta + +from momento.config.transport.topic_transport_strategy import StaticTopicGrpcConfiguration, StaticTopicTransportStrategy + from .topic_configuration import TopicConfiguration @@ -21,4 +25,7 @@ def v1() -> TopicConfigurations.Default: This configuration is guaranteed not to change in future releases of the Momento Python SDK. """ - return TopicConfigurations.Default(max_subscriptions=0) + return TopicConfigurations.Default( + StaticTopicTransportStrategy(StaticTopicGrpcConfiguration(deadline=timedelta(seconds=5))), + max_subscriptions=0, + ) diff --git a/src/momento/config/transport/topic_grpc_configuration.py b/src/momento/config/transport/topic_grpc_configuration.py new file mode 100644 index 00000000..101a03c5 --- /dev/null +++ b/src/momento/config/transport/topic_grpc_configuration.py @@ -0,0 +1,35 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from datetime import timedelta +from typing import Optional + + +class TopicGrpcConfiguration(ABC): + @abstractmethod + def get_deadline(self) -> timedelta: + pass + + @abstractmethod + def with_deadline(self, deadline: timedelta) -> TopicGrpcConfiguration: + pass + + @abstractmethod + def get_max_send_message_length(self) -> Optional[int]: + pass + + @abstractmethod + def get_max_receive_message_length(self) -> Optional[int]: + pass + + @abstractmethod + def get_keepalive_permit_without_calls(self) -> Optional[int]: + pass + + @abstractmethod + def get_keepalive_time(self) -> Optional[timedelta]: + pass + + @abstractmethod + def get_keepalive_timeout(self) -> Optional[timedelta]: + pass diff --git a/src/momento/config/transport/topic_transport_strategy.py b/src/momento/config/transport/topic_transport_strategy.py new file mode 100644 index 00000000..d81ecec7 --- /dev/null +++ b/src/momento/config/transport/topic_transport_strategy.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from datetime import timedelta +from typing import Optional + +from momento.config.transport.topic_grpc_configuration import TopicGrpcConfiguration +from momento.internal._utilities import _validate_request_timeout + + +class TopicTransportStrategy(ABC): + """Configures the network options for communicating with the Momento pubsub service.""" + + @abstractmethod + def get_grpc_configuration(self) -> TopicGrpcConfiguration: + """Access the gRPC configuration. + + Returns: + TopicGrpcConfiguration: low-level gRPC settings for the Momento client's communication. + """ + pass + + @abstractmethod + def with_grpc_configuration(self, grpc_configuration: TopicGrpcConfiguration) -> TopicTransportStrategy: + """Copy constructor for overriding the gRPC configuration. + + Args: + grpc_configuration (TopicGrpcConfiguration): the new gRPC configuration. + + Returns: + TopicTransportStrategy: a new TopicTransportStrategy with the specified gRPC config. + """ + pass + + @abstractmethod + def with_client_timeout(self, client_timeout: timedelta) -> TopicTransportStrategy: + """Copies the TopicTransportStrategy and updates the copy's client-side timeout. + + Args: + client_timeout (timedelta): the new client-side timeout. + + Returns: + TopicTransportStrategy: the new TopicTransportStrategy. + """ + pass + + +class StaticTopicGrpcConfiguration(TopicGrpcConfiguration): + def __init__( + self, + deadline: timedelta, + max_send_message_length: Optional[int] = None, + max_receive_message_length: Optional[int] = None, + keepalive_permit_without_calls: Optional[bool] = True, + keepalive_time: Optional[timedelta] = timedelta(milliseconds=5000), + keepalive_timeout: Optional[timedelta] = timedelta(milliseconds=1000), + ): + self._deadline = deadline + self._max_send_message_length = max_send_message_length + self._max_receive_message_length = max_receive_message_length + self._keepalive_permit_without_calls = keepalive_permit_without_calls + self._keepalive_time = keepalive_time + self._keepalive_timeout = keepalive_timeout + + def get_deadline(self) -> timedelta: + return self._deadline + + def with_deadline(self, deadline: timedelta) -> TopicGrpcConfiguration: + _validate_request_timeout(deadline) + return StaticTopicGrpcConfiguration(deadline) + + def get_max_send_message_length(self) -> Optional[int]: + return self._max_send_message_length + + def get_max_receive_message_length(self) -> Optional[int]: + return self._max_receive_message_length + + def get_keepalive_permit_without_calls(self) -> Optional[int]: + return int(self._keepalive_permit_without_calls) if self._keepalive_permit_without_calls is not None else None + + def get_keepalive_time(self) -> Optional[timedelta]: + return self._keepalive_time + + def get_keepalive_timeout(self) -> Optional[timedelta]: + return self._keepalive_timeout + + +class StaticTopicTransportStrategy(TopicTransportStrategy): + def __init__(self, grpc_configuration: TopicGrpcConfiguration): + self._grpc_configuration = grpc_configuration + + def get_grpc_configuration(self) -> TopicGrpcConfiguration: + return self._grpc_configuration + + def with_grpc_configuration(self, grpc_configuration: TopicGrpcConfiguration) -> TopicTransportStrategy: + return StaticTopicTransportStrategy(grpc_configuration) + + def with_client_timeout(self, client_timeout: timedelta) -> TopicTransportStrategy: + return self.with_grpc_configuration(self._grpc_configuration.with_deadline(client_timeout)) diff --git a/src/momento/internal/_utilities/_grpc_channel_options.py b/src/momento/internal/_utilities/_grpc_channel_options.py index ca5ad583..9eb178e6 100644 --- a/src/momento/internal/_utilities/_grpc_channel_options.py +++ b/src/momento/internal/_utilities/_grpc_channel_options.py @@ -3,6 +3,7 @@ from typing import Sequence, Tuple, Union from momento.config.transport.grpc_configuration import GrpcConfiguration +from momento.config.transport.topic_grpc_configuration import TopicGrpcConfiguration from momento.config.transport.transport_strategy import StaticGrpcConfiguration from momento.internal._utilities import _timedelta_to_ms @@ -11,6 +12,47 @@ ChannelArguments = Sequence[Tuple[str, Union[int, None]]] +def grpc_topic_channel_options_from_grpc_config(grpc_config: TopicGrpcConfiguration) -> ChannelArguments: + """Create gRPC channel options from a TopicGrpcConfiguration. + + Args: + grpc_config (TopicGrpcConfiguration): the gRPC configuration. + + Returns: + grpc.aio.ChannelArgumentType: a list of gRPC channel options as key-value tuples. + """ + channel_options = [] + + max_send_length = grpc_config.get_max_send_message_length() + channel_options.append( + ("grpc.max_send_message_length", max_send_length if max_send_length is not None else DEFAULT_MAX_MESSAGE_SIZE) + ) + + max_receive_length = grpc_config.get_max_receive_message_length() + channel_options.append( + ( + "grpc.max_receive_message_length", + max_receive_length if max_receive_length is not None else DEFAULT_MAX_MESSAGE_SIZE, + ) + ) + + channel_options.append(("grpc.service_config_disable_resolution", 1)) + + keepalive_permit = grpc_config.get_keepalive_permit_without_calls() + if keepalive_permit is not None: + channel_options.append(("grpc.keepalive_permit_without_calls", keepalive_permit)) + + keepalive_time = grpc_config.get_keepalive_time() + if keepalive_time is not None: + channel_options.append(("grpc.keepalive_time_ms", _timedelta_to_ms(keepalive_time))) + + keepalive_timeout = grpc_config.get_keepalive_timeout() + if keepalive_timeout is not None: + channel_options.append(("grpc.keepalive_timeout_ms", _timedelta_to_ms(keepalive_timeout))) + + return channel_options + + def grpc_data_channel_options_from_grpc_config(grpc_config: GrpcConfiguration) -> ChannelArguments: """Create gRPC channel options from a GrpcConfiguration. diff --git a/src/momento/internal/aio/_scs_grpc_manager.py b/src/momento/internal/aio/_scs_grpc_manager.py index 066a7bf4..77cf1eb3 100644 --- a/src/momento/internal/aio/_scs_grpc_manager.py +++ b/src/momento/internal/aio/_scs_grpc_manager.py @@ -1,7 +1,6 @@ from __future__ import annotations import asyncio -from datetime import timedelta from typing import Optional import grpc @@ -13,7 +12,6 @@ from momento.auth import CredentialProvider from momento.config import Configuration, TopicConfiguration from momento.config.auth_configuration import AuthConfiguration -from momento.config.transport.transport_strategy import StaticGrpcConfiguration from momento.errors.exceptions import ConnectionException from momento.internal._utilities import PYTHON_RUNTIME_VERSION, ClientType from momento.internal._utilities._channel_credentials import ( @@ -22,6 +20,7 @@ from momento.internal._utilities._grpc_channel_options import ( grpc_control_channel_options_from_grpc_config, grpc_data_channel_options_from_grpc_config, + grpc_topic_channel_options_from_grpc_config, ) from momento.internal.services import Service from momento.retry import RetryStrategy @@ -136,14 +135,13 @@ class _PubsubGrpcManager: """Internal gRPC pubsub manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - # NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients. - grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100)) - self._secure_channel = grpc.aio.secure_channel( target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None), - options=grpc_data_channel_options_from_grpc_config(grpc_config), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), ) async def close(self) -> None: @@ -157,14 +155,13 @@ class _PubsubGrpcStreamManager: """Internal gRPC pubsub stream manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - # NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients. - grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100)) - self._secure_channel = grpc.aio.secure_channel( target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC), - options=grpc_data_channel_options_from_grpc_config(grpc_config), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), ) async def close(self) -> None: diff --git a/src/momento/internal/aio/_scs_pubsub_client.py b/src/momento/internal/aio/_scs_pubsub_client.py index b3f03980..72249f61 100644 --- a/src/momento/internal/aio/_scs_pubsub_client.py +++ b/src/momento/internal/aio/_scs_pubsub_client.py @@ -1,6 +1,7 @@ from __future__ import annotations import math +from datetime import timedelta from momento_wire_types import cachepubsub_pb2 as pubsub_pb from momento_wire_types import cachepubsub_pb2_grpc as pubsub_grpc @@ -34,6 +35,9 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede self._logger.debug("Pubsub client instantiated with endpoint: %s", endpoint) self._endpoint = endpoint + default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline() + self._default_deadline_seconds = default_deadline.total_seconds() + num_subscriptions = configuration.get_max_subscriptions() # Default to a single channel and scale up if necessary. Each channel can support # 100 subscriptions. Issuing more subscribe requests than you have channels to handle @@ -70,6 +74,7 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> await self._get_stub().Publish( # type: ignore[misc] request, + timeout=self._default_deadline_seconds, ) return TopicPublish.Success() except Exception as e: diff --git a/src/momento/internal/synchronous/_scs_grpc_manager.py b/src/momento/internal/synchronous/_scs_grpc_manager.py index 624a2f58..7240d30f 100644 --- a/src/momento/internal/synchronous/_scs_grpc_manager.py +++ b/src/momento/internal/synchronous/_scs_grpc_manager.py @@ -1,6 +1,5 @@ from __future__ import annotations -from datetime import timedelta from threading import Event from typing import Optional @@ -14,7 +13,6 @@ from momento.auth import CredentialProvider from momento.config import Configuration, TopicConfiguration from momento.config.auth_configuration import AuthConfiguration -from momento.config.transport.transport_strategy import StaticGrpcConfiguration from momento.errors.exceptions import ConnectionException from momento.internal._utilities import PYTHON_RUNTIME_VERSION, ClientType from momento.internal._utilities._channel_credentials import ( @@ -23,6 +21,7 @@ from momento.internal._utilities._grpc_channel_options import ( grpc_control_channel_options_from_grpc_config, grpc_data_channel_options_from_grpc_config, + grpc_topic_channel_options_from_grpc_config, ) from momento.internal.services import Service from momento.internal.synchronous._add_header_client_interceptor import ( @@ -153,13 +152,12 @@ class _PubsubGrpcManager: """Internal gRPC pubsub manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - # NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients. - grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100)) - self._secure_channel = grpc.secure_channel( target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), - options=grpc_data_channel_options_from_grpc_config(grpc_config), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), ) intercept_channel = grpc.intercept_channel( self._secure_channel, *_interceptors(credential_provider.auth_token, ClientType.TOPIC, None) @@ -177,13 +175,12 @@ class _PubsubGrpcStreamManager: """Internal gRPC pubsub stream manager.""" def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider): - # NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients. - grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100)) - self._secure_channel = grpc.secure_channel( target=credential_provider.cache_endpoint, credentials=grpc.ssl_channel_credentials(), - options=grpc_data_channel_options_from_grpc_config(grpc_config), + options=grpc_topic_channel_options_from_grpc_config( + configuration.get_transport_strategy().get_grpc_configuration() + ), ) intercept_channel = grpc.intercept_channel( self._secure_channel, *_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC) diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index b0347a09..3e9e9d52 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -1,6 +1,7 @@ from __future__ import annotations import math +from datetime import timedelta from momento_wire_types import cachepubsub_pb2 as pubsub_pb from momento_wire_types import cachepubsub_pb2_grpc as pubsub_grpc @@ -34,6 +35,9 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede self._logger.debug("Pubsub client instantiated with endpoint: %s", endpoint) self._endpoint = endpoint + default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline() + self._default_deadline_seconds = default_deadline.total_seconds() + num_subscriptions = configuration.get_max_subscriptions() # Default to a single channel and scale up if necessary. Each channel can support # 100 subscriptions. Issuing more subscribe requests than you have channels to handle @@ -70,6 +74,7 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic self._get_stub().Publish( request, + timeout=self._default_deadline_seconds, ) return TopicPublish.Success() except Exception as e: diff --git a/tests/conftest.py b/tests/conftest.py index f1e039d8..e56cf925 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -52,7 +52,7 @@ ####################### TEST_CONFIGURATION = Configurations.Laptop.latest() -TEST_TOPIC_CONFIGURATION = TopicConfigurations.Default.latest() +TEST_TOPIC_CONFIGURATION = TopicConfigurations.Default.latest().with_client_timeout(timedelta(seconds=10)) TEST_AUTH_CONFIGURATION = AuthConfigurations.Laptop.latest()