Skip to content

Commit

Permalink
refactor(NODE-6057): implement CursorResponse for lazy document parsi…
Browse files Browse the repository at this point in the history
…ng (#4085)
  • Loading branch information
nbbeeken committed May 1, 2024
1 parent 6d8ad33 commit 9d73f45
Show file tree
Hide file tree
Showing 20 changed files with 584 additions and 138 deletions.
19 changes: 14 additions & 5 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,11 @@ import type { ClientMetadata } from './handshake/client_metadata';
import { StreamDescription, type StreamDescriptionOptions } from './stream_description';
import { type CompressorName, decompressResponse } from './wire_protocol/compression';
import { onData } from './wire_protocol/on_data';
import { MongoDBResponse, type MongoDBResponseConstructor } from './wire_protocol/responses';
import {
isErrorResponse,
MongoDBResponse,
type MongoDBResponseConstructor
} from './wire_protocol/responses';
import { getReadPreference, isSharded } from './wire_protocol/shared';

/** @internal */
Expand Down Expand Up @@ -443,7 +447,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this.socket.setTimeout(0);
const bson = response.parse();

const document = new (responseType ?? MongoDBResponse)(bson, 0, false);
const document =
responseType == null
? new MongoDBResponse(bson)
: isErrorResponse(bson)
? new MongoDBResponse(bson)
: new responseType(bson);

yield document;
this.throwIfAborted();
Expand Down Expand Up @@ -739,7 +748,7 @@ export class CryptoConnection extends Connection {
ns: MongoDBNamespace,
cmd: Document,
options?: CommandOptions,
responseType?: T | undefined
_responseType?: T | undefined
): Promise<Document> {
const { autoEncrypter } = this;
if (!autoEncrypter) {
Expand All @@ -753,7 +762,7 @@ export class CryptoConnection extends Connection {
const serverWireVersion = maxWireVersion(this);
if (serverWireVersion === 0) {
// This means the initial handshake hasn't happened yet
return await super.command<T>(ns, cmd, options, responseType);
return await super.command<T>(ns, cmd, options, undefined);
}

if (serverWireVersion < 8) {
Expand Down Expand Up @@ -787,7 +796,7 @@ export class CryptoConnection extends Connection {
}
}

const response = await super.command<T>(ns, encrypted, options, responseType);
const response = await super.command<T>(ns, encrypted, options, undefined);

return await autoEncrypter.decrypt(response, options);
}
Expand Down
57 changes: 36 additions & 21 deletions src/cmap/wire_protocol/on_demand/document.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ export class OnDemandDocument {
private readonly indexFound: Record<number, boolean> = Object.create(null);

/** All bson elements in this document */
private readonly elements: BSONElement[];
private readonly elements: ReadonlyArray<BSONElement>;

constructor(
/** BSON bytes, this document begins at offset */
Expand Down Expand Up @@ -97,14 +97,30 @@ export class OnDemandDocument {
* @param name - a basic latin string name of a BSON element
* @returns
*/
private getElement(name: string): CachedBSONElement | null {
private getElement(name: string | number): CachedBSONElement | null {
const cachedElement = this.cache[name];
if (cachedElement === false) return null;

if (cachedElement != null) {
return cachedElement;
}

if (typeof name === 'number') {
if (this.isArray) {
if (name < this.elements.length) {
const element = this.elements[name];
const cachedElement = { element, value: undefined };
this.cache[name] = cachedElement;
this.indexFound[name] = true;
return cachedElement;
} else {
return null;
}
} else {
return null;
}
}

for (let index = 0; index < this.elements.length; index++) {
const element = this.elements[index];

Expand Down Expand Up @@ -197,6 +213,13 @@ export class OnDemandDocument {
}
}

/**
* Returns the number of elements in this BSON document
*/
public size() {
return this.elements.length;
}

/**
* Checks for the existence of an element by name.
*
Expand All @@ -222,16 +245,20 @@ export class OnDemandDocument {
* @param required - whether or not the element is expected to exist, if true this function will throw if it is not present
*/
public get<const T extends keyof JSTypeOf>(
name: string,
name: string | number,
as: T,
required?: false | undefined
): JSTypeOf[T] | null;

/** `required` will make `get` throw if name does not exist or is null/undefined */
public get<const T extends keyof JSTypeOf>(name: string, as: T, required: true): JSTypeOf[T];
public get<const T extends keyof JSTypeOf>(
name: string | number,
as: T,
required: true
): JSTypeOf[T];

public get<const T extends keyof JSTypeOf>(
name: string,
name: string | number,
as: T,
required?: boolean
): JSTypeOf[T] | null {
Expand Down Expand Up @@ -303,21 +330,9 @@ export class OnDemandDocument {
});
}

/**
* Iterates through the elements of a document reviving them using the `as` BSONType.
*
* @param as - The type to revive all elements as
*/
public *valuesAs<const T extends keyof JSTypeOf>(as: T): Generator<JSTypeOf[T]> {
if (!this.isArray) {
throw new BSONError('Unexpected conversion of non-array value to array');
}
let counter = 0;
for (const element of this.elements) {
const value = this.toJSValue<T>(element, as);
this.cache[counter] = { element, value };
yield value;
counter += 1;
}
/** Returns this document's bytes only */
toBytes() {
const size = getInt32LE(this.bson, this.offset);
return this.bson.subarray(this.offset, this.offset + size);
}
}
150 changes: 139 additions & 11 deletions src/cmap/wire_protocol/responses.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,73 @@
import { type BSONSerializeOptions, BSONType, type Document, type Timestamp } from '../../bson';
import {
type BSONSerializeOptions,
BSONType,
type Document,
Long,
parseToElementsToArray,
type Timestamp
} from '../../bson';
import { MongoUnexpectedServerResponseError } from '../../error';
import { type ClusterTime } from '../../sdam/common';
import { type MongoDBNamespace, ns } from '../../utils';
import { OnDemandDocument } from './on_demand/document';

// eslint-disable-next-line no-restricted-syntax
const enum BSONElementOffset {
type = 0,
nameOffset = 1,
nameLength = 2,
offset = 3,
length = 4
}
/**
* Accepts a BSON payload and checks for na "ok: 0" element.
* This utility is intended to prevent calling response class constructors
* that expect the result to be a success and demand certain properties to exist.
*
* For example, a cursor response always expects a cursor embedded document.
* In order to write the class such that the properties reflect that assertion (non-null)
* we cannot invoke the subclass constructor if the BSON represents an error.
*
* @param bytes - BSON document returned from the server
*/
export function isErrorResponse(bson: Uint8Array): boolean {
const elements = parseToElementsToArray(bson, 0);
for (let eIdx = 0; eIdx < elements.length; eIdx++) {
const element = elements[eIdx];

if (element[BSONElementOffset.nameLength] === 2) {
const nameOffset = element[BSONElementOffset.nameOffset];

// 111 == "o", 107 == "k"
if (bson[nameOffset] === 111 && bson[nameOffset + 1] === 107) {
const valueOffset = element[BSONElementOffset.offset];
const valueLength = element[BSONElementOffset.length];

// If any byte in the length of the ok number (works for any type) is non zero,
// then it is considered "ok: 1"
for (let i = valueOffset; i < valueOffset + valueLength; i++) {
if (bson[i] !== 0x00) return false;
}

return true;
}
}
}

return true;
}

/** @internal */
export type MongoDBResponseConstructor = {
new (bson: Uint8Array, offset?: number, isArray?: boolean): MongoDBResponse;
};

/** @internal */
export class MongoDBResponse extends OnDemandDocument {
static is(value: unknown): value is MongoDBResponse {
return value instanceof MongoDBResponse;
}

// {ok:1}
static empty = new MongoDBResponse(new Uint8Array([13, 0, 0, 0, 16, 111, 107, 0, 1, 0, 0, 0, 0]));

Expand Down Expand Up @@ -83,27 +142,96 @@ export class MongoDBResponse extends OnDemandDocument {
return this.clusterTime ?? null;
}

public override toObject(options: BSONSerializeOptions = {}): Record<string, any> {
public override toObject(options?: BSONSerializeOptions): Record<string, any> {
const exactBSONOptions = {
useBigInt64: options.useBigInt64,
promoteLongs: options.promoteLongs,
promoteValues: options.promoteValues,
promoteBuffers: options.promoteBuffers,
bsonRegExp: options.bsonRegExp,
raw: options.raw ?? false,
fieldsAsRaw: options.fieldsAsRaw ?? {},
useBigInt64: options?.useBigInt64,
promoteLongs: options?.promoteLongs,
promoteValues: options?.promoteValues,
promoteBuffers: options?.promoteBuffers,
bsonRegExp: options?.bsonRegExp,
raw: options?.raw ?? false,
fieldsAsRaw: options?.fieldsAsRaw ?? {},
validation: this.parseBsonSerializationOptions(options)
};
return super.toObject(exactBSONOptions);
}

private parseBsonSerializationOptions({ enableUtf8Validation }: BSONSerializeOptions): {
private parseBsonSerializationOptions(options?: { enableUtf8Validation?: boolean }): {
utf8: { writeErrors: false } | false;
} {
const enableUtf8Validation = options?.enableUtf8Validation;
if (enableUtf8Validation === false) {
return { utf8: false };
}

return { utf8: { writeErrors: false } };
}
}

/** @internal */
export class CursorResponse extends MongoDBResponse {
/**
* This supports a feature of the FindCursor.
* It is an optimization to avoid an extra getMore when the limit has been reached
*/
static emptyGetMore = { id: new Long(0), length: 0, shift: () => null };

static override is(value: unknown): value is CursorResponse {
return value instanceof CursorResponse || value === CursorResponse.emptyGetMore;
}

public id: Long;
public ns: MongoDBNamespace | null = null;
public batchSize = 0;

private batch: OnDemandDocument;
private iterated = 0;

constructor(bytes: Uint8Array, offset?: number, isArray?: boolean) {
super(bytes, offset, isArray);

const cursor = this.get('cursor', BSONType.object, true);

const id = cursor.get('id', BSONType.long, true);
this.id = new Long(Number(id & 0xffff_ffffn), Number((id >> 32n) & 0xffff_ffffn));

const namespace = cursor.get('ns', BSONType.string);
if (namespace != null) this.ns = ns(namespace);

if (cursor.has('firstBatch')) this.batch = cursor.get('firstBatch', BSONType.array, true);
else if (cursor.has('nextBatch')) this.batch = cursor.get('nextBatch', BSONType.array, true);
else throw new MongoUnexpectedServerResponseError('Cursor document did not contain a batch');

this.batchSize = this.batch.size();
}

get length() {
return Math.max(this.batchSize - this.iterated, 0);
}

shift(options?: BSONSerializeOptions): any {
if (this.iterated >= this.batchSize) {
return null;
}

const result = this.batch.get(this.iterated, BSONType.object, true) ?? null;
this.iterated += 1;

if (options?.raw) {
return result.toBytes();
} else {
return result.toObject(options);
}
}

clear() {
this.iterated = this.batchSize;
}

pushMany() {
throw new Error('pushMany Unsupported method');
}

push() {
throw new Error('push Unsupported method');
}
}
Loading

0 comments on commit 9d73f45

Please sign in to comment.