From 7a1cbc3aa98d5aa05115069428ba1126f9ce4d1d Mon Sep 17 00:00:00 2001 From: Waldemar Quevedo Date: Tue, 13 Apr 2021 17:54:01 -0700 Subject: [PATCH] js: Improvements for push consumers with flow control Signed-off-by: Waldemar Quevedo --- go_test.mod | 2 +- go_test.sum | 2 + js.go | 180 ++++++++++++++++++++++++++++++++++-------------- nats.go | 131 ++++++++++++++--------------------- test/js_test.go | 101 ++++++++++++++++++++++++++- 5 files changed, 283 insertions(+), 133 deletions(-) diff --git a/go_test.mod b/go_test.mod index 72bfb4bd8..72e30d629 100644 --- a/go_test.mod +++ b/go_test.mod @@ -4,7 +4,7 @@ go 1.15 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0 + github.com/nats-io/nats-server/v2 v2.2.3-0.20210501163444-670f44f1e82e github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go_test.sum b/go_test.sum index 9d240804c..7567402c9 100644 --- a/go_test.sum +++ b/go_test.sum @@ -49,6 +49,8 @@ github.com/nats-io/nats-server/v2 v2.2.2-0.20210421215445-a48a39251636 h1:iy6c/t github.com/nats-io/nats-server/v2 v2.2.2-0.20210421215445-a48a39251636/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg= github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0 h1:e2MoeAShQE/oOSjkkV6J6R+l5ugbfkXI5spxgQykgoM= github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg= +github.com/nats-io/nats-server/v2 v2.2.3-0.20210501163444-670f44f1e82e h1:Hvpz1/Epth4q7LnaU0U9SqMFd8grUMFTL8LMO5HFVok= +github.com/nats-io/nats-server/v2 v2.2.3-0.20210501163444-670f44f1e82e/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= diff --git a/js.go b/js.go index 59f933096..2269a0e77 100644 --- a/js.go +++ b/js.go @@ -146,6 +146,7 @@ type js struct { pafs map[string]*pubAckFuture stc chan struct{} dch chan struct{} + rr *rand.Rand } type jsOpts struct { @@ -443,10 +444,11 @@ func (js *js) newAsyncReply() string { return _EMPTY_ } js.rsub = sub + js.rr = rand.New(rand.NewSource(time.Now().UnixNano())) } var sb strings.Builder sb.WriteString(js.rpre) - rn := js.nc.respRand.Int63() + rn := js.rr.Int63() var b [aReplyTokensize]byte for i, l := 0, rn; i < len(b); i++ { b[i] = rdigits[l%base] @@ -850,11 +852,10 @@ type jsSub struct { attached bool // Heartbeats and Flow Control handling from push consumers. - hbs bool - fc bool - - // cmeta is holds metadata from a push consumer when HBs are enabled. - cmeta atomic.Value + hbs bool + fc bool + cmeta string + fcs map[uint64]string } // newFetchReply generates a unique inbox used for a fetch request. @@ -931,12 +932,6 @@ func (jsi *jsSub) fetchNoWait(ctx context.Context, subj string, payload []byte) return msg, nil } -// controlMetadata is metadata used to be able to detect sequence mismatch -// errors in push based consumers that have heartbeats enabled. -type controlMetadata struct { - meta string -} - func (jsi *jsSub) unsubscribe(drainMode bool) error { if drainMode && (jsi.durable || jsi.attached) { // Skip deleting consumer for durables/attached @@ -1096,6 +1091,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync return nil, err } + // With flow control enabled async subscriptions we will disable msgs + // limits, and set a larger pending bytes limit by default. + if !isPullMode && cb != nil && hasFC { + sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit*4) + } + // If we are creating or updating let's process that request. if shouldCreate { // If not set default to ack explicit. @@ -1194,40 +1195,119 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync return sub, nil } -func (nc *Conn) processControlFlow(msg *Msg, s *Subscription, jsi *jsSub) { - // If it is a flow control message then have to ack. - if msg.Reply != "" { - nc.publish(msg.Reply, _EMPTY_, nil, nil) - } else if jsi.hbs { - // Process heartbeat received, get latest control metadata if present. - var ctrl *controlMetadata - cmeta := jsi.cmeta.Load() - if cmeta == nil { - return - } +// ErrConsumerSequenceMismatch represents an error from a consumer +// that received a Heartbeat including sequence different to the +// one expected from the view of the client. +type ErrConsumerSequenceMismatch struct { + // StreamResumeSequence is the stream sequence from where the consumer + // should resume consuming from the stream. + StreamResumeSequence uint64 - ctrl = cmeta.(*controlMetadata) - tokens, err := getMetadataFields(ctrl.meta) - if err != nil { - return - } - // Consumer sequence - dseq := tokens[6] - ldseq := msg.Header.Get(lastConsumerSeqHdr) - - // Detect consumer sequence mismatch and whether - // should restart the consumer. - if ldseq != dseq { - // Dispatch async error including details such as - // from where the consumer could be restarted. - sseq := parseNum(tokens[5]) - ecs := &ErrConsumerSequenceMismatch{ - StreamResumeSequence: uint64(sseq), - ConsumerSequence: parseNum(dseq), - LastConsumerSequence: parseNum(ldseq), - } - nc.handleConsumerSequenceMismatch(s, ecs) + // ConsumerSequence is the sequence of the consumer that is behind. + ConsumerSequence uint64 + + // LastConsumerSequence is the sequence of the consumer when the heartbeat + // was received. + LastConsumerSequence uint64 +} + +func (ecs *ErrConsumerSequenceMismatch) Error() string { + return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d", + ecs.ConsumerSequence, + ecs.LastConsumerSequence-ecs.ConsumerSequence, + ecs.StreamResumeSequence, + ) +} + +// isControlMessage will return true if this is an empty control status message. +func isControlMessage(msg *Msg) bool { + return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg +} + +func (jsi *jsSub) trackSequences(reply string) { + jsi.mu.Lock() + jsi.cmeta = reply + jsi.mu.Unlock() +} + +// checkForFlowControlResponse will check to see if we should send a flow control response +// based on the delivered index. +// Lock should be held. +func (sub *Subscription) checkForFlowControlResponse(delivered uint64) { + jsi, nc := sub.jsi, sub.conn + if jsi == nil { + return + } + + jsi.mu.Lock() + defer jsi.mu.Unlock() + + if len(jsi.fcs) == 0 { + return + } + + if reply := jsi.fcs[delivered]; reply != _EMPTY_ { + delete(jsi.fcs, delivered) + nc.Publish(reply, nil) + } +} + +// Record an inbound flow control message. +func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) { + jsi.mu.Lock() + if jsi.fcs == nil { + jsi.fcs = make(map[uint64]string) + } + jsi.fcs[dfuture] = reply + jsi.mu.Unlock() +} + +// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer. +func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) { + nc.mu.Lock() + errCB := nc.Opts.AsyncErrorCB + if errCB != nil { + nc.ach.push(func() { errCB(nc, sub, err) }) + } + nc.mu.Unlock() +} + +// processControlFlow will automatically respond to control messages sent by the server. +func (nc *Conn) processSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) { + // Process heartbeat received, get latest control metadata if present. + jsi.mu.RLock() + ctrl := jsi.cmeta + jsi.mu.RUnlock() + + if ctrl == _EMPTY_ { + return + } + + tokens, err := getMetadataFields(ctrl) + if err != nil { + return + } + + // Consumer sequence. + var ldseq string + dseq := tokens[6] + hdr := msg.Header[lastConsumerSeqHdr] + if len(hdr) == 1 { + ldseq = hdr[0] + } + + // Detect consumer sequence mismatch and whether + // should restart the consumer. + if ldseq != dseq { + // Dispatch async error including details such as + // from where the consumer could be restarted. + sseq := parseNum(tokens[5]) + ecs := &ErrConsumerSequenceMismatch{ + StreamResumeSequence: uint64(sseq), + ConsumerSequence: uint64(parseNum(dseq)), + LastConsumerSequence: uint64(parseNum(ldseq)), } + nc.handleConsumerSequenceMismatch(s, ecs) } } @@ -1762,12 +1842,12 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin return info.ConsumerInfo, nil } -func (m *Msg) checkReply() (*js, bool, error) { +func (m *Msg) checkReply() (*js, *jsSub, error) { if m == nil || m.Sub == nil { - return nil, false, ErrMsgNotBound + return nil, nil, ErrMsgNotBound } if m.Reply == "" { - return nil, false, ErrMsgNoReply + return nil, nil, ErrMsgNoReply } sub := m.Sub sub.mu.Lock() @@ -1775,13 +1855,13 @@ func (m *Msg) checkReply() (*js, bool, error) { sub.mu.Unlock() // Not using a JS context. - return nil, false, nil + return nil, nil, nil } js := sub.jsi.js - isPullMode := sub.jsi.pull + jsi := sub.jsi sub.mu.Unlock() - return js, isPullMode, nil + return js, jsi, nil } // ackReply handles all acks. Will do the right thing for pull and sync mode. diff --git a/nats.go b/nats.go index 10ad16967..3a10b296f 100644 --- a/nats.go +++ b/nats.go @@ -2539,6 +2539,9 @@ func (nc *Conn) waitForMsgs(s *Subscription) { if !s.closed { s.delivered++ delivered = s.delivered + if s.jsi != nil && s.jsi.fc && len(s.jsi.fcs) > 0 { + s.checkForFlowControlResponse(delivered) + } } s.mu.Unlock() @@ -2607,8 +2610,9 @@ func (nc *Conn) processMsg(data []byte) { // Check if we have headers encoded here. var h Header var err error - var ctrl bool + var ctrlMsg bool var hasFC bool + var hasHBs bool if nc.ps.ma.hdr > 0 { hbuf := msgPayload[:nc.ps.ma.hdr] @@ -2630,40 +2634,40 @@ func (nc *Conn) processMsg(data []byte) { sub.mu.Lock() - // Skip flow control messages in case of using a JetStream context. - jsi := sub.jsi - if jsi != nil { - ctrl = isControlMessage(m) - hasFC = jsi.fc - } - // Check if closed. if sub.closed { sub.mu.Unlock() return } - // Subscription internal stats (applicable only for non ChanSubscription's) - if sub.typ != ChanSubscription { - sub.pMsgs++ - if sub.pMsgs > sub.pMsgsMax { - sub.pMsgsMax = sub.pMsgs - } - sub.pBytes += len(m.Data) - if sub.pBytes > sub.pBytesMax { - sub.pBytesMax = sub.pBytes - } + // Skip flow control messages in case of using a JetStream context. + jsi := sub.jsi + if jsi != nil { + ctrlMsg, hasHBs, hasFC = isControlMessage(m), jsi.hbs, jsi.fc + } + + // Skip processing if this is a control message. + if !ctrlMsg { + // Subscription internal stats (applicable only for non ChanSubscription's) + if sub.typ != ChanSubscription { + sub.pMsgs++ + if sub.pMsgs > sub.pMsgsMax { + sub.pMsgsMax = sub.pMsgs + } + sub.pBytes += len(m.Data) + if sub.pBytes > sub.pBytesMax { + sub.pBytesMax = sub.pBytes + } - // Check for a Slow Consumer - if (sub.pMsgsLimit > 0 && sub.pMsgs > sub.pMsgsLimit) || - (sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) { - goto slowConsumer + // Check for a Slow Consumer + if (sub.pMsgsLimit > 0 && sub.pMsgs > sub.pMsgsLimit) || + (sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) { + goto slowConsumer + } } - } - // We have two modes of delivery. One is the channel, used by channel - // subscribers and syncSubscribers, the other is a linked list for async. - if !ctrl { + // We have two modes of delivery. One is the channel, used by channel + // subscribers and syncSubscribers, the other is a linked list for async. if sub.mch != nil { select { case sub.mch <- m: @@ -2683,8 +2687,19 @@ func (nc *Conn) processMsg(data []byte) { sub.pTail = m } } - if hasFC { - jsi.trackSequences(m) + if jsi != nil && hasHBs { + // Store the ACK metadata from the message to + // compare later on with the received heartbeat. + jsi.trackSequences(m.Reply) + } + } else if hasFC && m.Reply != _EMPTY_ { + // This is a flow control message. + // If we have no pending, go ahead and send in place. + if sub.pMsgs == 0 { + nc.Publish(m.Reply, nil) + } else { + // Schedule a reply after the previous message is delivered. + jsi.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) } } @@ -2693,10 +2708,9 @@ func (nc *Conn) processMsg(data []byte) { sub.mu.Unlock() - // Handle flow control and heartbeat messages automatically - // for JetStream Push consumers. - if ctrl { - nc.processControlFlow(m, sub, jsi) + // Handle control heartbeat messages. + if ctrlMsg && hasHBs && m.Reply == _EMPTY_ { + nc.processSequenceMismatch(m, sub, jsi) } return @@ -3881,55 +3895,6 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { return nil } -// ErrConsumerSequenceMismatch represents an error from a consumer -// that received a Heartbeat including sequence different to the -// one expected from the view of the client. -type ErrConsumerSequenceMismatch struct { - // StreamResumeSequence is the stream sequence from where the consumer - // should resume consuming from the stream. - StreamResumeSequence uint64 - - // ConsumerSequence is the sequence of the consumer that is behind. - ConsumerSequence int64 - - // LastConsumerSequence is the sequence of the consumer when the heartbeat - // was received. - LastConsumerSequence int64 -} - -func (ecs *ErrConsumerSequenceMismatch) Error() string { - return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d", - ecs.ConsumerSequence, - ecs.LastConsumerSequence-ecs.ConsumerSequence, - ecs.StreamResumeSequence, - ) -} - -// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer. -func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) { - nc.mu.Lock() - errCB := nc.Opts.AsyncErrorCB - if errCB != nil { - nc.ach.push(func() { errCB(nc, sub, err) }) - } - nc.mu.Unlock() -} - -func isControlMessage(msg *Msg) bool { - return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg -} - -func (jsi *jsSub) trackSequences(msg *Msg) { - var ctrl *controlMetadata - if cmeta := jsi.cmeta.Load(); cmeta == nil { - ctrl = &controlMetadata{} - } else { - ctrl = cmeta.(*controlMetadata) - } - ctrl.meta = msg.Reply - jsi.cmeta.Store(ctrl) -} - // NextMsg will return the next message available to a synchronous subscriber // or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription), // the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout), @@ -4037,6 +4002,10 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { // Update some stats. s.delivered++ delivered := s.delivered + if s.jsi != nil && s.jsi.fc && len(s.jsi.fcs) > 0 { + s.checkForFlowControlResponse(delivered) + } + if s.typ == SyncSubscription { s.pMsgs-- s.pBytes -= len(msg.Data) diff --git a/test/js_test.go b/test/js_test.go index aa6a3572c..206ab115e 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -1057,7 +1057,7 @@ func TestJetStreamPushFlowControlHeartbeats_SubscribeSync(t *testing.T) { t.Fatalf("Unexpected empty message: %+v", m) } - if err := m.Ack(); err != nil { + if err := m.AckSync(); err != nil { t.Fatalf("Error on ack message: %v", err) } recvd++ @@ -1421,6 +1421,105 @@ Loop: }) } +func TestJetStreamPushFlowControl_SubscribeAsyncAndChannel(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errCh := make(chan error) + errHandler := nats.ErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { + errCh <- err + }) + nc, err := nats.Connect(s.ClientURL(), errHandler) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + const totalMsgs = 16536 * 2 + + js, err := nc.JetStream(nats.PublishAsyncMaxPending(totalMsgs)) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + go func() { + payload := strings.Repeat("O", 4096) + for i := 0; i < totalMsgs; i++ { + js.PublishAsync("foo", []byte(payload)) + } + }() + + // Small channel that blocks and then buffered channel that can deliver all + // messages without blocking. + recvd := make(chan *nats.Msg, 64) + delivered := make(chan *nats.Msg, totalMsgs) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Dispatch channel consumer + go func() { + for m := range recvd { + select { + case <-ctx.Done(): + return + default: + } + + // Ack of Ack to cause head of line blocking. + m.AckSync() + delivered <- m + + if len(delivered) == totalMsgs { + cancel() + } + } + }() + + sub, err := js.Subscribe("foo", func(msg *nats.Msg) { + // Cause bottleneck by having channel block when full + // because of work taking long. + recvd <- msg + }, nats.ManualAck(), nats.EnableFlowControl()) + + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + <-ctx.Done() + + got := len(delivered) + expected := totalMsgs + if got != expected { + t.Errorf("Expected %d messages, got: %d", expected, got) + } + + // Wait for a couple of heartbeats to arrive and confirm there is no error. + select { + case <-time.After(1 * time.Second): + case err := <-errCh: + t.Errorf("error handler: %v", err) + } +} + func TestJetStream_Drain(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown()