From 34be18087517ba515604288c5b5e400e0104d970 Mon Sep 17 00:00:00 2001 From: Tomasz Pietrek Date: Thu, 26 Jan 2023 22:10:16 +0100 Subject: [PATCH] Add batch completed status to Pull Consumers Signed-off-by: Tomasz Pietrek --- server/consumer.go | 9 +++++ server/jetstream_test.go | 71 ++++++++++++++++++++++++++++++++++++++-- 2 files changed, 78 insertions(+), 2 deletions(-) diff --git a/server/consumer.go b/server/consumer.go index 66963c1f9e..dd3ee7adc1 100644 --- a/server/consumer.go +++ b/server/consumer.go @@ -38,6 +38,9 @@ const ( JSPullRequestPendingBytes = "Nats-Pending-Bytes" ) +// Headers sent when batch size was completed, but there were remaining bytes. +const JsPullRequestRemainingBytesT = "NATS/1.0 409 Batch Completed\r\n%s: %d\r\n%s: %d\r\n\r\n" + type ConsumerInfo struct { Stream string `json:"stream_name"` Name string `json:"name"` @@ -3224,6 +3227,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { ackReply string delay time.Duration sz int + wrn, wrb int ) o.mu.Lock() // consumer is closed when mset is set to nil. @@ -3277,6 +3281,7 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { if o.isPushMode() { dsubj = o.dsubj } else if wr := o.nextWaiting(sz); wr != nil { + wrn, wrb = wr.n, wr.b dsubj = wr.reply if done := wr.recycleIfDone(); done && o.node != nil { o.removeClusterPendingRequest(dsubj) @@ -3330,6 +3335,10 @@ func (o *consumer) loopAndGatherMsgs(qch chan struct{}) { // Do actual delivery. o.deliverMsg(dsubj, ackReply, pmsg, dc, rp) + // If given request fulfilled batch size, but there are still pending bytes, send information about it. + if wrn <= 0 && wrb > 0 { + o.outq.send(newJSPubMsg(dsubj, _EMPTY_, _EMPTY_, []byte(fmt.Sprintf(JsPullRequestRemainingBytesT, JSPullRequestPendingMsgs, wrn, JSPullRequestPendingBytes, wrb)), nil, nil, 0)) + } // Reset our idle heartbeat timer if set. if hb != nil { hb.Reset(hbd) diff --git a/server/jetstream_test.go b/server/jetstream_test.go index 844a944f2c..838e45be36 100644 --- a/server/jetstream_test.go +++ b/server/jetstream_test.go @@ -16808,25 +16808,39 @@ func TestJetStreamPullMaxBytes(t *testing.T) { req = &JSApiConsumerGetNextRequest{Batch: 1, MaxBytes: 10_000_000, NoWait: true} jreq, _ = json.Marshal(req) nc.PublishRequest(subj, reply, jreq) - checkSubsPending(t, sub, 1) + // we expect two messages, as the second one should be `Batch Completed` status. + checkSubsPending(t, sub, 2) + // first one is message from the stream. m, err = sub.NextMsg(time.Second) require_NoError(t, err) require_True(t, len(m.Data) == dsz) require_True(t, len(m.Header) == 0) + // second one is the status. + m, err = sub.NextMsg(time.Second) + require_NoError(t, err) + if v := m.Header.Get("Description"); v != "Batch Completed" { + t.Fatalf("Expected Batch Completed, got: %s", v) + } checkSubsPending(t, sub, 0) // Same but with batch > 1 req = &JSApiConsumerGetNextRequest{Batch: 5, MaxBytes: 10_000_000, NoWait: true} jreq, _ = json.Marshal(req) nc.PublishRequest(subj, reply, jreq) - checkSubsPending(t, sub, 5) + // 6, not 5, as 6th is the status. + checkSubsPending(t, sub, 6) for i := 0; i < 5; i++ { m, err = sub.NextMsg(time.Second) require_NoError(t, err) require_True(t, len(m.Data) == dsz) require_True(t, len(m.Header) == 0) } + m, err = sub.NextMsg(time.Second) + require_NoError(t, err) + if v := m.Header.Get("Description"); v != "Batch Completed" { + t.Fatalf("Expected Batch Completed, got: %s", v) + } checkSubsPending(t, sub, 0) // Now ask for large batch but make sure we are limited by batch size. @@ -19333,3 +19347,56 @@ func TestJetStreamMsgBlkFailOnKernelFault(t *testing.T) { friendlyBytes(si.Config.MaxBytes), friendlyBytes(int64(si.State.Bytes))) } } + +func TestJetStreamPullConsumerBatchCompleted(t *testing.T) { + + s := RunBasicJetStreamServer(t) + defer s.Shutdown() + + nc, js := jsClientConnect(t, s) + defer nc.Close() + + _, err := js.AddStream(&nats.StreamConfig{ + Name: "TEST", + Subjects: []string{"foo"}, + }) + require_NoError(t, err) + + msgSize := 128 + msg := make([]byte, msgSize) + rand.Read(msg) + + for i := 0; i < 10; i++ { + _, err := js.Publish("foo", msg) + require_NoError(t, err) + } + + _, err = js.AddConsumer("TEST", &nats.ConsumerConfig{ + Durable: "dur", + AckPolicy: nats.AckExplicitPolicy, + }) + require_NoError(t, err) + + req := JSApiConsumerGetNextRequest{Batch: 0, MaxBytes: 1024, Expires: 250 * time.Millisecond} + + reqb, _ := json.Marshal(req) + sub := natsSubSync(t, nc, nats.NewInbox()) + err = nc.PublishRequest("$JS.API.CONSUMER.MSG.NEXT.TEST.dur", sub.Subject, reqb) + require_NoError(t, err) + + // Expect first message to arrive normally. + _, err = sub.NextMsg(time.Second * 1) + require_NoError(t, err) + + // Second message should be info that batch is complete, but there were pending bytes. + pullMsg, err := sub.NextMsg(time.Second * 1) + require_NoError(t, err) + + if v := pullMsg.Header.Get("Status"); v != "409" { + t.Fatalf("Expected 409, got: %s", v) + } + if v := pullMsg.Header.Get("Description"); v != "Batch Completed" { + t.Fatalf("Expected Batch Completed, got: %s", v) + } + +}