diff --git a/.gitignore b/.gitignore index 221750f..611f2f6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,4 +3,5 @@ external/ node_modules coverage .eslintcache -.env \ No newline at end of file +.env +.vscode diff --git a/src/apis/consumer/fetch-v15.ts b/src/apis/consumer/fetch-v15.ts index 69df226..90c8634 100644 --- a/src/apis/consumer/fetch-v15.ts +++ b/src/apis/consumer/fetch-v15.ts @@ -1,4 +1,4 @@ -import { ResponseError } from '../../errors.ts' +import { type GenericError, OutOfBoundsError, ResponseError } from '../../errors.ts' import { Reader } from '../../protocol/reader.ts' import { readRecordsBatch, type RecordsBatch } from '../../protocol/records.ts' import { Writer } from '../../protocol/writer.ts' @@ -174,19 +174,28 @@ export function parseResponse ( preferredReadReplica: r.readInt32() } - let recordsSize = r.readUnsignedVarInt() - if (partition.errorCode !== 0) { errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode]) } - if (recordsSize > 1) { - recordsSize-- + // We need to reduce the size by one to follow the COMPACT_RECORDS specification. + const recordsSize = r.readUnsignedVarInt() - 1 + if (recordsSize > 0) { const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize)) partition.records = [] do { - partition.records.push(readRecordsBatch(recordsBatchesReader)) + try { + partition.records.push(readRecordsBatch(recordsBatchesReader)) + } catch (err) { + // Contrary to other places in the protocol, records batches CAN BE truncated due to maxBytes argument. + // In that case we just ignore the error. + if ((err as GenericError).code === OutOfBoundsError.code) { + break + } + /* c8 ignore next 3 - Hard to test */ + throw err + } } while (recordsBatchesReader.position < recordsSize) r.skip(recordsSize) diff --git a/src/apis/consumer/fetch-v16.ts b/src/apis/consumer/fetch-v16.ts index 22bc617..9f505b7 100644 --- a/src/apis/consumer/fetch-v16.ts +++ b/src/apis/consumer/fetch-v16.ts @@ -1,4 +1,4 @@ -import { ResponseError } from '../../errors.ts' +import { type GenericError, OutOfBoundsError, ResponseError } from '../../errors.ts' import { Reader } from '../../protocol/reader.ts' import { readRecordsBatch, type RecordsBatch } from '../../protocol/records.ts' import { Writer } from '../../protocol/writer.ts' @@ -174,19 +174,28 @@ export function parseResponse ( preferredReadReplica: r.readInt32() } - let recordsSize = r.readUnsignedVarInt() - if (partition.errorCode !== 0) { errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode]) } - if (recordsSize > 1) { - recordsSize-- + // We need to reduce the size by one to follow the COMPACT_RECORDS specification. + const recordsSize = r.readUnsignedVarInt() - 1 + if (recordsSize > 0) { const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize)) partition.records = [] do { - partition.records.push(readRecordsBatch(recordsBatchesReader)) + try { + partition.records.push(readRecordsBatch(recordsBatchesReader)) + } catch (err) { + // Contrary to other places in the protocol, records batches CAN BE truncated due to maxBytes argument. + // In that case we just ignore the error. + if ((err as GenericError).code === OutOfBoundsError.code) { + break + } + /* c8 ignore next 3 - Hard to test */ + throw err + } } while (recordsBatchesReader.position < recordsSize) r.skip(recordsSize) diff --git a/src/apis/consumer/fetch-v17.ts b/src/apis/consumer/fetch-v17.ts index 08f8d7f..e4c9e8a 100644 --- a/src/apis/consumer/fetch-v17.ts +++ b/src/apis/consumer/fetch-v17.ts @@ -1,4 +1,4 @@ -import { ResponseError } from '../../errors.ts' +import { type GenericError, OutOfBoundsError, ResponseError } from '../../errors.ts' import { Reader } from '../../protocol/reader.ts' import { readRecordsBatch, type RecordsBatch } from '../../protocol/records.ts' import { Writer } from '../../protocol/writer.ts' @@ -178,14 +178,24 @@ export function parseResponse ( errors.push([`/responses/${i}/partitions/${j}`, partition.errorCode]) } - // We need to reduce the size by one to follow the COMPACT_RECORDS specification + // We need to reduce the size by one to follow the COMPACT_RECORDS specification. const recordsSize = r.readUnsignedVarInt() - 1 if (recordsSize > 0) { const recordsBatchesReader = Reader.from(r.buffer.subarray(r.position, r.position + recordsSize)) partition.records = [] do { - partition.records.push(readRecordsBatch(recordsBatchesReader)) + try { + partition.records.push(readRecordsBatch(recordsBatchesReader)) + } catch (err) { + // Contrary to other places in the protocol, records batches CAN BE truncated due to maxBytes argument. + // In that case we just ignore the error. + if ((err as GenericError).code === OutOfBoundsError.code) { + break + } + /* c8 ignore next 3 - Hard to test */ + throw err + } } while (recordsBatchesReader.position < recordsSize) r.skip(recordsSize) diff --git a/src/errors.ts b/src/errors.ts index ef861e2..7b732c3 100644 --- a/src/errors.ts +++ b/src/errors.ts @@ -11,6 +11,7 @@ export const errorCodes = [ 'PLT_KFK_AUTHENTICATION', 'PLT_KFK_MULTIPLE', 'PLT_KFK_NETWORK', + 'PLT_KFK_OUT_OF_BOUNDS', 'PLT_KFK_PROTOCOL', 'PLT_KFK_RESPONSE', 'PLT_KFK_TIMEOUT', @@ -163,6 +164,14 @@ export class ProtocolError extends GenericError { } } +export class OutOfBoundsError extends GenericError { + static code: ErrorCode = 'PLT_KFK_OUT_OF_BOUNDS' + + constructor (message: string, properties: ErrorProperties = {}) { + super(OutOfBoundsError.code, message, { isOut: true, ...properties }) + } +} + export class ResponseError extends MultipleErrors { static code: ErrorCode = 'PLT_KFK_RESPONSE' diff --git a/src/protocol/dynamic-buffer.ts b/src/protocol/dynamic-buffer.ts index cfaeda1..d9e1076 100644 --- a/src/protocol/dynamic-buffer.ts +++ b/src/protocol/dynamic-buffer.ts @@ -1,4 +1,4 @@ -import { UserError } from '../errors.ts' +import { OutOfBoundsError } from '../errors.ts' import { EMPTY_BUFFER, INT16_SIZE, INT32_SIZE, INT64_SIZE, INT8_SIZE } from './definitions.ts' import { BITS_8PLUS_MASK, @@ -91,7 +91,7 @@ export class DynamicBuffer { } if (start < 0 || start > this.length || end > this.length) { - throw new UserError('Out of bounds.') + throw new OutOfBoundsError('Out of bounds.') } if (this.buffers.length === 0) { @@ -128,13 +128,13 @@ export class DynamicBuffer { } if (start < 0 || start > this.length || end > this.length) { - throw new UserError('Out of bounds.') + throw new OutOfBoundsError('Out of bounds.') } if (this.buffers.length === 0) { return EMPTY_BUFFER } else if (this.buffers.length === 1) { - return this.buffers[0].slice(start, end) + return this.buffers[0].subarray(start, end) } let position = 0 @@ -176,7 +176,7 @@ export class DynamicBuffer { consume (offset: number): this { if (offset < 0 || offset > this.length) { - throw new UserError('Out of bounds.') + throw new OutOfBoundsError('Out of bounds.') } if (offset === 0) { @@ -210,7 +210,7 @@ export class DynamicBuffer { get (offset: number): number { if (offset < 0 || offset >= this.length) { - throw new UserError('Out of bounds.') + throw new OutOfBoundsError('Out of bounds.') } const [finalIndex, current] = this.#findInitialBuffer(offset) @@ -219,7 +219,7 @@ export class DynamicBuffer { readUInt8 (offset: number = 0): number { if (offset < 0 || offset >= this.length) { - throw new UserError('Out of bounds.') + throw new OutOfBoundsError('Out of bounds.') } const [finalIndex, current] = this.#findInitialBuffer(offset) @@ -265,7 +265,7 @@ export class DynamicBuffer { let read = 0 if (offset < 0 || offset >= this.length) { - throw new UserError('Out of bounds.') + throw new OutOfBoundsError('Out of bounds.') } // Find the initial buffer @@ -294,7 +294,7 @@ export class DynamicBuffer { let read = 0 if (offset < 0 || offset >= this.length) { - throw new UserError('Out of bounds.') + throw new OutOfBoundsError('Out of bounds.') } // Find the initial buffer @@ -318,7 +318,7 @@ export class DynamicBuffer { readInt8 (offset: number = 0): number { if (offset < 0 || offset >= this.length) { - throw new UserError('Out of bounds.') + throw new OutOfBoundsError('Out of bounds.') } const [finalIndex, current] = this.#findInitialBuffer(offset) @@ -681,7 +681,7 @@ export class DynamicBuffer { #readMultiple (index: number, length: number) { if (index < 0 || index + length > this.length) { - throw new UserError('Out of bounds.') + throw new OutOfBoundsError('Out of bounds.') } let [startOffset, current] = this.#findInitialBuffer(index) diff --git a/test/apis/consumer/fetch-v15.test.ts b/test/apis/consumer/fetch-v15.test.ts index c5f873a..7fa3e13 100644 --- a/test/apis/consumer/fetch-v15.test.ts +++ b/test/apis/consumer/fetch-v15.test.ts @@ -825,3 +825,106 @@ test('parseResponse parses record data', () => { // Verify value is a Buffer ok(Buffer.isBuffer(record.value)) }) + +test('parseResponse handles truncated records', () => { + // Create a response with records data + // First create a record batch + const timestamp = BigInt(Date.now()) + const recordsBatch = Writer.create() + // Record batch structure + .appendInt64(0n) // firstOffset + .appendInt32(60) // length - this would be dynamically computed in real usage + .appendInt32(0) // partitionLeaderEpoch + .appendInt8(2) // magic (record format version) + .appendUnsignedInt32(0) // crc - would be computed properly in real code + .appendInt16(0) // attributes + .appendInt32(0) // lastOffsetDelta + .appendInt64(timestamp) // firstTimestamp + .appendInt64(timestamp) // maxTimestamp + .appendInt64(-1n) // producerId - not specified + .appendInt16(0) // producerEpoch + .appendInt32(0) // firstSequence + .appendInt32(1) // number of records + // Single record + .appendVarInt(8) // length of the record + .appendInt8(0) // attributes + .appendVarInt64(0n) // timestampDelta + .appendVarInt(0) // offsetDelta + .appendVarIntBytes(null) // key + .appendVarIntBytes(Buffer.from('test-value')) // value + .appendVarIntArray([], () => {}) // No headers + // Truncated batch + .appendInt64(0n) // firstOffset + .appendInt32(60) // length + + // Now create the full response + const writer = Writer.create() + .appendInt32(0) // throttleTimeMs + .appendInt16(0) // errorCode (success) + .appendInt32(123) // sessionId + // Responses array - using tagged fields format + .appendArray( + [ + { + topicId: '12345678-1234-1234-1234-123456789abc', + partitions: [ + { + partitionIndex: 0, + errorCode: 0, + highWatermark: 100n, + lastStableOffset: 100n, + logStartOffset: 0n, + abortedTransactions: [], + preferredReadReplica: -1, + recordsBatch + } + ] + } + ], + (w, topic) => { + w.appendUUID(topic.topicId) + // Partitions array + .appendArray(topic.partitions, (w, partition) => { + w.appendInt32(partition.partitionIndex) + .appendInt16(partition.errorCode) + .appendInt64(partition.highWatermark) + .appendInt64(partition.lastStableOffset) + .appendInt64(partition.logStartOffset) + // Aborted transactions array (empty) + .appendArray(partition.abortedTransactions, () => {}) + .appendInt32(partition.preferredReadReplica) + + // Add records batch + .appendUnsignedVarInt(partition.recordsBatch.length + 1) + .appendFrom(partition.recordsBatch) + }) + } + ) + .appendInt8(0) // Root tagged fields + + const response = parseResponse(1, 1, 17, Reader.from(writer)) + + // Verify the records were parsed correctly + ok(response.responses[0].partitions[0].records, 'Records should be defined') + + const batch = response.responses[0].partitions[0].records[0]! + const record = batch.records[0] + + deepStrictEqual( + { + firstOffset: batch.firstOffset, + recordsLength: batch.records.length, + offsetDelta: record.offsetDelta, + valueString: record.value.toString() + }, + { + firstOffset: 0n, + recordsLength: 1, + offsetDelta: 0, + valueString: 'test-value' + } + ) + + // Verify value is a Buffer + ok(Buffer.isBuffer(record.value)) +}) diff --git a/test/apis/consumer/fetch-v16.test.ts b/test/apis/consumer/fetch-v16.test.ts index de5eeee..8ec993f 100644 --- a/test/apis/consumer/fetch-v16.test.ts +++ b/test/apis/consumer/fetch-v16.test.ts @@ -788,7 +788,110 @@ test('parseResponse parses record data', () => { .appendInt64(partition.lastStableOffset) .appendInt64(partition.logStartOffset) // Aborted transactions array (empty) - .appendArray(partition.abortedTransactions, () => { }) + .appendArray(partition.abortedTransactions, () => {}) + .appendInt32(partition.preferredReadReplica) + + // Add records batch + .appendUnsignedVarInt(partition.recordsBatch.length + 1) + .appendFrom(partition.recordsBatch) + }) + } + ) + .appendInt8(0) // Root tagged fields + + const response = parseResponse(1, 1, 17, Reader.from(writer)) + + // Verify the records were parsed correctly + ok(response.responses[0].partitions[0].records, 'Records should be defined') + + const batch = response.responses[0].partitions[0].records[0]! + const record = batch.records[0] + + deepStrictEqual( + { + firstOffset: batch.firstOffset, + recordsLength: batch.records.length, + offsetDelta: record.offsetDelta, + valueString: record.value.toString() + }, + { + firstOffset: 0n, + recordsLength: 1, + offsetDelta: 0, + valueString: 'test-value' + } + ) + + // Verify value is a Buffer + ok(Buffer.isBuffer(record.value)) +}) + +test('parseResponse handles truncated records', () => { + // Create a response with records data + // First create a record batch + const timestamp = BigInt(Date.now()) + const recordsBatch = Writer.create() + // Record batch structure + .appendInt64(0n) // firstOffset + .appendInt32(60) // length - this would be dynamically computed in real usage + .appendInt32(0) // partitionLeaderEpoch + .appendInt8(2) // magic (record format version) + .appendUnsignedInt32(0) // crc - would be computed properly in real code + .appendInt16(0) // attributes + .appendInt32(0) // lastOffsetDelta + .appendInt64(timestamp) // firstTimestamp + .appendInt64(timestamp) // maxTimestamp + .appendInt64(-1n) // producerId - not specified + .appendInt16(0) // producerEpoch + .appendInt32(0) // firstSequence + .appendInt32(1) // number of records + // Single record + .appendVarInt(8) // length of the record + .appendInt8(0) // attributes + .appendVarInt64(0n) // timestampDelta + .appendVarInt(0) // offsetDelta + .appendVarIntBytes(null) // key + .appendVarIntBytes(Buffer.from('test-value')) // value + .appendVarIntArray([], () => {}) // No headers + // Truncated batch + .appendInt64(0n) // firstOffset + .appendInt32(60) // length + + // Now create the full response + const writer = Writer.create() + .appendInt32(0) // throttleTimeMs + .appendInt16(0) // errorCode (success) + .appendInt32(123) // sessionId + // Responses array - using tagged fields format + .appendArray( + [ + { + topicId: '12345678-1234-1234-1234-123456789abc', + partitions: [ + { + partitionIndex: 0, + errorCode: 0, + highWatermark: 100n, + lastStableOffset: 100n, + logStartOffset: 0n, + abortedTransactions: [], + preferredReadReplica: -1, + recordsBatch + } + ] + } + ], + (w, topic) => { + w.appendUUID(topic.topicId) + // Partitions array + .appendArray(topic.partitions, (w, partition) => { + w.appendInt32(partition.partitionIndex) + .appendInt16(partition.errorCode) + .appendInt64(partition.highWatermark) + .appendInt64(partition.lastStableOffset) + .appendInt64(partition.logStartOffset) + // Aborted transactions array (empty) + .appendArray(partition.abortedTransactions, () => {}) .appendInt32(partition.preferredReadReplica) // Add records batch diff --git a/test/apis/consumer/fetch-v17.test.ts b/test/apis/consumer/fetch-v17.test.ts index ee67a2a..525307b 100644 --- a/test/apis/consumer/fetch-v17.test.ts +++ b/test/apis/consumer/fetch-v17.test.ts @@ -825,3 +825,106 @@ test('parseResponse parses record data', () => { // Verify value is a Buffer ok(Buffer.isBuffer(record.value)) }) + +test('parseResponse handles truncated records', () => { + // Create a response with records data + // First create a record batch + const timestamp = BigInt(Date.now()) + const recordsBatch = Writer.create() + // Record batch structure + .appendInt64(0n) // firstOffset + .appendInt32(60) // length - this would be dynamically computed in real usage + .appendInt32(0) // partitionLeaderEpoch + .appendInt8(2) // magic (record format version) + .appendUnsignedInt32(0) // crc - would be computed properly in real code + .appendInt16(0) // attributes + .appendInt32(0) // lastOffsetDelta + .appendInt64(timestamp) // firstTimestamp + .appendInt64(timestamp) // maxTimestamp + .appendInt64(-1n) // producerId - not specified + .appendInt16(0) // producerEpoch + .appendInt32(0) // firstSequence + .appendInt32(1) // number of records + // Single record + .appendVarInt(8) // length of the record + .appendInt8(0) // attributes + .appendVarInt64(0n) // timestampDelta + .appendVarInt(0) // offsetDelta + .appendVarIntBytes(null) // key + .appendVarIntBytes(Buffer.from('test-value')) // value + .appendVarIntArray([], () => {}) // No headers + // Truncated batch + .appendInt64(0n) // firstOffset + .appendInt32(60) // length + + // Now create the full response + const writer = Writer.create() + .appendInt32(0) // throttleTimeMs + .appendInt16(0) // errorCode (success) + .appendInt32(123) // sessionId + // Responses array - using tagged fields format + .appendArray( + [ + { + topicId: '12345678-1234-1234-1234-123456789abc', + partitions: [ + { + partitionIndex: 0, + errorCode: 0, + highWatermark: 100n, + lastStableOffset: 100n, + logStartOffset: 0n, + abortedTransactions: [], + preferredReadReplica: -1, + recordsBatch + } + ] + } + ], + (w, topic) => { + w.appendUUID(topic.topicId) + // Partitions array + .appendArray(topic.partitions, (w, partition) => { + w.appendInt32(partition.partitionIndex) + .appendInt16(partition.errorCode) + .appendInt64(partition.highWatermark) + .appendInt64(partition.lastStableOffset) + .appendInt64(partition.logStartOffset) + // Aborted transactions array (empty) + .appendArray(partition.abortedTransactions, () => {}) + .appendInt32(partition.preferredReadReplica) + + // Add records batch + .appendUnsignedVarInt(partition.recordsBatch.length + 1) + .appendFrom(partition.recordsBatch) + }) + } + ) + .appendInt8(0) // Root tagged fields + + const response = parseResponse(1, 1, 17, Reader.from(writer)) + + // Verify the records were parsed correctly + ok(response.responses[0].partitions[0].records, 'Records should be defined') + + const batch = response.responses[0].partitions[0].records[0]! + const record = batch.records[0] + + deepStrictEqual( + { + firstOffset: batch.firstOffset, + recordsLength: batch.records.length, + offsetDelta: record.offsetDelta, + valueString: record.value.toString() + }, + { + firstOffset: 0n, + recordsLength: 1, + offsetDelta: 0, + valueString: 'test-value' + } + ) + + // Verify value is a Buffer + ok(Buffer.isBuffer(record.value)) +}) diff --git a/test/clients/consumer/consumer.test.ts b/test/clients/consumer/consumer.test.ts index 716993a..d246870 100644 --- a/test/clients/consumer/consumer.test.ts +++ b/test/clients/consumer/consumer.test.ts @@ -798,6 +798,54 @@ test('consume should integrate with custom deserializers', async t => { await stream.close() }) +test('consumer should properly consume messages with maxBytes limit', async t => { + const topic = await createTopic(t, true) + const producer = await createProducer(t) + + const publishMessages = 100 + const batchSize = 10 + + const batch: MessageToProduce[] = [] + for (let i = 0; i < publishMessages; i++) { + const message = JSON.stringify({ id: i }) + + batch.push({ + key: Buffer.from('customer_id'), + value: Buffer.from(message), + topic + }) + + if (batch.length >= batchSize) { + await producer.send({ messages: batch }) + batch.length = 0 + } + } + if (batch.length > 0) { + await producer.send({ messages: batch }) + } + + const consumer = createConsumer(t, {}) + + const stream = await consumer.consume({ + topics: [topic], + autocommit: true, + mode: 'earliest', + maxWaitTime: 1000, + maxBytes: 1024 // magic number that will make the stream receive messages in more "onData" calls + }) + + let receivedMessages = 0 + + for await (const message of stream) { + strictEqual(JSON.parse(message.value.toString()).id, receivedMessages) + if (++receivedMessages === publishMessages) { + break + } + } + + strictEqual(receivedMessages, publishMessages) +}) + test('fetch should return data and support diagnostic channels', async t => { const consumer = createConsumer(t) const topic = await createTopic(t, true) @@ -1230,7 +1278,11 @@ test('fetch should retrieve messages from multiple batches', async t => { strictEqual(fetchResult.responses[0].partitions.length, 1, 'Should return one partition') const fetchPartition = fetchResult.responses[0].partitions[0]! strictEqual(fetchPartition.errorCode, 0, 'Should succeed fetching partition') - strictEqual(fetchPartition.records?.length, 3, 'Should return all batches') + strictEqual( + fetchPartition.records?.length, + 3, + 'Should return all messages in a single batch since they fit in min/max bytes' + ) for (let batchNo = 0; batchNo < fetchPartition.records.length; ++batchNo) { const recordsBatch: RecordsBatch = fetchPartition.records[batchNo] diff --git a/test/protocol/dynamic-buffer.test.ts b/test/protocol/dynamic-buffer.test.ts index 60b2a7c..8726db6 100644 --- a/test/protocol/dynamic-buffer.test.ts +++ b/test/protocol/dynamic-buffer.test.ts @@ -1,11 +1,33 @@ -import { deepStrictEqual, notStrictEqual, ok, strictEqual, throws } from 'node:assert' +import { deepStrictEqual, notStrictEqual, strictEqual, throws } from 'node:assert' import test from 'node:test' -import { DynamicBuffer, UserError } from '../../src/index.ts' +import { DynamicBuffer, OutOfBoundsError } from '../../src/index.ts' import { EMPTY_BUFFER } from '../../src/protocol/definitions.ts' test('static isDynamicBuffer', () => { - ok(DynamicBuffer.isDynamicBuffer(new DynamicBuffer())) - ok(!DynamicBuffer.isDynamicBuffer('STRING')) + // Test with a DynamicBuffer instance + const dynamicBuffer = new DynamicBuffer() + strictEqual(DynamicBuffer.isDynamicBuffer(dynamicBuffer), true) + + // Test with a regular Buffer + const regularBuffer = Buffer.from([1, 2, 3]) + strictEqual(DynamicBuffer.isDynamicBuffer(regularBuffer), false) + + // Test with null + strictEqual(DynamicBuffer.isDynamicBuffer(null), false) + + // Test with undefined + strictEqual(DynamicBuffer.isDynamicBuffer(undefined), false) + + // Test with other types + strictEqual(DynamicBuffer.isDynamicBuffer({}), false) + strictEqual(DynamicBuffer.isDynamicBuffer([]), false) + strictEqual(DynamicBuffer.isDynamicBuffer('string'), false) + strictEqual(DynamicBuffer.isDynamicBuffer(123), false) + + // The dynamicBuffer symbol is a private implementation detail and can't be faked from outside + // as it's not exported or accessible (it's not Symbol.for() but a local Symbol) + const fakeBuffer = {} + strictEqual(DynamicBuffer.isDynamicBuffer(fakeBuffer), false) }) test('constructor', () => { @@ -117,7 +139,7 @@ test('subarray', () => { emptyBuffer.subarray(0, 1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -189,7 +211,7 @@ test('subarray', () => { multiBuffer.subarray(-1, 5) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -198,7 +220,7 @@ test('subarray', () => { multiBuffer.subarray(0, 7) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -213,7 +235,7 @@ test('slice', () => { emptyBuffer.slice(0, 1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -283,7 +305,7 @@ test('slice', () => { singleBuffer.slice(-1, 5) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -292,7 +314,7 @@ test('slice', () => { singleBuffer.slice(0, 7) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -432,7 +454,7 @@ test('consume', () => { buffer.consume(-1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -442,7 +464,7 @@ test('consume', () => { buffer.consume(buffer.length + 1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) } @@ -536,7 +558,7 @@ test('get', () => { emptyBuffer.get(0) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -561,7 +583,7 @@ test('get', () => { multiBuffer.get(-1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -570,7 +592,7 @@ test('get', () => { multiBuffer.get(5) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -586,7 +608,7 @@ test('readUInt8', () => { buffer.readUInt8(-1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -595,7 +617,7 @@ test('readUInt8', () => { buffer.readUInt8(3) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -613,7 +635,7 @@ test('readInt8', () => { buffer.readInt8(-1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -622,7 +644,7 @@ test('readInt8', () => { buffer.readInt8(3) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -689,7 +711,7 @@ for (const [name, bytes] of fixedLengths) { reader.call(buffer, -1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -698,7 +720,7 @@ for (const [name, bytes] of fixedLengths) { reader.call(buffer, bytes * 4) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -742,7 +764,7 @@ test('readFloatBE', () => { singleBuffer.readFloatBE(-1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -751,7 +773,7 @@ test('readFloatBE', () => { singleBuffer.readFloatBE(1) // Not enough bytes for a float }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -793,7 +815,7 @@ test('readFloatLE', () => { singleBuffer.readFloatLE(-1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -802,7 +824,7 @@ test('readFloatLE', () => { singleBuffer.readFloatLE(1) // Not enough bytes for a float }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -845,7 +867,7 @@ test('readDoubleBE', () => { singleBuffer.readDoubleBE(-1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -854,7 +876,7 @@ test('readDoubleBE', () => { singleBuffer.readDoubleBE(1) // Not enough bytes for a double }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -897,7 +919,7 @@ test('readDoubleLE', () => { singleBuffer.readDoubleLE(-1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) @@ -906,7 +928,7 @@ test('readDoubleLE', () => { singleBuffer.readDoubleLE(1) // Not enough bytes for a double }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -955,7 +977,7 @@ test('readUnsignedVarInt', () => { smallBuffer.readUnsignedVarInt(1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -1014,7 +1036,7 @@ test('readUnsignedVarInt64', () => { smallBuffer.readUnsignedVarInt64(1) }, err => { - return err instanceof UserError && err.message === 'Out of bounds.' + return err instanceof OutOfBoundsError } ) }) @@ -1385,30 +1407,3 @@ test('writeVarInt64', () => { dynamicBuffer5.writeVarInt64(-42n, false) // Prepend -42 strictEqual(dynamicBuffer5.length > buffer5Length, true) // Length should have increased }) - -test('isDynamicBuffer - static method', () => { - // Test with a DynamicBuffer instance - const dynamicBuffer = new DynamicBuffer() - strictEqual(DynamicBuffer.isDynamicBuffer(dynamicBuffer), true) - - // Test with a regular Buffer - const regularBuffer = Buffer.from([1, 2, 3]) - strictEqual(DynamicBuffer.isDynamicBuffer(regularBuffer), false) - - // Test with null - strictEqual(DynamicBuffer.isDynamicBuffer(null), false) - - // Test with undefined - strictEqual(DynamicBuffer.isDynamicBuffer(undefined), false) - - // Test with other types - strictEqual(DynamicBuffer.isDynamicBuffer({}), false) - strictEqual(DynamicBuffer.isDynamicBuffer([]), false) - strictEqual(DynamicBuffer.isDynamicBuffer('string'), false) - strictEqual(DynamicBuffer.isDynamicBuffer(123), false) - - // The dynamicBuffer symbol is a private implementation detail and can't be faked from outside - // as it's not exported or accessible (it's not Symbol.for() but a local Symbol) - const fakeBuffer = {} - strictEqual(DynamicBuffer.isDynamicBuffer(fakeBuffer), false) -})