Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion src/apis/consumer/fetch-v15.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ export function parseResponse (
const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize))
partition.records = []
do {
partition.records.push(readRecordsBatch(recordsBatchesReader))
const batch = readRecordsBatch(recordsBatchesReader)
if (batch) {
partition.records.push(batch)
} else {
break
}
} while (recordsBatchesReader.position < recordsSize)

r.skip(recordsSize)
Expand Down
7 changes: 6 additions & 1 deletion src/apis/consumer/fetch-v16.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ export function parseResponse (
const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize))
partition.records = []
do {
partition.records.push(readRecordsBatch(recordsBatchesReader))
const batch = readRecordsBatch(recordsBatchesReader)
if (batch) {
partition.records.push(batch)
} else {
break
}
} while (recordsBatchesReader.position < recordsSize)

r.skip(recordsSize)
Expand Down
7 changes: 6 additions & 1 deletion src/apis/consumer/fetch-v17.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,12 @@ export function parseResponse (
const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize))
partition.records = []
do {
partition.records.push(readRecordsBatch(recordsBatchesReader))
const batch = readRecordsBatch(recordsBatchesReader)
if (batch) {
partition.records.push(batch)
} else {
break
}
} while (recordsBatchesReader.position < recordsSize)

r.skip(recordsSize)
Expand Down
4 changes: 4 additions & 0 deletions src/protocol/reader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -409,4 +409,8 @@ export class Reader {
this.skip(length)
}
}

canRead (length: number): boolean {
return this.buffer.length - this.position >= length
}
}
33 changes: 30 additions & 3 deletions src/protocol/records.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,20 @@ import { DynamicBuffer } from './dynamic-buffer.ts'
import { Reader } from './reader.ts'
import { Writer } from './writer.ts'

const BATCH_HEADER_SIZE =
/* FirstOffset */ 8 +
/* Length */ 4 +
/* PartitionLeaderEpoch */ 4 +
/* Magic */ 1 +
/* CRC */ 4 +
/* Attributes */ 2 +
/* LastOffsetDelta */ 4 +
/* FirstTimestamp */ 8 +
/* MaxTimestamp */ 8 +
/* ProducerId */ 8 +
/* ProducerEpoch */ 2 +
/* FirstSequence */ 4 +
/* RecordsLength */ 4
const CURRENT_RECORD_VERSION = 2
const IS_TRANSACTIONAL = 0b10000 // Bit 4 set
const IS_COMPRESSED = 0b111 // Bits 0, 1 and/or 2 set
Expand Down Expand Up @@ -247,10 +261,23 @@ export function createRecordsBatch (
)
}

export function readRecordsBatch (reader: Reader): RecordsBatch {
export function readRecordsBatch (reader: Reader): RecordsBatch | undefined {
// Check if we have enough data to read the batch header
if (!reader.canRead(BATCH_HEADER_SIZE)) {
return
}

const firstOffset = reader.readInt64()
const length = reader.readInt32()

// Check if we have enough data to read the complete batch, including the rest of the header
if (!reader.canRead(length)) {
return
}

const batch = {
firstOffset: reader.readInt64(),
length: reader.readInt32(),
firstOffset,
length,
partitionLeaderEpoch: reader.readInt32(),
magic: reader.readInt8(),
crc: reader.readUnsignedInt32(),
Expand Down
2 changes: 1 addition & 1 deletion test/protocol/records.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -229,8 +229,8 @@
const reader = Reader.from(batchBuffer)
const batch = readRecordsBatch(reader)

strictEqual(batch.records.length, 1, 'Should have one record')

Check failure on line 232 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.8.0)

'batch' is possibly 'undefined'.

Check failure on line 232 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.5.0)

'batch' is possibly 'undefined'.

Check failure on line 232 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.7.0)

'batch' is possibly 'undefined'.

Check failure on line 232 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.9.0)

'batch' is possibly 'undefined'.
strictEqual(batch.records[0].value.toString(), 'value', 'Record value should match')

Check failure on line 233 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.8.0)

'batch' is possibly 'undefined'.

Check failure on line 233 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.5.0)

'batch' is possibly 'undefined'.

Check failure on line 233 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.7.0)

'batch' is possibly 'undefined'.

Check failure on line 233 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.9.0)

