diff --git a/context.go b/context.go index 666a483ad..03d3b0ea4 100644 --- a/context.go +++ b/context.go @@ -133,6 +133,7 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { // snapshot mch := s.mch + jsi := s.jsi s.mu.Unlock() var ok bool @@ -147,11 +148,38 @@ func (s *Subscription) NextMsgWithContext(ctx context.Context) (*Msg, error) { if err := s.processNextMsgDelivered(msg); err != nil { return nil, err } else { + // JetStream Push consumers may get extra status messages + // that the client will process automatically. + if jsi != nil { + if isControlMessage(msg) { + if err := jsi.handleControlMessage(s, msg); err != nil { + return nil, err + } + // Skip and wait for next message. + break + } else if jsi.hbs { + jsi.trackSequences(msg) + } + return msg, nil + } return msg, nil } default: } + if jsi != nil { + // Skip any control messages that may have been delivered + // until there is a valid message or a timeout error. + msg, err = s.processControlFlow(mch, jsi, nil, ctx.Done()) + if err != nil { + if err == ErrTimeout { + err = ctx.Err() + } + return nil, err + } + return msg, nil + } + select { case msg, ok = <-mch: if !ok { diff --git a/go.mod b/go.mod index 91b5659df..4a9c33923 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.15 require ( github.com/golang/protobuf v1.4.2 - github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0 + github.com/nats-io/nats-server/v2 v2.2.1-0.20210326232401-9f753a247545 github.com/nats-io/nkeys v0.3.0 github.com/nats-io/nuid v1.0.1 google.golang.org/protobuf v1.23.0 diff --git a/go.sum b/go.sum index 109fb4817..72aeb8607 100644 --- a/go.sum +++ b/go.sum @@ -32,8 +32,8 @@ github.com/nats-io/nats-server/v2 v2.1.8-0.20200929001935-7f44d075f7ad/go.mod h1 github.com/nats-io/nats-server/v2 v2.1.8-0.20201129161730-ebe63db3e3ed/go.mod h1:XD0zHR/jTXdZvWaQfS5mQgsXj6x12kMjKLyAk/cOGgY= github.com/nats-io/nats-server/v2 v2.1.8-0.20210205154825-f7ab27f7dad4/go.mod h1:kauGd7hB5517KeSqspW2U1Mz/jhPbTrE8eOXzUPk1m0= github.com/nats-io/nats-server/v2 v2.1.8-0.20210227190344-51550e242af8/go.mod h1:/QQ/dpqFavkNhVnjvMILSQ3cj5hlmhB66adlgNbjuoA= -github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0 h1:ybeT5VFA73CVQb4rCL+48+up91xWheriSBbJ3M2Pzps= -github.com/nats-io/nats-server/v2 v2.2.1-0.20210327180151-03aee09847d0/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc= +github.com/nats-io/nats-server/v2 v2.2.1-0.20210326232401-9f753a247545 h1:8xNPhr7nW0+4W+bwHziYzQLqoN+Z7Rko19Doe+8XW3w= +github.com/nats-io/nats-server/v2 v2.2.1-0.20210326232401-9f753a247545/go.mod h1:eKlAaGmSQHZMFQA6x56AaP5/Bl9N3mWF4awyT2TTpzc= github.com/nats-io/nats.go v1.10.0/go.mod h1:AjGArbfyR50+afOUotNX2Xs5SYHf+CoOa5HH1eEl2HE= github.com/nats-io/nats.go v1.10.1-0.20200531124210-96f2130e4d55/go.mod h1:ARiFsjW9DVxk48WJbO3OSZ2DG8fjkMi7ecLmXoY/n9I= github.com/nats-io/nats.go v1.10.1-0.20200606002146-fc6fed82929a/go.mod h1:8eAIv96Mo9QW6Or40jUHejS7e4VwZ3VRYD6Sf0BTDp4= diff --git a/js.go b/js.go index 27359c99f..c5b36c8a2 100644 --- a/js.go +++ b/js.go @@ -415,6 +415,8 @@ type ConsumerConfig struct { SampleFrequency string `json:"sample_freq,omitempty"` MaxWaiting int `json:"max_waiting,omitempty"` MaxAckPending int `json:"max_ack_pending,omitempty"` + FlowControl bool `json:"flow_control,omitempty"` + Heartbeat time.Duration `json:"idle_heartbeat,omitempty"` } // ConsumerInfo is the info from a JetStream consumer. @@ -454,6 +456,19 @@ type jsSub struct { pull bool durable bool attached bool + + // Heartbeats handling from push consumers. + hbs bool + + // cmeta is holds metadata from a push consumer + // for when heartbeats are enabled. + cmeta atomic.Value +} + +// controlMetadata is metadata used to be able to detect sequence mismatch +// errors in push based consumers that have heartbeats enabled. +type controlMetadata struct { + meta string } func (jsi *jsSub) unsubscribe(drainMode bool) error { @@ -535,6 +550,7 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] isPullMode := ch == nil && cb == nil badPullAck := o.cfg.AckPolicy == AckNonePolicy || o.cfg.AckPolicy == AckAllPolicy + hasHeartbeats := o.cfg.Heartbeat > 0 if isPullMode && badPullAck { return nil, fmt.Errorf("invalid ack mode for pull consumers: %s", o.cfg.AckPolicy) } @@ -618,9 +634,9 @@ func (js *js) subscribe(subj, queue string, cb MsgHandler, ch chan *Msg, opts [] } if isPullMode { - sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: true}} + sub = &Subscription{Subject: subj, conn: js.nc, typ: PullSubscription, jsi: &jsSub{js: js, pull: isPullMode}} } else { - sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js}) + sub, err = js.nc.subscribe(deliver, queue, cb, ch, cb == nil, &jsSub{js: js, hbs: hasHeartbeats}) if err != nil { return nil, err } @@ -872,6 +888,22 @@ func BindStream(name string) SubOpt { }) } +// EnableFlowControl enables flow control for a push based consumer. +func EnableFlowControl() SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.FlowControl = true + return nil + }) +} + +// IdleHeartbeat enables push based consumers to have idle heartbeats delivered. +func IdleHeartbeat(duration time.Duration) SubOpt { + return subOptFn(func(opts *subOpts) error { + opts.cfg.Heartbeat = duration + return nil + }) +} + func (sub *Subscription) ConsumerInfo() (*ConsumerInfo, error) { sub.mu.Lock() // TODO(dlc) - Better way to mark especially if we attach. @@ -1322,18 +1354,12 @@ type MsgMetaData struct { StreamName string } -// MetaData retrieves the metadata from a JetStream message. -func (m *Msg) MetaData() (*MsgMetaData, error) { - if _, _, err := m.checkReply(); err != nil { - return nil, err - } - +func getMetadataFields(subject string) ([]string, error) { const expectedTokens = 9 const btsep = '.' tsa := [expectedTokens]string{} start, tokens := 0, tsa[:0] - subject := m.Reply for i := 0; i < len(subject); i++ { if subject[i] == btsep { tokens = append(tokens, subject[start:i]) @@ -1344,6 +1370,19 @@ func (m *Msg) MetaData() (*MsgMetaData, error) { if len(tokens) != expectedTokens || tokens[0] != "$JS" || tokens[1] != "ACK" { return nil, ErrNotJSMessage } + return tokens, nil +} + +// MetaData retrieves the metadata from a JetStream message. +func (m *Msg) MetaData() (*MsgMetaData, error) { + if _, _, err := m.checkReply(); err != nil { + return nil, err + } + + tokens, err := getMetadataFields(m.Reply) + if err != nil { + return nil, err + } meta := &MsgMetaData{ Delivered: uint64(parseNum(tokens[4])), diff --git a/nats.go b/nats.go index 3ee5b6f3f..962dd643a 100644 --- a/nats.go +++ b/nats.go @@ -2378,6 +2378,7 @@ func (nc *Conn) waitForMsgs(s *Subscription) { s.delivered++ delivered = s.delivered } + jsi := s.jsi s.mu.Unlock() if closed { @@ -2386,7 +2387,19 @@ func (nc *Conn) waitForMsgs(s *Subscription) { // Deliver the message. if m != nil && (max == 0 || delivered <= max) { - mcb(m) + if jsi != nil { + // Process and skip flow control messages automatically. + if isControlMessage(m) { + jsi.handleControlMessage(s, m) + } else { + if jsi.hbs { + jsi.trackSequences(m) + } + mcb(m) + } + } else { + mcb(m) + } } // If we have hit the max for delivered msgs, remove sub. if max > 0 && delivered >= max { @@ -2824,14 +2837,17 @@ func NewMsg(subject string) *Msg { } const ( - hdrLine = "NATS/1.0\r\n" - crlf = "\r\n" - hdrPreEnd = len(hdrLine) - len(crlf) - statusHdr = "Status" - descrHdr = "Description" - noResponders = "503" - noMessages = "404" - statusLen = 3 // e.g. 20x, 40x, 50x + hdrLine = "NATS/1.0\r\n" + crlf = "\r\n" + hdrPreEnd = len(hdrLine) - len(crlf) + statusHdr = "Status" + descrHdr = "Description" + lastConsumerSeqHdr = "Nats-Last-Consumer" + lastStreamSeqHdr = "Nats-Last-Stream" + noResponders = "503" + noMessages = "404" + controlMsg = "100" + statusLen = 3 // e.g. 20x, 40x, 50x ) // decodeHeadersMsg will decode and headers. @@ -3620,6 +3636,137 @@ func (nc *Conn) unsubscribe(sub *Subscription, max int, drainMode bool) error { return nil } +// ErrConsumerSequenceMismatch represents an error from a consumer +// that received a Heartbeat including sequence different to the +// one expected from the view of the client. +type ErrConsumerSequenceMismatch struct { + // StreamResumeSequence is the stream sequence from where the consumer + // should resume consuming from the stream. + StreamResumeSequence uint64 + + // ConsumerSequence is the sequence of the consumer that is behind. + ConsumerSequence int64 + + // LastConsumerSequence is the sequence of the consumer when the heartbeat + // was received. + LastConsumerSequence int64 +} + +func (ecs *ErrConsumerSequenceMismatch) Error() string { + return fmt.Sprintf("nats: sequence mismatch for consumer at sequence %d (%d sequences behind), should restart consumer from stream sequence %d", + ecs.ConsumerSequence, + ecs.LastConsumerSequence-ecs.ConsumerSequence, + ecs.StreamResumeSequence, + ) +} + +// handleConsumerSequenceMismatch will send an async error that can be used to restart a push based consumer. +func handleConsumerSequenceMismatch(sub *Subscription, err error) { + sub.mu.Lock() + nc := sub.conn + sub.mu.Unlock() + + nc.mu.Lock() + errCB := nc.Opts.AsyncErrorCB + if errCB != nil { + nc.ach.push(func() { errCB(nc, sub, err) }) + } + nc.mu.Unlock() +} + +func isControlMessage(msg *Msg) bool { + return len(msg.Data) == 0 && msg.Header.Get(statusHdr) == controlMsg +} + +func (jsi *jsSub) trackSequences(msg *Msg) { + var ctrl *controlMetadata + if cmeta := jsi.cmeta.Load(); cmeta == nil { + ctrl = &controlMetadata{} + } else { + ctrl = cmeta.(*controlMetadata) + } + ctrl.meta = msg.Reply + jsi.cmeta.Store(ctrl) +} + +func (jsi *jsSub) handleControlMessage(s *Subscription, msg *Msg) error { + // If it is a flow control message then have to ack. + if msg.Reply != "" { + err := msg.Respond(nil) + if err != nil { + return err + } + } else if jsi.hbs { + // Process heartbeat received, get latest control metadata if present. + var ctrl *controlMetadata + cmeta := jsi.cmeta.Load() + if cmeta == nil { + return nil + } + + ctrl = cmeta.(*controlMetadata) + tokens, err := getMetadataFields(ctrl.meta) + if err != nil { + return err + } + // Consumer sequence + dseq := tokens[6] + ldseq := msg.Header.Get(lastConsumerSeqHdr) + + // Detect consumer sequence mismatch and whether + // should restart the consumer. + if ldseq != dseq { + // Dispatch async error including details such as + // from where the consumer could be restarted. + sseq := parseNum(tokens[5]) + ecs := &ErrConsumerSequenceMismatch{ + StreamResumeSequence: uint64(sseq), + ConsumerSequence: parseNum(dseq), + LastConsumerSequence: parseNum(ldseq), + } + handleConsumerSequenceMismatch(s, ecs) + } + } + return nil +} + +// processControlFlow checks whether it is a control message and +// handles it automatically. +func (s *Subscription) processControlFlow(mch <-chan *Msg, jsi *jsSub, tch <-chan time.Time, ctxDone <-chan struct{}) (*Msg, error) { + // We will peek at the channel and return the next message + // that is not a control message or a timeout error. + var msg *Msg + var ok bool + for { + select { + case msg, ok = <-mch: + if !ok { + return nil, s.getNextMsgErr() + } + if err := s.processNextMsgDelivered(msg); err != nil { + return nil, err + } + if isControlMessage(msg) { + err := jsi.handleControlMessage(s, msg) + if err != nil { + return nil, err + } + } else { + // In case of using heartbeats, then snapshot the raw metadata + // sequences from a consumer. + if jsi.hbs { + jsi.trackSequences(msg) + } + return msg, nil + } + case <-tch: + return nil, ErrTimeout + case <-ctxDone: + return nil, ErrTimeout + } + } +} + // NextMsg will return the next message available to a synchronous subscriber // or block until one is available. An error is returned if the subscription is invalid (ErrBadSubscription), // the connection is closed (ErrConnectionClosed), or the timeout is reached (ErrTimeout). @@ -3637,6 +3784,7 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { // snapshot mch := s.mch + jsi := s.jsi s.mu.Unlock() var ok bool @@ -3651,6 +3799,19 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { if err := s.processNextMsgDelivered(msg); err != nil { return nil, err } else { + // JetStream Push consumers may get extra status messages + // that the client will process automatically. + if jsi != nil { + if isControlMessage(msg) { + if err := jsi.handleControlMessage(s, msg); err != nil { + return nil, err + } + // Skip and wait for next message. + break + } else if jsi.hbs { + jsi.trackSequences(msg) + } + } return msg, nil } default: @@ -3662,6 +3823,16 @@ func (s *Subscription) NextMsg(timeout time.Duration) (*Msg, error) { t := globalTimerPool.Get(timeout) defer globalTimerPool.Put(t) + if jsi != nil { + // Skip any control messages that may have been delivered + // until there is a valid message or a timeout error. + msg, err = s.processControlFlow(mch, jsi, t.C, nil) + if err != nil { + return nil, err + } + return msg, nil + } + select { case msg, ok = <-mch: if !ok { diff --git a/test/js_test.go b/test/js_test.go index a9aebc81d..0454a2ec8 100644 --- a/test/js_test.go +++ b/test/js_test.go @@ -905,6 +905,303 @@ func TestJetStreamAckPending_Push(t *testing.T) { } } +func TestJetStreamPushFlowControlHeartbeats_SubscribeSync(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + errHandler := nats.ErrorHandler(func(c *nats.Conn, sub *nats.Subscription, err error) { + t.Logf("WARN: %s", err) + }) + + nc, err := nats.Connect(s.ClientURL(), errHandler) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Burst and try to hit the flow control limit of the server. + const totalMsgs = 16536 + payload := strings.Repeat("A", 1024) + for i := 0; i < totalMsgs; i++ { + if _, err := js.Publish("foo", []byte(fmt.Sprintf("i:%d/", i)+payload)); err != nil { + t.Fatal(err) + } + } + + hbTimer := 500 * time.Millisecond + sub, err := js.SubscribeSync("foo", + nats.AckWait(30*time.Second), + nats.MaxDeliver(1), + nats.EnableFlowControl(), + nats.IdleHeartbeat(hbTimer), + ) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + if info.Config.Heartbeat != hbTimer { + t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) + } + + m, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + meta, err := m.MetaData() + if err != nil { + t.Fatal(err) + } + if meta.Pending > totalMsgs { + t.Logf("WARN: More pending messages than expected (%v), got: %v", totalMsgs, meta.Pending) + } + err = m.Ack() + if err != nil { + t.Fatal(err) + } + + recvd := 1 + timeout := time.Now().Add(10 * time.Second) + for time.Now().Before(timeout) { + m, err := sub.NextMsg(1 * time.Second) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + if len(m.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + if err := m.Ack(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } + recvd++ + + if recvd == totalMsgs { + break + } + } + + t.Run("idle heartbeats", func(t *testing.T) { + // Delay to get a few heartbeats. + time.Sleep(2 * time.Second) + + timeout = time.Now().Add(5 * time.Second) + for time.Now().Before(timeout) { + msg, err := sub.NextMsg(200 * time.Millisecond) + if err != nil { + if err == nats.ErrTimeout { + // If timeout, ok to stop checking for the test. + break + } + t.Fatal(err) + } + if len(msg.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + recvd++ + meta, err := msg.MetaData() + if err != nil { + t.Fatal(err) + } + if meta.Pending == 0 { + break + } + } + if recvd > totalMsgs { + t.Logf("WARN: Received more messages than expected (%v), got: %v", totalMsgs, recvd) + } + }) + + t.Run("with context", func(t *testing.T) { + sub, err := js.SubscribeSync("foo", + nats.AckWait(100*time.Millisecond), + nats.Durable("bar"), + nats.EnableFlowControl(), + nats.IdleHeartbeat(hbTimer), + ) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err = sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + + recvd = 0 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + for { + select { + case <-ctx.Done(): + t.Fatal(ctx.Err()) + default: + } + + m, err := sub.NextMsgWithContext(ctx) + if err != nil { + t.Fatalf("Error getting next message: %v", err) + } + if len(m.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + if err := m.Ack(); err != nil { + t.Fatalf("Error on ack message: %v", err) + } + recvd++ + + if recvd >= totalMsgs { + break + } + } + + // Delay to get a few heartbeats. + time.Sleep(2 * time.Second) + for { + select { + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + return + } + default: + } + + msg, err := sub.NextMsgWithContext(ctx) + if err != nil { + if err == context.DeadlineExceeded { + break + } + t.Fatal(err) + } + if len(msg.Data) == 0 { + t.Fatalf("Unexpected empty message: %+v", m) + } + + meta, err := msg.MetaData() + if err != nil { + t.Fatal(err) + } + if meta.Pending == 0 { + break + } + } + }) +} + +func TestJetStreamPushFlowControlHeartbeats_SubscribeAsync(t *testing.T) { + s := RunBasicJetStreamServer() + defer s.Shutdown() + + if config := s.JetStreamConfig(); config != nil { + defer os.RemoveAll(config.StoreDir) + } + + nc, err := nats.Connect(s.ClientURL()) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + defer nc.Close() + + js, err := nc.JetStream() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + _, err = js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + // Burst and try to hit the flow control limit of the server. + const totalMsgs = 16536 + payload := strings.Repeat("A", 1024) + for i := 0; i < totalMsgs; i++ { + if _, err := js.Publish("foo", []byte(payload)); err != nil { + t.Fatal(err) + } + } + + recvd := make(chan *nats.Msg, totalMsgs) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + errCh := make(chan error) + hbTimer := 200 * time.Millisecond + sub, err := js.Subscribe("foo", func(msg *nats.Msg) { + if len(msg.Data) == 0 { + errCh <- fmt.Errorf("Unexpected empty message: %+v", msg) + } + recvd <- msg + + if len(recvd) == totalMsgs { + cancel() + } + }, nats.EnableFlowControl(), nats.IdleHeartbeat(hbTimer)) + if err != nil { + t.Fatal(err) + } + defer sub.Unsubscribe() + + info, err := sub.ConsumerInfo() + if err != nil { + t.Fatal(err) + } + if !info.Config.FlowControl { + t.Fatal("Expected Flow Control to be enabled") + } + if info.Config.Heartbeat != hbTimer { + t.Errorf("Expected %v, got: %v", hbTimer, info.Config.Heartbeat) + } + + <-ctx.Done() + + got := len(recvd) + expected := totalMsgs + if got != expected { + t.Errorf("Expected %v, got: %v", expected, got) + } + + // Wait for a couple of heartbeats to arrive and confirm there is no error. + select { + case <-time.After(1 * time.Second): + case err := <-errCh: + t.Fatal(err) + } +} + func TestJetStream_Drain(t *testing.T) { s := RunBasicJetStreamServer() defer s.Shutdown() @@ -4039,6 +4336,14 @@ func testJetStream_ClusterReconnectDurableQueueSubscriber(t *testing.T, subject srvA.Restart() } }), + nats.ErrorHandler(func(_ *nats.Conn, sub *nats.Subscription, err error) { + t.Logf("WARN: Got error %v", err) + if info, ok := err.(*nats.ErrConsumerSequenceMismatch); ok { + t.Logf("WARN: %+v", info) + } + // Take out this QueueSubscriber from the group. + sub.Drain() + }), ) if err != nil { t.Error(err) @@ -4094,7 +4399,7 @@ func testJetStream_ClusterReconnectDurableQueueSubscriber(t *testing.T, subject } } } - }, nats.Durable(dname), nats.AckWait(5*time.Second), nats.ManualAck()) + }, nats.Durable(dname), nats.AckWait(5*time.Second), nats.ManualAck(), nats.IdleHeartbeat(100*time.Millisecond)) if err != nil && (err != nats.ErrTimeout && err != context.DeadlineExceeded) { t.Error(err) @@ -4146,12 +4451,15 @@ func testJetStream_ClusterReconnectDurableQueueSubscriber(t *testing.T, subject <-ctx.Done() + // Wait a bit to get heartbeats. + time.Sleep(2 * time.Second) + // Drain to allow AckSync response to be received. nc.Drain() got := len(msgs) if got != totalMsgs { - t.Logf("WARN: Expected %v, got: %v", totalMsgs, got) + t.Logf("WARN: Expected %v, got: %v (failed publishes: %v)", totalMsgs, got, failedPubs) } if got < totalMsgs-failedPubs { t.Errorf("Expected %v, got: %v", totalMsgs-failedPubs, got)