Skip to content

Commit

Permalink
Merge e9f7e59 into 0493cfd
Browse files Browse the repository at this point in the history
  • Loading branch information
wallyqs committed Mar 26, 2021
2 parents 0493cfd + e9f7e59 commit cbbd3db
Show file tree
Hide file tree
Showing 4 changed files with 573 additions and 40 deletions.
29 changes: 29 additions & 0 deletions context.go
Expand Up @@ -133,6 +133,7 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {

// snapshot
mch := s.mch
jsi := s.jsi
s.mu.Unlock()

var ok bool
Expand All @@ -147,11 +148,39 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
} else {
// JetStream Push consumers may get extra status messages
// that the client will process automatically.
if jsi != nil {
if isControlMessage(msg) {
err := jsi.handleControlMessage(s, msg)
if err != nil {
return nil, err
}
// Skip and wait for next message.
break
} else if jsi.hbs {
jsi.trackSequences(msg)
}
return msg, nil
}
return msg, nil
}
default:
}

if jsi != nil {
// Skip any control messages that may have been delivered
// until there is a valid message or a timeout error.
msg, err = s.processControlFlow(mch, jsi, nil, ctx.Done())
if err != nil {
if err == ErrTimeout {
return nil, ctx.Err()
}
return nil, err
}
return msg, nil
}

select {
case msg, ok = <-mch:
if !ok {
Expand Down
57 changes: 48 additions & 9 deletions js.go
Expand Up @@ -415,6 +415,8 @@ type ConsumerConfig struct {
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
Expand Down Expand Up @@ -454,6 +456,19 @@ type jsSub struct {
pull bool
durable bool
attached bool

// Heartbeats handling from push consumers.
hbs bool

// cmeta is holds metadata from a push consumer
// for when heartbeats are enabled.
cmeta atomic.Value
}

// controlMetadata is metadata used to be able to detect sequence mismatch
// errors in push based consumers that have heartbeats enabled.
type controlMetadata struct {
meta string
}

func (jsi *jsSub) unsubscribe(drainMode bool) error {
Expand Down Expand Up @@ -535,6 +550,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []

isPullMode := ch == nil && cb == nil
badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
hasHeartbeats := o.cfg.Heartbeat > 0
if isPullMode && badPullAck {
return nil, fmt.Errorf("invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
}
Expand Down Expand Up @@ -618,9 +634,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
}

if isPullMode {
sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: true}}
sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: isPullMode}}
} else {
sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js})
sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js, hbs: hasHeartbeats})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -872,6 +888,22 @@ func BindStream(name string) SubOpt {
})
}

// EnableFlowControl enables flow control for a push based consumer.
func EnableFlowControl() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.FlowControl = true
return nil
})
}

// IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
func IdleHeartbeat(duration time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Heartbeat = duration
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down Expand Up @@ -1322,18 +1354,12 @@ type MsgMetaData struct {
StreamName string
}

// MetaData retrieves the metadata from a JetStream message.
func (m *Msg) MetaData() (*MsgMetaData, error) {
if _, _, err := m.checkReply(); err != nil {
return nil, err
}

func getMetadataFields(subject string) ([]string, error) {
const expectedTokens = 9
const btsep = '.'

tsa := [expectedTokens]string{}
start, tokens := 0, tsa[:0]
subject := m.Reply
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tokens = append(tokens, subject[start:i])
Expand All @@ -1344,6 +1370,19 @@ func (m *Msg) MetaData() (*MsgMetaData, error) {
if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
return nil, ErrNotJSMessage
}
return tokens, nil
}

// MetaData retrieves the metadata from a JetStream message.
func (m *Msg) MetaData() (*MsgMetaData, error) {
if _, _, err := m.checkReply(); err != nil {
return nil, err
}

tokens, err := getMetadataFields(m.Reply)
if err != nil {
return nil, err
}

meta := &MsgMetaData{
Delivered: uint64(parseNum(tokens[4])),
Expand Down
190 changes: 181 additions & 9 deletions nats.go
Expand Up @@ -2378,6 +2378,7 @@ func (nc *Conn) waitForMsgs(s *Subscription) {
s.delivered++
delivered = s.delivered
}
jsi := s.jsi
s.mu.Unlock()

if closed {
Expand All @@ -2386,7 +2387,19 @@ func (nc *Conn) waitForMsgs(s *Subscription) {

// Deliver the message.
if m != nil && (max == 0 || delivered <= max) {
mcb(m)
if jsi != nil {
// Process and skip flow control messages automatically.
if isControlMessage(m) {
jsi.handleControlMessage(s, m)
} else {
if jsi.hbs {
jsi.trackSequences(m)
}
mcb(m)
}
} else {
mcb(m)
}
}
// If we have hit the max for delivered msgs, remove sub.
if max > 0 && delivered >= max {
Expand Down Expand Up @@ -2824,14 +2837,17 @@ func NewMsg(subject string) *Msg {
}

const (
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
statusHdr = "Status"
descrHdr = "Description"
noResponders = "503"
noMessages = "404"
statusLen = 3 // e.g. 20x, 40x, 50x
hdrLine = "NATS/1.0\r\n"
crlf = "\r\n"
hdrPreEnd = len(hdrLine) - len(crlf)
statusHdr = "Status"
descrHdr = "Description"
lastConsumerSeqHdr = "Nats-Last-Consumer"
lastStreamSeqHdr = "Nats-Last-Stream"
noResponders = "503"
noMessages = "404"
controlMsg = "100"
statusLen = 3 // e.g. 20x, 40x, 50x
)

// decodeHeadersMsg will decode and headers.
Expand Down Expand Up @@ -3620,6 +3636,137 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error {
return nil
}

// ErrConsumerSequenceMismatch represents an error from a consumer
// that received a Heartbeat including sequence different to the
// one expected from the view of the client.
type ErrConsumerSequenceMismatch struct {
// StreamResumeSequence is the stream sequence from where the consumer
// should resume consuming from the stream.
StreamResumeSequence uint64

// ConsumerSequence is the sequence of the consumer that is behind.
ConsumerSequence int64

// LastConsumerSequence is the sequence of the consumer when the heartbeat
// was received.
LastConsumerSequence int64
}

func (ecs *ErrConsumerSequenceMismatch) Error() string {
return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d",
ecs.ConsumerSequence,
ecs.LastConsumerSequence-ecs.ConsumerSequence,
ecs.StreamResumeSequence,
)
}

// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer.
func handleConsumerSequenceMismatch(sub *Subscription, err error) {
sub.mu.Lock()
nc := sub.conn
sub.mu.Unlock()

nc.mu.Lock()
errCB := nc.Opts.AsyncErrorCB
if errCB != nil {
nc.ach.push(func() { errCB(nc, sub, err) })
}
nc.mu.Unlock()
}

func isControlMessage(msg *Msg) bool {
return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg
}

func (jsi *jsSub) trackSequences(msg *Msg) {
var ctrl *controlMetadata
if cmeta := jsi.cmeta.Load(); cmeta == nil {
ctrl = &controlMetadata{}
} else {
ctrl = cmeta.(*controlMetadata)
}
ctrl.meta = msg.Reply
jsi.cmeta.Store(ctrl)
}

func (jsi *jsSub) handleControlMessage(s *Subscription, msg *Msg) error {
// If it is a flow control message then have to ack.
if msg.Reply != "" {
err := msg.Respond(nil)
if err != nil {
return err
}
} else if jsi.hbs {
// Process heartbeat received, get latest control metadata if present.
var ctrl *controlMetadata
cmeta := jsi.cmeta.Load()
if cmeta == nil {
return nil
}

ctrl = cmeta.(*controlMetadata)
tokens, err := getMetadataFields(ctrl.meta)
if err != nil {
return err
}
// Consumer sequence
dseq := tokens[6]
ldseq := msg.Header.Get(lastConsumerSeqHdr)

// Detect consumer sequence mismatch and whether
// should restart the consumer.
if ldseq != dseq {
// Dispatch async error including details such as
// from where the consumer could be restarted.
sseq := parseNum(tokens[5])
ecs := &ErrConsumerSequenceMismatch{
StreamResumeSequence: uint64(sseq),
ConsumerSequence: parseNum(dseq),
LastConsumerSequence: parseNum(ldseq),
}
handleConsumerSequenceMismatch(s, ecs)
}
}
return nil
}

// processControlFlow checks whether it is a control message and
// handles it automatically.
func (s *Subscription) processControlFlow(mch <-chan *Msg, jsi *jsSub, tch <-chan time.Time, ctxDone <-chan struct{}) (*Msg, error) {
// We will peek at the channel and return the next message
// that is not a control message or a timeout error.
var msg *Msg
var ok bool
for {
select {
case msg, ok = <-mch:
if !ok {
return nil, s.getNextMsgErr()
}
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
}
if isControlMessage(msg) {
err := jsi.handleControlMessage(s, msg)
if err != nil {
return nil, err
}
} else {
// In case of using heartbeats, then snapshot the raw metadata
// sequences from a consumer.
if jsi.hbs {
jsi.trackSequences(msg)
}
return msg, nil
}
case <-tch:
return nil, ErrTimeout
case <-ctxDone:
return nil, ErrTimeout
}
}
}

// NextMsg will return the next message available to a synchronous subscriber
// or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription),
// the connection is closed (ErrConnectionClosed), or the timeout is reached (ErrTimeout).
Expand All @@ -3637,6 +3784,7 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {

// snapshot
mch := s.mch
jsi := s.jsi
s.mu.Unlock()

var ok bool
Expand All @@ -3651,6 +3799,20 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
} else {
// JetStream Push consumers may get extra status messages
// that the client will process automatically.
if jsi != nil {
if isControlMessage(msg) {
err := jsi.handleControlMessage(s, msg)
if err != nil {
return nil, err
}
// Skip and wait for next message.
break
} else if jsi.hbs {
jsi.trackSequences(msg)
}
}
return msg, nil
}
default:
Expand All @@ -3662,6 +3824,16 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) {
t := globalTimerPool.Get(timeout)
defer globalTimerPool.Put(t)

if jsi != nil {
// Skip any control messages that may have been delivered
// until there is a valid message or a timeout error.
msg, err = s.processControlFlow(mch, jsi, t.C, nil)
if err != nil {
return nil, err
}
return msg, nil
}

select {
case msg, ok = <-mch:
if !ok {
Expand Down

0 comments on commit cbbd3db

Please sign in to comment.