'batch' is possibly 'undefined'.
})

test('createRecordsBatch with compression should compress the records', () => {
Expand All @@ -257,10 +257,10 @@
const reader = Reader.from(batchBuffer)
const batch = readRecordsBatch(reader)

strictEqual(batch.attributes & 0b111, 1, 'Attributes should have gzip compression bit set')

Check failure on line 260 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.8.0)

'batch' is possibly 'undefined'.

Check failure on line 260 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.5.0)

'batch' is possibly 'undefined'.

Check failure on line 260 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.7.0)

'batch' is possibly 'undefined'.

Check failure on line 260 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.9.0)

'batch' is possibly 'undefined'.
strictEqual(batch.records.length, 2, 'Should have two records after decompression')

Check failure on line 261 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.8.0)

'batch' is possibly 'undefined'.

Check failure on line 261 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.5.0)

'batch' is possibly 'undefined'.

Check failure on line 261 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.7.0)

'batch' is possibly 'undefined'.

Check failure on line 261 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.9.0)

'batch' is possibly 'undefined'.
strictEqual(batch.records[0].value.toString(), 'value1'.repeat(100), 'First record value should match')

Check failure on line 262 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.8.0)

'batch' is possibly 'undefined'.

Check failure on line 262 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.5.0)

'batch' is possibly 'undefined'.

Check failure on line 262 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.7.0)

'batch' is possibly 'undefined'.

Check failure on line 262 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.9.0)

'batch' is possibly 'undefined'.
strictEqual(batch.records[1].value.toString(), 'value2'.repeat(100), 'Second record value should match')

Check failure on line 263 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.8.0)

'batch' is possibly 'undefined'.

Check failure on line 263 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.5.0)

'batch' is possibly 'undefined'.

Check failure on line 263 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.7.0)

'batch' is possibly 'undefined'.

Check failure on line 263 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.9.0)

'batch' is possibly 'undefined'.
})

test('createRecordsBatch should throw on unsupported compression', () => {
Expand Down Expand Up @@ -379,17 +379,17 @@
const reader = Reader.from(batchBuffer)
const batch = readRecordsBatch(reader)

strictEqual(batch.attributes & 0b111, 1, 'Attributes should have gzip compression bit set')

Check failure on line 382 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.8.0)

'batch' is possibly 'undefined'.

Check failure on line 382 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.5.0)

'batch' is possibly 'undefined'.

Check failure on line 382 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.7.0)

'batch' is possibly 'undefined'.

Check failure on line 382 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.9.0)

'batch' is possibly 'undefined'.
strictEqual(batch.records.length, 2, 'Should have two records')

Check failure on line 383 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.8.0)

'batch' is possibly 'undefined'.

Check failure on line 383 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.5.0)

'batch' is possibly 'undefined'.

Check failure on line 383 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.7.0)

'batch' is possibly 'undefined'.

Check failure on line 383 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.9.0)

'batch' is possibly 'undefined'.
strictEqual(batch.records[0].value.toString(), 'compressed1', 'First record value should match')

Check failure on line 384 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.8.0)

'batch' is possibly 'undefined'.

Check failure on line 384 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.5.0)

'batch' is possibly 'undefined'.

Check failure on line 384 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.7.0)

'batch' is possibly 'undefined'.

Check failure on line 384 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.9.0)

'batch' is possibly 'undefined'.
strictEqual(batch.records[1].value.toString(), 'compressed2', 'Second record value should match')

Check failure on line 385 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.8.0)

'batch' is possibly 'undefined'.

Check failure on line 385 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.5.0)

'batch' is possibly 'undefined'.

Check failure on line 385 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.7.0)

'batch' is possibly 'undefined'.

Check failure on line 385 in test/protocol/records.test.ts

View workflow job for this annotation

GitHub Actions / ci (20, ubuntu-latest, 3.9.0)

'batch' is possibly 'undefined'.
})

test('readRecordsBatch should throw on unsupported compression bitmask', () => {
// Create a mock batch with an invalid compression bitmask
const writer = Writer.create()
.appendInt64(0n) // firstOffset
.appendInt32(100) // length
.appendInt32(49) // length
.appendInt32(0) // partitionLeaderEpoch
.appendInt8(2) // magic
.appendUnsignedInt32(0) // crc
Expand Down
Loading