Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions nats/src/nats/js/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -492,6 +492,11 @@ class ConsumerConfig(Base):
# Metadata are user defined string key/value pairs.
metadata: Optional[Dict[str, str]] = None

# Consumer pause until timestamp.
# Temporarily suspend message delivery until the specified time (RFC 3339 format).
# Introduced in nats-server 2.11.0.
pause_until: Optional[str] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
cls._convert_nanoseconds(resp, "ack_wait")
Expand Down Expand Up @@ -538,6 +543,12 @@ class ConsumerInfo(Base):
num_pending: Optional[int] = None
cluster: Optional[ClusterInfo] = None
push_bound: Optional[bool] = None
# Indicates if the consumer is currently paused.
# Introduced in nats-server 2.11.0.
paused: Optional[bool] = None
# RFC 3339 timestamp until which the consumer is paused.
# Introduced in nats-server 2.11.0.
pause_remaining: Optional[str] = None

@classmethod
def from_response(cls, resp: Dict[str, Any]):
Expand All @@ -548,6 +559,18 @@ def from_response(cls, resp: Dict[str, Any]):
return super().from_response(resp)


@dataclass
class ConsumerPause(Base):
"""
ConsumerPause represents the pause state after a pause or resume operation.
Introduced in nats-server 2.11.0.
"""

paused: bool
pause_until: Optional[str] = None
pause_remaining: Optional[str] = None


@dataclass
class AccountLimits(Base):
"""Account limits
Expand Down
61 changes: 61 additions & 0 deletions nats/src/nats/js/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,67 @@ async def delete_consumer(self, stream: str, consumer: str) -> bool:
)
return resp["success"]

async def pause_consumer(
self,
stream: str,
consumer: str,
pause_until: str,
timeout: Optional[float] = None,
) -> api.ConsumerPause:
"""
Pause a consumer until the specified time.

Args:
stream: The stream name
consumer: The consumer name
pause_until: RFC 3339 timestamp string (e.g., "2025-10-22T12:00:00Z")
until which the consumer should be paused
timeout: Request timeout in seconds

Returns:
ConsumerPause with paused status

Note:
Requires nats-server 2.11.0 or later
"""
if timeout is None:
timeout = self._timeout

req = {"pause_until": pause_until}
req_data = json.dumps(req).encode()

resp = await self._api_request(
f"{self._prefix}.CONSUMER.PAUSE.{stream}.{consumer}",
req_data,
timeout=timeout,
)
return api.ConsumerPause.from_response(resp)

async def resume_consumer(
self,
stream: str,
consumer: str,
timeout: Optional[float] = None,
) -> api.ConsumerPause:
"""
Resume a paused consumer immediately.

This is equivalent to calling pause_consumer with a timestamp in the past.

Args:
stream: The stream name
consumer: The consumer name
timeout: Request timeout in seconds

Returns:
ConsumerPause with paused=False

Note:
Requires nats-server 2.11.0 or later
"""
# Resume by pausing until a time in the past (epoch)
return await self.pause_consumer(stream, consumer, "1970-01-01T00:00:00Z", timeout)

async def consumers_info(self, stream: str, offset: Optional[int] = None) -> List[api.ConsumerInfo]:
"""
consumers_info retrieves a list of consumers. Consumers list limit is 256 for more
Expand Down
131 changes: 131 additions & 0 deletions nats/tests/test_js.py
Original file line number Diff line number Diff line change
Expand Up @@ -1503,6 +1503,137 @@ async def test_jsm_stream_info_options(self):
assert si.state.subjects == None


class ConsumerPauseResumeTest(SingleJetStreamServerTestCase):
@async_test
async def test_consumer_pause_and_resume(self):
"""Test pausing and resuming a consumer"""
nc = NATS()
await nc.connect()

server_version = nc.connected_server_version
if server_version.major == 2 and server_version.minor < 11:
pytest.skip("consumer pause/resume requires nats-server v2.11.0 or later")

