Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ external/
node_modules
coverage
.eslintcache
.env
.env
.vscode
21 changes: 15 additions & 6 deletions src/apis/consumer/fetch-v15.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ResponseError } from '../../errors.ts'
import { type GenericError, OutOfBoundsError, ResponseError } from '../../errors.ts'
import { Reader } from '../../protocol/reader.ts'
import { readRecordsBatch, type RecordsBatch } from '../../protocol/records.ts'
import { Writer } from '../../protocol/writer.ts'
Expand Down Expand Up @@ -174,19 +174,28 @@ export function parseResponse (
preferredReadReplica: r.readInt32()
}

let recordsSize = r.readUnsignedVarInt()

if (partition.errorCode !== 0) {
errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode])
}

if (recordsSize > 1) {
recordsSize--
// We need to reduce the size by one to follow the COMPACT_RECORDS specification.
const recordsSize = r.readUnsignedVarInt() - 1

if (recordsSize > 0) {
const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize))
partition.records = []
do {
partition.records.push(readRecordsBatch(recordsBatchesReader))
try {
partition.records.push(readRecordsBatch(recordsBatchesReader))
} catch (err) {
// Contrary to other places in the protocol, records batches CAN BE truncated due to maxBytes argument.
// In that case we just ignore the error.
if ((err as GenericError).code === OutOfBoundsError.code) {
break
}
/* c8 ignore next 3 - Hard to test */
throw err
}
} while (recordsBatchesReader.position < recordsSize)

r.skip(recordsSize)
Expand Down
21 changes: 15 additions & 6 deletions src/apis/consumer/fetch-v16.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ResponseError } from '../../errors.ts'
import { type GenericError, OutOfBoundsError, ResponseError } from '../../errors.ts'
import { Reader } from '../../protocol/reader.ts'
import { readRecordsBatch, type RecordsBatch } from '../../protocol/records.ts'
import { Writer } from '../../protocol/writer.ts'
Expand Down Expand Up @@ -174,19 +174,28 @@ export function parseResponse (
preferredReadReplica: r.readInt32()
}

let recordsSize = r.readUnsignedVarInt()

if (partition.errorCode !== 0) {
errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode])
}

if (recordsSize > 1) {
recordsSize--
// We need to reduce the size by one to follow the COMPACT_RECORDS specification.
const recordsSize = r.readUnsignedVarInt() - 1

if (recordsSize > 0) {
const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize))
partition.records = []
do {
partition.records.push(readRecordsBatch(recordsBatchesReader))
try {
partition.records.push(readRecordsBatch(recordsBatchesReader))
} catch (err) {
Copy link
Contributor

@simone-sanfratello simone-sanfratello Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it works. When the error happens, if the reader position is not moved backwards, the already read records will be lost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, that's not how it works.
The already read records have been added. Only the truncated part is discarded.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So I think the record in between of the truncation will be lost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which is fine because on the next iteration Kafka will not send the rest of the message but rather start from the beginning.

// Contrary to other places in the protocol, records batches CAN BE truncated due to maxBytes argument.
// In that case we just ignore the error.
if ((err as GenericError).code === OutOfBoundsError.code) {
break
}
/* c8 ignore next 3 - Hard to test */
throw err
}
} while (recordsBatchesReader.position < recordsSize)

r.skip(recordsSize)
Expand Down
16 changes: 13 additions & 3 deletions src/apis/consumer/fetch-v17.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ResponseError } from '../../errors.ts'
import { type GenericError, OutOfBoundsError, ResponseError } from '../../errors.ts'
import { Reader } from '../../protocol/reader.ts'
import { readRecordsBatch, type RecordsBatch } from '../../protocol/records.ts'
import { Writer } from '../../protocol/writer.ts'
Expand Down Expand Up @@ -178,14 +178,24 @@ export function parseResponse (
errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode])
}

// We need to reduce the size by one to follow the COMPACT_RECORDS specification
// We need to reduce the size by one to follow the COMPACT_RECORDS specification.
const recordsSize = r.readUnsignedVarInt() - 1

