Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions python/natsrpy/_natsrpy_rs/js/kv.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -122,10 +122,10 @@ class KVConfig:
"""

bucket: str
description: str
description: str | None
max_value_size: int | None
history: int | None
max_age: float | None
max_age: timedelta | None
max_bytes: int | None
storage: StorageType | None
num_replicas: int | None
Expand All @@ -135,7 +135,7 @@ class KVConfig:
mirror_direct: bool | None
compression: bool | None
placement: Placement | None
limit_markers: float | None
limit_markers: timedelta | None

def __new__(
cls,
Expand Down
4 changes: 1 addition & 3 deletions python/natsrpy/_natsrpy_rs/js/stream.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -132,8 +132,6 @@ class SubjectTransform:
source: str
destination: str

def __new__(cls, source: str, destination: str) -> Self: ...

@final
class Source:
"""Configuration for a stream source or mirror origin.
Expand Down Expand Up @@ -454,7 +452,7 @@ class StreamInfo:
"""

config: StreamConfig
created: float
created: int
state: StreamState
cluster: ClusterInfo | None
mirror: SourceInfo | None
Expand Down
40 changes: 40 additions & 0 deletions python/tests/test_consumers.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,3 +304,43 @@ async def test_push_consumer_config_properties() -> None:
assert config.description == "push test"
assert config.ack_policy == AckPolicy.EXPLICIT
assert config.deliver_policy == DeliverPolicy.NEW


async def test_pull_consumer_consume_context_manager(js: JetStream) -> None:
stream_name = f"test-pullctx-{uuid.uuid4().hex[:8]}"
subj = f"{stream_name}.data"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
await js.publish(subj, b"consume-msg", wait=True)
consumer = await stream.consumers.create(
PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"),
)
async with consumer.consume() as fetcher:
msg = await anext(fetcher)
assert msg.payload == b"consume-msg"
await msg.ack()
finally:
await js.streams.delete(stream_name)


async def test_push_consumer_consume_context_manager(js: JetStream) -> None:
stream_name = f"test-pushctx-{uuid.uuid4().hex[:8]}"
subj = f"{stream_name}.data"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
await js.publish(subj, b"push-consume-msg", wait=True)
deliver_subj = uuid.uuid4().hex
consumer = await stream.consumers.create(
PushConsumerConfig(
deliver_subject=deliver_subj,
name=f"push-{uuid.uuid4().hex[:8]}",
),
)
async with consumer.consume() as msgs:
msg = await anext(msgs)
assert msg.payload == b"push-consume-msg"
await msg.ack()
finally:
await js.streams.delete(stream_name)
4 changes: 4 additions & 0 deletions python/tests/test_jetstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,7 @@ async def test_jetstream_publish_with_headers(js: JetStream) -> None:
await js.publish(subj, b"with-headers", headers={"x-test": "value"}, wait=True)
finally:
await js.streams.delete(stream_name)


async def test_jetstream_has_counters_manager(js: JetStream) -> None:
assert js.counters is not None
105 changes: 105 additions & 0 deletions python/tests/test_js_message_acks.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import uuid
from datetime import datetime

from natsrpy.js import (
AckPolicy,
Expand Down Expand Up @@ -166,3 +167,107 @@ async def test_message_headers_empty(js: JetStream) -> None:
assert isinstance(messages[0].headers, dict)
finally:
await js.streams.delete(stream_name)


async def test_message_stream_sequence_and_consumer_sequence(js: JetStream) -> None:
stream_name = f"test-seqs-{uuid.uuid4().hex[:8]}"
subj = f"{stream_name}.data"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
await js.publish(subj, b"seq-msg", wait=True)
consumer = await stream.consumers.create(
PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"),
)
messages = await consumer.fetch(max_messages=1, timeout=5.0)
assert len(messages) == 1
msg = messages[0]
assert isinstance(msg.stream_sequence, int)
assert msg.stream_sequence >= 1
assert isinstance(msg.consumer_sequence, int)
assert msg.consumer_sequence >= 1
finally:
await js.streams.delete(stream_name)


async def test_message_consumer_and_stream_names(js: JetStream) -> None:
stream_name = f"test-names-{uuid.uuid4().hex[:8]}"
subj = f"{stream_name}.data"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
await js.publish(subj, b"names-msg", wait=True)
consumer_name = f"consumer-{uuid.uuid4().hex[:8]}"
consumer = await stream.consumers.create(
PullConsumerConfig(name=consumer_name),
)
messages = await consumer.fetch(max_messages=1, timeout=5.0)
assert len(messages) == 1
msg = messages[0]
assert msg.stream == stream_name
assert msg.consumer == consumer_name
finally:
await js.streams.delete(stream_name)


async def test_message_delivered_and_pending(js: JetStream) -> None:
stream_name = f"test-delpend-{uuid.uuid4().hex[:8]}"
subj = f"{stream_name}.data"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
await js.publish(subj, b"msg-1", wait=True)
await js.publish(subj, b"msg-2", wait=True)
consumer = await stream.consumers.create(
PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"),
)
messages = await consumer.fetch(max_messages=1, timeout=5.0)
assert len(messages) == 1
msg = messages[0]
assert isinstance(msg.delivered, int)
assert msg.delivered >= 1
assert isinstance(msg.pending, int)
assert msg.pending >= 0
await msg.ack()
finally:
await js.streams.delete(stream_name)


async def test_message_published_timestamp(js: JetStream) -> None:
stream_name = f"test-pub-ts-{uuid.uuid4().hex[:8]}"
subj = f"{stream_name}.data"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
await js.publish(subj, b"ts-msg", wait=True)
consumer = await stream.consumers.create(
PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"),
)
messages = await consumer.fetch(max_messages=1, timeout=5.0)
assert len(messages) == 1
msg = messages[0]
assert isinstance(msg.published, datetime)
finally:
await js.streams.delete(stream_name)


async def test_message_length_and_dunder_len(js: JetStream) -> None:
stream_name = f"test-msglen-{uuid.uuid4().hex[:8]}"
subj = f"{stream_name}.data"
config = StreamConfig(name=stream_name, subjects=[f"{stream_name}.>"])
stream = await js.streams.create(config)
try:
payload = b"length-test-payload"
await js.publish(subj, payload, wait=True)
consumer = await stream.consumers.create(
PullConsumerConfig(name=f"consumer-{uuid.uuid4().hex[:8]}"),
)
messages = await consumer.fetch(max_messages=1, timeout=5.0)
assert len(messages) == 1
msg = messages[0]
assert isinstance(msg.length, int)
assert msg.length >= len(payload)
assert len(msg) == msg.length
await msg.ack()
finally:
await js.streams.delete(stream_name)
38 changes: 38 additions & 0 deletions python/tests/test_kv.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,3 +831,41 @@ async def test_kv_operation_equality() -> None:
assert KVOperation.Put != KVOperation.Delete
assert KVOperation.Put != KVOperation.Purge
assert KVOperation.Delete != KVOperation.Purge


async def test_kv_entry_seen_current(js: JetStream) -> None:
bucket = f"test-kv-seen-{uuid.uuid4().hex[:8]}"
config = KVConfig(bucket=bucket)
kv = await js.kv.create(config)
try:
# Create watcher on empty bucket, then put — the received entry is a live update
watcher = await kv.watch_all()
await kv.put("mykey", b"value1")
entry = await asyncio.wait_for(anext(watcher), timeout=5.0)
assert isinstance(entry.seen_current, bool)
# First live update on an empty bucket is marked as seen_current
assert entry.seen_current is True
finally:
await js.kv.delete(bucket)


async def test_kv_config_max_age_timedelta(js: JetStream) -> None:
bucket = f"test-kv-maxage-{uuid.uuid4().hex[:8]}"
max_age = timedelta(hours=1)
config = KVConfig(bucket=bucket, max_age=max_age)
assert config.max_age == max_age
kv = await js.kv.create(config)
try:
assert kv is not None
finally:
await js.kv.delete(bucket)


async def test_kv_config_description_none() -> None:
config = KVConfig(bucket="test-desc-none")
assert config.description is None


async def test_kv_config_description_set() -> None:
config = KVConfig(bucket="test-desc-set", description="my description")
assert config.description == "my description"
75 changes: 75 additions & 0 deletions python/tests/test_nats_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,78 @@ async def test_nats_connection_failure() -> None:
nats = Nats(addrs=["localhost:19999"])
with pytest.raises(Exception):
await nats.startup()


async def test_nats_addr_property(nats_url: str) -> None:
nats = Nats(addrs=[nats_url])
assert nats.addr == [nats_url]


async def test_nats_addr_default() -> None:
nats = Nats()
assert nats.addr == ["nats://localhost:4222"]


async def test_nats_token_property() -> None:
nats = Nats(token="secret-token") # noqa: S106
assert nats.token == "secret-token" # noqa: S105


async def test_nats_token_default() -> None:
nats = Nats()
assert nats.token is None


async def test_nats_nkey_property() -> None:
nats = Nats()
assert nats.nkey is None


async def test_nats_user_and_pass_property() -> None:
nats = Nats(user_and_pass=("user", "pass"))
assert nats.user_and_pass == ("user", "pass")


async def test_nats_user_and_pass_default() -> None:
nats = Nats()
assert nats.user_and_pass is None


async def test_nats_custom_inbox_prefix_property() -> None:
nats = Nats(custom_inbox_prefix="_custom")
assert nats.custom_inbox_prefix == "_custom"


async def test_nats_custom_inbox_prefix_default() -> None:
nats = Nats()
assert nats.custom_inbox_prefix is None


async def test_nats_read_buffer_capacity_property() -> None:
nats = Nats(read_buffer_capacity=1024)
assert nats.read_buffer_capacity == 1024


async def test_nats_read_buffer_capacity_default() -> None:
nats = Nats()
assert nats.read_buffer_capacity == 65535


async def test_nats_sender_capacity_property() -> None:
nats = Nats(sender_capacity=64)
assert nats.sender_capacity == 64


async def test_nats_sender_capacity_default() -> None:
nats = Nats()
assert nats.sender_capacity == 128


async def test_nats_max_reconnects_property() -> None:
nats = Nats(max_reconnects=5)
assert nats.max_reconnects == 5


async def test_nats_max_reconnects_default() -> None:
nats = Nats()
assert nats.max_reconnects is None
60 changes: 60 additions & 0 deletions python/tests/test_streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,3 +252,63 @@ async def test_stream_state_after_publish(js: JetStream) -> None:
assert info.state.bytes > 0
finally:
await js.streams.delete(name)


async def test_stream_name_property(js: JetStream) -> None:
name = f"test-sname-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"])
stream = await js.streams.create(config)
try:
assert stream.name == name
finally:
await js.streams.delete(name)


async def test_stream_info_created_field(js: JetStream) -> None:
name = f"test-screated-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"])
stream = await js.streams.create(config)
try:
info = await stream.get_info()
assert isinstance(info.created, (int, float))
assert info.created > 0
finally:
await js.streams.delete(name)


async def test_stream_state_subjects_count(js: JetStream) -> None:
name = f"test-ssubj-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"])
stream = await js.streams.create(config)
try:
await js.publish(f"{name}.a", b"msg-a", wait=True)
await js.publish(f"{name}.b", b"msg-b", wait=True)
info = await stream.get_info()
assert info.state.subjects_count == 2
finally:
await js.streams.delete(name)


async def test_stream_state_timestamps(js: JetStream) -> None:
name = f"test-sts-{uuid.uuid4().hex[:8]}"
subj = f"{name}.data"
config = StreamConfig(name=name, subjects=[f"{name}.>"])
stream = await js.streams.create(config)
try:
await js.publish(subj, b"ts-msg", wait=True)
info = await stream.get_info()
assert info.state.first_timestamp >= 0
assert info.state.last_timestamp >= 0
finally:
await js.streams.delete(name)


async def test_stream_state_consumer_count(js: JetStream) -> None:
name = f"test-scnt-{uuid.uuid4().hex[:8]}"
config = StreamConfig(name=name, subjects=[f"{name}.>"])
stream = await js.streams.create(config)
try:
info = await stream.get_info()
assert info.state.consumer_count == 0
finally:
await js.streams.delete(name)
Loading
Loading