js = nc.jetstream()
jsm = nc.jsm()

# Create a stream
await jsm.add_stream(name="PAUSETEST", subjects=["pause.test"])

# Publish some messages
for i in range(5):
await js.publish("pause.test", f"msg-{i}".encode())

# Create a pull consumer
consumer_name = "pause-consumer"
await jsm.add_consumer(
"PAUSETEST",
name=consumer_name,
durable_name=consumer_name,
ack_policy="explicit",
)

# Get initial consumer info - may or may not be paused initially
# (we'll test pausing anyway)
initial_cinfo = await jsm.consumer_info("PAUSETEST", consumer_name)

# Pause the consumer until a future time (1 hour from now)
from datetime import datetime, timedelta, timezone

pause_until = (datetime.now(timezone.utc) + timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ")

pause_resp = await jsm.pause_consumer("PAUSETEST", consumer_name, pause_until)
assert pause_resp.paused is True
assert pause_resp.pause_remaining is not None

# Verify consumer is still paused when we check info
cinfo = await jsm.consumer_info("PAUSETEST", consumer_name)
assert cinfo.paused is True

# Resume the consumer
resume_resp = await jsm.resume_consumer("PAUSETEST", consumer_name)
assert resume_resp.paused is False

# Verify consumer can now receive messages
sub = await js.pull_subscribe_bind(consumer_name, "PAUSETEST")
msgs = await sub.fetch(1, timeout=2)
assert len(msgs) == 1
# Message should be one of our published messages
assert msgs[0].data in [b"msg-0", b"msg-1", b"msg-2", b"msg-3", b"msg-4"]
await msgs[0].ack()

await nc.close()

@async_test
async def test_consumer_pause_until_in_config(self):
"""Test creating a consumer with pause_until in config"""
nc = NATS()
await nc.connect()

server_version = nc.connected_server_version
if server_version.major == 2 and server_version.minor < 11:
pytest.skip("consumer pause/resume requires nats-server v2.11.0 or later")

js = nc.jetstream()
jsm = nc.jsm()

# Create a stream
await jsm.add_stream(name="PAUSECONFIG", subjects=["pause.config"])

# Publish a message
await js.publish("pause.config", b"test message")

# Create a consumer with pause_until in the config
from datetime import datetime, timedelta, timezone

pause_until = (datetime.now(timezone.utc) + timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ")

consumer_config = nats.js.api.ConsumerConfig(
name="paused-consumer",
durable_name="paused-consumer",
ack_policy="explicit",
pause_until=pause_until,
)

cinfo = await jsm.add_consumer("PAUSECONFIG", config=consumer_config)
assert cinfo.paused is True
# The server may round or adjust the pause_until time slightly
assert cinfo.config.pause_until is not None

await nc.close()

@async_test
async def test_consumer_pause_with_immediate_expiry(self):
"""Test pausing a consumer with an immediate expiry (effectively resume)"""
nc = NATS()
await nc.connect()

server_version = nc.connected_server_version
if server_version.major == 2 and server_version.minor < 11:
pytest.skip("consumer pause/resume requires nats-server v2.11.0 or later")

js = nc.jetstream()
jsm = nc.jsm()

# Create a stream
await jsm.add_stream(name="PAUSEIMMEDIATE", subjects=["pause.immediate"])

# Create a consumer
consumer_name = "immediate-consumer"
await jsm.add_consumer(
"PAUSEIMMEDIATE",
name=consumer_name,
durable_name=consumer_name,
ack_policy="explicit",
)

# Pause with a time in the past (epoch) - should effectively resume
resume_resp = await jsm.pause_consumer("PAUSEIMMEDIATE", consumer_name, "1970-01-01T00:00:00Z")
assert resume_resp.paused is False

await nc.close()


class SubscribeTest(SingleJetStreamServerTestCase):
@async_test
async def test_queue_subscribe_deliver_group(self):
Expand Down
Loading