diff --git a/src/apis/consumer/fetch-v15.ts b/src/apis/consumer/fetch-v15.ts index 69df226..7fe9437 100644 --- a/src/apis/consumer/fetch-v15.ts +++ b/src/apis/consumer/fetch-v15.ts @@ -186,7 +186,12 @@ export function parseResponse ( const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize)) partition.records = [] do { - partition.records.push(readRecordsBatch(recordsBatchesReader)) + const batch = readRecordsBatch(recordsBatchesReader) + if (batch) { + partition.records.push(batch) + } else { + break + } } while (recordsBatchesReader.position < recordsSize) r.skip(recordsSize) diff --git a/src/apis/consumer/fetch-v16.ts b/src/apis/consumer/fetch-v16.ts index 22bc617..4de2a17 100644 --- a/src/apis/consumer/fetch-v16.ts +++ b/src/apis/consumer/fetch-v16.ts @@ -186,7 +186,12 @@ export function parseResponse ( const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize)) partition.records = [] do { - partition.records.push(readRecordsBatch(recordsBatchesReader)) + const batch = readRecordsBatch(recordsBatchesReader) + if (batch) { + partition.records.push(batch) + } else { + break + } } while (recordsBatchesReader.position < recordsSize) r.skip(recordsSize) diff --git a/src/apis/consumer/fetch-v17.ts b/src/apis/consumer/fetch-v17.ts index 2d7f7bc..d8c8e42 100644 --- a/src/apis/consumer/fetch-v17.ts +++ b/src/apis/consumer/fetch-v17.ts @@ -186,7 +186,12 @@ export function parseResponse ( const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize)) partition.records = [] do { - partition.records.push(readRecordsBatch(recordsBatchesReader)) + const batch = readRecordsBatch(recordsBatchesReader) + if (batch) { + partition.records.push(batch) + } else { + break + } } while (recordsBatchesReader.position < recordsSize) r.skip(recordsSize) diff --git a/src/protocol/reader.ts b/src/protocol/reader.ts index 22e9e27..a2ec6c5 100644 --- a/src/protocol/reader.ts +++ b/src/protocol/reader.ts @@ -409,4 +409,8 @@ export class Reader { this.skip(length) } } + + canRead (length: number): boolean { + return this.buffer.length - this.position >= length + } } diff --git a/src/protocol/records.ts b/src/protocol/records.ts index 1515a0e..73b2417 100644 --- a/src/protocol/records.ts +++ b/src/protocol/records.ts @@ -12,6 +12,20 @@ import { DynamicBuffer } from './dynamic-buffer.ts' import { Reader } from './reader.ts' import { Writer } from './writer.ts' +const BATCH_HEADER_SIZE = + /* FirstOffset */ 8 + + /* Length */ 4 + + /* PartitionLeaderEpoch */ 4 + + /* Magic */ 1 + + /* CRC */ 4 + + /* Attributes */ 2 + + /* LastOffsetDelta */ 4 + + /* FirstTimestamp */ 8 + + /* MaxTimestamp */ 8 + + /* ProducerId */ 8 + + /* ProducerEpoch */ 2 + + /* FirstSequence */ 4 + + /* RecordsLength */ 4 const CURRENT_RECORD_VERSION = 2 const IS_TRANSACTIONAL = 0b10000 // Bit 4 set const IS_COMPRESSED = 0b111 // Bits 0, 1 and/or 2 set @@ -247,10 +261,23 @@ export function createRecordsBatch ( ) } -export function readRecordsBatch (reader: Reader): RecordsBatch { +export function readRecordsBatch (reader: Reader): RecordsBatch | undefined { + // Check if we have enough data to read the batch header + if (!reader.canRead(BATCH_HEADER_SIZE)) { + return + } + + const firstOffset = reader.readInt64() + const length = reader.readInt32() + + // Check if we have enough data to read the complete batch, including the rest of the header + if (!reader.canRead(length)) { + return + } + const batch = { - firstOffset: reader.readInt64(), - length: reader.readInt32(), + firstOffset, + length, partitionLeaderEpoch: reader.readInt32(), magic: reader.readInt8(), crc: reader.readUnsignedInt32(), diff --git a/test/protocol/records.test.ts b/test/protocol/records.test.ts index 84a9318..6a1e3fe 100644 --- a/test/protocol/records.test.ts +++ b/test/protocol/records.test.ts @@ -389,7 +389,7 @@ test('readRecordsBatch should throw on unsupported compression bitmask', () => { // Create a mock batch with an invalid compression bitmask const writer = Writer.create() .appendInt64(0n) // firstOffset - .appendInt32(100) // length + .appendInt32(49) // length .appendInt32(0) // partitionLeaderEpoch .appendInt8(2) // magic .appendUnsignedInt32(0) // crc