Skip to content

Commit

Permalink
js: Improvements for push consumers with flow control
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs authored and nsurfer committed May 3, 2021
1 parent aa61ea3 commit 7a1cbc3
Show file tree
Hide file tree
Showing 5 changed files with 283 additions and 133 deletions.
2 changes: 1 addition & 1 deletion go_test.mod
Expand Up @@ -4,7 +4,7 @@ go 1.15

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0
github.com/nats-io/nats-server/v2 v2.2.3-0.20210501163444-670f44f1e82e
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
2 changes: 2 additions & 0 deletions go_test.sum
Expand Up @@ -49,6 +49,8 @@ github.com/nats-io/nats-server/v2 v2.2.2-0.20210421215445-a48a39251636 h1:iy6c/t
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421215445-a48a39251636/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0 h1:e2MoeAShQE/oOSjkkV6J6R+l5ugbfkXI5spxgQykgoM=
github.com/nats-io/nats-server/v2 v2.2.2-0.20210421232642-f2d3f5fb81d0/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg=
github.com/nats-io/nats-server/v2 v2.2.3-0.20210501163444-670f44f1e82e h1:Hvpz1/Epth4q7LnaU0U9SqMFd8grUMFTL8LMO5HFVok=
github.com/nats-io/nats-server/v2 v2.2.3-0.20210501163444-670f44f1e82e/go.mod h1:aF2IwMZdYktJswITm41c/k66uCHjTvpTxGQ7+d4cPeg=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
Expand Down
180 changes: 130 additions & 50 deletions js.go
Expand Up @@ -146,6 +146,7 @@ type js struct {
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
}

type jsOpts struct {
Expand Down Expand Up @@ -443,10 +444,11 @@ func (js *js) newAsyncReply() string {
return _EMPTY_
}
js.rsub = sub
js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
}
var sb strings.Builder
sb.WriteString(js.rpre)
rn := js.nc.respRand.Int63()
rn := js.rr.Int63()
var b [aReplyTokensize]byte
for i, l := 0, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
Expand Down Expand Up @@ -850,11 +852,10 @@ type jsSub struct {
attached bool

// Heartbeats and Flow Control handling from push consumers.
hbs bool
fc bool

// cmeta is holds metadata from a push consumer when HBs are enabled.
cmeta atomic.Value
hbs bool
fc bool
cmeta string
fcs map[uint64]string
}

// newFetchReply generates a unique inbox used for a fetch request.
Expand Down Expand Up @@ -931,12 +932,6 @@ func (jsi *jsSub) fetchNoWait(ctx context.Context, subj string, payload []byte)
return msg, nil
}

// controlMetadata is metadata used to be able to detect sequence mismatch
// errors in push based consumers that have heartbeats enabled.
type controlMetadata struct {
meta string
}

