FEAT: Add max_records to kafka task#67
Conversation
Stop the kafka reader after a fixed number of records have been forwarded downstream. Independent from end_after (wall-clock) and retry_limit (idle-based). In group mode, offsets up to the last forwarded record are committed on shutdown via deferred Close(). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
|
Copilot resolve the merge conflicts in this pull request |
There was a problem hiding this comment.
Pull request overview
Adds a max_records read-mode limit to the Kafka pipeline task so the consumer can stop cleanly after forwarding a fixed number of messages, independent of existing end_after (wall-clock) and retry_limit (idle/error-based) stop conditions.
Changes:
- Introduces
max_recordsconfiguration and enforces the stop condition in the Kafka consumer read loop. - Updates Kafka task README with the new field, examples, and behavior notes.
- Adds a new test pipeline YAML demonstrating
max_records+end_after.
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 1 comment.
| File | Description |
|---|---|
| test/pipelines/kafka_read_max_records.yaml | Adds an example pipeline that stops after forwarding 10 Kafka records (with end_after safety net). |
| internal/pkg/pipeline/task/kafka/README.md | Documents max_records, including usage example and interaction with end_after/retry_limit. |
| internal/pkg/pipeline/task/kafka/kafka.go | Implements max_records counter and early exit after N forwarded records. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Adds `validate:"omitempty,gte=0"` so a negative max_records is rejected at config-load time rather than silently behaving as unlimited (which contradicts the documented "0 = unlimited" semantics). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
This is problematic, the kafka consumer would fetch records in batch, but would not forward all of them downstream. This will lead to dropped records.
Add flush of all records here as well as in end_after part. So that we ensure at_least_once delivery.
Yash Shrivastava (alephys26)
left a comment
There was a problem hiding this comment.
Correction: This is valid for the kafka task. The ctx cancellation happens before a new read and the commit happens before the count increment. Both cases are valid.
The only pain point now is the failure of record processing in any downstream task. If it fails there we will drop the messages, since the commits are already there on kafka.
It's a known issue, same as the SQS. |
Summary
max_recordsfield to the kafka read task that stops the reader after N records have been forwarded downstream.end_after(wall-clock) andretry_limit(idle-based); the three can be combined.c.Close().test/pipelines/kafka_read_max_records.yamladded.Test plan
test/pipelines/kafka_read_max_records.yamlagainst a populated topic and verify exactly 10 records are emitted before the reader stops.end_after/retry_limitwhenmax_recordscannot be reached.go build ./...is clean.🤖 Generated with Claude Code