Skip to content

Commit

Permalink
js: Improvements for push consumers with flow control
Browse files Browse the repository at this point in the history
- Move responding to flow control messages to be done after latest Ack

- Set larger pending limits for async subscribers with flow control

Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Apr 14, 2021
1 parent 1a653a7 commit d1342e5
Show file tree
Hide file tree
Showing 3 changed files with 254 additions and 116 deletions.
195 changes: 137 additions & 58 deletions js.go
Expand Up @@ -20,6 +20,7 @@ import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"net/http"
"strconv"
"strings"
Expand Down Expand Up @@ -140,12 +141,13 @@ type js struct {
opts *jsOpts

// For async publish context.
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
mu sync.RWMutex
rpre string
rsub *Subscription
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
respRand *rand.Rand
}

type jsOpts struct {
Expand Down Expand Up @@ -422,10 +424,11 @@ func (js *js) newAsyncReply() string {
return _EMPTY_
}
js.rsub = sub
js.respRand = rand.New(rand.NewSource(time.Now().UnixNano()))
}
var sb strings.Builder
sb.WriteString(js.rpre)
rn := js.nc.respRand.Int63()
rn := js.respRand.Int63()
var b [aReplyTokensize]byte
for i, l := 0, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
Expand Down Expand Up @@ -820,17 +823,11 @@ type jsSub struct {
attached bool

// Heartbeats and Flow Control handling from push consumers.
hbs bool
fc bool

// cmeta is holds metadata from a push consumer when HBs are enabled.
cmeta atomic.Value
}

// controlMetadata is metadata used to be able to detect sequence mismatch
// errors in push based consumers that have heartbeats enabled.
type controlMetadata struct {
meta string
mu sync.RWMutex
hbs bool
fc bool
cmeta string
fcInbox string
}

func (jsi *jsSub) unsubscribe(drainMode bool) error {
Expand Down Expand Up @@ -984,6 +981,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
if err != nil {
return nil, err
}

// With flow control enabled async subscriptions we will disable msgs limits,
// and set a larger pending bytes limit by default.
if cb != nil && hasFC {
sub.SetPendingLimits(-1, DefaultSubPendingBytesLimit*4)
}
}

// If we are creating or updating let's process that request.
Expand All @@ -997,7 +1000,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
// TODO(dlc) - We should be able to update this if client updates PendingLimits.
if cfg.MaxAckPending == 0 && cfg.AckPolicy != AckNonePolicy {
maxMsgs, _, _ := sub.PendingLimits()
cfg.MaxAckPending = maxMsgs
if maxMsgs > 0 {
cfg.MaxAckPending = maxMsgs
}
}

req := &createConsumerRequest{
Expand Down Expand Up @@ -1053,40 +1058,99 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
return sub, nil
}

func (nc *Conn) processControlFlow(msg *Msg, s *Subscription, jsi *jsSub) {
// If it is a flow control message then have to ack.
if msg.Reply != "" {
nc.publish(msg.Reply, _EMPTY_, nil, nil)
} else if jsi.hbs {
// Process heartbeat received, get latest control metadata if present.
var ctrl *controlMetadata
cmeta := jsi.cmeta.Load()
if cmeta == nil {
return
}
// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
type ErrConsumerSequenceMismatch struct {
// StreamResumeSequence is the stream sequence from where the consumer
// should resume consuming from the stream.
StreamResumeSequence uint64

ctrl = cmeta.(*controlMetadata)
tokens, err := getMetadataFields(ctrl.meta)
if err != nil {
return
}
// Consumer sequence
dseq := tokens[6]
ldseq := msg.Header.Get(lastConsumerSeqHdr)

// Detect consumer sequence mismatch and whether
// should restart the consumer.
if ldseq != dseq {
// Dispatch async error including details such as
// from where the consumer could be restarted.
sseq := parseNum(tokens[5])
ecs := &ErrConsumerSequenceMismatch{
StreamResumeSequence: uint64(sseq),
ConsumerSequence: parseNum(dseq),
LastConsumerSequence: parseNum(ldseq),
}
nc.handleConsumerSequenceMismatch(s, ecs)
// ConsumerSequence is the sequence of the consumer that is behind.
ConsumerSequence uint64

// LastConsumerSequence is the sequence of the consumer when the heartbeat
// was received.
LastConsumerSequence uint64
}

func (ecs *ErrConsumerSequenceMismatch) Error() string {
return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
ecs.ConsumerSequence,
ecs.LastConsumerSequence-ecs.ConsumerSequence,
ecs.StreamResumeSequence,
)
}

// isControlMessage will return true if this is an empty control status message.
func isControlMessage(msg *Msg) bool {
if len(msg.Data) > 0 {
return false
}
// We know that the server will always send messages
// using canonical names so can check from map directly.
hdr := msg.Header[statusHdr]
return len(hdr) == 1 && hdr[0] == controlMsg
}

func (jsi *jsSub) trackSequences(reply string) {
jsi.mu.Lock()
jsi.cmeta = reply
jsi.mu.Unlock()
}

func (jsi *jsSub) scheduleFlowControlResponse(reply string) {
jsi.mu.Lock()
jsi.fcInbox = reply
jsi.mu.Unlock()
}

// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, err) })
}
nc.mu.Unlock()
}

// processControlFlow will automatically respond to control messages sent by the server.
func (nc *Conn) processSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
// Process heartbeat received, get latest control metadata if present.
jsi.mu.RLock()
ctrl := jsi.cmeta
jsi.mu.RUnlock()