func (jsi *jsSub) unsubscribe(drainMode bool) error {
if drainMode && (jsi.durable || jsi.attached) {
// Skip deleting consumer for durables/attached
Expand Down Expand Up @@ -1096,6 +1091,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
return nil, err
}

// With flow control enabled async subscriptions we will disable msgs
// limits, and set a larger pending bytes limit by default.
if !isPullMode && cb != nil && hasFC {
sub.SetPendingLimits(DefaultSubPendingMsgsLimit*16, DefaultSubPendingBytesLimit*4)
}

// If we are creating or updating let's process that request.
if shouldCreate {
// If not set default to ack explicit.
Expand Down Expand Up @@ -1194,40 +1195,119 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
return sub, nil
}

func (nc *Conn) processControlFlow(msg *Msg, s *Subscription, jsi *jsSub) {
// If it is a flow control message then have to ack.
if msg.Reply != "" {
nc.publish(msg.Reply, _EMPTY_, nil, nil)
} else if jsi.hbs {
// Process heartbeat received, get latest control metadata if present.
var ctrl *controlMetadata
cmeta := jsi.cmeta.Load()
if cmeta == nil {
return
}
// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
type ErrConsumerSequenceMismatch struct {
// StreamResumeSequence is the stream sequence from where the consumer
// should resume consuming from the stream.
StreamResumeSequence uint64

ctrl = cmeta.(*controlMetadata)
tokens, err := getMetadataFields(ctrl.meta)
if err != nil {
return
}
// Consumer sequence
dseq := tokens[6]
ldseq := msg.Header.Get(lastConsumerSeqHdr)

// Detect consumer sequence mismatch and whether
// should restart the consumer.
if ldseq != dseq {
// Dispatch async error including details such as
// from where the consumer could be restarted.
sseq := parseNum(tokens[5])
ecs := &ErrConsumerSequenceMismatch{
StreamResumeSequence: uint64(sseq),
ConsumerSequence: parseNum(dseq),
LastConsumerSequence: parseNum(ldseq),
}
nc.handleConsumerSequenceMismatch(s, ecs)
// ConsumerSequence is the sequence of the consumer that is behind.
ConsumerSequence uint64

// LastConsumerSequence is the sequence of the consumer when the heartbeat
// was received.
LastConsumerSequence uint64
}

func (ecs *ErrConsumerSequenceMismatch) Error() string {
return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
ecs.ConsumerSequence,
ecs.LastConsumerSequence-ecs.ConsumerSequence,
ecs.StreamResumeSequence,
)
}

// isControlMessage will return true if this is an empty control status message.
func isControlMessage(msg *Msg) bool {
return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
}

func (jsi *jsSub) trackSequences(reply string) {
jsi.mu.Lock()
jsi.cmeta = reply
jsi.mu.Unlock()
}

// checkForFlowControlResponse will check to see if we should send a flow control response
// based on the delivered index.
// Lock should be held.
func (sub *Subscription) checkForFlowControlResponse(delivered uint64) {
jsi, nc := sub.jsi, sub.conn
if jsi == nil {
return
}

jsi.mu.Lock()
defer jsi.mu.Unlock()

if len(jsi.fcs) == 0 {
return
}

if reply := jsi.fcs[delivered]; reply != _EMPTY_ {
delete(jsi.fcs, delivered)
nc.Publish(reply, nil)
}
}

// Record an inbound flow control message.
func (jsi *jsSub) scheduleFlowControlResponse(dfuture uint64, reply string) {
jsi.mu.Lock()
if jsi.fcs == nil {
jsi.fcs = make(map[uint64]string)
}
jsi.fcs[dfuture] = reply
jsi.mu.Unlock()
}

// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, err) })
}
nc.mu.Unlock()
}

// processControlFlow will automatically respond to control messages sent by the server.
func (nc *Conn) processSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
// Process heartbeat received, get latest control metadata if present.
jsi.mu.RLock()
ctrl := jsi.cmeta
jsi.mu.RUnlock()

if ctrl == _EMPTY_ {
return
}

tokens, err := getMetadataFields(ctrl)
if err != nil {
return
}

// Consumer sequence.
var ldseq string
dseq := tokens[6]
hdr := msg.Header[lastConsumerSeqHdr]
if len(hdr) == 1 {
ldseq = hdr[0]
}

// Detect consumer sequence mismatch and whether
// should restart the consumer.
if ldseq != dseq {
// Dispatch async error including details such as
// from where the consumer could be restarted.
sseq := parseNum(tokens[5])
ecs := &ErrConsumerSequenceMismatch{
StreamResumeSequence: uint64(sseq),
ConsumerSequence: uint64(parseNum(dseq)),
LastConsumerSequence: uint64(parseNum(ldseq)),
}
nc.handleConsumerSequenceMismatch(s, ecs)
}
}

Expand Down Expand Up @@ -1762,26 +1842,26 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return info.ConsumerInfo, nil
}

func (m *Msg) checkReply() (*js, bool, error) {
func (m *Msg) checkReply() (*js, *jsSub, error) {
if m == nil || m.Sub == nil {
return nil, false, ErrMsgNotBound
return nil, nil, ErrMsgNotBound
}
if m.Reply == "" {
return nil, false, ErrMsgNoReply
return nil, nil, ErrMsgNoReply
}
sub := m.Sub
sub.mu.Lock()
if sub.jsi == nil {
sub.mu.Unlock()

// Not using a JS context.
return nil, false, nil
return nil, nil, nil
}
js := sub.jsi.js
isPullMode := sub.jsi.pull
jsi := sub.jsi
sub.mu.Unlock()

return js, isPullMode, nil
return js, jsi, nil
}

// ackReply handles all acks. Will do the right thing for pull and sync mode.
Expand Down

0 comments on commit 7a1cbc3

Please sign in to comment.