From 66ba98e14f1e04b491d564c2098a5b5fc3343e48 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Mon, 4 Oct 2021 08:55:57 -0600 Subject: [PATCH 1/3] [FIXED] JetStream flow control may stall in some conditions Signed-off-by: Ivan Kozlovic --- js.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++++---- nats.go | 22 +++++++++++++---- 2 files changed, 88 insertions(+), 10 deletions(-) diff --git a/js.go b/js.go index 4d4adbdac..9f02350e2 100644 --- a/js.go +++ b/js.go @@ -94,6 +94,14 @@ const ( // Scale for threshold of missed HBs or lack of activity. hbcThresh = 2 + + // For ChanSubscription, we can't update sub.delivered as we do for other + // type of subscriptions, since the channel is user provided. + // With flow control in play, we will check for flow control on incoming + // messages (as opposed to when they are delivered), but also from a go + // routine. Without this, the subscription would possibly stall until + // a new message or heartbeat/fc are received. + chanSubFCCheckInterval = 250 * time.Millisecond ) // Types of control messages, so far heartbeat and flow control @@ -897,6 +905,7 @@ type jsSub struct { cmeta string fcr string fcd uint64 + fciseq uint64 } // Deletes the JS Consumer. @@ -1409,15 +1418,27 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, } if !isPullMode { // We can't reuse the channel, so if one was passed, we need to create a new one. - if ch != nil { + if isSync { ch = make(chan *Msg, cap(ch)) + } else if ch != nil { + // User provided (ChanSubscription), simply try to drain it. + for done := false; !done; { + select { + case <-ch: + default: + done = true + } + } } jsi.deliver = deliver + jsi.hbi = info.Config.Heartbeat // Recreate the subscription here. sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi) if err != nil { return nil, err } + hasFC = info.Config.FlowControl + hasHeartbeats = info.Config.Heartbeat > 0 } } else { if cinfo.Error.Code == 404 { @@ -1442,10 +1463,41 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, if hasHeartbeats { sub.scheduleHeartbeatCheck() } + // For ChanSubscriptions, if we know that there is flow control, we will + // start a go routine that evaluates the number of delivered messages + // and process flow control. + if sub.Type() == ChanSubscription && hasFC { + go sub.chanSubcheckForFlowControlResponse() + } return sub, nil } +// This long-lived routine is used per ChanSubscription to check +// on the number of delivered messages and check for flow control response. +func (sub *Subscription) chanSubcheckForFlowControlResponse() { + sub.mu.Lock() + nc := sub.conn + sub.mu.Unlock() + if nc == nil { + return + } + t := time.NewTicker(chanSubFCCheckInterval) + for range t.C { + sub.mu.Lock() + if sub.closed { + sub.mu.Unlock() + t.Stop() + return + } + fcReply := sub.checkForFlowControlResponse() + sub.mu.Unlock() + if fcReply != _EMPTY_ { + nc.Publish(fcReply, nil) + } + } +} + // ErrConsumerSequenceMismatch represents an error from a consumer // that received a Heartbeat including sequence different to the // one expected from the view of the client. @@ -1488,8 +1540,11 @@ func isJSControlMessage(msg *Msg) (bool, int) { // Keeps track of the incoming message's reply subject so that the consumer's // state (deliver sequence, etc..) can be checked against heartbeats. +// We will also bump the incoming data message sequence that is used in FC cases. // Runs under the subscription lock func (sub *Subscription) trackSequences(reply string) { + // For flow control, keep track of incoming message sequence. + sub.jsi.fciseq++ sub.jsi.cmeta = reply } @@ -1626,13 +1681,25 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) { }() } +// For jetstream subscriptions, returns the number of delivered messages. +// For ChanSubscription, this value is computed based on the known number +// of messages added to the channel minus the current size of that channel. +// Lock held on entry +func (sub *Subscription) getJSDelivered() uint64 { + if sub.typ == ChanSubscription { + return sub.jsi.fciseq - uint64(len(sub.mch)) + } + return sub.delivered +} + // checkForFlowControlResponse will check to see if we should send a flow control response // based on the subscription current delivered index and the target. // Runs under subscription lock func (sub *Subscription) checkForFlowControlResponse() string { // Caller has verified that there is a sub.jsi and fc jsi := sub.jsi - if jsi.fcd == sub.delivered { + jsi.active = true + if sub.getJSDelivered() >= jsi.fcd { fcr := jsi.fcr jsi.fcr, jsi.fcd = _EMPTY_, 0 return fcr @@ -1642,9 +1709,8 @@ func (sub *Subscription) checkForFlowControlResponse() string { // Record an inbound flow control message. // Runs under subscription lock -func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) { - jsi := sub.jsi - jsi.fcr, jsi.fcd = reply, dfuture +func (sub *Subscription) scheduleFlowControlResponse(reply string) { + sub.jsi.fcr, sub.jsi.fcd = reply, sub.jsi.fciseq } // Checks for activity from our consumer. diff --git a/nats.go b/nats.go index ed8506eb2..f8d202f3b 100644 --- a/nats.go +++ b/nats.go @@ -2600,7 +2600,6 @@ func (nc *Conn) waitForMsgs(s *Subscription) { delivered = s.delivered if s.jsi != nil { fcReply = s.checkForFlowControlResponse() - s.jsi.active = true } } s.mu.Unlock() @@ -2768,6 +2767,7 @@ func (nc *Conn) processMsg(data []byte) { // Skip processing if this is a control message. if !ctrlMsg { + var chanSubCheckFC bool // Subscription internal stats (applicable only for non ChanSubscription's) if sub.typ != ChanSubscription { sub.pMsgs++ @@ -2784,6 +2784,8 @@ func (nc *Conn) processMsg(data []byte) { (sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) { goto slowConsumer } + } else if jsi != nil { + chanSubCheckFC = true } // We have two modes of delivery. One is the channel, used by channel @@ -2811,15 +2813,26 @@ func (nc *Conn) processMsg(data []byte) { // Store the ACK metadata from the message to // compare later on with the received heartbeat. sub.trackSequences(m.Reply) + if chanSubCheckFC { + // For ChanSubscription, since we can't call this when a message + // is "delivered" (since user is pull from their own channel), + // we have a go routine that does this check, however, we do it + // also here to make it much more responsive. The go routine is + // really to avoid stalling when there is no new messages coming. + fcReply = sub.checkForFlowControlResponse() + } } } else if ctrlType == jsCtrlFC && m.Reply != _EMPTY_ { // This is a flow control message. - // If we have no pending, go ahead and send in place. - if sub.pMsgs <= 0 { + // We will schedule the send of the FC reply once we have delivered the + // DATA message that was received before this flow control message, which + // has sequence `jsi.fciseq`. However, it is possible that this message + // has already been delivered, in that case, we need to send the FC reply now. + if sub.getJSDelivered() >= jsi.fciseq { fcReply = m.Reply } else { // Schedule a reply after the previous message is delivered. - sub.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply) + sub.scheduleFlowControlResponse(m.Reply) } } @@ -4192,7 +4205,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error { delivered := s.delivered if s.jsi != nil { fcReply = s.checkForFlowControlResponse() - s.jsi.active = true } if s.typ == SyncSubscription { From aedc479e75268ba73a45377e84ebf0a620f3228b Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 5 Oct 2021 09:21:29 -0600 Subject: [PATCH 2/3] Fixed ordered consumer test Several issues: - We send a "EOF" empty message which should not be counted as the number of chunks used to reconstitute the asset - Some "message filters" that are removed as part of the execution of the filter's callback would not be put back for the following "sync" test (we test async then sync). Signed-off-by: Ivan Kozlovic --- js_test.go | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/js_test.go b/js_test.go index 90c03fe64..9685ce154 100644 --- a/js_test.go +++ b/js_test.go @@ -105,7 +105,10 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } else { chunk = msg[i : i+chunkSize] } - js.PublishAsync("a", chunk) + msg := NewMsg("a") + msg.Data = chunk + msg.Header.Set("data", "true") + js.PublishMsgAsync(msg) } js.PublishAsync("a", nil) // eof @@ -171,7 +174,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { t.Fatalf("Objects do not match") } case <-time.After(5 * time.Second): - t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs) + t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs-1) } } @@ -202,7 +205,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { rmsg = append(rmsg, m.Data...) } if !done { - t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs) + t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs-1) } if rsum := sha256.Sum256(rmsg); rsum != sum { t.Fatalf("Objects do not match") @@ -215,7 +218,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { // Now introduce some loss. singleLoss := func(m *Msg) *Msg { - if rand.Intn(100) <= 10 { + if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ { nc.removeMsgFilter("a") return nil } @@ -223,10 +226,11 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } nc.addMsgFilter("a", singleLoss) testConsumer() + nc.addMsgFilter("a", singleLoss) testSyncConsumer() multiLoss := func(m *Msg) *Msg { - if rand.Intn(100) <= 10 { + if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ { return nil } return m @@ -246,11 +250,12 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } nc.addMsgFilter("a", firstOnly) testConsumer() + nc.addMsgFilter("a", firstOnly) testSyncConsumer() lastOnly := func(m *Msg) *Msg { if meta, err := m.Metadata(); err == nil { - if meta.Sequence.Stream >= si.State.LastSeq { + if meta.Sequence.Stream >= si.State.LastSeq-1 { nc.removeMsgFilter("a") return nil } @@ -259,6 +264,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) { } nc.addMsgFilter("a", lastOnly) testConsumer() + nc.addMsgFilter("a", lastOnly) testSyncConsumer() } From c564a49724e3fc54f805cd69bb604333f83fa822 Mon Sep 17 00:00:00 2001 From: Ivan Kozlovic Date: Tue, 5 Oct 2021 15:14:47 -0600 Subject: [PATCH 3/3] Updates based on code review Signed-off-by: Ivan Kozlovic --- js.go | 38 +++++++++++++++++++++----------------- nats.go | 12 +++++++++--- 2 files changed, 30 insertions(+), 20 deletions(-) diff --git a/js.go b/js.go index 9f02350e2..626fcabfd 100644 --- a/js.go +++ b/js.go @@ -906,6 +906,7 @@ type jsSub struct { fcr string fcd uint64 fciseq uint64 + csfct *time.Timer } // Deletes the JS Consumer. @@ -1467,7 +1468,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // start a go routine that evaluates the number of delivered messages // and process flow control. if sub.Type() == ChanSubscription && hasFC { - go sub.chanSubcheckForFlowControlResponse() + sub.chanSubcheckForFlowControlResponse() } return sub, nil @@ -1477,25 +1478,28 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync, // on the number of delivered messages and check for flow control response. func (sub *Subscription) chanSubcheckForFlowControlResponse() { sub.mu.Lock() - nc := sub.conn - sub.mu.Unlock() - if nc == nil { + // We don't use defer since if we need to send an RC reply, we need + // to do it outside the sub's lock. So doing explicit unlock... + if sub.closed { + sub.mu.Unlock() return } - t := time.NewTicker(chanSubFCCheckInterval) - for range t.C { - sub.mu.Lock() - if sub.closed { - sub.mu.Unlock() - t.Stop() - return - } - fcReply := sub.checkForFlowControlResponse() - sub.mu.Unlock() - if fcReply != _EMPTY_ { - nc.Publish(fcReply, nil) - } + var fcReply string + var nc *Conn + + jsi := sub.jsi + if jsi.csfct == nil { + jsi.csfct = time.AfterFunc(chanSubFCCheckInterval, sub.chanSubcheckForFlowControlResponse) + } else { + fcReply = sub.checkForFlowControlResponse() + nc = sub.conn + // Do the reset here under the lock, it's ok... + jsi.csfct.Reset(chanSubFCCheckInterval) } + sub.mu.Unlock() + // This call will return an error (which we don't care here) + // if nc is nil or fcReply is empty. + nc.Publish(fcReply, nil) } // ErrConsumerSequenceMismatch represents an error from a consumer diff --git a/nats.go b/nats.go index f8d202f3b..95a5577df 100644 --- a/nats.go +++ b/nats.go @@ -3864,9 +3864,15 @@ func (nc *Conn) removeSub(s *Subscription) { s.mch = nil // If JS subscription then stop HB timer. - if jsi := s.jsi; jsi != nil && jsi.hbc != nil { - jsi.hbc.Stop() - jsi.hbc = nil + if jsi := s.jsi; jsi != nil { + if jsi.hbc != nil { + jsi.hbc.Stop() + jsi.hbc = nil + } + if jsi.csfct != nil { + jsi.csfct.Stop() + jsi.csfct = nil + } } // Mark as invalid