Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

js: Implement js.subscribe #222

Merged
merged 4 commits into from
Nov 18, 2021
Merged

js: Implement js.subscribe #222

merged 4 commits into from
Nov 18, 2021

Conversation

wallyqs
Copy link
Member

@wallyqs wallyqs commented Nov 18, 2021

Adds support to:

  • Ephemeral Subscribe:
    js.subscribe("foo")

  • Durable 'Push Bound' Subscribe
    js.subscribe("foo", durable="singleton")

  • Queue Subscribe with Deliver Group
    js.subscribe("foo", "workers")

  • Ordered Consumer (ephemeral with max inflight=1 and auto resume)):
    js.subscribe("foo", ordered_consumer=True)

            import asyncio
            import nats

            async def main():
                nc = await nats.connect()
                js = nc.jetstream()

                await js.add_stream(name='hello', subjects=['hello'])
                await js.publish('hello', b'Hello JS!')

                async def cb(msg):
                  print('Received:', msg)

                # Ephemeral Async Subscribe
                await js.subscribe('hello', cb=cb)

                # Durable Async Subscribe
                # NOTE: Only one subscription can be bound to a durable name. It also auto acks by default.
                await js.subscribe('hello', cb=cb, durable='foo')

                # Durable Sync Subscribe
                # NOTE: Sync subscribers do not auto ack.
                await js.subscribe('hello', durable='bar')

                # Queue Async Subscribe
                # NOTE: Here 'workers' becomes deliver_group, durable name and queue name.
                await js.subscribe('hello', 'workers', cb=cb)

Other changes:

  • nats.aio.client.Subscription is moved to nats.aio.subscription.Subscription to avoid import cycles
  • Fixes to headers parsing when there are inline headers + key value headers in a heartbeat
  • Added nc.new_inbox() to get a new inbox for requests
  • Implements flow control + idle_heartbeats protocols for JS push consumers

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

1 participant