From 16b20a357ade0c0bae9edb0461ee01340025eb7f Mon Sep 17 00:00:00 2001 From: ashwinidulams <61725106+ashwinidulams@users.noreply.github.com> Date: Sat, 11 Mar 2023 09:42:41 +0530 Subject: [PATCH] fix: ack the dropped messages as well (#603) Signed-off-by: ashwinidulams --- pkg/reduce/readloop/readloop.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/pkg/reduce/readloop/readloop.go b/pkg/reduce/readloop/readloop.go index 893c6eef9..b1c1d40b3 100644 --- a/pkg/reduce/readloop/readloop.go +++ b/pkg/reduce/readloop/readloop.go @@ -181,6 +181,9 @@ messagesLoop: metrics.LabelPipeline: rl.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(rl.vertexReplica)), LabelReason: "late"}).Inc() + + // mark it as a successfully written message as the message will be acked to avoid subsequent retries + writtenMessages = append(writtenMessages, message) continue }