Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Message accounting does not include size of headers. #1050

Closed
kung-foo opened this issue Dec 9, 2022 · 1 comment · Fixed by #1132
Closed

Message accounting does not include size of headers. #1050

kung-foo opened this issue Dec 9, 2022 · 1 comment · Fixed by #1132
Assignees
Labels

Comments

@kung-foo
Copy link

kung-foo commented Dec 9, 2022

Describe the bug

Message accounting does not include size of headers. See:

https://github.com/segmentio/kafka-go/blob/v0.4.38/message.go#L43-L45

When messages get close to the bytes limit, they may end up going over due to headers. This results in a tcp rst from Azure Event Hub.

Kafka Version

Azure Event Hubs

To Reproduce

Send a batch messages with about 1MB of payload. Add headers. Boom.

Expected Behavior

Success.

Observed Behavior

Additional Context

@dominicbarnes
Copy link
Contributor

This is interesting! At first, I thought simply we didn't factor in the header size at all, but that didn't seem likely as I'm sure we would have seen some sort of network errors related to this kind of mismatch. Once I dived into the code, I found the header size is calculated at the time the message is being written, which comes after the batch has been determined:

kafka-go/write.go

Lines 558 to 562 in b737754

wb.writeVarArray(len(msg.Headers), func(i int) {
h := &msg.Headers[i]
wb.writeVarString(h.Key)
wb.writeVarBytes(h.Value)
})

kafka-go/recordbatch.go

Lines 104 to 107 in b737754

varArrayLen(len(msg.Headers), func(i int) int {
h := &msg.Headers[i]
return varStringLen(h.Key) + varBytesLen(h.Value)
})

As you suggested, it seems the solution here is to ensure that headers are included when computing batches upstream of this code here.

I'll do my best to open a PR for this, but we will also gladly review one if you decide to submit. :)

mostafa added a commit to mostafa/xk6-kafka that referenced this issue Sep 20, 2023
* Update Go, k6 and other dependencies
* Update actions
* Increase golangci-lint timeout
* Clean up after the test
* Update tests to compensate for the size of the header
Issue: segmentio/kafka-go#1050
PR: segmentio/kafka-go#1132
Release: https://github.com/segmentio/kafka-go/releases/tag/v0.4.41
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants