Skip to content

Commit

Permalink
js: Add support for FlowControl and Heartbeats
Browse files Browse the repository at this point in the history
Signed-off-by: Waldemar Quevedo <wally@synadia.com>
  • Loading branch information
wallyqs committed Mar 28, 2021
1 parent a56624b commit 215b1ee
Show file tree
Hide file tree
Showing 6 changed files with 569 additions and 23 deletions.
28 changes: 28 additions & 0 deletions context.go
Expand Up @@ -133,6 +133,7 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {

// snapshot
mch := s.mch
jsi := s.jsi
s.mu.Unlock()

var ok bool
Expand All @@ -147,11 +148,38 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) {
if err := s.processNextMsgDelivered(msg); err != nil {
return nil, err
} else {
// JetStream Push consumers may get extra status messages
// that the client will process automatically.
if jsi != nil {
if isControlMessage(msg) {
if err := jsi.handleControlMessage(s, msg); err != nil {
return nil, err
}
// Skip and wait for next message.
break
} else if jsi.hbs {
jsi.trackSequences(msg)
}
return msg, nil
}
return msg, nil
}
default:
}

if jsi != nil {
// Skip any control messages that may have been delivered
// until there is a valid message or a timeout error.
msg, err = s.processControlFlow(mch, jsi, nil, ctx.Done())
if err != nil {
if err == ErrTimeout {
err = ctx.Err()
}
return nil, err
}
return msg, nil
}

select {
case msg, ok = <-mch:
if !ok {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Expand Up @@ -4,7 +4,7 @@ go 1.15

require (
github.com/golang/protobuf v1.4.2
github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0
github.com/nats-io/nats-server/v2 v2.2.1-0.20210326232401-9f753a247545
github.com/nats-io/nkeys v0.3.0
github.com/nats-io/nuid v1.0.1
google.golang.org/protobuf v1.23.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Expand Up @@ -32,8 +32,8 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1
github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0=
github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0 h1:ybeT5VFA73CVQb4rCL+48+up91xWheriSBbJ3M2Pzps=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210326232401-9f753a247545 h1:8xNPhr7nW0+4W+bwHziYzQLqoN+Z7Rko19Doe+8XW3w=
github.com/nats-io/nats-server/v2 v2.2.1-0.20210326232401-9f753a247545/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc=
github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE=
github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I=
github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4=
Expand Down
57 changes: 48 additions & 9 deletions js.go
Expand Up @@ -415,6 +415,8 @@ type ConsumerConfig struct {
SampleFrequency string `json:"sample_freq,omitempty"`
MaxWaiting int `json:"max_waiting,omitempty"`
MaxAckPending int `json:"max_ack_pending,omitempty"`
FlowControl bool `json:"flow_control,omitempty"`
Heartbeat time.Duration `json:"idle_heartbeat,omitempty"`
}

// ConsumerInfo is the info from a JetStream consumer.
Expand Down Expand Up @@ -454,6 +456,19 @@ type jsSub struct {
pull bool
durable bool
attached bool

// Heartbeats handling from push consumers.
hbs bool

// cmeta is holds metadata from a push consumer
// for when heartbeats are enabled.
cmeta atomic.Value
}

// controlMetadata is metadata used to be able to detect sequence mismatch
// errors in push based consumers that have heartbeats enabled.
type controlMetadata struct {
meta string
}

func (jsi *jsSub) unsubscribe(drainMode bool) error {
Expand Down Expand Up @@ -535,6 +550,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []

isPullMode := ch == nil && cb == nil
badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy
hasHeartbeats := o.cfg.Heartbeat > 0
if isPullMode && badPullAck {
return nil, fmt.Errorf("invalid ack mode for pull consumers: %s", o.cfg.AckPolicy)
}
Expand Down Expand Up @@ -618,9 +634,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts []
}

if isPullMode {
sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: true}}
sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: isPullMode}}
} else {
sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js})
sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js, hbs: hasHeartbeats})
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -872,6 +888,22 @@ func BindStream(name string) SubOpt {
})
}

// EnableFlowControl enables flow control for a push based consumer.
func EnableFlowControl() SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.FlowControl = true
return nil
})
}

// IdleHeartbeat enables push based consumers to have idle heartbeats delivered.
func IdleHeartbeat(duration time.Duration) SubOpt {
return subOptFn(func(opts *subOpts) error {
opts.cfg.Heartbeat = duration
return nil
})
}

func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) {
sub.mu.Lock()
// TODO(dlc) - Better way to mark especially if we attach.
Expand Down Expand Up @@ -1322,18 +1354,12 @@ type MsgMetaData struct {
StreamName string
}

// MetaData retrieves the metadata from a JetStream message.
func (m *Msg) MetaData() (*MsgMetaData, error) {
if _, _, err := m.checkReply(); err != nil {
return nil, err
}

func getMetadataFields(subject string) ([]string, error) {
const expectedTokens = 9
const btsep = '.'

tsa := [expectedTokens]string{}
start, tokens := 0, tsa[:0]
subject := m.Reply
for i := 0; i < len(subject); i++ {
if subject[i] == btsep {
tokens = append(tokens, subject[start:i])
Expand All @@ -1344,6 +1370,19 @@ func (m *Msg) MetaData() (*MsgMetaData, error) {
if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" {
return nil, ErrNotJSMessage
}
return tokens, nil
}

// MetaData retrieves the metadata from a JetStream message.
func (m *Msg) MetaData() (*MsgMetaData, error) {
if _, _, err := m.checkReply(); err != nil {
return nil, err
}

tokens, err := getMetadataFields(m.Reply)
if err != nil {
return nil, err
}

meta := &MsgMetaData{
Delivered: uint64(parseNum(tokens[4])),
Expand Down

0 comments on commit 215b1ee

Please sign in to comment.