Skip to content

Commit

Permalink
Merge 5f637cf into aa61ea3
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed May 3, 2021
2 parents aa61ea3 + 5f637cf commit dbfd204
Show file tree
Hide file tree
Showing 3 changed files with 240 additions and 109 deletions.
174 changes: 123 additions & 51 deletions js.go
Expand Up @@ -146,6 +146,7 @@ type js struct {
pafs map[string]*pubAckFuture
stc chan struct{}
dch chan struct{}
rr *rand.Rand
}

type jsOpts struct {
Expand Down Expand Up @@ -443,10 +444,11 @@ func (js *js) newAsyncReply() string {
return _EMPTY_
}
js.rsub = sub
js.rr = rand.New(rand.NewSource(time.Now().UnixNano()))
}
var sb strings.Builder
sb.WriteString(js.rpre)
rn := js.nc.respRand.Int63()
rn := js.rr.Int63()
var b [aReplyTokensize]byte
for i, l := 0, rn; i < len(b); i++ {
b[i] = rdigits[l%base]
Expand Down Expand Up @@ -850,11 +852,10 @@ type jsSub struct {
attached bool

// Heartbeats and Flow Control handling from push consumers.
hbs bool
fc bool

// cmeta is holds metadata from a push consumer when HBs are enabled.
cmeta atomic.Value
hbs bool
fc bool
cmeta string
fcInbox string
}

// newFetchReply generates a unique inbox used for a fetch request.
Expand Down Expand Up @@ -931,12 +932,6 @@ func (jsi *jsSub) fetchNoWait(ctx context.Context, subj string, payload []byte)
return msg, nil
}

// controlMetadata is metadata used to be able to detect sequence mismatch
// errors in push based consumers that have heartbeats enabled.
type controlMetadata struct {
meta string
}

func (jsi *jsSub) unsubscribe(drainMode bool) error {
if drainMode && (jsi.durable || jsi.attached) {
// Skip deleting consumer for durables/attached
Expand Down Expand Up @@ -1096,6 +1091,12 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
return nil, err
}

// With flow control enabled async subscriptions we will set a larger
// pending bytes and msgs limit than the default.
if !isPullMode && cb != nil && hasFC {
sub.SetPendingLimits(DefaultSubPendingMsgsLimit*4, DefaultSubPendingBytesLimit*4)
}

// If we are creating or updating let's process that request.
if shouldCreate {
// If not set default to ack explicit.
Expand Down Expand Up @@ -1194,40 +1195,96 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
return sub, nil
}

func (nc *Conn) processControlFlow(msg *Msg, s *Subscription, jsi *jsSub) {
// If it is a flow control message then have to ack.
if msg.Reply != "" {
nc.publish(msg.Reply, _EMPTY_, nil, nil)
} else if jsi.hbs {
// Process heartbeat received, get latest control metadata if present.
var ctrl *controlMetadata
cmeta := jsi.cmeta.Load()
if cmeta == nil {
return
}
// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
type ErrConsumerSequenceMismatch struct {
// StreamResumeSequence is the stream sequence from where the consumer
// should resume consuming from the stream.
StreamResumeSequence uint64

ctrl = cmeta.(*controlMetadata)
tokens, err := getMetadataFields(ctrl.meta)
if err != nil {
return
}
// Consumer sequence
dseq := tokens[6]
ldseq := msg.Header.Get(lastConsumerSeqHdr)

// Detect consumer sequence mismatch and whether
// should restart the consumer.
if ldseq != dseq {
// Dispatch async error including details such as
// from where the consumer could be restarted.
sseq := parseNum(tokens[5])
ecs := &ErrConsumerSequenceMismatch{
StreamResumeSequence: uint64(sseq),
ConsumerSequence: parseNum(dseq),
LastConsumerSequence: parseNum(ldseq),
}
nc.handleConsumerSequenceMismatch(s, ecs)
// ConsumerSequence is the sequence of the consumer that is behind.
ConsumerSequence uint64

// LastConsumerSequence is the sequence of the consumer when the heartbeat
// was received.
LastConsumerSequence uint64
}

func (ecs *ErrConsumerSequenceMismatch) Error() string {
return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
ecs.ConsumerSequence,
ecs.LastConsumerSequence-ecs.ConsumerSequence,
ecs.StreamResumeSequence,
)
}

// isControlMessage will return true if this is an empty control status message.
func isControlMessage(msg *Msg) bool {
if len(msg.Data) > 0 {
return false
}
return msg.Header.Get(statusHdr) == controlMsg
}

func (jsi *jsSub) trackSequences(reply string) {
jsi.mu.Lock()
jsi.cmeta = reply
jsi.mu.Unlock()
}

func (jsi *jsSub) scheduleFlowControlResponse(reply string) {
jsi.mu.Lock()
jsi.fcInbox = reply
jsi.mu.Unlock()
}

// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, err) })
}
nc.mu.Unlock()
}

// processControlFlow will automatically respond to control messages sent by the server.
func (nc *Conn) processSequenceMismatch(msg *Msg, s *Subscription, jsi *jsSub) {
// Process heartbeat received, get latest control metadata if present.
jsi.mu.RLock()
ctrl := jsi.cmeta
jsi.mu.RUnlock()

if ctrl == "" {
return
}

tokens, err := getMetadataFields(ctrl)
if err != nil {
return
}

// Consumer sequence.
var ldseq string
dseq := tokens[6]
hdr := msg.Header[lastConsumerSeqHdr]
if len(hdr) == 1 {
ldseq = hdr[0]
}

// Detect consumer sequence mismatch and whether
// should restart the consumer.
if ldseq != dseq {
// Dispatch async error including details such as
// from where the consumer could be restarted.
sseq := parseNum(tokens[5])
ecs := &ErrConsumerSequenceMismatch{
StreamResumeSequence: uint64(sseq),
ConsumerSequence: uint64(parseNum(dseq)),
LastConsumerSequence: uint64(parseNum(ldseq)),
}
nc.handleConsumerSequenceMismatch(s, ecs)
}
}

Expand Down Expand Up @@ -1762,26 +1819,26 @@ func (js *js) getConsumerInfoContext(ctx context.Context, stream, consumer strin
return info.ConsumerInfo, nil
}

func (m *Msg) checkReply() (*js, bool, error) {
func (m *Msg) checkReply() (*js, *jsSub, error) {
if m == nil || m.Sub == nil {
return nil, false, ErrMsgNotBound
return nil, nil, ErrMsgNotBound
}
if m.Reply == "" {
return nil, false, ErrMsgNoReply
return nil, nil, ErrMsgNoReply
}
sub := m.Sub
sub.mu.Lock()
if sub.jsi == nil {
sub.mu.Unlock()

// Not using a JS context.
return nil, false, nil
return nil, nil, nil
}
js := sub.jsi.js
isPullMode := sub.jsi.pull
jsi := sub.jsi
sub.mu.Unlock()

return js, isPullMode, nil
return js, jsi, nil
}

// ackReply handles all acks. Will do the right thing for pull and sync mode.
Expand All @@ -1795,7 +1852,7 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
}
}

