Skip to content
Merged
11 changes: 9 additions & 2 deletions src/momento/internal/aio/_scs_pubsub_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,22 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) ->
self._log_request_error("publish", e)
return TopicPublish.Error(convert_error(e, Service.TOPICS))

async def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse:
async def subscribe(
self,
cache_name: str,
topic_name: str,
resume_at_topic_sequence_number: int = 0,
resume_at_topic_sequence_page: int = 0,
) -> TopicSubscribeResponse:
try:
_validate_cache_name(cache_name)
_validate_topic_name(topic_name)

request = pubsub_pb._SubscriptionRequest(
cache_name=cache_name,
topic=topic_name,
# TODO: resume_at_topic_sequence_number
resume_at_topic_sequence_number=resume_at_topic_sequence_number,
sequence_page=resume_at_topic_sequence_page,
)
stream = self._get_stream_stub().Subscribe( # type: ignore[misc]
request,
Expand Down
11 changes: 9 additions & 2 deletions src/momento/internal/synchronous/_scs_pubsub_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,22 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic
self._log_request_error("publish", e)
return TopicPublish.Error(convert_error(e, Service.TOPICS))

def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse:
def subscribe(
self,
cache_name: str,
topic_name: str,
resume_at_topic_sequence_number: int = 0,
resume_at_topic_sequence_page: int = 0,
) -> TopicSubscribeResponse:
try:
_validate_cache_name(cache_name)
_validate_topic_name(topic_name)

request = pubsub_pb._SubscriptionRequest(
cache_name=cache_name,
topic=topic_name,
# TODO: resume_at_topic_sequence_number
resume_at_topic_sequence_number=resume_at_topic_sequence_number,
sequence_page=resume_at_topic_sequence_page,
)
stream = self._get_stream_stub().Subscribe( # type: ignore[misc]
request,
Expand Down
10 changes: 8 additions & 2 deletions src/momento/responses/pubsub/subscribe.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,23 @@ class SubscriptionBase(TopicSubscribeResponse):

_logger = logs.logger
_last_known_sequence_number: Optional[int] = None
_last_known_sequence_page: Optional[int] = None

def _process_result(self, result: cachepubsub_pb2._SubscriptionItem) -> Optional[TopicSubscriptionItemResponse]:
msg_type: str = result.WhichOneof("kind")
if msg_type == "item":
self._last_known_sequence_number = result.item.topic_sequence_number
self._last_known_sequence_page = result.item.sequence_page
value = result.item.value
value_type: str = value.WhichOneof("kind")
if value_type == "text":
return TopicSubscriptionItem.Text(value.text)
return TopicSubscriptionItem.Text(
value.text, self._last_known_sequence_number, self._last_known_sequence_page
)
elif value_type == "binary":
return TopicSubscriptionItem.Binary(value.binary)
return TopicSubscriptionItem.Binary(
value.binary, self._last_known_sequence_number, self._last_known_sequence_page
)
else:
err = SdkException(
f"Could not find matching TopicSubscriptionItem response for type: {value_type}",
Expand Down
4 changes: 4 additions & 0 deletions src/momento/responses/pubsub/subscription_item.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,16 @@ class Text(TopicSubscriptionItemResponse):
"""Indicates the request was successful and value will be returned as a string."""

value: str
sequence_number: int
sequence_page: int

@dataclass
class Binary(TopicSubscriptionItemResponse):
"""Indicates the request was successful and value will be returned as bytes."""

value: bytes
sequence_number: int
sequence_page: int

class Error(TopicSubscriptionItemResponse, ErrorResponseMixin):
"""Contains information about an error returned from a request.
Expand Down
16 changes: 14 additions & 2 deletions src/momento/topic_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,29 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic
"""
return self._pubsub_client.publish(cache_name, topic_name, value)

def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse:
def subscribe(
self,
cache_name: str,
topic_name: str,
resume_at_topic_sequence_number: int = 0,
resume_at_topic_sequence_page: int = 0,
) -> TopicSubscribeResponse:
"""Subscribes to a topic.

Args:
cache_name (str): The cache to subscribe to.
topic_name (str): The topic to subscribe to.
resume_at_topic_sequence_number (int): The sequence number to resume at. Omit or use 0 to start at the
latest messages.
resume_at_topic_sequence_page (int): The page number to resume at. Omit or use 0 to start at the
latest messages.

Returns:
TopicSubscribeResponse
"""
return self._pubsub_client.subscribe(cache_name, topic_name)
return self._pubsub_client.subscribe(
cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page
)

def close(self) -> None:
self._pubsub_client.close()
16 changes: 14 additions & 2 deletions src/momento/topic_client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,17 +81,29 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) ->
"""
return await self._pubsub_client.publish(cache_name, topic_name, value)

async def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse:
async def subscribe(
self,
cache_name: str,
topic_name: str,
resume_at_topic_sequence_number: int = 0,
resume_at_topic_sequence_page: int = 0,
) -> TopicSubscribeResponse:
"""Subscribes to a topic.

Args:
cache_name (str): The cache to subscribe to.
topic_name (str): The topic to subscribe to.
resume_at_topic_sequence_number (int): The sequence number to resume at. Omit or use 0 to start at the
latest messages.
resume_at_topic_sequence_page (int): The page number to resume at. Omit or use 0 to start at the
latest messages.

Returns:
TopicSubscribeResponse
"""
return await self._pubsub_client.subscribe(cache_name, topic_name)
return await self._pubsub_client.subscribe(
cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page
)

async def close(self) -> None:
await self._pubsub_client.close()
34 changes: 34 additions & 0 deletions tests/momento/topic_client/test_topics.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,11 +53,45 @@ def subscribe_happy_path_string(client: CacheClient, topic_client: TopicClient,
topic = uuid_str()
value = uuid_str()

_ = topic_client.publish(cache_name, topic_name=topic, value=value)

subscribe_response = topic_client.subscribe(cache_name, topic_name=topic)
assert isinstance(subscribe_response, TopicSubscribe.Subscription)

item_response = next(subscribe_response)
assert isinstance(item_response, TopicSubscriptionItem.Text)
assert item_response.value == value

def subscribe_happy_path_string_resume_at_sequence(
client: CacheClient, topic_client: TopicClient, cache_name: str
) -> None:
topic = uuid_str()
value = uuid_str()

_ = topic_client.publish(cache_name, topic_name=topic, value="foo")
_ = topic_client.publish(cache_name, topic_name=topic, value=value)
_ = topic_client.publish(cache_name, topic_name=topic, value="bar")

subscribe_response = topic_client.subscribe(cache_name, topic_name=topic, resume_at_topic_sequence_number=2)
assert isinstance(subscribe_response, TopicSubscribe.Subscription)

item_response = next(subscribe_response)
assert isinstance(item_response, TopicSubscriptionItem.Text)
assert item_response.value == value

def subscribe_happy_path_string_resume_at_invalid_sequence(
client: CacheClient, topic_client: TopicClient, cache_name: str
) -> None:
topic = uuid_str()
value = uuid_str()

_ = topic_client.publish(cache_name, topic_name=topic, value=value)

subscribe_response = topic_client.subscribe(
cache_name, topic_name=topic, resume_at_topic_sequence_number=300, resume_at_topic_sequence_page=5435435
)
assert isinstance(subscribe_response, TopicSubscribe.Subscription)

item_response = next(subscribe_response)
assert isinstance(item_response, TopicSubscriptionItem.Text)
assert item_response.value == value
Expand Down
41 changes: 39 additions & 2 deletions tests/momento/topic_client/test_topics_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,50 @@ async def subscribe_happy_path_string(
topic = uuid_str()
value = uuid_str()

_ = await topic_client_async.publish(cache_name, topic_name=topic, value=value)

subscribe_response = await topic_client_async.subscribe(cache_name, topic_name=topic)
assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync)

item_task = subscribe_response.__anext__()
publish_response = await topic_client_async.publish(cache_name, topic_name=topic, value=value)
item_response = await item_task
assert isinstance(item_response, TopicSubscriptionItem.Text)
assert item_response.value == value

print(publish_response)
async def subscribe_happy_path_string_resume_at_sequence(
client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str
) -> None:
topic = uuid_str()
value = uuid_str()

_ = await topic_client_async.publish(cache_name, topic_name=topic, value="foo")
_ = await topic_client_async.publish(cache_name, topic_name=topic, value=value)
_ = await topic_client_async.publish(cache_name, topic_name=topic, value="bar")

subscribe_response = await topic_client_async.subscribe(
cache_name, topic_name=topic, resume_at_topic_sequence_number=2
)
assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync)

item_task = subscribe_response.__anext__()
item_response = await item_task
assert isinstance(item_response, TopicSubscriptionItem.Text)
assert item_response.value == value

async def subscribe_happy_path_string_resume_at_invalid_sequence(
client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str
) -> None:
topic = uuid_str()
value = uuid_str()

_ = await topic_client_async.publish(cache_name, topic_name=topic, value=value)

subscribe_response = await topic_client_async.subscribe(
cache_name, topic_name=topic, resume_at_topic_sequence_number=300, resume_at_topic_sequence_page=5435435
)
assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync)

item_task = subscribe_response.__anext__()
item_response = await item_task
assert isinstance(item_response, TopicSubscriptionItem.Text)
assert item_response.value == value
Expand Down
Loading