Skip to content

Commit

Permalink
js: Add support for FlowControl and Heartbeats
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Mar 30, 2021
1 parent c0e12e4 commit 7843601
Show file tree
Hide file tree
Showing 3 changed files with 681 additions and 36 deletions.
110 changes: 100 additions & 10 deletions js.go
Expand Up @@ -746,6 +746,8 @@ type ConsumerConfig struct {
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
Expand Down Expand Up @@ -785,6 +787,19 @@ type jsSub struct {
pull bool
durable bool
attached bool

// Heartbeats and Flow Control handling from push consumers.
hbs bool
fch chan *Msg

// cmeta is holds metadata from a push consumer when HBs are enabled.
cmeta atomic.Value
}

// controlMetadata is metadata used to be able to detect sequence mismatch
// errors in push based consumers that have heartbeats enabled.
type controlMetadata struct {
meta string
}

func (jsi *jsSub) unsubscribe(drainMode bool) error {
Expand Down Expand Up @@ -860,6 +875,8 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync

isPullMode := ch == nil && cb == nil
badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
hasHeartbeats := o.cfg.Heartbeat > 0
hasFC := o.cfg.FlowControl
if isPullMode && badPullAck {
return nil, fmt.Errorf("nats: invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
}
Expand Down Expand Up @@ -930,9 +947,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
}

if isPullMode {
sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: true}}
sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: isPullMode}}
} else {
sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js})
sub, err = js.nc.subscribe(deliver, queue, cb, ch, isSync, &jsSub{js: js, hbs: hasHeartbeats})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1005,11 +1022,61 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, isSync
// If we are pull based go ahead and fire off the first request to populate.
if isPullMode {
sub.jsi.pull = o.pull
} else if hasFC || hasHeartbeats {
// Start goroutine to handle flow control messages for a push consumer.
sub.jsi.fch = make(chan *Msg, 64)
go sub.processControlFlow()
}

return sub, nil
}

func (s *Subscription) processControlFlow() {
s.mu.Lock()
jsi := s.jsi
fch := jsi.fch
hbs := jsi.hbs
s.mu.Unlock()

// Channel will be closed on unsubscribe and stop the loop.
for msg := range fch {
// If it is a flow control message then have to ack.
if msg.Reply != "" {
msg.Respond(nil)
} else if hbs {
// Process heartbeat received, get latest control metadata if present.
var ctrl *controlMetadata
cmeta := jsi.cmeta.Load()
if cmeta == nil {
continue
}

ctrl = cmeta.(*controlMetadata)
tokens, err := getMetadataFields(ctrl.meta)
if err != nil {
continue
}
// Consumer sequence
dseq := tokens[6]
ldseq := msg.Header.Get(lastConsumerSeqHdr)

// Detect consumer sequence mismatch and whether
// should restart the consumer.
if ldseq != dseq {
// Dispatch async error including details such as
// from where the consumer could be restarted.
sseq := parseNum(tokens[5])
ecs := &ErrConsumerSequenceMismatch{
StreamResumeSequence: uint64(sseq),
ConsumerSequence: parseNum(dseq),
LastConsumerSequence: parseNum(ldseq),
}
handleConsumerSequenceMismatch(s, ecs)
}
}
}
}

type streamRequest struct {
Subject string `json:"subject,omitempty"`
}
Expand Down Expand Up @@ -1190,6 +1257,22 @@ func BindStream(name string) SubOpt {
})
}

// EnableFlowControl enables flow control for a push based consumer.
func EnableFlowControl() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.FlowControl = true
return nil
})
}

// IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
func IdleHeartbeat(duration time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Heartbeat = duration
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down Expand Up @@ -1638,19 +1721,12 @@ type MsgMetaData struct {
StreamName string
}

// MetaData retrieves the metadata from a JetStream message. This method will
// return an error for non-JetStream Msgs.
func (m *Msg) MetaData() (*MsgMetaData, error) {
if _, _, err := m.checkReply(); err != nil {
return nil, err
}

func getMetadataFields(subject string) ([]string, error) {
const expectedTokens = 9
const btsep = '.'

tsa := [expectedTokens]string{}
start, tokens := 0, tsa[:0]
subject := m.Reply
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tokens = append(tokens, subject[start:i])
Expand All @@ -1661,6 +1737,20 @@ func (m *Msg) MetaData() (*MsgMetaData, error) {
if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
return nil, ErrNotJSMessage
}
return tokens, nil
}

// MetaData retrieves the metadata from a JetStream message. This method will
// return an error for non-JetStream Msgs.
func (m *Msg) MetaData() (*MsgMetaData, error) {
if _, _, err := m.checkReply(); err != nil {
return nil, err
}

tokens, err := getMetadataFields(m.Reply)
if err != nil {
return nil, err
}

meta := &MsgMetaData{
Delivered: uint64(parseNum(tokens[4])),
Expand Down
133 changes: 110 additions & 23 deletions nats.go
Expand Up @@ -2447,6 +2447,8 @@ func (nc *Conn) processMsg(data []byte) {
// Check if we have headers encoded here.
var h http.Header
var err error
var ctrl bool
var hasFC bool

if nc.ps.ma.hdr > 0 {
hbuf := msgPayload[:nc.ps.ma.hdr]
Expand All @@ -2468,6 +2470,13 @@ func (nc *Conn) processMsg(data []byte) {

sub.mu.Lock()

// Skip flow control messages in case of using a JetStream context.
jsi := sub.jsi
if jsi != nil {
hasFC = jsi.fch != nil
ctrl = isControlMessage(m)
}

// Check if closed.
if sub.closed {
sub.mu.Unlock()
Expand All @@ -2494,30 +2503,44 @@ func (nc *Conn) processMsg(data []byte) {

// We have two modes of delivery. One is the channel, used by channel
// subscribers and syncSubscribers, the other is a linked list for async.
if sub.mch != nil {
if !ctrl {
if sub.mch != nil {
select {
case sub.mch <- m:
default:
goto slowConsumer
}
} else {
// Push onto the async pList
if sub.pHead == nil {
sub.pHead = m
sub.pTail = m
if sub.pCond != nil {
sub.pCond.Signal()
}
} else {
sub.pTail.next = m
sub.pTail = m
}
}
if hasFC {
jsi.trackSequences(m)
}
} else if hasFC {
// Process flow control and heartbeat messages
// automatically for JetStream Push consumers.
select {
case sub.mch <- m:
case jsi.fch <- m:
default:
goto slowConsumer
}
} else {
// Push onto the async pList
if sub.pHead == nil {
sub.pHead = m
sub.pTail = m
if sub.pCond != nil {
sub.pCond.Signal()
}
} else {
sub.pTail.next = m
sub.pTail = m
}
}

// Clear SlowConsumer status.
sub.sc = false

sub.mu.Unlock()

return

slowConsumer:
Expand Down Expand Up @@ -2826,14 +2849,17 @@ func NewMsg(subject string) *Msg {
}

const (
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
statusHdr = "Status"
descrHdr = "Description"
noResponders = "503"
noMessages = "404"
statusLen = 3 // e.g. 20x, 40x, 50x
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
statusHdr = "Status"
descrHdr = "Description"
lastConsumerSeqHdr = "Nats-Last-Consumer"
lastStreamSeqHdr = "Nats-Last-Stream"
noResponders = "503"
noMessages = "404"
controlMsg = "100"
statusLen = 3 // e.g. 20x, 40x, 50x
)

// decodeHeadersMsg will decode and headers.
Expand Down Expand Up @@ -3574,9 +3600,17 @@ func (s *Subscription) AutoUnsubscribe(max int) error {
// unsubscribe performs the low level unsubscribe to the server.
// Use Subscription.Unsubscribe()
func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
// Check whether it is a JetStream sub and should clean up consumers.
// For JetStream consumers, need to clean up ephemeral consumers
// or delete durable ones if called with Unsubscribe. In case they
// are push based with flow control, need to also close the channel
// for handling control messages.
sub.mu.Lock()
jsi := sub.jsi
if jsi != nil && jsi.fch != nil {
// Close channel to signal flow control goroutine to stop.
close(jsi.fch)
jsi.fch = nil
}
sub.mu.Unlock()
if jsi != nil {
err := jsi.unsubscribe(drainMode)
Expand Down Expand Up @@ -3622,6 +3656,59 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
return nil
}

// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
type ErrConsumerSequenceMismatch struct {
// StreamResumeSequence is the stream sequence from where the consumer
// should resume consuming from the stream.
StreamResumeSequence uint64

// ConsumerSequence is the sequence of the consumer that is behind.
ConsumerSequence int64

// LastConsumerSequence is the sequence of the consumer when the heartbeat
// was received.
LastConsumerSequence int64
}

func (ecs *ErrConsumerSequenceMismatch) Error() string {
return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
ecs.ConsumerSequence,
ecs.LastConsumerSequence-ecs.ConsumerSequence,
ecs.StreamResumeSequence,
)
}

// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
func handleConsumerSequenceMismatch(sub *Subscription, err error) {
sub.mu.Lock()
nc := sub.conn
sub.mu.Unlock()

nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, err) })
}
nc.mu.Unlock()
}

func isControlMessage(msg *Msg) bool {
return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
}

func (jsi *jsSub) trackSequences(msg *Msg) {
var ctrl *controlMetadata
if cmeta := jsi.cmeta.Load(); cmeta == nil {
ctrl = &controlMetadata{}
} else {
ctrl = cmeta.(*controlMetadata)
}
ctrl.meta = msg.Reply
jsi.cmeta.Store(ctrl)
}

// NextMsg will return the next message available to a synchronous subscriber
// or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription),
// the connection is closed (ErrConnectionClosed), or the timeout is reached (ErrTimeout).
Expand Down

0 comments on commit 7843601

Please sign in to comment.