if (recordsSize > 0) {
const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize))
partition.records = []
do {
partition.records.push(readRecordsBatch(recordsBatchesReader))
try {
partition.records.push(readRecordsBatch(recordsBatchesReader))
} catch (err) {
// Contrary to other places in the protocol, records batches CAN BE truncated due to maxBytes argument.
// In that case we just ignore the error.
if ((err as GenericError).code === OutOfBoundsError.code) {
break
}
/* c8 ignore next 3 - Hard to test */
throw err
}
} while (recordsBatchesReader.position < recordsSize)

r.skip(recordsSize)
Expand Down
9 changes: 9 additions & 0 deletions src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export const errorCodes = [
'PLT_KFK_AUTHENTICATION',
'PLT_KFK_MULTIPLE',
'PLT_KFK_NETWORK',
'PLT_KFK_OUT_OF_BOUNDS',
'PLT_KFK_PROTOCOL',
'PLT_KFK_RESPONSE',
'PLT_KFK_TIMEOUT',
Expand Down Expand Up @@ -163,6 +164,14 @@ export class ProtocolError extends GenericError {
}
}

export class OutOfBoundsError extends GenericError {
static code: ErrorCode = 'PLT_KFK_OUT_OF_BOUNDS'

constructor (message: string, properties: ErrorProperties = {}) {
super(OutOfBoundsError.code, message, { isOut: true, ...properties })
}
}

