Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 50 additions & 2 deletions src/momento/config/topic_configuration.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from datetime import timedelta

from momento.config.transport.topic_transport_strategy import TopicTransportStrategy


class TopicConfigurationBase(ABC):
Expand All @@ -12,24 +15,69 @@ def get_max_subscriptions(self) -> int:
def with_max_subscriptions(self, max_subscriptions: int) -> TopicConfiguration:
pass

@abstractmethod
def get_transport_strategy(self) -> TopicTransportStrategy:
pass

@abstractmethod
def with_transport_strategy(self, transport_strategy: TopicTransportStrategy) -> TopicConfiguration:
pass

@abstractmethod
def with_client_timeout(self, client_timeout: timedelta) -> TopicConfiguration:
pass


class TopicConfiguration(TopicConfigurationBase):
"""Configuration options for Momento topic client."""

def __init__(self, max_subscriptions: int = 0):
def __init__(self, transport_strategy: TopicTransportStrategy, max_subscriptions: int = 0):
"""Instantiate a configuration.

Args:
transport_strategy (TopicTransportStrategy): Configuration options for networking with
the Momento pubsub service.
max_subscriptions (int): The maximum number of subscriptions the client is expected
to handle. Because each gRPC channel can handle 100 connections, we must explicitly
open multiple channels to accommodate the load. NOTE: if the number of connection
attempts exceeds the number the channels can support, program execution will block
and hang.
"""
self._max_subscriptions = max_subscriptions
self._transport_strategy = transport_strategy

def get_max_subscriptions(self) -> int:
return self._max_subscriptions

def with_max_subscriptions(self, max_subscriptions: int) -> TopicConfiguration:
return TopicConfiguration(max_subscriptions)
return TopicConfiguration(self._transport_strategy, max_subscriptions)

def get_transport_strategy(self) -> TopicTransportStrategy:
"""Access the transport strategy.

Returns:
TopicTransportStrategy: the current configuration options for wire interactions with the Momento pubsub service.
"""
return self._transport_strategy

def with_transport_strategy(self, transport_strategy: TopicTransportStrategy) -> TopicConfiguration:
"""Copy constructor for overriding TopicTransportStrategy.

Args:
transport_strategy (TopicTransportStrategy): the new TopicTransportStrategy.

Returns:
TopicConfiguration: the new TopicConfiguration with the specified TopicTransportStrategy.
"""
return TopicConfiguration(transport_strategy, self._max_subscriptions)

def with_client_timeout(self, client_timeout: timedelta) -> TopicConfiguration:
"""Copies the TopicConfiguration and sets the new client-side timeout in the copy's TopicTransportStrategy.

Args:
client_timeout (timedelta): the new client-side timeout.

Return:
TopicConfiguration: the new TopicConfiguration.
"""
return TopicConfiguration(self._transport_strategy.with_client_timeout(client_timeout), self._max_subscriptions)
9 changes: 8 additions & 1 deletion src/momento/config/topic_configurations.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
from __future__ import annotations

from datetime import timedelta

from momento.config.transport.topic_transport_strategy import StaticTopicGrpcConfiguration, StaticTopicTransportStrategy

from .topic_configuration import TopicConfiguration


Expand All @@ -21,4 +25,7 @@ def v1() -> TopicConfigurations.Default:

This configuration is guaranteed not to change in future releases of the Momento Python SDK.
"""
return TopicConfigurations.Default(max_subscriptions=0)
return TopicConfigurations.Default(
StaticTopicTransportStrategy(StaticTopicGrpcConfiguration(deadline=timedelta(seconds=5))),
max_subscriptions=0,
)
35 changes: 35 additions & 0 deletions src/momento/config/transport/topic_grpc_configuration.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from datetime import timedelta
from typing import Optional


class TopicGrpcConfiguration(ABC):
@abstractmethod
def get_deadline(self) -> timedelta:
pass

@abstractmethod
def with_deadline(self, deadline: timedelta) -> TopicGrpcConfiguration:
pass

@abstractmethod
def get_max_send_message_length(self) -> Optional[int]:
pass

@abstractmethod
def get_max_receive_message_length(self) -> Optional[int]:
pass

@abstractmethod
def get_keepalive_permit_without_calls(self) -> Optional[int]:
pass

@abstractmethod
def get_keepalive_time(self) -> Optional[timedelta]:
pass

@abstractmethod
def get_keepalive_timeout(self) -> Optional[timedelta]:
pass
99 changes: 99 additions & 0 deletions src/momento/config/transport/topic_transport_strategy.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from __future__ import annotations

from abc import ABC, abstractmethod
from datetime import timedelta
from typing import Optional

from momento.config.transport.topic_grpc_configuration import TopicGrpcConfiguration
from momento.internal._utilities import _validate_request_timeout


class TopicTransportStrategy(ABC):
"""Configures the network options for communicating with the Momento pubsub service."""

@abstractmethod
def get_grpc_configuration(self) -> TopicGrpcConfiguration:
"""Access the gRPC configuration.

