Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add timestamp field to the kafka and kafka_franz outputs. #2665

Merged
merged 1 commit into from
Jun 19, 2024

Conversation

mihaitodor
Copy link
Collaborator

Fixes #398.

Note: In #398, the broker CreateTime mode is mentioned and I believe this is the mode that's being used by both the kafka and kafka_franz output. See the comment from Sarama here: https://github.com/IBM/sarama/blob/d2246cc9b9df113f3ffed7c442fd81e8b9cf736e/async_producer.go#L181-L192. Also, see this issue in franz-go: twmb/franz-go#166.

Tag: "latest",
Hostname: "redpanda",
ExposedPorts: []string{"9092"},
PortBindings: map[docker.Port][]docker.PortBinding{
"9092/tcp": {{HostIP: "", HostPort: kafkaPortStr}},
},
Cmd: []string{
"redpanda", "start", "--smp 1", "--overprovisioned", "",
Copy link
Collaborator Author

@mihaitodor mihaitodor Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This doesn't seem to work anymore, at least not via Colima on an M3. I also ran all the Kafka integration tests after these changes on amd64 with Linux and standard Docker and they passed successfully.

"start",
"--node-id 0",
"--mode dev-container",
"--set rpk.additional_start_flags=[--reactor-backend=epoll]",
Copy link
Collaborator Author

@mihaitodor mihaitodor Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was super-annoying, since it sometimes starts successfully if I don't set this option, but not always. Details here: redpanda-data/redpanda#17104

"redpanda",
"start",
"--node-id 0",
"--mode dev-container",
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs here: https://docs.redpanda.com/current/reference/rpk/rpk-redpanda/rpk-redpanda-mode/#development-mode Among other things, it sets --overprovisioned to true.

> redpanda start --mode help
...
Mode uses well-known configuration properties for development or tests
environments:
--mode dev-container
    Bundled flags:
        * --overprovisioned
        * --reserve-memory 0M
        * --check=false
        * --unsafe-bypass-fsync
    Bundled cluster properties:
        * auto_create_topics_enabled: true
        * group_topic_partitions: 3
        * storage_min_free_bytes: 10485760 (10MiB)
        * topic_partitions_per_shard: 1000
        * fetch_reads_debounce_timeout: 10
        * group_initial_rebalance_delay: 0
        * log_segment_size_min: 1
        * write_caching_default: true

Fixes #398.

Note: In #398, the broker `CreateTime` mode is mentioned and I
believe this is the mode that's being used by both the `kafka`
and `kafka_franz` output. See the comment from Sarama here: https://github.com/IBM/sarama/blob/d2246cc9b9df113f3ffed7c442fd81e8b9cf736e/async_producer.go#L181-L192.
Also, see this issue in franz-go: twmb/franz-go#166.

Signed-off-by: Mihai Todor <todormihai@gmail.com>
@mihaitodor mihaitodor force-pushed the mihaitodor-add-timestamp-field-kafka branch from ee59587 to 637f64c Compare June 18, 2024 18:47
@@ -356,3 +364,76 @@ input:
integration.StreamTestOptVarSet("VAR1", ""),
)
}

func TestIntegrationKafkaOutputFixedTimestamp(t *testing.T) {
Copy link
Collaborator Author

@mihaitodor mihaitodor Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice to maybe avoid adding extra tests. I was thinking to introduce a new StreamTestReadsMetadata() which writes something to the output and then launches the input similar to StreamTestOpenCloseIsolated() and then checks if the metadata of the message read from the input contains the metadata keys and values which were passed via a (variadic) parameter.

consumer_group: "blobfish"
processors:
- mapping: |
root = if metadata("kafka_timestamp_unix") != 666 { "error: invalid timestamp" }
Copy link
Collaborator Author

@mihaitodor mihaitodor Jun 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bit messy... Would you be open to updating messageMatch() here to also check that for the received message.Part ErrorGet() returns a nil error? I'd like to use throw("invalid timestamp") instead.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think that'd be fine, it's a niche that any existing tests would break as a result and it's not a big deal to have minor behavioural changes in the testing suite.

Copy link
Collaborator

@Jeffail Jeffail left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great, thanks @mihaitodor, and extra ❤️ for fixing up the integration tests.

@Jeffail Jeffail merged commit eeafe01 into main Jun 19, 2024
3 checks passed
@Jeffail Jeffail deleted the mihaitodor-add-timestamp-field-kafka branch June 19, 2024 08:01
@Play-AV
Copy link

Play-AV commented Jul 13, 2024

Thanks for this <3

For redis streams > kafka
meta timestamp = (this.redis_stream.split("-").index(0).int64()/1000).ts_unix()
seemed to work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Kafka Output Timestamp
3 participants