Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADDED] Drain for jetstream consume methods #1515

Merged
merged 2 commits into from Jan 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 3 additions & 0 deletions .golangci.yaml
Expand Up @@ -5,6 +5,9 @@ issues:
- linters:
- errcheck
text: "Unsubscribe"
- linters:
- errcheck
text: "Drain"
- linters:
- errcheck
text: "msg.Ack"
Expand Down
14 changes: 7 additions & 7 deletions go_test.mod
Expand Up @@ -4,19 +4,19 @@ go 1.19

require (
github.com/golang/protobuf v1.4.2
github.com/klauspost/compress v1.17.2
github.com/nats-io/nats-server/v2 v2.10.0
github.com/klauspost/compress v1.17.4
github.com/nats-io/nats-server/v2 v2.10.7
github.com/nats-io/nkeys v0.4.6
github.com/nats-io/nuid v1.0.1
go.uber.org/goleak v1.2.1
golang.org/x/text v0.13.0
golang.org/x/text v0.14.0
google.golang.org/protobuf v1.23.0
)

require (
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/time v0.3.0 // indirect
github.com/nats-io/jwt/v2 v2.5.3 // indirect
golang.org/x/crypto v0.16.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
)
29 changes: 14 additions & 15 deletions go_test.sum
Expand Up @@ -10,14 +10,14 @@ github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMyw
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4=
github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.4 h1:Ej5ixsIri7BrIjBkRZLTo6ghwrEtHFk7ijlczPW4fZ4=
github.com/klauspost/compress v1.17.4/go.mod h1:/dCuZOvVtNoHsyb+cuJD3itjs3NbnF6KH9zAO4BDxPM=
github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g=
github.com/minio/highwayhash v1.0.2/go.mod h1:BQskDq+xkJ12lmlUUi7U0M5Swg3EWR+dLTk+kldvVxY=
github.com/nats-io/jwt/v2 v2.5.2 h1:DhGH+nKt+wIkDxM6qnVSKjokq5t59AZV5HRcFW0zJwU=
github.com/nats-io/jwt/v2 v2.5.2/go.mod h1:24BeQtRwxRV8ruvC4CojXlx/WQ/VjuwlYiH+vu/+ibI=
github.com/nats-io/nats-server/v2 v2.10.0 h1:rcU++Hzo+wARxtJugrV3J5z5iGdHeVG8tT8Chb3bKDg=
github.com/nats-io/nats-server/v2 v2.10.0/go.mod h1:3PMvMSu2cuK0J9YInRLWdFpFsswKKGUS77zVSAudRto=
github.com/nats-io/jwt/v2 v2.5.3 h1:/9SWvzc6hTfamcgXJ3uYRpgj+QuY2aLNqRiqrKcrpEo=
github.com/nats-io/jwt/v2 v2.5.3/go.mod h1:iysuPemFcc7p4IoYots3IuELSI4EDe9Y0bQMe+I3Bf4=
github.com/nats-io/nats-server/v2 v2.10.7 h1:f5VDy+GMu7JyuFA0Fef+6TfulfCs5nBTgq7MMkFJx5Y=
github.com/nats-io/nats-server/v2 v2.10.7/go.mod h1:V2JHOvPiPdtfDXTuEUsthUnCvSDeFrK4Xn9hRo6du7c=
github.com/nats-io/nkeys v0.4.6 h1:IzVe95ru2CT6ta874rt9saQRkWfe2nFj1NtvYSLqMzY=
github.com/nats-io/nkeys v0.4.6/go.mod h1:4DxZNzenSVd1cYQoAa8948QY3QDjrHfcfVADymtkpts=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
Expand All @@ -26,16 +26,15 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A=
go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4=
golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc=
golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf4=
golang.org/x/exp v0.0.0-20230905200255-921286631fa9 h1:GoHiUyI/Tp2nVkLI2mCxVkOjsbSXD66ic0XW0js0R9g=
golang.org/x/crypto v0.16.0 h1:mMMrFzRSCF0GvB7Ne27XVtVAaXLrPmgPC7/v0tkwHaY=
golang.org/x/crypto v0.16.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/sys v0.0.0-20190130150945-aca44879d564/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE=
golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k=
golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE=
golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4=
golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
Expand Down
18 changes: 18 additions & 0 deletions jetstream/ordered.go
Expand Up @@ -49,6 +49,7 @@ type (
consumer *orderedConsumer
opts []PullMessagesOpt
done chan struct{}
closed uint32
}

