From 3d98928893df7030bcd2d66ac455d75c02193e18 Mon Sep 17 00:00:00 2001 From: Deepak Date: Wed, 6 Jul 2022 18:54:40 +0530 Subject: [PATCH 1/3] Add num_replicas field for consumer --- nats/js/api.py | 1 + tests/test_js.py | 19 +++++++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/nats/js/api.py b/nats/js/api.py index 74d26c23..11329b39 100644 --- a/nats/js/api.py +++ b/nats/js/api.py @@ -365,6 +365,7 @@ class ConsumerConfig(Base): flow_control: Optional[bool] = None idle_heartbeat: Optional[float] = None headers_only: Optional[bool] = None + num_replicas: Optional[int] = None @classmethod def from_response(cls, resp: Dict[str, Any]): diff --git a/tests/test_js.py b/tests/test_js.py index f43fbc7e..7ea86025 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -1577,3 +1577,22 @@ async def error_handler(e): with pytest.raises(BadBucketError): await js.key_value(bucket="TEST3") + +class ConsumerReplicasTest(SingleJetStreamServerTestCase): + @async_test + async def test_number_of_consumer_replicas(self): + nc = await nats.connect() + + js = nc.jetstream() + await js.add_stream(name="TESTREPLICAS", subjects=["test.replicas"]) + for i in range(0, 10): + await js.publish("test", f'{i}'.encode()) + + + # Create consumer + config = nats.js.api.ConsumerConfig(num_replicas=1) + cons = await js.add_consumer(stream="TESTREPLICAS", config=config) + + assert cons.config.num_replicas == 1 + + await nc.close() \ No newline at end of file From de77557a4acccf10cb492cdabfc3ac45957b5022 Mon Sep 17 00:00:00 2001 From: Deepak Date: Wed, 6 Jul 2022 19:49:38 +0530 Subject: [PATCH 2/3] Add consumer name --- tests/test_js.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_js.py b/tests/test_js.py index 7ea86025..9da425f5 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -1590,9 +1590,9 @@ async def test_number_of_consumer_replicas(self): # Create consumer - config = nats.js.api.ConsumerConfig(num_replicas=1) + config = nats.js.api.ConsumerConfig(num_replicas=1, durable_name="mycons") cons = await js.add_consumer(stream="TESTREPLICAS", config=config) assert cons.config.num_replicas == 1 - await nc.close() \ No newline at end of file + await nc.close() From f0bbb6950efe601485114a0146e7b54f1b95520e Mon Sep 17 00:00:00 2001 From: Deepak Date: Thu, 7 Jul 2022 00:31:59 +0530 Subject: [PATCH 3/3] Fix subject name when publishing --- tests/test_js.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_js.py b/tests/test_js.py index 9da425f5..d2d4ea59 100644 --- a/tests/test_js.py +++ b/tests/test_js.py @@ -1586,7 +1586,7 @@ async def test_number_of_consumer_replicas(self): js = nc.jetstream() await js.add_stream(name="TESTREPLICAS", subjects=["test.replicas"]) for i in range(0, 10): - await js.publish("test", f'{i}'.encode()) + await js.publish("test.replicas", f'{i}'.encode()) # Create consumer