Skip to content

Commit

Permalink
feat: introduce BufferPool to replace BufferList (#2669)
Browse files Browse the repository at this point in the history
BufferList really helped simplify a lot of code in our message
stream processing, but ultimately is more powerful than we need it
to be. Additionally, depending on this package makes maintenance
of the driver more difficult over time. This introduces a new type
called BufferList which is tailored to our particular use case.

NODE-2930
  • Loading branch information
mbroadst committed Dec 10, 2020
1 parent 07fd317 commit 3c56efc
Show file tree
Hide file tree
Showing 6 changed files with 210 additions and 31 deletions.
1 change: 0 additions & 1 deletion package.json
Expand Up @@ -26,7 +26,6 @@
"bson-ext": "^2.0.0"
},
"dependencies": {
"bl": "^2.2.1",
"bson": "^4.0.4",
"denque": "^1.4.1",
"lodash": "^4.17.20"
Expand Down
21 changes: 8 additions & 13 deletions src/cmap/message_stream.ts
@@ -1,4 +1,3 @@
import BufferList = require('bl');
import { Duplex, DuplexOptions } from 'stream';
import { Response, Msg, BinMsg, Query, WriteProtocolMessageType, MessageHeader } from './commands';
import { MongoError, MongoParseError } from '../error';
Expand All @@ -11,7 +10,7 @@ import {
CompressorName
} from './wire_protocol/compression';
import type { Document, BSONSerializeOptions } from '../bson';
import type { Callback } from '../utils';
import { BufferPool, Callback } from '../utils';
import type { ClientSession } from '../sessions';

const MESSAGE_HEADER_SIZE = 16;
Expand Down Expand Up @@ -48,21 +47,19 @@ export interface OperationDescription extends BSONSerializeOptions {
* @internal
*/
export class MessageStream extends Duplex {
/** @internal */
maxBsonMessageSize: number;
[kBuffer]: BufferList;
/** @internal */
[kBuffer]: BufferPool;

constructor(options: MessageStreamOptions = {}) {
super(options);

this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize;

this[kBuffer] = new BufferList();
this[kBuffer] = new BufferPool();
}

_write(chunk: Buffer, _: unknown, callback: Callback<Buffer>): void {
const buffer = this[kBuffer];
buffer.append(chunk);

this[kBuffer].append(chunk);
processIncomingData(this, callback);
}

Expand Down Expand Up @@ -135,7 +132,7 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
return;
}

const sizeOfMessage = buffer.readInt32LE(0);
const sizeOfMessage = buffer.peek(4).readInt32LE();
if (sizeOfMessage < 0) {
callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`));
return;
Expand All @@ -155,9 +152,7 @@ function processIncomingData(stream: MessageStream, callback: Callback<Buffer>)
return;
}

const message = buffer.slice(0, sizeOfMessage);
buffer.consume(sizeOfMessage);

const message = buffer.read(sizeOfMessage);
const messageHeader: MessageHeader = {
length: message.readInt32LE(0),
requestId: message.readInt32LE(4),
Expand Down
3 changes: 2 additions & 1 deletion src/index.ts
Expand Up @@ -288,7 +288,8 @@ export type {
ClientMetadata,
ClientMetadataOptions,
MongoDBNamespace,
InterruptableAsyncInterval
InterruptibleAsyncInterval,
BufferPool
} from './utils';
export type { WriteConcern, W, WriteConcernOptions } from './write_concern';
export type { ExecutionResult } from './operations/execute_operation';
Expand Down
10 changes: 5 additions & 5 deletions src/sdam/monitor.ts
Expand Up @@ -3,7 +3,7 @@ import {
now,
makeStateMachine,
calculateDurationInMs,
makeInterruptableAsyncInterval
makeInterruptibleAsyncInterval
} from '../utils';
import { EventEmitter } from 'events';
import { connect } from '../cmap/connect';
Expand All @@ -17,7 +17,7 @@ import {
} from './events';

import { Server } from './server';
import type { InterruptableAsyncInterval, Callback } from '../utils';
import type { InterruptibleAsyncInterval, Callback } from '../utils';
import type { TopologyVersion } from './server_description';
import type { ConnectionOptions } from '../cmap/connection';

Expand Down Expand Up @@ -65,7 +65,7 @@ export class Monitor extends EventEmitter {
[kConnection]?: Connection;
[kCancellationToken]: EventEmitter;
/** @internal */
[kMonitorId]?: InterruptableAsyncInterval;
[kMonitorId]?: InterruptibleAsyncInterval;
[kRTTPinger]?: RTTPinger;

constructor(server: Server, options?: Partial<MonitorOptions>) {
Expand Down Expand Up @@ -123,7 +123,7 @@ export class Monitor extends EventEmitter {
// start
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), {
this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), {
interval: heartbeatFrequencyMS,
minInterval: minHeartbeatFrequencyMS,
immediate: true
Expand Down Expand Up @@ -153,7 +153,7 @@ export class Monitor extends EventEmitter {
// restart monitoring
const heartbeatFrequencyMS = this.options.heartbeatFrequencyMS;
const minHeartbeatFrequencyMS = this.options.minHeartbeatFrequencyMS;
this[kMonitorId] = makeInterruptableAsyncInterval(monitorServer(this), {
this[kMonitorId] = makeInterruptibleAsyncInterval(monitorServer(this), {
interval: heartbeatFrequencyMS,
minInterval: minHeartbeatFrequencyMS
});
Expand Down
107 changes: 102 additions & 5 deletions src/utils.ts
Expand Up @@ -961,7 +961,7 @@ export function calculateDurationInMs(started: number): number {
return elapsed < 0 ? 0 : elapsed;
}

export interface InterruptableAsyncIntervalOptions {
export interface InterruptibleAsyncIntervalOptions {
/** The interval to execute a method on */
interval: number;
/** A minimum interval that must elapse before the method is called */
Expand All @@ -977,7 +977,7 @@ export interface InterruptableAsyncIntervalOptions {
}

/** @internal */
export interface InterruptableAsyncInterval {
export interface InterruptibleAsyncInterval {
wake(): void;
stop(): void;
}
Expand All @@ -991,10 +991,10 @@ export interface InterruptableAsyncInterval {
*
* @param fn - An async function to run on an interval, must accept a `callback` as its only parameter
*/
export function makeInterruptableAsyncInterval(
export function makeInterruptibleAsyncInterval(
fn: (callback: Callback) => void,
options?: Partial<InterruptableAsyncIntervalOptions>
): InterruptableAsyncInterval {
options?: Partial<InterruptibleAsyncIntervalOptions>
): InterruptibleAsyncInterval {
let timerId: NodeJS.Timeout | undefined;
let lastCallTime: number;
let lastWakeTime: number;
Expand Down Expand Up @@ -1155,3 +1155,100 @@ export function isRecord(

return isRecord;
}

const kBuffers = Symbol('buffers');
const kLength = Symbol('length');

/**
* A pool of Buffers which allow you to read them as if they were one
* @internal
*/
export class BufferPool {
[kBuffers]: Buffer[];
[kLength]: number;

constructor() {
this[kBuffers] = [];
this[kLength] = 0;
}

get length(): number {
return this[kLength];
}

/** Adds a buffer to the internal buffer pool list */
append(buffer: Buffer): void {
this[kBuffers].push(buffer);
this[kLength] += buffer.length;
}

/** Returns the requested number of bytes without consuming them */
peek(size: number): Buffer {
return this.read(size, false);
}

/** Reads the requested number of bytes, optionally consuming them */
read(size: number, consume = true): Buffer {
if (typeof size !== 'number' || size < 0) {
throw new TypeError('Parameter size must be a non-negative number');
}

if (size > this[kLength]) {
return Buffer.alloc(0);
}

let result: Buffer;

// read the whole buffer
if (size === this.length) {
result = Buffer.concat(this[kBuffers]);

if (consume) {
this[kBuffers] = [];
this[kLength] = 0;
}
}

// size is within first buffer, no need to concat
else if (size <= this[kBuffers][0].length) {
result = this[kBuffers][0].slice(0, size);
if (consume) {
this[kBuffers][0] = this[kBuffers][0].slice(size);
this[kLength] -= size;
}
}

// size is beyond first buffer, need to track and copy
else {
result = Buffer.allocUnsafe(size);

let idx;
let offset = 0;
let bytesToCopy = size;
for (idx = 0; idx < this[kBuffers].length; ++idx) {
let bytesCopied;
if (bytesToCopy > this[kBuffers][idx].length) {
bytesCopied = this[kBuffers][idx].copy(result, offset, 0);
offset += bytesCopied;
} else {
bytesCopied = this[kBuffers][idx].copy(result, offset, 0, bytesToCopy);
if (consume) {
this[kBuffers][idx] = this[kBuffers][idx].slice(bytesCopied);
}
offset += bytesCopied;
break;
}

bytesToCopy -= bytesCopied;
}

// compact the internal buffer array
if (consume) {
this[kBuffers] = this[kBuffers].slice(idx);
this[kLength] -= size;
}
}

return result;
}
}

0 comments on commit 3c56efc

Please sign in to comment.