if ctrl == "" {
return
}

tokens, err := getMetadataFields(ctrl)
if err != nil {
return
}

// Consumer sequence.
var ldseq string
dseq := tokens[6]
hdr := msg.Header[lastConsumerSeqHdr]
if len(hdr) == 1 {
ldseq = hdr[0]
}

// Detect consumer sequence mismatch and whether
// should restart the consumer.
if ldseq != dseq {
// Dispatch async error including details such as
// from where the consumer could be restarted.
sseq := parseNum(tokens[5])
ecs := &ErrConsumerSequenceMismatch{
StreamResumeSequence: uint64(sseq),
ConsumerSequence: uint64(parseNum(dseq)),
LastConsumerSequence: uint64(parseNum(ldseq)),
}
nc.handleConsumerSequenceMismatch(s, ecs)
}
}

Expand Down Expand Up @@ -1619,26 +1683,26 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return info.ConsumerInfo, nil
}

func (m *Msg) checkReply() (*js, bool, error) {
func (m *Msg) checkReply() (*js, *jsSub, error) {
if m == nil || m.Sub == nil {
return nil, false, ErrMsgNotBound
return nil, nil, ErrMsgNotBound
}
if m.Reply == "" {
return nil, false, ErrMsgNoReply
return nil, nil, ErrMsgNoReply
}
sub := m.Sub
sub.mu.Lock()
if sub.jsi == nil {
sub.mu.Unlock()

// Not using a JS context.
return nil, false, nil
return nil, nil, nil
}
js := sub.jsi.js
isPullMode := sub.jsi.pull
jsi := sub.jsi
sub.mu.Unlock()

return js, isPullMode, nil
return js, jsi, nil
}

// ackReply handles all acks. Will do the right thing for pull and sync mode.
Expand All @@ -1652,7 +1716,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
}
}

js, _, err := m.checkReply()
js, jsi, err := m.checkReply()
if err != nil {
return err
}
Expand Down Expand Up @@ -1693,6 +1757,21 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
atomic.StoreUint32(&m.ackd, 1)
}

// Check if has to reply to flow control.
if jsi != nil && jsi.fc {
jsi.mu.Lock()
fcInbox := jsi.fcInbox
doFC := fcInbox != ""
if doFC {
jsi.fcInbox = ""
}
jsi.mu.Unlock()

if doFC {
nc.Publish(fcInbox, nil)
}
}

return err
}

Expand Down
68 changes: 13 additions & 55 deletions nats.go
Expand Up @@ -2470,6 +2470,7 @@ func (nc *Conn) processMsg(data []byte) {
var err error
var ctrl bool
var hasFC bool
var hasHBs bool

if nc.ps.ma.hdr > 0 {
hbuf := msgPayload[:nc.ps.ma.hdr]
Expand All @@ -2495,6 +2496,7 @@ func (nc *Conn) processMsg(data []byte) {
jsi := sub.jsi
if jsi != nil {
ctrl = isControlMessage(m)
hasHBs = jsi.hbs
hasFC = jsi.fc
}

Expand Down Expand Up @@ -2544,20 +2546,25 @@ func (nc *Conn) processMsg(data []byte) {
sub.pTail = m
}
}
if hasFC {
jsi.trackSequences(m)
if hasHBs {
// Store the ACK metadata from the message to
// compare later on with the received heartbeat.
jsi.trackSequences(m.Reply)
}
} else if hasFC && m.Reply != "" {
// This is a flow control message, we will make
// a reply after the next ACK from client.
jsi.scheduleFlowControlResponse(m.Reply)
}

// Clear SlowConsumer status.
sub.sc = false

sub.mu.Unlock()

// Handle flow control and heartbeat messages automatically
// for JetStream Push consumers.
if ctrl {
nc.processControlFlow(m, sub, jsi)
// Handle control heartbeat messages.
if ctrl && hasHBs && m.Reply == "" {
nc.processSequenceMismatch(m, sub, jsi)
}

return
Expand Down Expand Up @@ -3720,55 +3727,6 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
return nil
}

// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
type ErrConsumerSequenceMismatch struct {
// StreamResumeSequence is the stream sequence from where the consumer
// should resume consuming from the stream.
StreamResumeSequence uint64

// ConsumerSequence is the sequence of the consumer that is behind.
ConsumerSequence int64

// LastConsumerSequence is the sequence of the consumer when the heartbeat
// was received.
LastConsumerSequence int64
}

func (ecs *ErrConsumerSequenceMismatch) Error() string {
return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
ecs.ConsumerSequence,
ecs.LastConsumerSequence-ecs.ConsumerSequence,
ecs.StreamResumeSequence,
)
}

// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, err) })
}
nc.mu.Unlock()
}

func isControlMessage(msg *Msg) bool {
return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
}

func (jsi *jsSub) trackSequences(msg *Msg) {
var ctrl *controlMetadata
if cmeta := jsi.cmeta.Load(); cmeta == nil {
ctrl = &controlMetadata{}
} else {
ctrl = cmeta.(*controlMetadata)
}
ctrl.meta = msg.Reply
jsi.cmeta.Store(ctrl)
}

// NextMsg will return the next message available to a synchronous subscriber
// or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription),
// the connection is closed (ErrConnectionClosed), or the timeout is reached (ErrTimeout).
Expand Down

0 comments on commit d1342e5

Please sign in to comment.