diff --git a/src/momento/internal/aio/_scs_pubsub_client.py b/src/momento/internal/aio/_scs_pubsub_client.py index 1fe07c89..b3f03980 100644 --- a/src/momento/internal/aio/_scs_pubsub_client.py +++ b/src/momento/internal/aio/_scs_pubsub_client.py @@ -76,7 +76,13 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> self._log_request_error("publish", e) return TopicPublish.Error(convert_error(e, Service.TOPICS)) - async def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse: + async def subscribe( + self, + cache_name: str, + topic_name: str, + resume_at_topic_sequence_number: int = 0, + resume_at_topic_sequence_page: int = 0, + ) -> TopicSubscribeResponse: try: _validate_cache_name(cache_name) _validate_topic_name(topic_name) @@ -84,7 +90,8 @@ async def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeRes request = pubsub_pb._SubscriptionRequest( cache_name=cache_name, topic=topic_name, - # TODO: resume_at_topic_sequence_number + resume_at_topic_sequence_number=resume_at_topic_sequence_number, + sequence_page=resume_at_topic_sequence_page, ) stream = self._get_stream_stub().Subscribe( # type: ignore[misc] request, diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index 075f65f1..b0347a09 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -76,7 +76,13 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic self._log_request_error("publish", e) return TopicPublish.Error(convert_error(e, Service.TOPICS)) - def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse: + def subscribe( + self, + cache_name: str, + topic_name: str, + resume_at_topic_sequence_number: int = 0, + resume_at_topic_sequence_page: int = 0, + ) -> TopicSubscribeResponse: try: _validate_cache_name(cache_name) _validate_topic_name(topic_name) @@ -84,7 +90,8 @@ def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse: request = pubsub_pb._SubscriptionRequest( cache_name=cache_name, topic=topic_name, - # TODO: resume_at_topic_sequence_number + resume_at_topic_sequence_number=resume_at_topic_sequence_number, + sequence_page=resume_at_topic_sequence_page, ) stream = self._get_stream_stub().Subscribe( # type: ignore[misc] request, diff --git a/src/momento/responses/pubsub/subscribe.py b/src/momento/responses/pubsub/subscribe.py index d2f361f9..ae6e5c07 100644 --- a/src/momento/responses/pubsub/subscribe.py +++ b/src/momento/responses/pubsub/subscribe.py @@ -37,17 +37,23 @@ class SubscriptionBase(TopicSubscribeResponse): _logger = logs.logger _last_known_sequence_number: Optional[int] = None + _last_known_sequence_page: Optional[int] = None def _process_result(self, result: cachepubsub_pb2._SubscriptionItem) -> Optional[TopicSubscriptionItemResponse]: msg_type: str = result.WhichOneof("kind") if msg_type == "item": self._last_known_sequence_number = result.item.topic_sequence_number + self._last_known_sequence_page = result.item.sequence_page value = result.item.value value_type: str = value.WhichOneof("kind") if value_type == "text": - return TopicSubscriptionItem.Text(value.text) + return TopicSubscriptionItem.Text( + value.text, self._last_known_sequence_number, self._last_known_sequence_page + ) elif value_type == "binary": - return TopicSubscriptionItem.Binary(value.binary) + return TopicSubscriptionItem.Binary( + value.binary, self._last_known_sequence_number, self._last_known_sequence_page + ) else: err = SdkException( f"Could not find matching TopicSubscriptionItem response for type: {value_type}", diff --git a/src/momento/responses/pubsub/subscription_item.py b/src/momento/responses/pubsub/subscription_item.py index b95f302a..76427a18 100644 --- a/src/momento/responses/pubsub/subscription_item.py +++ b/src/momento/responses/pubsub/subscription_item.py @@ -37,12 +37,16 @@ class Text(TopicSubscriptionItemResponse): """Indicates the request was successful and value will be returned as a string.""" value: str + sequence_number: int + sequence_page: int @dataclass class Binary(TopicSubscriptionItemResponse): """Indicates the request was successful and value will be returned as bytes.""" value: bytes + sequence_number: int + sequence_page: int class Error(TopicSubscriptionItemResponse, ErrorResponseMixin): """Contains information about an error returned from a request. diff --git a/src/momento/topic_client.py b/src/momento/topic_client.py index fd5e0be7..5624f6f4 100644 --- a/src/momento/topic_client.py +++ b/src/momento/topic_client.py @@ -81,17 +81,29 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic """ return self._pubsub_client.publish(cache_name, topic_name, value) - def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse: + def subscribe( + self, + cache_name: str, + topic_name: str, + resume_at_topic_sequence_number: int = 0, + resume_at_topic_sequence_page: int = 0, + ) -> TopicSubscribeResponse: """Subscribes to a topic. Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. + resume_at_topic_sequence_number (int): The sequence number to resume at. Omit or use 0 to start at the + latest messages. + resume_at_topic_sequence_page (int): The page number to resume at. Omit or use 0 to start at the + latest messages. Returns: TopicSubscribeResponse """ - return self._pubsub_client.subscribe(cache_name, topic_name) + return self._pubsub_client.subscribe( + cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page + ) def close(self) -> None: self._pubsub_client.close() diff --git a/src/momento/topic_client_async.py b/src/momento/topic_client_async.py index e7ea6e7a..ea1814af 100644 --- a/src/momento/topic_client_async.py +++ b/src/momento/topic_client_async.py @@ -81,17 +81,29 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> """ return await self._pubsub_client.publish(cache_name, topic_name, value) - async def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse: + async def subscribe( + self, + cache_name: str, + topic_name: str, + resume_at_topic_sequence_number: int = 0, + resume_at_topic_sequence_page: int = 0, + ) -> TopicSubscribeResponse: """Subscribes to a topic. Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. + resume_at_topic_sequence_number (int): The sequence number to resume at. Omit or use 0 to start at the + latest messages. + resume_at_topic_sequence_page (int): The page number to resume at. Omit or use 0 to start at the + latest messages. Returns: TopicSubscribeResponse """ - return await self._pubsub_client.subscribe(cache_name, topic_name) + return await self._pubsub_client.subscribe( + cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page + ) async def close(self) -> None: await self._pubsub_client.close() diff --git a/tests/momento/topic_client/test_topics.py b/tests/momento/topic_client/test_topics.py index 720eda7e..c318291f 100644 --- a/tests/momento/topic_client/test_topics.py +++ b/tests/momento/topic_client/test_topics.py @@ -53,11 +53,45 @@ def subscribe_happy_path_string(client: CacheClient, topic_client: TopicClient, topic = uuid_str() value = uuid_str() + _ = topic_client.publish(cache_name, topic_name=topic, value=value) + subscribe_response = topic_client.subscribe(cache_name, topic_name=topic) assert isinstance(subscribe_response, TopicSubscribe.Subscription) + item_response = next(subscribe_response) + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value + + def subscribe_happy_path_string_resume_at_sequence( + client: CacheClient, topic_client: TopicClient, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + + _ = topic_client.publish(cache_name, topic_name=topic, value="foo") + _ = topic_client.publish(cache_name, topic_name=topic, value=value) + _ = topic_client.publish(cache_name, topic_name=topic, value="bar") + + subscribe_response = topic_client.subscribe(cache_name, topic_name=topic, resume_at_topic_sequence_number=2) + assert isinstance(subscribe_response, TopicSubscribe.Subscription) + + item_response = next(subscribe_response) + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value + + def subscribe_happy_path_string_resume_at_invalid_sequence( + client: CacheClient, topic_client: TopicClient, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + _ = topic_client.publish(cache_name, topic_name=topic, value=value) + subscribe_response = topic_client.subscribe( + cache_name, topic_name=topic, resume_at_topic_sequence_number=300, resume_at_topic_sequence_page=5435435 + ) + assert isinstance(subscribe_response, TopicSubscribe.Subscription) + item_response = next(subscribe_response) assert isinstance(item_response, TopicSubscriptionItem.Text) assert item_response.value == value diff --git a/tests/momento/topic_client/test_topics_async.py b/tests/momento/topic_client/test_topics_async.py index ee52d157..f6f42cda 100644 --- a/tests/momento/topic_client/test_topics_async.py +++ b/tests/momento/topic_client/test_topics_async.py @@ -57,13 +57,50 @@ async def subscribe_happy_path_string( topic = uuid_str() value = uuid_str() + _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) + subscribe_response = await topic_client_async.subscribe(cache_name, topic_name=topic) assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync) item_task = subscribe_response.__anext__() - publish_response = await topic_client_async.publish(cache_name, topic_name=topic, value=value) + item_response = await item_task + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value - print(publish_response) + async def subscribe_happy_path_string_resume_at_sequence( + client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + + _ = await topic_client_async.publish(cache_name, topic_name=topic, value="foo") + _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) + _ = await topic_client_async.publish(cache_name, topic_name=topic, value="bar") + + subscribe_response = await topic_client_async.subscribe( + cache_name, topic_name=topic, resume_at_topic_sequence_number=2 + ) + assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync) + + item_task = subscribe_response.__anext__() + item_response = await item_task + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value + + async def subscribe_happy_path_string_resume_at_invalid_sequence( + client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + + _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) + + subscribe_response = await topic_client_async.subscribe( + cache_name, topic_name=topic, resume_at_topic_sequence_number=300, resume_at_topic_sequence_page=5435435 + ) + assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync) + + item_task = subscribe_response.__anext__() item_response = await item_task assert isinstance(item_response, TopicSubscriptionItem.Text) assert item_response.value == value