diff --git a/pkg/reduce/readloop/readloop.go b/pkg/reduce/readloop/readloop.go index 893c6eef9..b1c1d40b3 100644 --- a/pkg/reduce/readloop/readloop.go +++ b/pkg/reduce/readloop/readloop.go @@ -181,6 +181,9 @@ messagesLoop: metrics.LabelPipeline: rl.pipelineName, metrics.LabelVertexReplicaIndex: strconv.Itoa(int(rl.vertexReplica)), LabelReason: "late"}).Inc() + + // mark it as a successfully written message as the message will be acked to avoid subsequent retries + writtenMessages = append(writtenMessages, message) continue }