cursor struct {
Expand Down Expand Up @@ -298,6 +299,9 @@ func (s *orderedSubscription) Next() (Msg, error) {
}

func (s *orderedSubscription) Stop() {
if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
return
}
sub, ok := s.consumer.currentConsumer.getSubscription("")
if !ok {
return
Expand All @@ -308,6 +312,20 @@ func (s *orderedSubscription) Stop() {
close(s.done)
}

func (s *orderedSubscription) Drain() {
if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
return
}
sub, ok := s.consumer.currentConsumer.getSubscription("")
if !ok {
return
}
s.consumer.currentConsumer.Lock()
defer s.consumer.currentConsumer.Unlock()
sub.Drain()
close(s.done)
}

// Fetch is used to retrieve up to a provided number of messages from a stream.
// This method will always send a single request and wait until either all messages are retrieved
// or context reaches its deadline.
Expand Down
121 changes: 97 additions & 24 deletions jetstream/pull.go
@@ -1,4 +1,4 @@
// Copyright 2022-2023 The NATS Authors
// Copyright 2022-2024 The NATS Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
Expand Down Expand Up @@ -30,14 +30,31 @@ import (
type (
// MessagesContext supports iterating over a messages on a stream.
MessagesContext interface {
// Next retreives next message on a stream. It will block until the next message is available.
// Next retreives next message on a stream. It will block until the next
// message is available.
Next() (Msg, error)
// Stop closes the iterator and cancels subscription.

// Stop unsubscribes from the stream and cancels subscription. Calling
Jarema marked this conversation as resolved.
Show resolved Hide resolved
// Next after calling Stop will return ErrMsgIteratorClosed error.
// All messages that are already in the buffer are discarded.
Stop()

// Drain unsubscribes from the stream and cancels subscription. All
// messages that are already in the buffer will be available on
// subsequent calls to Next. After the buffer is drained, Next will
// return ErrMsgIteratorClosed error.
Drain()
}

ConsumeContext interface {
// Stop unsubscribes from the stream and cancels subscription.
Jarema marked this conversation as resolved.
Show resolved Hide resolved
// No more messages will be received after calling this method.
// All messages that are already in the buffer are discarded.
Stop()

// Drain unsubscribes from the stream and cancels subscription.
// All messages that are already in the buffer will be processed in callback function.
Drain()
}

// MessageHandler is a handler function used as callback in [Consume]
Expand Down Expand Up @@ -97,7 +114,9 @@ type (
hbMonitor *hbMonitor
fetchInProgress uint32
closed uint32
draining uint32
done chan struct{}
drained chan struct{}
connStatusChanged chan nats.Status
fetchNext chan *pullRequest
consumeOpts *consumeOpts
Expand Down Expand Up @@ -240,6 +259,14 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
if err != nil {
return nil, err
}
sub.subscription.SetClosedHandler(func(sid string) func(string) {
return func(subject string) {
p.Lock()
defer p.Unlock()
delete(p.subscriptions, sid)
atomic.CompareAndSwapUint32(&sub.draining, 1, 0)
}
}(sub.id))

sub.Lock()
// initial pull
Expand Down Expand Up @@ -352,6 +379,8 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (
sub.resetPendingMsgs()
}
sub.Unlock()
case <-sub.done:
return
}
}
}()
Expand Down Expand Up @@ -438,6 +467,7 @@ func (p *pullConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error
id: consumeID,
consumer: p,
done: make(chan struct{}, 1),
drained: make(chan struct{}, 1),
msgs: msgs,
errs: make(chan error, 1),
fetchNext: make(chan *pullRequest, 1),
Expand All @@ -450,28 +480,42 @@ func (p *pullConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error
p.Unlock()
return nil, err
}
sub.subscription.SetClosedHandler(func(sid string) func(string) {
return func(subject string) {
p.Lock()
defer p.Unlock()
if atomic.LoadUint32(&sub.draining) != 1 {
// if we're not draining, subscription can be closed as soon
// as closed handler is called
// otherwise, we need to wait until all messages are drained
// in Next
delete(p.subscriptions, sid)
}
close(msgs)
}
}(sub.id))

go func() {
<-sub.done
sub.cleanup()
}()
p.subscriptions[sub.id] = sub
p.Unlock()

go sub.pullMessages(subject)

go func() {
for {
status, ok := <-sub.connStatusChanged
if !ok {
select {
case status, ok := <-sub.connStatusChanged:
if !ok {
return
Jarema marked this conversation as resolved.
Show resolved Hide resolved
}
if status == nats.CONNECTED {
sub.errs <- errConnected
}
if status == nats.RECONNECTING {
sub.errs <- errDisconnected
}
case <-sub.done:
return
}
if status == nats.CONNECTED {
sub.errs <- errConnected
}
if status == nats.RECONNECTING {
sub.errs <- errDisconnected
}
}
}()

Expand All @@ -486,7 +530,9 @@ var (
func (s *pullSubscription) Next() (Msg, error) {
s.Lock()
defer s.Unlock()
if atomic.LoadUint32(&s.closed) == 1 {
drainMode := atomic.LoadUint32(&s.draining) == 1
closed := atomic.LoadUint32(&s.closed) == 1
if closed && !drainMode {
return nil, ErrMsgIteratorClosed
}
hbMonitor := s.scheduleHeartbeatCheck(2 * s.consumeOpts.Heartbeat)
Expand All @@ -506,8 +552,18 @@ func (s *pullSubscription) Next() (Msg, error) {
s.checkPending()
select {
case <-s.done:
drainMode := atomic.LoadUint32(&s.draining) == 1
if drainMode {
continue
}
return nil, ErrMsgIteratorClosed
case msg := <-s.msgs:
case msg, ok := <-s.msgs:
if !ok {
// if msgs channel is closed, it means that subscription was either drained or stopped
delete(s.consumer.subscriptions, s.id)
atomic.CompareAndSwapUint32(&s.draining, 1, 0)
return nil, ErrMsgIteratorClosed
}
if hbMonitor != nil {
hbMonitor.Reset(2 * s.consumeOpts.Heartbeat)
}
Expand Down Expand Up @@ -650,6 +706,21 @@ func (s *pullSubscription) Stop() {
}
}

func (s *pullSubscription) Drain() {
if !atomic.CompareAndSwapUint32(&s.closed, 0, 1) {
return
}
atomic.StoreUint32(&s.draining, 1)
close(s.done)
if s.consumeOpts.stopAfterMsgsLeft != nil {
if s.delivered >= s.consumeOpts.StopAfter {
close(s.consumeOpts.stopAfterMsgsLeft)
} else {
s.consumeOpts.stopAfterMsgsLeft <- s.consumeOpts.StopAfter - s.delivered
}
}
}

// Fetch sends a single request to retrieve given number of messages.
// It will wait up to provided expiry time if not all messages are available.
func (p *pullConsumer) Fetch(batch int, opts ...FetchOpt) (MessageBatch, error) {
Expand Down Expand Up @@ -834,18 +905,20 @@ func (s *pullSubscription) scheduleHeartbeatCheck(dur time.Duration) *hbMonitor
}

func (s *pullSubscription) cleanup() {
s.consumer.Lock()
defer s.consumer.Unlock()
if s.subscription == nil {
s.Lock()
defer s.Unlock()
if s.subscription == nil || !s.subscription.IsValid() {
return
}
if s.hbMonitor != nil {
s.hbMonitor.Stop()
}
s.subscription.Unsubscribe()
close(s.connStatusChanged)
s.subscription = nil
delete(s.consumer.subscriptions, s.id)
drainMode := atomic.LoadUint32(&s.draining) == 1
if drainMode {
s.subscription.Drain()
} else {
s.subscription.Unsubscribe()
}
atomic.StoreUint32(&s.closed, 1)
}

Expand Down
14 changes: 10 additions & 4 deletions jetstream/test/jetstream_test.go
Expand Up @@ -426,10 +426,16 @@ func TestCreateStreamMirrorCrossDomains(t *testing.T) {
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

if lStream.CachedInfo().State.Msgs != 3 {
t.Fatalf("Expected 3 msgs in stream; got: %d", lStream.CachedInfo().State.Msgs)
}
checkFor(t, 2*time.Second, 15*time.Millisecond, func() error {
info, err := lStream.Info(ctx)
if err != nil {
return fmt.Errorf("Unexpected error when getting stream info: %v", err)
}
if info.State.Msgs != 3 {
return fmt.Errorf("Expected 3 msgs in stream; got: %d", lStream.CachedInfo().State.Msgs)
}
return nil
})

rjs, err := jetstream.NewWithDomain(lnc, "HUB")
if err != nil {
Expand Down