Skip to content

Commit

Permalink
Merge pull request #837 from nats-io/js_fix_fc
Browse files Browse the repository at this point in the history
[FIXED] JetStream flow control may stall in some conditions
  • Loading branch information
kozlovic committed Oct 5, 2021
2 parents 8c2b0bf + c564a49 commit 6305227
Show file tree
Hide file tree
Showing 3 changed files with 113 additions and 19 deletions.
80 changes: 75 additions & 5 deletions js.go
Expand Up @@ -94,6 +94,14 @@ const (

// Scale for threshold of missed HBs or lack of activity.
hbcThresh = 2

// For ChanSubscription, we can't update sub.delivered as we do for other
// type of subscriptions, since the channel is user provided.
// With flow control in play, we will check for flow control on incoming
// messages (as opposed to when they are delivered), but also from a go
// routine. Without this, the subscription would possibly stall until
// a new message or heartbeat/fc are received.
chanSubFCCheckInterval = 250 * time.Millisecond
)

// Types of control messages, so far heartbeat and flow control
Expand Down Expand Up @@ -897,6 +905,8 @@ type jsSub struct {
cmeta string
fcr string
fcd uint64
fciseq uint64
csfct *time.Timer
}

// Deletes the JS Consumer.
Expand Down Expand Up @@ -1409,15 +1419,27 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
}
if !isPullMode {
// We can't reuse the channel, so if one was passed, we need to create a new one.
if ch != nil {
if isSync {
ch = make(chan *Msg, cap(ch))
} else if ch != nil {
// User provided (ChanSubscription), simply try to drain it.
for done := false; !done; {
select {
case <-ch:
default:
done = true
}
}
}
jsi.deliver = deliver
jsi.hbi = info.Config.Heartbeat
// Recreate the subscription here.
sub, err = nc.subscribe(jsi.deliver, queue, cb, ch, isSync, jsi)
if err != nil {
return nil, err
}
hasFC = info.Config.FlowControl
hasHeartbeats = info.Config.Heartbeat > 0
}
} else {
if cinfo.Error.Code == 404 {
Expand All @@ -1442,10 +1464,44 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync,
if hasHeartbeats {
sub.scheduleHeartbeatCheck()
}
// For ChanSubscriptions, if we know that there is flow control, we will
// start a go routine that evaluates the number of delivered messages
// and process flow control.
if sub.Type() == ChanSubscription && hasFC {
sub.chanSubcheckForFlowControlResponse()
}

return sub, nil
}

// This long-lived routine is used per ChanSubscription to check
// on the number of delivered messages and check for flow control response.
func (sub *Subscription) chanSubcheckForFlowControlResponse() {
sub.mu.Lock()
// We don't use defer since if we need to send an RC reply, we need
// to do it outside the sub's lock. So doing explicit unlock...
if sub.closed {
sub.mu.Unlock()
return
}
var fcReply string
var nc *Conn

jsi := sub.jsi
if jsi.csfct == nil {
jsi.csfct = time.AfterFunc(chanSubFCCheckInterval, sub.chanSubcheckForFlowControlResponse)
} else {
fcReply = sub.checkForFlowControlResponse()
nc = sub.conn
// Do the reset here under the lock, it's ok...
jsi.csfct.Reset(chanSubFCCheckInterval)
}
sub.mu.Unlock()
// This call will return an error (which we don't care here)
// if nc is nil or fcReply is empty.
nc.Publish(fcReply, nil)
}

// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
Expand Down Expand Up @@ -1488,8 +1544,11 @@ func isJSControlMessage(msg *Msg) (bool, int) {

// Keeps track of the incoming message's reply subject so that the consumer's
// state (deliver sequence, etc..) can be checked against heartbeats.
// We will also bump the incoming data message sequence that is used in FC cases.
// Runs under the subscription lock
func (sub *Subscription) trackSequences(reply string) {
// For flow control, keep track of incoming message sequence.
sub.jsi.fciseq++
sub.jsi.cmeta = reply
}

Expand Down Expand Up @@ -1626,13 +1685,25 @@ func (sub *Subscription) resetOrderedConsumer(sseq uint64) {
}()
}

// For jetstream subscriptions, returns the number of delivered messages.
// For ChanSubscription, this value is computed based on the known number
// of messages added to the channel minus the current size of that channel.
// Lock held on entry
func (sub *Subscription) getJSDelivered() uint64 {
if sub.typ == ChanSubscription {
return sub.jsi.fciseq - uint64(len(sub.mch))
}
return sub.delivered
}

// checkForFlowControlResponse will check to see if we should send a flow control response
// based on the subscription current delivered index and the target.
// Runs under subscription lock
func (sub *Subscription) checkForFlowControlResponse() string {
// Caller has verified that there is a sub.jsi and fc
jsi := sub.jsi
if jsi.fcd == sub.delivered {
jsi.active = true
if sub.getJSDelivered() >= jsi.fcd {
fcr := jsi.fcr
jsi.fcr, jsi.fcd = _EMPTY_, 0
return fcr
Expand All @@ -1642,9 +1713,8 @@ func (sub *Subscription) checkForFlowControlResponse() string {

// Record an inbound flow control message.
// Runs under subscription lock
func (sub *Subscription) scheduleFlowControlResponse(dfuture uint64, reply string) {
jsi := sub.jsi
jsi.fcr, jsi.fcd = reply, dfuture
func (sub *Subscription) scheduleFlowControlResponse(reply string) {
sub.jsi.fcr, sub.jsi.fcd = reply, sub.jsi.fciseq
}

// Checks for activity from our consumer.
Expand Down
18 changes: 12 additions & 6 deletions js_test.go
Expand Up @@ -105,7 +105,10 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
} else {
chunk = msg[i : i+chunkSize]
}
js.PublishAsync("a", chunk)
msg := NewMsg("a")
msg.Data = chunk
msg.Header.Set("data", "true")
js.PublishMsgAsync(msg)
}
js.PublishAsync("a", nil) // eof

Expand Down Expand Up @@ -171,7 +174,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
t.Fatalf("Objects do not match")
}
case <-time.After(5 * time.Second):
t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs)
t.Fatalf("Did not receive all chunks, only %d of %d total", atomic.LoadUint32(&received), si.State.Msgs-1)
}
}