export class ResponseError extends MultipleErrors {
static code: ErrorCode = 'PLT_KFK_RESPONSE'

Expand Down
22 changes: 11 additions & 11 deletions src/protocol/dynamic-buffer.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { UserError } from '../errors.ts'
import { OutOfBoundsError } from '../errors.ts'
import { EMPTY_BUFFER, INT16_SIZE, INT32_SIZE, INT64_SIZE, INT8_SIZE } from './definitions.ts'
import {
BITS_8PLUS_MASK,
Expand Down Expand Up @@ -91,7 +91,7 @@ export class DynamicBuffer {
}

if (start < 0 || start > this.length || end > this.length) {
throw new UserError('Out of bounds.')
throw new OutOfBoundsError('Out of bounds.')
}

if (this.buffers.length === 0) {
Expand Down Expand Up @@ -128,13 +128,13 @@ export class DynamicBuffer {
}

if (start < 0 || start > this.length || end > this.length) {
throw new UserError('Out of bounds.')
throw new OutOfBoundsError('Out of bounds.')
}

if (this.buffers.length === 0) {
return EMPTY_BUFFER
} else if (this.buffers.length === 1) {
return this.buffers[0].slice(start, end)
return this.buffers[0].subarray(start, end)
}

let position = 0
Expand Down Expand Up @@ -176,7 +176,7 @@ export class DynamicBuffer {

consume (offset: number): this {
if (offset < 0 || offset > this.length) {
throw new UserError('Out of bounds.')
throw new OutOfBoundsError('Out of bounds.')
}

if (offset === 0) {
Expand Down Expand Up @@ -210,7 +210,7 @@ export class DynamicBuffer {

get (offset: number): number {
if (offset < 0 || offset >= this.length) {
throw new UserError('Out of bounds.')
throw new OutOfBoundsError('Out of bounds.')
}

const [finalIndex, current] = this.#findInitialBuffer(offset)
Expand All @@ -219,7 +219,7 @@ export class DynamicBuffer {

readUInt8 (offset: number = 0): number {
if (offset < 0 || offset >= this.length) {
throw new UserError('Out of bounds.')
throw new OutOfBoundsError('Out of bounds.')
}

const [finalIndex, current] = this.#findInitialBuffer(offset)
Expand Down Expand Up @@ -265,7 +265,7 @@ export class DynamicBuffer {
let read = 0

if (offset < 0 || offset >= this.length) {
throw new UserError('Out of bounds.')
throw new OutOfBoundsError('Out of bounds.')
}

// Find the initial buffer
Expand Down Expand Up @@ -294,7 +294,7 @@ export class DynamicBuffer {
let read = 0

if (offset < 0 || offset >= this.length) {
throw new UserError('Out of bounds.')
throw new OutOfBoundsError('Out of bounds.')
}

// Find the initial buffer
Expand All @@ -318,7 +318,7 @@ export class DynamicBuffer {

readInt8 (offset: number = 0): number {
if (offset < 0 || offset >= this.length) {
throw new UserError('Out of bounds.')
throw new OutOfBoundsError('Out of bounds.')
}

const [finalIndex, current] = this.#findInitialBuffer(offset)
Expand Down Expand Up @@ -681,7 +681,7 @@ export class DynamicBuffer {

#readMultiple (index: number, length: number) {
if (index < 0 || index + length > this.length) {
throw new UserError('Out of bounds.')
throw new OutOfBoundsError('Out of bounds.')
}

let [startOffset, current] = this.#findInitialBuffer(index)
Expand Down
103 changes: 103 additions & 0 deletions test/apis/consumer/fetch-v15.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -825,3 +825,106 @@ test('parseResponse parses record data', () => {
// Verify value is a Buffer
ok(Buffer.isBuffer(record.value))
})

test('parseResponse handles truncated records', () => {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should test when the truncation happens in the nth recordsBatch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It already does. It fails in the 2nd batch.

Copy link
Contributor

@simone-sanfratello simone-sanfratello Oct 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this test the error. When the truncation happens in partition.recordsBatch we should properly read all the records, so for example:

  • record 1, truncation on record 2, record 3
    we should get all the 3 records

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above, if the record is truncated, we discard and in the next run we get it from the beginning, including the already transmitted truncated part.

// Create a response with records data
// First create a record batch
const timestamp = BigInt(Date.now())
const recordsBatch = Writer.create()
// Record batch structure
.appendInt64(0n) // firstOffset
.appendInt32(60) // length - this would be dynamically computed in real usage
.appendInt32(0) // partitionLeaderEpoch
.appendInt8(2) // magic (record format version)
.appendUnsignedInt32(0) // crc - would be computed properly in real code
.appendInt16(0) // attributes
.appendInt32(0) // lastOffsetDelta
.appendInt64(timestamp) // firstTimestamp
.appendInt64(timestamp) // maxTimestamp
.appendInt64(-1n) // producerId - not specified
.appendInt16(0) // producerEpoch
.appendInt32(0) // firstSequence
.appendInt32(1) // number of records
// Single record
.appendVarInt(8) // length of the record
.appendInt8(0) // attributes
.appendVarInt64(0n) // timestampDelta
.appendVarInt(0) // offsetDelta
.appendVarIntBytes(null) // key
.appendVarIntBytes(Buffer.from('test-value')) // value
.appendVarIntArray([], () => {}) // No headers
// Truncated batch
.appendInt64(0n) // firstOffset
.appendInt32(60) // length

// Now create the full response
const writer = Writer.create()
.appendInt32(0) // throttleTimeMs
.appendInt16(0) // errorCode (success)
.appendInt32(123) // sessionId
// Responses array - using tagged fields format
.appendArray(
[
{
topicId: '12345678-1234-1234-1234-123456789abc',
partitions: [
{
partitionIndex: 0,
errorCode: 0,
highWatermark: 100n,
lastStableOffset: 100n,
logStartOffset: 0n,
abortedTransactions: [],
preferredReadReplica: -1,
recordsBatch
}
]
}
],
(w, topic) => {
w.appendUUID(topic.topicId)
// Partitions array
.appendArray(topic.partitions, (w, partition) => {
w.appendInt32(partition.partitionIndex)
.appendInt16(partition.errorCode)
.appendInt64(partition.highWatermark)
.appendInt64(partition.lastStableOffset)
.appendInt64(partition.logStartOffset)
// Aborted transactions array (empty)
.appendArray(partition.abortedTransactions, () => {})
.appendInt32(partition.preferredReadReplica)

// Add records batch
.appendUnsignedVarInt(partition.recordsBatch.length + 1)
.appendFrom(partition.recordsBatch)
})
}
)
.appendInt8(0) // Root tagged fields

const response = parseResponse(1, 1, 17, Reader.from(writer))

// Verify the records were parsed correctly
ok(response.responses[0].partitions[0].records, 'Records should be defined')

const batch = response.responses[0].partitions[0].records[0]!
const record = batch.records[0]

deepStrictEqual(
{
firstOffset: batch.firstOffset,
recordsLength: batch.records.length,
offsetDelta: record.offsetDelta,
valueString: record.value.toString()
},
{
firstOffset: 0n,
recordsLength: 1,
offsetDelta: 0,
valueString: 'test-value'
}
)

// Verify value is a Buffer
ok(Buffer.isBuffer(record.value))
})
Loading