js, _, err := m.checkReply()
js, jsi, err := m.checkReply()
if err != nil {
return err
}
Expand Down Expand Up @@ -1836,6 +1893,21 @@ func (m *Msg) ackReply(ackType []byte, sync bool, opts ...AckOpt) error {
atomic.StoreUint32(&m.ackd, 1)
}

// Check if has to reply to flow control.
if jsi != nil && jsi.fc {
jsi.mu.Lock()
fcInbox := jsi.fcInbox
doFC := fcInbox != ""
if doFC {
jsi.fcInbox = ""
}
jsi.mu.Unlock()

if doFC {
nc.Publish(fcInbox, nil)
}
}

return err
}

Expand Down
73 changes: 16 additions & 57 deletions nats.go
Expand Up @@ -2609,6 +2609,7 @@ func (nc *Conn) processMsg(data []byte) {
var err error
var ctrl bool
var hasFC bool
var hasHBs bool

if nc.ps.ma.hdr > 0 {
hbuf := msgPayload[:nc.ps.ma.hdr]
Expand All @@ -2634,6 +2635,7 @@ func (nc *Conn) processMsg(data []byte) {
jsi := sub.jsi
if jsi != nil {
ctrl = isControlMessage(m)
hasHBs = jsi.hbs
hasFC = jsi.fc
}

Expand Down Expand Up @@ -2661,9 +2663,10 @@ func (nc *Conn) processMsg(data []byte) {
}
}

// We have two modes of delivery. One is the channel, used by channel
// subscribers and syncSubscribers, the other is a linked list for async.
// Skip processing if this is a control message.
if !ctrl {
// We have two modes of delivery. One is the channel, used by channel
// subscribers and syncSubscribers, the other is a linked list for async.
if sub.mch != nil {
select {
case sub.mch <- m:
Expand All @@ -2683,20 +2686,25 @@ func (nc *Conn) processMsg(data []byte) {
sub.pTail = m
}
}
if hasFC {
jsi.trackSequences(m)
if hasHBs {
// Store the ACK metadata from the message to
// compare later on with the received heartbeat.
jsi.trackSequences(m.Reply)
}
} else if hasFC && m.Reply != "" {
// This is a flow control message, we will make
// a reply after the next ACK from client.
jsi.scheduleFlowControlResponse(m.Reply)
}

// Clear SlowConsumer status.
sub.sc = false

sub.mu.Unlock()

// Handle flow control and heartbeat messages automatically
// for JetStream Push consumers.
if ctrl {
nc.processControlFlow(m, sub, jsi)
// Handle control heartbeat messages.
if ctrl && hasHBs && m.Reply == "" {
nc.processSequenceMismatch(m, sub, jsi)
}

return
Expand Down Expand Up @@ -3881,55 +3889,6 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
return nil
}

// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
type ErrConsumerSequenceMismatch struct {
// StreamResumeSequence is the stream sequence from where the consumer
// should resume consuming from the stream.
StreamResumeSequence uint64

// ConsumerSequence is the sequence of the consumer that is behind.
ConsumerSequence int64

// LastConsumerSequence is the sequence of the consumer when the heartbeat
// was received.
LastConsumerSequence int64
}

func (ecs *ErrConsumerSequenceMismatch) Error() string {
return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
ecs.ConsumerSequence,
ecs.LastConsumerSequence-ecs.ConsumerSequence,
ecs.StreamResumeSequence,
)
}

// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
func (nc *Conn) handleConsumerSequenceMismatch(sub *Subscription, err error) {
nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, err) })
}
nc.mu.Unlock()
}

func isControlMessage(msg *Msg) bool {
return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
}

func (jsi *jsSub) trackSequences(msg *Msg) {
var ctrl *controlMetadata
if cmeta := jsi.cmeta.Load(); cmeta == nil {
ctrl = &controlMetadata{}
} else {
ctrl = cmeta.(*controlMetadata)
}
ctrl.meta = msg.Reply
jsi.cmeta.Store(ctrl)
}

// NextMsg will return the next message available to a synchronous subscriber
// or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription),
// the connection is closed (ErrConnectionClosed), the timeout is reached (ErrTimeout),
Expand Down

0 comments on commit dbfd204

Please sign in to comment.