Returns:
TopicGrpcConfiguration: low-level gRPC settings for the Momento client's communication.
"""
pass

@abstractmethod
def with_grpc_configuration(self, grpc_configuration: TopicGrpcConfiguration) -> TopicTransportStrategy:
"""Copy constructor for overriding the gRPC configuration.

Args:
grpc_configuration (TopicGrpcConfiguration): the new gRPC configuration.

Returns:
TopicTransportStrategy: a new TopicTransportStrategy with the specified gRPC config.
"""
pass

@abstractmethod
def with_client_timeout(self, client_timeout: timedelta) -> TopicTransportStrategy:
"""Copies the TopicTransportStrategy and updates the copy's client-side timeout.

Args:
client_timeout (timedelta): the new client-side timeout.

Returns:
TopicTransportStrategy: the new TopicTransportStrategy.
"""
pass


class StaticTopicGrpcConfiguration(TopicGrpcConfiguration):
def __init__(
self,
deadline: timedelta,
max_send_message_length: Optional[int] = None,
max_receive_message_length: Optional[int] = None,
keepalive_permit_without_calls: Optional[bool] = True,
keepalive_time: Optional[timedelta] = timedelta(milliseconds=5000),
keepalive_timeout: Optional[timedelta] = timedelta(milliseconds=1000),
):
self._deadline = deadline
self._max_send_message_length = max_send_message_length
self._max_receive_message_length = max_receive_message_length
self._keepalive_permit_without_calls = keepalive_permit_without_calls
self._keepalive_time = keepalive_time
self._keepalive_timeout = keepalive_timeout

def get_deadline(self) -> timedelta:
return self._deadline

def with_deadline(self, deadline: timedelta) -> TopicGrpcConfiguration:
_validate_request_timeout(deadline)
return StaticTopicGrpcConfiguration(deadline)

def get_max_send_message_length(self) -> Optional[int]:
return self._max_send_message_length

def get_max_receive_message_length(self) -> Optional[int]:
return self._max_receive_message_length

def get_keepalive_permit_without_calls(self) -> Optional[int]:
return int(self._keepalive_permit_without_calls) if self._keepalive_permit_without_calls is not None else None

def get_keepalive_time(self) -> Optional[timedelta]:
return self._keepalive_time

def get_keepalive_timeout(self) -> Optional[timedelta]:
return self._keepalive_timeout


class StaticTopicTransportStrategy(TopicTransportStrategy):
def __init__(self, grpc_configuration: TopicGrpcConfiguration):
self._grpc_configuration = grpc_configuration

def get_grpc_configuration(self) -> TopicGrpcConfiguration:
return self._grpc_configuration

def with_grpc_configuration(self, grpc_configuration: TopicGrpcConfiguration) -> TopicTransportStrategy:
return StaticTopicTransportStrategy(grpc_configuration)

def with_client_timeout(self, client_timeout: timedelta) -> TopicTransportStrategy:
return self.with_grpc_configuration(self._grpc_configuration.with_deadline(client_timeout))
42 changes: 42 additions & 0 deletions src/momento/internal/_utilities/_grpc_channel_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Sequence, Tuple, Union

from momento.config.transport.grpc_configuration import GrpcConfiguration
from momento.config.transport.topic_grpc_configuration import TopicGrpcConfiguration
from momento.config.transport.transport_strategy import StaticGrpcConfiguration
from momento.internal._utilities import _timedelta_to_ms

Expand All @@ -11,6 +12,47 @@
ChannelArguments = Sequence[Tuple[str, Union[int, None]]]


def grpc_topic_channel_options_from_grpc_config(grpc_config: TopicGrpcConfiguration) -> ChannelArguments:
"""Create gRPC channel options from a TopicGrpcConfiguration.

Args:
grpc_config (TopicGrpcConfiguration): the gRPC configuration.

