Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

js: Add js.PullSubscribe and sub.Fetch APIs for pull consumers #670

Merged
merged 1 commit into from Mar 18, 2021

Conversation

wallyqs
Copy link
Member

@wallyqs wallyqs commented Mar 3, 2021

This changes the way how Pull consumers work by adding the two following APIs:

func (js *js) PullSubscribe(subj string, opts ...SubOpt) (*Subscription, error)
func (sub *Subscription) Fetch(batch int, opts ...PullOpt) ([]*Msg, error)

PullSubscribe replaces using SubscribeSync or QueueSubscribeSync along with nats.Pull option to create pull based consumers. When a subscription is called with PullSubscribe, it can use sub.Fetch() to get a synchronous response of a collection of messages.

Example usage

js, _ := nc.JetStream()

// Create pull based consumer with 128 max concurrent inflight pull requests.
sub, err := js.PullSubscribe("foo", nats.Durable("example"), nats.PullMaxWaiting(128))

// Simple loop to fetch one message is efficient:
// internally first makes a request to get a single message if
// there is one already, if none then makes longer request
// that waits up to 2 seconds
for {
        msgs, err := sub.Fetch(1, nats.MaxWait(2*time.Second))
        if err != nil {
                log.Println("Error during fetch:", err)
		continue
        }

	// If no errors, it is ensured that there would always be at
	// least a message at this point.
	msg := msgs[0]
	msg.Ack()
}

// When expecting for multiple messages, it will be awaited
// for the delivery of all the messages or yield the delivered
// results when the timeout expires.
for {
        msgs, err := sub.Fetch(10, nats.MaxWait(2*time.Second))
        if err != nil {
                log.Println("Error during fetch:", err)
		continue
        }
	for _, msg := range msgs {
		msg.Ack()
	}
}

Signed-off-by: Waldemar Quevedo wally@synadia.com

@coveralls
Copy link

coveralls commented Mar 3, 2021

Coverage Status

Coverage decreased (-0.4%) to 86.657% when pulling 99d20d3 on js-pull-subscribe into d70f82c on master.

@derekcollison
Copy link
Member

Let's squash down for review.

@wallyqs wallyqs force-pushed the js-pull-subscribe branch 2 times, most recently from c4d3b43 to 6493d5d Compare March 3, 2021 04:54
@wallyqs
Copy link
Member Author

wallyqs commented Mar 3, 2021

Squashed and updated server version in go.mod

test/js_test.go Show resolved Hide resolved
@scottf
Copy link
Contributor

scottf commented Mar 3, 2021

What is this?

// Deletes durable consumer on unsubscribe
defer sub.Unsubscribe()

@wallyqs
Copy link
Member Author

wallyqs commented Mar 3, 2021

@scottf that means that when Unsubscribe() is called then also makes the consumer go away:

https://github.com/nats-io/nats.go/blob/master/js.go#L427-L440
https://github.com/nats-io/nats.go/blob/master/nats.go#L3567-L3576'

Note: This comment is stale as the original example has changed.

@wallyqs wallyqs force-pushed the js-pull-subscribe branch 2 times, most recently from 2387a12 to df21c14 Compare March 4, 2021 04:21
@wallyqs
Copy link
Member Author

wallyqs commented Mar 4, 2021

Rebased and updated to test against latest version of the server.

js.go Outdated
// this limit can be modified by using PullMaxWaiting when creating the consumer.
//
nr := &nextRequest{Batch: batch, NoWait: o.noWait, Expires: o.expires}
req, _ := json.Marshal(nr)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In case only Batch is set, could take advantage of the fast path and just the number as bytes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also results in a lighter protocol.

js.go Outdated Show resolved Hide resolved
js.go Outdated
if len(msg.Data) == 0 {
switch msg.Header.Get(statusHdr) {
case noResponders:
return ErrNoResponders
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be another error type surfaced that might be more meaningful?

nats.go Outdated
switch msg.Header.Get(statusHdr) {
case noResponders:
return ErrNoResponders
case "400", "404", "408", "409":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Convert this case for > 400 to future proof this? (either convert to int or test the first char to '4' or '5')... Alternatively, could add a default that results in an error. wdyt?

test/js_test.go Outdated Show resolved Hide resolved
Copy link
Member

@ColinSullivan1 ColinSullivan1 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM; the suggested changes aren't critical to this PR.

@wallyqs wallyqs force-pushed the js-pull-subscribe branch 7 times, most recently from aa625d9 to 552d360 Compare March 17, 2021 23:07
@wallyqs wallyqs force-pushed the js-pull-subscribe branch 3 times, most recently from 52815ba to 99d20d3 Compare March 17, 2021 23:47
@wallyqs
Copy link
Member Author

wallyqs commented Mar 18, 2021

This is rebased and updated against latest version of the server. The logic is now smarter using NoWait to be signaled in case there are no messages and fallback to a longer pull batch request in case there aren't any at the moment and wait for the delivery. In case of batches of 1, also the way pull works was simplified to be a mix of new style request plus an old style request for the second one in case the initial NoWait request fails with no messages.

Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general LGTM

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
@wallyqs wallyqs merged commit 47f4a4c into master Mar 18, 2021
@wallyqs wallyqs deleted the js-pull-subscribe branch March 18, 2021 02:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants