Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
charbonnierg committed Mar 23, 2024
1 parent 469e8a8 commit d7997c0
Show file tree
Hide file tree
Showing 4 changed files with 496 additions and 1 deletion.
14 changes: 14 additions & 0 deletions PULL_REQUEST.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
## Problem

## Reference

- [jsm.go #530 - Support direct batch and multi get](https://github.com/nats-io/jsm.go/pull/530/files)
- [jsm.go implementation](https://github.com/nats-io/jsm.go/blob/1c7f0e42497859a4594efbc657ca25237cab7359/streams.go#L749)
- [StreamMsgGetRequest JSON Schema](https://github.com/nats-io/jsm.go/blob/main/schemas/jetstream/api/v1/stream_msg_get_request.json)

### Edge cases


- When requests are made against servers that do not support `batch` the first response will be received and nothing will follow. There is no way to detect this scenario and the batch request will time out. Language documentation must clearly call out what server version supports this.

- To help inform proper use of this feature vs just using a consumer, any multi-subject request may only allow matching up to 1024 subjects. Any more will result in a 413 status reply.
110 changes: 110 additions & 0 deletions examples/direct_get.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
import asyncio
import nats
from nats.js import api
from nats.js import errors


async def main():
nc = await nats.connect("localhost")

# Create JetStream context.
js = nc.jetstream()

# Clean-up KV in case it already exists
try:
await js.delete_key_value("demo")
except errors.NotFoundError:
pass

# Create a KV store
kv = await js.create_key_value(
api.KeyValueConfig(bucket="demo", history=64, direct=True)
)

# Set a key
await kv.put("device.a", b"bar")

# Set another key
await kv.put("device.b", b"foo")

# Update a key
await kv.put("device.a", b"baz")

# Set a key
await kv.put("device.c", b"qux")

# Use case: Read latest value for all devices with a batch size

# Usage 1: Async iterator which can be reentered
async with js.direct_get(
"KV_demo",
batch_size=10,
multi_last=["$KV.demo.device.*"],
) as response:

# Because a batch size is used,
# we need to loop until response
# is no longer pending.
while response.pending():

# Iterate over a batch of messages
async for msg in response:
print(f"Pending: {response.pending()}")
print(f"Data: {msg.data}")

# Print response result. It's only possible
# to access result when a batch has been fully
# processed.
result = response.result()
print(f"Result: {result}")

# Usage 2: Async iterator which does not need to be reentered
async with js.direct_get(
"KV_demo",
batch_size=1,
multi_last=["$KV.demo.device.*"],
continue_on_eob=True,
) as response:

async for msg in response:
print(f"Pending: {response.pending()}")
print(f"Data: {msg.data}")

result = response.result()
print(f"Result: {result}")

# Usage 3: Using a callback to process messages until batch is ended

async def cb(msg: api.RawStreamMsg) -> None:
print(f"Msg sequence: {msg.seq}")
print(f"Data: {msg.data}")

# Read latest value using a callback
result = await js.direct_get(
"KV_demo",
batch_size=1,
cb=cb,
multi_last=["$KV.demo.device.*"],
)
print(result)
# Continue reading from last sequence
while result.num_pending:
result = await js.direct_get(
"KV_demo",
batch_size=1,
cb=cb,
multi_last=["$KV.demo.device.*"],
continue_from=result,
)

# Usage 4: Using a callback to process messages until whole response is received
result = await js.direct_get(
"KV_demo",
batch_size=10,
cb=cb,
multi_last=["$KV.demo.device.*"],
continue_on_eob=True,
)

if __name__ == "__main__":
asyncio.run(main())
7 changes: 7 additions & 0 deletions nats/js/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -695,6 +695,13 @@ def header(self) -> Optional[Dict]:
return self.headers


@dataclass
class DirectGetResult(Base):
num_pending: int
last_seq: int
up_to_seq: int


@dataclass
class KeyValueConfig(Base):
"""
Expand Down

0 comments on commit d7997c0

Please sign in to comment.