Returns:
grpc.aio.ChannelArgumentType: a list of gRPC channel options as key-value tuples.
"""
channel_options = []

max_send_length = grpc_config.get_max_send_message_length()
channel_options.append(
("grpc.max_send_message_length", max_send_length if max_send_length is not None else DEFAULT_MAX_MESSAGE_SIZE)
)

max_receive_length = grpc_config.get_max_receive_message_length()
channel_options.append(
(
"grpc.max_receive_message_length",
max_receive_length if max_receive_length is not None else DEFAULT_MAX_MESSAGE_SIZE,
)
)

channel_options.append(("grpc.service_config_disable_resolution", 1))

keepalive_permit = grpc_config.get_keepalive_permit_without_calls()
if keepalive_permit is not None:
channel_options.append(("grpc.keepalive_permit_without_calls", keepalive_permit))

keepalive_time = grpc_config.get_keepalive_time()
if keepalive_time is not None:
channel_options.append(("grpc.keepalive_time_ms", _timedelta_to_ms(keepalive_time)))

keepalive_timeout = grpc_config.get_keepalive_timeout()
if keepalive_timeout is not None:
channel_options.append(("grpc.keepalive_timeout_ms", _timedelta_to_ms(keepalive_timeout)))

return channel_options


def grpc_data_channel_options_from_grpc_config(grpc_config: GrpcConfiguration) -> ChannelArguments:
"""Create gRPC channel options from a GrpcConfiguration.

Expand Down
17 changes: 7 additions & 10 deletions src/momento/internal/aio/_scs_grpc_manager.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from __future__ import annotations

import asyncio
from datetime import timedelta
from typing import Optional

import grpc
Expand All @@ -13,7 +12,6 @@
from momento.auth import CredentialProvider
from momento.config import Configuration, TopicConfiguration
from momento.config.auth_configuration import AuthConfiguration
from momento.config.transport.transport_strategy import StaticGrpcConfiguration
from momento.errors.exceptions import ConnectionException
from momento.internal._utilities import PYTHON_RUNTIME_VERSION, ClientType
from momento.internal._utilities._channel_credentials import (
Expand All @@ -22,6 +20,7 @@
from momento.internal._utilities._grpc_channel_options import (
grpc_control_channel_options_from_grpc_config,
grpc_data_channel_options_from_grpc_config,
grpc_topic_channel_options_from_grpc_config,
)
from momento.internal.services import Service
from momento.retry import RetryStrategy
Expand Down Expand Up @@ -136,14 +135,13 @@ class _PubsubGrpcManager:
"""Internal gRPC pubsub manager."""

def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
# NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients.
grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100))

self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_interceptors(credential_provider.auth_token, ClientType.TOPIC, None),
options=grpc_data_channel_options_from_grpc_config(grpc_config),
options=grpc_topic_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)

async def close(self) -> None:
Expand All @@ -157,14 +155,13 @@ class _PubsubGrpcStreamManager:
"""Internal gRPC pubsub stream manager."""

def __init__(self, configuration: TopicConfiguration, credential_provider: CredentialProvider):
# NOTE: This is hard-coded for now but we may want to expose it via TopicConfiguration in the future, as we do with some of the other clients.
grpc_config = StaticGrpcConfiguration(deadline=timedelta(milliseconds=1100))

self._secure_channel = grpc.aio.secure_channel(
target=credential_provider.cache_endpoint,
credentials=grpc.ssl_channel_credentials(),
interceptors=_stream_interceptors(credential_provider.auth_token, ClientType.TOPIC),
options=grpc_data_channel_options_from_grpc_config(grpc_config),
options=grpc_topic_channel_options_from_grpc_config(
configuration.get_transport_strategy().get_grpc_configuration()
),
)

async def close(self) -> None:
Expand Down
5 changes: 5 additions & 0 deletions src/momento/internal/aio/_scs_pubsub_client.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import math
from datetime import timedelta

from momento_wire_types import cachepubsub_pb2 as pubsub_pb
from momento_wire_types import cachepubsub_pb2_grpc as pubsub_grpc
Expand Down Expand Up @@ -34,6 +35,9 @@ def __init__(self, configuration: TopicConfiguration, credential_provider: Crede
self._logger.debug("Pubsub client instantiated with endpoint: %s", endpoint)
self._endpoint = endpoint

default_deadline: timedelta = configuration.get_transport_strategy().get_grpc_configuration().get_deadline()
self._default_deadline_seconds = default_deadline.total_seconds()

num_subscriptions = configuration.get_max_subscriptions()
# Default to a single channel and scale up if necessary. Each channel can support
# 100 subscriptions. Issuing more subscribe requests than you have channels to handle
Expand Down Expand Up @@ -70,6 +74,7 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) ->

await self._get_stub().Publish( # type: ignore[misc]
request,
timeout=self._default_deadline_seconds,
)
return TopicPublish.Success()
except Exception as e:
Expand Down
Loading
Loading