Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

js: FlowControl + Heartbeats for Push Consumers #692

Merged
merged 1 commit into from Mar 31, 2021
Merged

Conversation

wallyqs
Copy link
Member

@wallyqs wallyqs commented Mar 30, 2021

This adds the following options to push based consumers:

sub, err := js.Subscribe("foo", func(msg *nats.Msg) {}, nats.EnableFlowControl(), nats.IdleHeartbeat(2*time.Second))

sub, err := js.SubscribeSync("foo", nats.EnableFlowControl(), nats.IdleHeartbeat(2*time.Second))

mch := make(chan *Msg, 512)
sub, err := js.ChanSubscribe("foo", mch, nats.EnableFlowControl(), nats.IdleHeartbeat(2*time.Second))

When they are enabled, the client will automatically handle/skip the control messages sent by the server. In case of push based consumers with Heartbeats, an asynchronous typed error will be dispatched so that the subscription could deleted and restarted from the given sequence:

errHandler := nats.ErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) {
	if info, ok := err.(*nats.ErrConsumerSequenceMismatch); ok {
		t.Logf("Subscription should be restarted from stream sequence at %+v", info.StreamResumeSequence)
	}
})

Signed-off-by: Waldemar Quevedo wally@synadia.com

js.go Outdated Show resolved Hide resolved
nats.go Outdated
select {
case sub.mch <- m:
case jsi.fch <- m:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to support ChanSubscribe, SyncSubscribe and Async Subscribers, a goroutine that expects signals via a channel is started to process the status messages outside of the message delivery.

@wallyqs wallyqs force-pushed the js-hbs-chan branch 3 times, most recently from 11407b9 to 7843601 Compare March 30, 2021 18:11
js.go Outdated
} else if hasFC || hasHeartbeats {
// Start goroutine to handle flow control messages for a push consumer.
sub.jsi.fch = make(chan *Msg, 64)
go sub.processControlFlow()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I moved the control messages handling to be done before delivering the message at processMsg and added an extra goroutine that would be handling the flow control messages from a subscription instead: https://github.com/nats-io/nats.go/pull/692/files#diff-1d2fb9af677137c0182213770500dd5625a51ba7286301cb11d0197b21e0f948R2533

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anyway to avoid having another Go routine?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed the goroutine and now responding a flow control message happens as soon as we receive the message, we have to do the processing of control messages there to be able to support ChanSubscribe:

@coveralls
Copy link

coveralls commented Mar 31, 2021

Coverage Status

Coverage decreased (-0.3%) to 86.339% when pulling 1b16e34 on js-hbs-chan into a0b1f60 on master.

@wallyqs wallyqs force-pushed the js-hbs-chan branch 2 times, most recently from 821f170 to 3b348f7 Compare March 31, 2021 06:27
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
Copy link
Member

@derekcollison derekcollison left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@wallyqs wallyqs merged commit 1114ca8 into master Mar 31, 2021
@wallyqs wallyqs deleted the js-hbs-chan branch March 31, 2021 16:57
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants