Skip to content

Conversation

ShogunPanda
Copy link
Contributor

Based of the work of @simone-sanfratello in #124.
Fixes #120.

Signed-off-by: Paolo Insogna <paolo@cowtech.it>
partition.records.push(readRecordsBatch(recordsBatchesReader))
try {
partition.records.push(readRecordsBatch(recordsBatchesReader))
} catch (err) {
Copy link
Contributor

@simone-sanfratello simone-sanfratello Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it works. When the error happens, if the reader position is not moved backwards, the already read records will be lost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, that's not how it works.
The already read records have been added. Only the truncated part is discarded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think the record in between of the truncation will be lost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is fine because on the next iteration Kafka will not send the rest of the message but rather start from the beginning.

ok(Buffer.isBuffer(record.value))
})

test('parseResponse handles truncated records', () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test when the truncation happens in the nth recordsBatch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already does. It fails in the 2nd batch.

Copy link
Contributor

@simone-sanfratello simone-sanfratello Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test the error. When the truncation happens in partition.recordsBatch we should properly read all the records, so for example:

  • record 1, truncation on record 2, record 3
    we should get all the 3 records

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above, if the record is truncated, we discard and in the next run we get it from the beginning, including the already transmitted truncated part.

Copy link
Member

@mcollina mcollina left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@simone-sanfratello simone-sanfratello self-requested a review October 2, 2025 12:24
@ShogunPanda ShogunPanda merged commit e15b2ed into main Oct 2, 2025
62 of 72 checks passed
@ShogunPanda ShogunPanda deleted the handle-truncation branch October 2, 2025 12:27
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

UserError: Out of bounds when consuming messages
3 participants