Expand Down Expand Up @@ -202,7 +205,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
rmsg = append(rmsg, m.Data...)
}
if !done {
t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs)
t.Fatalf("Did not receive all chunks, only %d of %d total", received, si.State.Msgs-1)
}
if rsum := sha256.Sum256(rmsg); rsum != sum {
t.Fatalf("Objects do not match")
Expand All @@ -215,18 +218,19 @@ func TestJetStreamOrderedConsumer(t *testing.T) {

// Now introduce some loss.
singleLoss := func(m *Msg) *Msg {
if rand.Intn(100) <= 10 {
if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ {
nc.removeMsgFilter("a")
return nil
}
return m
}
nc.addMsgFilter("a", singleLoss)
testConsumer()
nc.addMsgFilter("a", singleLoss)
testSyncConsumer()

multiLoss := func(m *Msg) *Msg {
if rand.Intn(100) <= 10 {
if rand.Intn(100) <= 10 && m.Header.Get("data") != _EMPTY_ {
return nil
}
return m
Expand All @@ -246,11 +250,12 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
}
nc.addMsgFilter("a", firstOnly)
testConsumer()
nc.addMsgFilter("a", firstOnly)
testSyncConsumer()

lastOnly := func(m *Msg) *Msg {
if meta, err := m.Metadata(); err == nil {
if meta.Sequence.Stream >= si.State.LastSeq {
if meta.Sequence.Stream >= si.State.LastSeq-1 {
nc.removeMsgFilter("a")
return nil
}
Expand All @@ -259,6 +264,7 @@ func TestJetStreamOrderedConsumer(t *testing.T) {
}
nc.addMsgFilter("a", lastOnly)
testConsumer()
nc.addMsgFilter("a", lastOnly)
testSyncConsumer()
}

Expand Down
34 changes: 26 additions & 8 deletions nats.go
Expand Up @@ -2600,7 +2600,6 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
delivered = s.delivered
if s.jsi != nil {
fcReply = s.checkForFlowControlResponse()
s.jsi.active = true
}
}
s.mu.Unlock()
Expand Down Expand Up @@ -2768,6 +2767,7 @@ func (nc *Conn) processMsg(data []byte) {

// Skip processing if this is a control message.
if !ctrlMsg {
var chanSubCheckFC bool
// Subscription internal stats (applicable only for non ChanSubscription's)
if sub.typ != ChanSubscription {
sub.pMsgs++
Expand All @@ -2784,6 +2784,8 @@ func (nc *Conn) processMsg(data []byte) {
(sub.pBytesLimit > 0 && sub.pBytes > sub.pBytesLimit) {
goto slowConsumer
}
} else if jsi != nil {
chanSubCheckFC = true
}

// We have two modes of delivery. One is the channel, used by channel
Expand Down Expand Up @@ -2811,15 +2813,26 @@ func (nc *Conn) processMsg(data []byte) {
// Store the ACK metadata from the message to
// compare later on with the received heartbeat.
sub.trackSequences(m.Reply)
if chanSubCheckFC {
// For ChanSubscription, since we can't call this when a message
// is "delivered" (since user is pull from their own channel),
// we have a go routine that does this check, however, we do it
// also here to make it much more responsive. The go routine is
// really to avoid stalling when there is no new messages coming.
fcReply = sub.checkForFlowControlResponse()
}
}
} else if ctrlType == jsCtrlFC && m.Reply != _EMPTY_ {
// This is a flow control message.
// If we have no pending, go ahead and send in place.
if sub.pMsgs <= 0 {
// We will schedule the send of the FC reply once we have delivered the
// DATA message that was received before this flow control message, which
// has sequence `jsi.fciseq`. However, it is possible that this message
// has already been delivered, in that case, we need to send the FC reply now.
if sub.getJSDelivered() >= jsi.fciseq {
fcReply = m.Reply
} else {
// Schedule a reply after the previous message is delivered.
sub.scheduleFlowControlResponse(sub.delivered+uint64(sub.pMsgs), m.Reply)
sub.scheduleFlowControlResponse(m.Reply)
}
}

Expand Down Expand Up @@ -3851,9 +3864,15 @@ func (nc *Conn) removeSub(s *Subscription) {
s.mch = nil

// If JS subscription then stop HB timer.
if jsi := s.jsi; jsi != nil && jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
if jsi := s.jsi; jsi != nil {
if jsi.hbc != nil {
jsi.hbc.Stop()
jsi.hbc = nil
}
if jsi.csfct != nil {
jsi.csfct.Stop()
jsi.csfct = nil
}
}

// Mark as invalid
Expand Down Expand Up @@ -4192,7 +4211,6 @@ func (s *Subscription) processNextMsgDelivered(msg *Msg) error {
delivered := s.delivered
if s.jsi != nil {
fcReply = s.checkForFlowControlResponse()
s.jsi.active = true
}

if s.typ == SyncSubscription {
Expand Down

0 comments on commit 6305227

Please sign in to comment.