From 867f7c874e079ef06cfa3e181110023a209a18e7 Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Tue, 19 Nov 2024 15:51:52 -0800 Subject: [PATCH 01/12] feat: add support for topic sequence number and sequence page --- src/momento/internal/aio/_scs_pubsub_client.py | 11 +++++++++-- .../internal/synchronous/_scs_pubsub_client.py | 11 +++++++++-- src/momento/responses/pubsub/subscribe.py | 2 ++ src/momento/topic_client.py | 14 ++++++++++++-- src/momento/topic_client_async.py | 12 ++++++++++-- 5 files changed, 42 insertions(+), 8 deletions(-) diff --git a/src/momento/internal/aio/_scs_pubsub_client.py b/src/momento/internal/aio/_scs_pubsub_client.py index 1fe07c89..b3f03980 100644 --- a/src/momento/internal/aio/_scs_pubsub_client.py +++ b/src/momento/internal/aio/_scs_pubsub_client.py @@ -76,7 +76,13 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> self._log_request_error("publish", e) return TopicPublish.Error(convert_error(e, Service.TOPICS)) - async def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse: + async def subscribe( + self, + cache_name: str, + topic_name: str, + resume_at_topic_sequence_number: int = 0, + resume_at_topic_sequence_page: int = 0, + ) -> TopicSubscribeResponse: try: _validate_cache_name(cache_name) _validate_topic_name(topic_name) @@ -84,7 +90,8 @@ async def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeRes request = pubsub_pb._SubscriptionRequest( cache_name=cache_name, topic=topic_name, - # TODO: resume_at_topic_sequence_number + resume_at_topic_sequence_number=resume_at_topic_sequence_number, + sequence_page=resume_at_topic_sequence_page, ) stream = self._get_stream_stub().Subscribe( # type: ignore[misc] request, diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index 075f65f1..a3d89cef 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -76,7 +76,13 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic self._log_request_error("publish", e) return TopicPublish.Error(convert_error(e, Service.TOPICS)) - def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse: + def subscribe( + self, + cache_name: str, + topic_name: str, + resume_at_topic_sequence_number: int = 0, + resume_at_topic_sequence_page: int = 0, + ) -> TopicSubscribeResponse: try: _validate_cache_name(cache_name) _validate_topic_name(topic_name) @@ -84,7 +90,8 @@ def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse: request = pubsub_pb._SubscriptionRequest( cache_name=cache_name, topic=topic_name, - # TODO: resume_at_topic_sequence_number + resume_at_topic_sequence_number=resume_at_topic_sequence_number, + sequence_page=resume_at_topic_sequence_page, ) stream = self._get_stream_stub().Subscribe( # type: ignore[misc] request, diff --git a/src/momento/responses/pubsub/subscribe.py b/src/momento/responses/pubsub/subscribe.py index d2f361f9..5ce348e0 100644 --- a/src/momento/responses/pubsub/subscribe.py +++ b/src/momento/responses/pubsub/subscribe.py @@ -37,11 +37,13 @@ class SubscriptionBase(TopicSubscribeResponse): _logger = logs.logger _last_known_sequence_number: Optional[int] = None + _last_known_sequence_page: Optional[int] = None def _process_result(self, result: cachepubsub_pb2._SubscriptionItem) -> Optional[TopicSubscriptionItemResponse]: msg_type: str = result.WhichOneof("kind") if msg_type == "item": self._last_known_sequence_number = result.item.topic_sequence_number + self._last_known_sequence_page = result.item.sequence_page value = result.item.value value_type: str = value.WhichOneof("kind") if value_type == "text": diff --git a/src/momento/topic_client.py b/src/momento/topic_client.py index fd5e0be7..5d2d2ed3 100644 --- a/src/momento/topic_client.py +++ b/src/momento/topic_client.py @@ -81,17 +81,27 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic """ return self._pubsub_client.publish(cache_name, topic_name, value) - def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse: + def subscribe( + self, + cache_name: str, + topic_name: str, + resume_at_topic_sequence_number: int = 0, + resume_at_topic_sequence_page: int = 0, + ) -> TopicSubscribeResponse: """Subscribes to a topic. Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. + resume_at_topic_sequence_number (int): The topic sequence number to resume at. + resume_at_topic_sequence_page (int): The topic sequence page to resume at. Returns: TopicSubscribeResponse """ - return self._pubsub_client.subscribe(cache_name, topic_name) + return self._pubsub_client.subscribe( + cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page + ) def close(self) -> None: self._pubsub_client.close() diff --git a/src/momento/topic_client_async.py b/src/momento/topic_client_async.py index e7ea6e7a..e17f2c7f 100644 --- a/src/momento/topic_client_async.py +++ b/src/momento/topic_client_async.py @@ -81,17 +81,25 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> """ return await self._pubsub_client.publish(cache_name, topic_name, value) - async def subscribe(self, cache_name: str, topic_name: str) -> TopicSubscribeResponse: + async def subscribe( + self, + cache_name: str, + topic_name: str, + resume_at_topic_sequence_number: int = 0, + resume_at_topic_sequence_page: int = 0, + ) -> TopicSubscribeResponse: """Subscribes to a topic. Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. + resume_at_topic_sequence_number (int): The topic sequence number to resume at. + resume_at_topic_sequence_page (int): The topic sequence page to resume at. Returns: TopicSubscribeResponse """ - return await self._pubsub_client.subscribe(cache_name, topic_name) + return await self._pubsub_client.subscribe(cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page) async def close(self) -> None: await self._pubsub_client.close() From 06cceab0ff111cb6c992676fe3de2cdc8a43c98f Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Tue, 19 Nov 2024 16:25:02 -0800 Subject: [PATCH 02/12] chore: add tests --- tests/momento/topic_client/test_topics.py | 45 +++++++++++++++-- .../momento/topic_client/test_topics_async.py | 49 ++++++++++++++++--- 2 files changed, 85 insertions(+), 9 deletions(-) diff --git a/tests/momento/topic_client/test_topics.py b/tests/momento/topic_client/test_topics.py index 720eda7e..7d82f850 100644 --- a/tests/momento/topic_client/test_topics.py +++ b/tests/momento/topic_client/test_topics.py @@ -49,20 +49,59 @@ def topic_validator(topic_client: TopicClient) -> TTopicValidator: cache_name = uuid_str() return partial(topic_client.subscribe, cache_name=cache_name) - def subscribe_happy_path_string(client: CacheClient, topic_client: TopicClient, cache_name: str) -> None: + def subscribe_happy_path_string(topic_client: TopicClient, cache_name: str) -> None: topic = uuid_str() value = uuid_str() + _ = topic_client.publish(cache_name, topic_name=topic, value=value) + subscribe_response = topic_client.subscribe(cache_name, topic_name=topic) assert isinstance(subscribe_response, TopicSubscribe.Subscription) + item_response = next(subscribe_response) + + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value + + def subscribe_happy_path_string_with_nonzero_resume( + topic_client: TopicClient, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + + _ = topic_client.publish(cache_name, topic_name=topic, value="1") + _ = topic_client.publish(cache_name, topic_name=topic, value="2") _ = topic_client.publish(cache_name, topic_name=topic, value=value) + subscribe_response = topic_client.subscribe( + cache_name, topic_name=topic, resume_at_topic_sequence_number=3, resume_at_topic_sequence_page=0 + ) + assert isinstance(subscribe_response, TopicSubscribe.Subscription) + item_response = next(subscribe_response) + + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value + + def subscribe_happy_path_string_with_discontinuity( + topic_client: TopicClient, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + + _ = topic_client.publish(cache_name, topic_name=topic, value=value) + + subscribe_response = topic_client.subscribe( + cache_name, topic_name=topic, resume_at_topic_sequence_number=5, resume_at_topic_sequence_page=5 + ) + assert isinstance(subscribe_response, TopicSubscribe.Subscription) + + item_response = next(subscribe_response) + assert isinstance(item_response, TopicSubscriptionItem.Text) assert item_response.value == value - def subscribe_happy_path_binary(client: CacheClient, topic_client: TopicClient, cache_name: str) -> None: + def subscribe_happy_path_binary(topic_client: TopicClient, cache_name: str) -> None: topic = uuid_str() value = uuid_bytes() @@ -75,7 +114,7 @@ def subscribe_happy_path_binary(client: CacheClient, topic_client: TopicClient, assert isinstance(item_response, TopicSubscriptionItem.Binary) assert item_response.value == value - def succeeds_with_nonexistent_topic(client: CacheClient, topic_client: TopicClient, cache_name: str) -> None: + def succeeds_with_nonexistent_topic(topic_client: TopicClient, cache_name: str) -> None: topic = uuid_str() resp = topic_client.subscribe(cache_name, topic) diff --git a/tests/momento/topic_client/test_topics_async.py b/tests/momento/topic_client/test_topics_async.py index ee52d157..2351cbd4 100644 --- a/tests/momento/topic_client/test_topics_async.py +++ b/tests/momento/topic_client/test_topics_async.py @@ -30,7 +30,7 @@ def topic_validator(topic_client_async: TopicClientAsync) -> TTopicValidator: return partial(topic_client_async.publish, cache_name=cache_name, value=value) async def publish_happy_path( - client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + topic_client_async: TopicClientAsync, cache_name: str ) -> None: topic = uuid_str() value = uuid_str() @@ -52,24 +52,61 @@ def topic_validator(topic_client_async: TopicClientAsync) -> TTopicValidator: return partial(topic_client_async.subscribe, cache_name=cache_name) async def subscribe_happy_path_string( - client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + topic_client_async: TopicClientAsync, cache_name: str ) -> None: topic = uuid_str() value = uuid_str() + _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) + subscribe_response = await topic_client_async.subscribe(cache_name, topic_name=topic) assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync) item_task = subscribe_response.__anext__() - publish_response = await topic_client_async.publish(cache_name, topic_name=topic, value=value) + item_response = await item_task + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value - print(publish_response) + async def subscribe_happy_path_string_with_nonzero_resume( + topic_client_async: TopicClientAsync, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + + _ = await topic_client_async.publish(cache_name, topic_name=topic, value="1") + _ = await topic_client_async.publish(cache_name, topic_name=topic, value="2") + _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) + + subscribe_response = await topic_client_async.subscribe( + cache_name, topic_name=topic, resume_at_topic_sequence_number=3, resume_at_topic_sequence_page=0 + ) + assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync) + + item_task = subscribe_response.__anext__() + item_response = await item_task + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value + + async def subscribe_happy_path_string_with_discontinuity( + topic_client_async: TopicClientAsync, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + + _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) + + subscribe_response = await topic_client_async.subscribe( + cache_name, topic_name=topic, resume_at_topic_sequence_number=5, resume_at_topic_sequence_page=5 + ) + assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync) + + item_task = subscribe_response.__anext__() item_response = await item_task assert isinstance(item_response, TopicSubscriptionItem.Text) assert item_response.value == value async def subscribe_happy_path_binary( - client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + topic_client_async: TopicClientAsync, cache_name: str ) -> None: topic = uuid_str() value = uuid_bytes() @@ -86,7 +123,7 @@ async def subscribe_happy_path_binary( assert item_response.value == value async def succeeds_with_nonexistent_topic( - client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + topic_client_async: TopicClientAsync, cache_name: str ) -> None: topic = uuid_str() From 3811d080527eecda66393e77312281912a0fd2ff Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Tue, 19 Nov 2024 16:31:57 -0800 Subject: [PATCH 03/12] chore: improve docs --- src/momento/topic_client.py | 6 ++++-- src/momento/topic_client_async.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/momento/topic_client.py b/src/momento/topic_client.py index 5d2d2ed3..88d5c151 100644 --- a/src/momento/topic_client.py +++ b/src/momento/topic_client.py @@ -93,8 +93,10 @@ def subscribe( Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. - resume_at_topic_sequence_number (int): The topic sequence number to resume at. - resume_at_topic_sequence_page (int): The topic sequence page to resume at. + resume_at_topic_sequence_number (int): The topic sequence number to resume at. Omit or set to 0 to start at + the latest messages. + resume_at_topic_sequence_page (int): The topic sequence page to resume at. Omit or set to 0 to start at the + latest page. Returns: TopicSubscribeResponse diff --git a/src/momento/topic_client_async.py b/src/momento/topic_client_async.py index e17f2c7f..e34747ff 100644 --- a/src/momento/topic_client_async.py +++ b/src/momento/topic_client_async.py @@ -93,8 +93,10 @@ async def subscribe( Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. - resume_at_topic_sequence_number (int): The topic sequence number to resume at. - resume_at_topic_sequence_page (int): The topic sequence page to resume at. + resume_at_topic_sequence_number (int): The topic sequence number to resume at. Omit or set to 0 to start at + the latest messages. + resume_at_topic_sequence_page (int): The topic sequence page to resume at. Omit or set to 0 to start at + the latest page. Returns: TopicSubscribeResponse From 43582606a36ac1a0b50fab9fffb78e23458aa299 Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Tue, 19 Nov 2024 16:40:58 -0800 Subject: [PATCH 04/12] chore: formatting cleanup --- .../internal/synchronous/_scs_pubsub_client.py | 10 +++++----- src/momento/topic_client_async.py | 4 +++- tests/momento/topic_client/test_topics.py | 12 ++++-------- .../momento/topic_client/test_topics_async.py | 18 +++++------------- 4 files changed, 17 insertions(+), 27 deletions(-) diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index a3d89cef..b0347a09 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -77,11 +77,11 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic return TopicPublish.Error(convert_error(e, Service.TOPICS)) def subscribe( - self, - cache_name: str, - topic_name: str, - resume_at_topic_sequence_number: int = 0, - resume_at_topic_sequence_page: int = 0, + self, + cache_name: str, + topic_name: str, + resume_at_topic_sequence_number: int = 0, + resume_at_topic_sequence_page: int = 0, ) -> TopicSubscribeResponse: try: _validate_cache_name(cache_name) diff --git a/src/momento/topic_client_async.py b/src/momento/topic_client_async.py index e34747ff..725b2bb1 100644 --- a/src/momento/topic_client_async.py +++ b/src/momento/topic_client_async.py @@ -101,7 +101,9 @@ async def subscribe( Returns: TopicSubscribeResponse """ - return await self._pubsub_client.subscribe(cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page) + return await self._pubsub_client.subscribe( + cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page + ) async def close(self) -> None: await self._pubsub_client.close() diff --git a/tests/momento/topic_client/test_topics.py b/tests/momento/topic_client/test_topics.py index 7d82f850..e92c2bbf 100644 --- a/tests/momento/topic_client/test_topics.py +++ b/tests/momento/topic_client/test_topics.py @@ -1,6 +1,6 @@ from functools import partial -from momento import CacheClient, TopicClient +from momento import TopicClient from momento.responses import TopicPublish, TopicSubscribe, TopicSubscriptionItem from pytest import fixture from pytest_describe import behaves_like @@ -29,7 +29,7 @@ def topic_validator(topic_client: TopicClient) -> TTopicValidator: value = uuid_str() return partial(topic_client.publish, cache_name=cache_name, value=value) - def publish_happy_path(client: CacheClient, topic_client: TopicClient, cache_name: str) -> None: + def publish_happy_path(topic_client: TopicClient, cache_name: str) -> None: topic = uuid_str() value = uuid_str() @@ -63,9 +63,7 @@ def subscribe_happy_path_string(topic_client: TopicClient, cache_name: str) -> N assert isinstance(item_response, TopicSubscriptionItem.Text) assert item_response.value == value - def subscribe_happy_path_string_with_nonzero_resume( - topic_client: TopicClient, cache_name: str - ) -> None: + def subscribe_happy_path_string_with_nonzero_resume(topic_client: TopicClient, cache_name: str) -> None: topic = uuid_str() value = uuid_str() @@ -83,9 +81,7 @@ def subscribe_happy_path_string_with_nonzero_resume( assert isinstance(item_response, TopicSubscriptionItem.Text) assert item_response.value == value - def subscribe_happy_path_string_with_discontinuity( - topic_client: TopicClient, cache_name: str - ) -> None: + def subscribe_happy_path_string_with_discontinuity(topic_client: TopicClient, cache_name: str) -> None: topic = uuid_str() value = uuid_str() diff --git a/tests/momento/topic_client/test_topics_async.py b/tests/momento/topic_client/test_topics_async.py index 2351cbd4..682c3418 100644 --- a/tests/momento/topic_client/test_topics_async.py +++ b/tests/momento/topic_client/test_topics_async.py @@ -1,6 +1,6 @@ from functools import partial -from momento import CacheClientAsync, TopicClientAsync +from momento import TopicClientAsync from momento.responses import TopicPublish, TopicSubscribe, TopicSubscriptionItem from pytest import fixture from pytest_describe import behaves_like @@ -29,9 +29,7 @@ def topic_validator(topic_client_async: TopicClientAsync) -> TTopicValidator: value = uuid_str() return partial(topic_client_async.publish, cache_name=cache_name, value=value) - async def publish_happy_path( - topic_client_async: TopicClientAsync, cache_name: str - ) -> None: + async def publish_happy_path(topic_client_async: TopicClientAsync, cache_name: str) -> None: topic = uuid_str() value = uuid_str() @@ -51,9 +49,7 @@ def topic_validator(topic_client_async: TopicClientAsync) -> TTopicValidator: cache_name = uuid_str() return partial(topic_client_async.subscribe, cache_name=cache_name) - async def subscribe_happy_path_string( - topic_client_async: TopicClientAsync, cache_name: str - ) -> None: + async def subscribe_happy_path_string(topic_client_async: TopicClientAsync, cache_name: str) -> None: topic = uuid_str() value = uuid_str() @@ -105,9 +101,7 @@ async def subscribe_happy_path_string_with_discontinuity( assert isinstance(item_response, TopicSubscriptionItem.Text) assert item_response.value == value - async def subscribe_happy_path_binary( - topic_client_async: TopicClientAsync, cache_name: str - ) -> None: + async def subscribe_happy_path_binary(topic_client_async: TopicClientAsync, cache_name: str) -> None: topic = uuid_str() value = uuid_bytes() @@ -122,9 +116,7 @@ async def subscribe_happy_path_binary( assert isinstance(item_response, TopicSubscriptionItem.Binary) assert item_response.value == value - async def succeeds_with_nonexistent_topic( - topic_client_async: TopicClientAsync, cache_name: str - ) -> None: + async def succeeds_with_nonexistent_topic(topic_client_async: TopicClientAsync, cache_name: str) -> None: topic = uuid_str() resp = await topic_client_async.subscribe(cache_name, topic) From 6a5cf66b18e4cb1b35e148e9ffba53c5b04587de Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Wed, 20 Nov 2024 08:54:20 -0800 Subject: [PATCH 05/12] chore: pull new params back out of public client --- src/momento/topic_client.py | 10 +--------- src/momento/topic_client_async.py | 10 +--------- 2 files changed, 2 insertions(+), 18 deletions(-) diff --git a/src/momento/topic_client.py b/src/momento/topic_client.py index 88d5c151..1035be3b 100644 --- a/src/momento/topic_client.py +++ b/src/momento/topic_client.py @@ -85,25 +85,17 @@ def subscribe( self, cache_name: str, topic_name: str, - resume_at_topic_sequence_number: int = 0, - resume_at_topic_sequence_page: int = 0, ) -> TopicSubscribeResponse: """Subscribes to a topic. Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. - resume_at_topic_sequence_number (int): The topic sequence number to resume at. Omit or set to 0 to start at - the latest messages. - resume_at_topic_sequence_page (int): The topic sequence page to resume at. Omit or set to 0 to start at the - latest page. Returns: TopicSubscribeResponse """ - return self._pubsub_client.subscribe( - cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page - ) + return self._pubsub_client.subscribe(cache_name, topic_name) def close(self) -> None: self._pubsub_client.close() diff --git a/src/momento/topic_client_async.py b/src/momento/topic_client_async.py index 725b2bb1..8320306a 100644 --- a/src/momento/topic_client_async.py +++ b/src/momento/topic_client_async.py @@ -85,25 +85,17 @@ async def subscribe( self, cache_name: str, topic_name: str, - resume_at_topic_sequence_number: int = 0, - resume_at_topic_sequence_page: int = 0, ) -> TopicSubscribeResponse: """Subscribes to a topic. Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. - resume_at_topic_sequence_number (int): The topic sequence number to resume at. Omit or set to 0 to start at - the latest messages. - resume_at_topic_sequence_page (int): The topic sequence page to resume at. Omit or set to 0 to start at - the latest page. Returns: TopicSubscribeResponse """ - return await self._pubsub_client.subscribe( - cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page - ) + return await self._pubsub_client.subscribe(cache_name, topic_name) async def close(self) -> None: await self._pubsub_client.close() From f41bf9c94425f0f17ce94f91c719dfb0ae2eb92d Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Wed, 20 Nov 2024 08:55:03 -0800 Subject: [PATCH 06/12] chore: restore cache client fixture and remove unneeded tests --- tests/momento/topic_client/test_topics.py | 42 ++-------------- .../momento/topic_client/test_topics_async.py | 50 ++++--------------- 2 files changed, 13 insertions(+), 79 deletions(-) diff --git a/tests/momento/topic_client/test_topics.py b/tests/momento/topic_client/test_topics.py index e92c2bbf..b0576098 100644 --- a/tests/momento/topic_client/test_topics.py +++ b/tests/momento/topic_client/test_topics.py @@ -1,6 +1,6 @@ from functools import partial -from momento import TopicClient +from momento import CacheClient, TopicClient from momento.responses import TopicPublish, TopicSubscribe, TopicSubscriptionItem from pytest import fixture from pytest_describe import behaves_like @@ -49,7 +49,7 @@ def topic_validator(topic_client: TopicClient) -> TTopicValidator: cache_name = uuid_str() return partial(topic_client.subscribe, cache_name=cache_name) - def subscribe_happy_path_string(topic_client: TopicClient, cache_name: str) -> None: + def subscribe_happy_path_string(client: CacheClient, topic_client: TopicClient, cache_name: str) -> None: topic = uuid_str() value = uuid_str() @@ -63,41 +63,7 @@ def subscribe_happy_path_string(topic_client: TopicClient, cache_name: str) -> N assert isinstance(item_response, TopicSubscriptionItem.Text) assert item_response.value == value - def subscribe_happy_path_string_with_nonzero_resume(topic_client: TopicClient, cache_name: str) -> None: - topic = uuid_str() - value = uuid_str() - - _ = topic_client.publish(cache_name, topic_name=topic, value="1") - _ = topic_client.publish(cache_name, topic_name=topic, value="2") - _ = topic_client.publish(cache_name, topic_name=topic, value=value) - - subscribe_response = topic_client.subscribe( - cache_name, topic_name=topic, resume_at_topic_sequence_number=3, resume_at_topic_sequence_page=0 - ) - assert isinstance(subscribe_response, TopicSubscribe.Subscription) - - item_response = next(subscribe_response) - - assert isinstance(item_response, TopicSubscriptionItem.Text) - assert item_response.value == value - - def subscribe_happy_path_string_with_discontinuity(topic_client: TopicClient, cache_name: str) -> None: - topic = uuid_str() - value = uuid_str() - - _ = topic_client.publish(cache_name, topic_name=topic, value=value) - - subscribe_response = topic_client.subscribe( - cache_name, topic_name=topic, resume_at_topic_sequence_number=5, resume_at_topic_sequence_page=5 - ) - assert isinstance(subscribe_response, TopicSubscribe.Subscription) - - item_response = next(subscribe_response) - - assert isinstance(item_response, TopicSubscriptionItem.Text) - assert item_response.value == value - - def subscribe_happy_path_binary(topic_client: TopicClient, cache_name: str) -> None: + def subscribe_happy_path_binary(client: CacheClient, topic_client: TopicClient, cache_name: str) -> None: topic = uuid_str() value = uuid_bytes() @@ -110,7 +76,7 @@ def subscribe_happy_path_binary(topic_client: TopicClient, cache_name: str) -> N assert isinstance(item_response, TopicSubscriptionItem.Binary) assert item_response.value == value - def succeeds_with_nonexistent_topic(topic_client: TopicClient, cache_name: str) -> None: + def succeeds_with_nonexistent_topic(client: CacheClient, topic_client: TopicClient, cache_name: str) -> None: topic = uuid_str() resp = topic_client.subscribe(cache_name, topic) diff --git a/tests/momento/topic_client/test_topics_async.py b/tests/momento/topic_client/test_topics_async.py index 682c3418..f2657cb6 100644 --- a/tests/momento/topic_client/test_topics_async.py +++ b/tests/momento/topic_client/test_topics_async.py @@ -1,6 +1,6 @@ from functools import partial -from momento import TopicClientAsync +from momento import CacheClientAsync, TopicClientAsync from momento.responses import TopicPublish, TopicSubscribe, TopicSubscriptionItem from pytest import fixture from pytest_describe import behaves_like @@ -49,33 +49,15 @@ def topic_validator(topic_client_async: TopicClientAsync) -> TTopicValidator: cache_name = uuid_str() return partial(topic_client_async.subscribe, cache_name=cache_name) - async def subscribe_happy_path_string(topic_client_async: TopicClientAsync, cache_name: str) -> None: - topic = uuid_str() - value = uuid_str() - - _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) - - subscribe_response = await topic_client_async.subscribe(cache_name, topic_name=topic) - assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync) - - item_task = subscribe_response.__anext__() - item_response = await item_task - assert isinstance(item_response, TopicSubscriptionItem.Text) - assert item_response.value == value - - async def subscribe_happy_path_string_with_nonzero_resume( - topic_client_async: TopicClientAsync, cache_name: str + async def subscribe_happy_path_string( + client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str ) -> None: topic = uuid_str() value = uuid_str() - _ = await topic_client_async.publish(cache_name, topic_name=topic, value="1") - _ = await topic_client_async.publish(cache_name, topic_name=topic, value="2") _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) - subscribe_response = await topic_client_async.subscribe( - cache_name, topic_name=topic, resume_at_topic_sequence_number=3, resume_at_topic_sequence_page=0 - ) + subscribe_response = await topic_client_async.subscribe(cache_name, topic_name=topic) assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync) item_task = subscribe_response.__anext__() @@ -83,25 +65,9 @@ async def subscribe_happy_path_string_with_nonzero_resume( assert isinstance(item_response, TopicSubscriptionItem.Text) assert item_response.value == value - async def subscribe_happy_path_string_with_discontinuity( - topic_client_async: TopicClientAsync, cache_name: str + async def subscribe_happy_path_binary( + client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str ) -> None: - topic = uuid_str() - value = uuid_str() - - _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) - - subscribe_response = await topic_client_async.subscribe( - cache_name, topic_name=topic, resume_at_topic_sequence_number=5, resume_at_topic_sequence_page=5 - ) - assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync) - - item_task = subscribe_response.__anext__() - item_response = await item_task - assert isinstance(item_response, TopicSubscriptionItem.Text) - assert item_response.value == value - - async def subscribe_happy_path_binary(topic_client_async: TopicClientAsync, cache_name: str) -> None: topic = uuid_str() value = uuid_bytes() @@ -116,7 +82,9 @@ async def subscribe_happy_path_binary(topic_client_async: TopicClientAsync, cach assert isinstance(item_response, TopicSubscriptionItem.Binary) assert item_response.value == value - async def succeeds_with_nonexistent_topic(topic_client_async: TopicClientAsync, cache_name: str) -> None: + async def succeeds_with_nonexistent_topic( + client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + ) -> None: topic = uuid_str() resp = await topic_client_async.subscribe(cache_name, topic) From a283d36813096543e2d9c6fb29be2c3ed818b8e4 Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Wed, 20 Nov 2024 09:09:48 -0800 Subject: [PATCH 07/12] chore: add comment for future develoment --- src/momento/internal/aio/_scs_pubsub_client.py | 3 +++ src/momento/internal/synchronous/_scs_pubsub_client.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/src/momento/internal/aio/_scs_pubsub_client.py b/src/momento/internal/aio/_scs_pubsub_client.py index b3f03980..30bb4437 100644 --- a/src/momento/internal/aio/_scs_pubsub_client.py +++ b/src/momento/internal/aio/_scs_pubsub_client.py @@ -76,6 +76,9 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> self._log_request_error("publish", e) return TopicPublish.Error(convert_error(e, Service.TOPICS)) + # TODO: when we support reconnecting to a subscription, we can make use of the resume_* parameters. + # Also, if we expose discontinuities and heartbeats in this SDK, we can add these parameters + # to the topic clients as well for the user to control. async def subscribe( self, cache_name: str, diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index b0347a09..7359066e 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -76,6 +76,9 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic self._log_request_error("publish", e) return TopicPublish.Error(convert_error(e, Service.TOPICS)) + # TODO: when we support reconnecting to a subscription, we can make use of the resume_* parameters. + # Also, if we expose discontinuities and heartbeats in this SDK, we can add these parameters + # to the topic clients as well for the user to control. def subscribe( self, cache_name: str, From 93b47215fee95a280f30450a0394fbe673a2ac6b Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Wed, 20 Nov 2024 13:13:11 -0800 Subject: [PATCH 08/12] chore: expose sequence nums on pubsub items and add sequence params to subscribe methods --- src/momento/responses/pubsub/subscribe.py | 8 +++- .../responses/pubsub/subscription_item.py | 4 ++ src/momento/topic_client.py | 8 +++- src/momento/topic_client_async.py | 8 +++- tests/momento/topic_client/test_topics.py | 33 ++++++++++++++++ .../momento/topic_client/test_topics_async.py | 38 +++++++++++++++++++ 6 files changed, 95 insertions(+), 4 deletions(-) diff --git a/src/momento/responses/pubsub/subscribe.py b/src/momento/responses/pubsub/subscribe.py index 5ce348e0..ae6e5c07 100644 --- a/src/momento/responses/pubsub/subscribe.py +++ b/src/momento/responses/pubsub/subscribe.py @@ -47,9 +47,13 @@ def _process_result(self, result: cachepubsub_pb2._SubscriptionItem) -> Optional value = result.item.value value_type: str = value.WhichOneof("kind") if value_type == "text": - return TopicSubscriptionItem.Text(value.text) + return TopicSubscriptionItem.Text( + value.text, self._last_known_sequence_number, self._last_known_sequence_page + ) elif value_type == "binary": - return TopicSubscriptionItem.Binary(value.binary) + return TopicSubscriptionItem.Binary( + value.binary, self._last_known_sequence_number, self._last_known_sequence_page + ) else: err = SdkException( f"Could not find matching TopicSubscriptionItem response for type: {value_type}", diff --git a/src/momento/responses/pubsub/subscription_item.py b/src/momento/responses/pubsub/subscription_item.py index b95f302a..76427a18 100644 --- a/src/momento/responses/pubsub/subscription_item.py +++ b/src/momento/responses/pubsub/subscription_item.py @@ -37,12 +37,16 @@ class Text(TopicSubscriptionItemResponse): """Indicates the request was successful and value will be returned as a string.""" value: str + sequence_number: int + sequence_page: int @dataclass class Binary(TopicSubscriptionItemResponse): """Indicates the request was successful and value will be returned as bytes.""" value: bytes + sequence_number: int + sequence_page: int class Error(TopicSubscriptionItemResponse, ErrorResponseMixin): """Contains information about an error returned from a request. diff --git a/src/momento/topic_client.py b/src/momento/topic_client.py index 1035be3b..f87caba1 100644 --- a/src/momento/topic_client.py +++ b/src/momento/topic_client.py @@ -85,17 +85,23 @@ def subscribe( self, cache_name: str, topic_name: str, + resume_at_topic_sequence_number: int = 0, + resume_at_topic_sequence_page: int = 0, ) -> TopicSubscribeResponse: """Subscribes to a topic. Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. + resume_at_topic_sequence_number (int): The sequence number to resume at. + resume_at_topic_sequence_page (int): The page number to resume at. Returns: TopicSubscribeResponse """ - return self._pubsub_client.subscribe(cache_name, topic_name) + return self._pubsub_client.subscribe( + cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page + ) def close(self) -> None: self._pubsub_client.close() diff --git a/src/momento/topic_client_async.py b/src/momento/topic_client_async.py index 8320306a..22dbc455 100644 --- a/src/momento/topic_client_async.py +++ b/src/momento/topic_client_async.py @@ -85,17 +85,23 @@ async def subscribe( self, cache_name: str, topic_name: str, + resume_at_topic_sequence_number: int = 0, + resume_at_topic_sequence_page: int = 0, ) -> TopicSubscribeResponse: """Subscribes to a topic. Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. + resume_at_topic_sequence_number (int): The sequence number to resume at. + resume_at_topic_sequence_page (int): The page number to resume at. Returns: TopicSubscribeResponse """ - return await self._pubsub_client.subscribe(cache_name, topic_name) + return await self._pubsub_client.subscribe( + cache_name, topic_name, resume_at_topic_sequence_number, resume_at_topic_sequence_page + ) async def close(self) -> None: await self._pubsub_client.close() diff --git a/tests/momento/topic_client/test_topics.py b/tests/momento/topic_client/test_topics.py index b0576098..7a2ff03a 100644 --- a/tests/momento/topic_client/test_topics.py +++ b/tests/momento/topic_client/test_topics.py @@ -59,7 +59,40 @@ def subscribe_happy_path_string(client: CacheClient, topic_client: TopicClient, assert isinstance(subscribe_response, TopicSubscribe.Subscription) item_response = next(subscribe_response) + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value + + def subscribe_happy_path_string_resume_at_sequence( + client: CacheClient, topic_client: TopicClient, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + _ = topic_client.publish(cache_name, topic_name=topic, value="foo") + _ = topic_client.publish(cache_name, topic_name=topic, value="bar") + _ = topic_client.publish(cache_name, topic_name=topic, value=value) + + subscribe_response = topic_client.subscribe(cache_name, topic_name=topic, resume_at_topic_sequence_number=3) + assert isinstance(subscribe_response, TopicSubscribe.Subscription) + + item_response = next(subscribe_response) + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value + + def subscribe_happy_path_string_resume_at_invalid_sequence( + client: CacheClient, topic_client: TopicClient, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + + _ = topic_client.publish(cache_name, topic_name=topic, value=value) + + subscribe_response = topic_client.subscribe( + cache_name, topic_name=topic, resume_at_topic_sequence_number=300, resume_at_topic_sequence_page=5435435 + ) + assert isinstance(subscribe_response, TopicSubscribe.Subscription) + + item_response = next(subscribe_response) assert isinstance(item_response, TopicSubscriptionItem.Text) assert item_response.value == value diff --git a/tests/momento/topic_client/test_topics_async.py b/tests/momento/topic_client/test_topics_async.py index f2657cb6..1b163d0e 100644 --- a/tests/momento/topic_client/test_topics_async.py +++ b/tests/momento/topic_client/test_topics_async.py @@ -65,6 +65,44 @@ async def subscribe_happy_path_string( assert isinstance(item_response, TopicSubscriptionItem.Text) assert item_response.value == value + async def subscribe_happy_path_string_resume_at_sequence( + client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + + _ = await topic_client_async.publish(cache_name, topic_name=topic, value="foo") + _ = await topic_client_async.publish(cache_name, topic_name=topic, value="bar") + _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) + + subscribe_response = await topic_client_async.subscribe( + cache_name, topic_name=topic, resume_at_topic_sequence_number=3 + ) + assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync) + + item_task = subscribe_response.__anext__() + item_response = await item_task + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value + + async def subscribe_happy_path_string_resume_at_invalid_sequence( + client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + ) -> None: + topic = uuid_str() + value = uuid_str() + + _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) + + subscribe_response = await topic_client_async.subscribe( + cache_name, topic_name=topic, resume_at_topic_sequence_number=300, resume_at_topic_sequence_page=5435435 + ) + assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync) + + item_task = subscribe_response.__anext__() + item_response = await item_task + assert isinstance(item_response, TopicSubscriptionItem.Text) + assert item_response.value == value + async def subscribe_happy_path_binary( client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str ) -> None: From e3a38e5fdcacb81c2a93bb37a08bc58ffeaf7956 Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Wed, 20 Nov 2024 13:17:55 -0800 Subject: [PATCH 09/12] chore: remove obsolete comments --- src/momento/internal/aio/_scs_pubsub_client.py | 3 --- src/momento/internal/synchronous/_scs_pubsub_client.py | 3 --- 2 files changed, 6 deletions(-) diff --git a/src/momento/internal/aio/_scs_pubsub_client.py b/src/momento/internal/aio/_scs_pubsub_client.py index 30bb4437..b3f03980 100644 --- a/src/momento/internal/aio/_scs_pubsub_client.py +++ b/src/momento/internal/aio/_scs_pubsub_client.py @@ -76,9 +76,6 @@ async def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> self._log_request_error("publish", e) return TopicPublish.Error(convert_error(e, Service.TOPICS)) - # TODO: when we support reconnecting to a subscription, we can make use of the resume_* parameters. - # Also, if we expose discontinuities and heartbeats in this SDK, we can add these parameters - # to the topic clients as well for the user to control. async def subscribe( self, cache_name: str, diff --git a/src/momento/internal/synchronous/_scs_pubsub_client.py b/src/momento/internal/synchronous/_scs_pubsub_client.py index 7359066e..b0347a09 100644 --- a/src/momento/internal/synchronous/_scs_pubsub_client.py +++ b/src/momento/internal/synchronous/_scs_pubsub_client.py @@ -76,9 +76,6 @@ def publish(self, cache_name: str, topic_name: str, value: str | bytes) -> Topic self._log_request_error("publish", e) return TopicPublish.Error(convert_error(e, Service.TOPICS)) - # TODO: when we support reconnecting to a subscription, we can make use of the resume_* parameters. - # Also, if we expose discontinuities and heartbeats in this SDK, we can add these parameters - # to the topic clients as well for the user to control. def subscribe( self, cache_name: str, From 32c1d504b8f232f23f0c06fcf325a1cf3ca01c4f Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Wed, 20 Nov 2024 13:20:30 -0800 Subject: [PATCH 10/12] chore: improve docstrings --- src/momento/topic_client.py | 6 ++++-- src/momento/topic_client_async.py | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/momento/topic_client.py b/src/momento/topic_client.py index f87caba1..5624f6f4 100644 --- a/src/momento/topic_client.py +++ b/src/momento/topic_client.py @@ -93,8 +93,10 @@ def subscribe( Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. - resume_at_topic_sequence_number (int): The sequence number to resume at. - resume_at_topic_sequence_page (int): The page number to resume at. + resume_at_topic_sequence_number (int): The sequence number to resume at. Omit or use 0 to start at the + latest messages. + resume_at_topic_sequence_page (int): The page number to resume at. Omit or use 0 to start at the + latest messages. Returns: TopicSubscribeResponse diff --git a/src/momento/topic_client_async.py b/src/momento/topic_client_async.py index 22dbc455..ea1814af 100644 --- a/src/momento/topic_client_async.py +++ b/src/momento/topic_client_async.py @@ -93,8 +93,10 @@ async def subscribe( Args: cache_name (str): The cache to subscribe to. topic_name (str): The topic to subscribe to. - resume_at_topic_sequence_number (int): The sequence number to resume at. - resume_at_topic_sequence_page (int): The page number to resume at. + resume_at_topic_sequence_number (int): The sequence number to resume at. Omit or use 0 to start at the + latest messages. + resume_at_topic_sequence_page (int): The page number to resume at. Omit or use 0 to start at the + latest messages. Returns: TopicSubscribeResponse From d4da5f1b1486dfe96434f31bba80807876019fe7 Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Wed, 20 Nov 2024 13:25:10 -0800 Subject: [PATCH 11/12] chore: minor test fixes --- tests/momento/topic_client/test_topics.py | 2 +- tests/momento/topic_client/test_topics_async.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/tests/momento/topic_client/test_topics.py b/tests/momento/topic_client/test_topics.py index 7a2ff03a..cd2c44fe 100644 --- a/tests/momento/topic_client/test_topics.py +++ b/tests/momento/topic_client/test_topics.py @@ -29,7 +29,7 @@ def topic_validator(topic_client: TopicClient) -> TTopicValidator: value = uuid_str() return partial(topic_client.publish, cache_name=cache_name, value=value) - def publish_happy_path(topic_client: TopicClient, cache_name: str) -> None: + def publish_happy_path(client: CacheClient, topic_client: TopicClient, cache_name: str) -> None: topic = uuid_str() value = uuid_str() diff --git a/tests/momento/topic_client/test_topics_async.py b/tests/momento/topic_client/test_topics_async.py index 1b163d0e..52621831 100644 --- a/tests/momento/topic_client/test_topics_async.py +++ b/tests/momento/topic_client/test_topics_async.py @@ -29,7 +29,9 @@ def topic_validator(topic_client_async: TopicClientAsync) -> TTopicValidator: value = uuid_str() return partial(topic_client_async.publish, cache_name=cache_name, value=value) - async def publish_happy_path(topic_client_async: TopicClientAsync, cache_name: str) -> None: + async def publish_happy_path( + client: CacheClientAsync, topic_client_async: TopicClientAsync, cache_name: str + ) -> None: topic = uuid_str() value = uuid_str() From 385e139d524282b1c4ae67fcb476a854fcee7094 Mon Sep 17 00:00:00 2001 From: Pete Gautier Date: Wed, 20 Nov 2024 13:29:15 -0800 Subject: [PATCH 12/12] chore: improve sequence resume tests --- tests/momento/topic_client/test_topics.py | 4 ++-- tests/momento/topic_client/test_topics_async.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/momento/topic_client/test_topics.py b/tests/momento/topic_client/test_topics.py index cd2c44fe..c318291f 100644 --- a/tests/momento/topic_client/test_topics.py +++ b/tests/momento/topic_client/test_topics.py @@ -69,10 +69,10 @@ def subscribe_happy_path_string_resume_at_sequence( value = uuid_str() _ = topic_client.publish(cache_name, topic_name=topic, value="foo") - _ = topic_client.publish(cache_name, topic_name=topic, value="bar") _ = topic_client.publish(cache_name, topic_name=topic, value=value) + _ = topic_client.publish(cache_name, topic_name=topic, value="bar") - subscribe_response = topic_client.subscribe(cache_name, topic_name=topic, resume_at_topic_sequence_number=3) + subscribe_response = topic_client.subscribe(cache_name, topic_name=topic, resume_at_topic_sequence_number=2) assert isinstance(subscribe_response, TopicSubscribe.Subscription) item_response = next(subscribe_response) diff --git a/tests/momento/topic_client/test_topics_async.py b/tests/momento/topic_client/test_topics_async.py index 52621831..f6f42cda 100644 --- a/tests/momento/topic_client/test_topics_async.py +++ b/tests/momento/topic_client/test_topics_async.py @@ -74,11 +74,11 @@ async def subscribe_happy_path_string_resume_at_sequence( value = uuid_str() _ = await topic_client_async.publish(cache_name, topic_name=topic, value="foo") - _ = await topic_client_async.publish(cache_name, topic_name=topic, value="bar") _ = await topic_client_async.publish(cache_name, topic_name=topic, value=value) + _ = await topic_client_async.publish(cache_name, topic_name=topic, value="bar") subscribe_response = await topic_client_async.subscribe( - cache_name, topic_name=topic, resume_at_topic_sequence_number=3 + cache_name, topic_name=topic, resume_at_topic_sequence_number=2 ) assert isinstance(subscribe_response, TopicSubscribe.SubscriptionAsync)