Skip to content

Commit

Permalink
Merge branch 'main' into NODE-4719/SDAM-logging
Browse files Browse the repository at this point in the history
  • Loading branch information
nbbeeken committed Dec 13, 2023
2 parents 1d6e837 + 70a2ef9 commit a9c8d77
Show file tree
Hide file tree
Showing 11 changed files with 1,086 additions and 837 deletions.
1,322 changes: 674 additions & 648 deletions package-lock.json

Large diffs are not rendered by default.

38 changes: 19 additions & 19 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -62,43 +62,43 @@
}
},
"devDependencies": {
"@aws-sdk/credential-providers": "^3.462.0",
"@aws-sdk/credential-providers": "^3.465.0",
"@iarna/toml": "^2.2.5",
"@istanbuljs/nyc-config-typescript": "^1.0.2",
"@microsoft/api-extractor": "^7.36.4",
"@microsoft/api-extractor": "^7.38.4",
"@microsoft/tsdoc-config": "^0.16.2",
"@mongodb-js/zstd": "^1.1.0",
"@mongodb-js/zstd": "^1.2.0",
"@octokit/core": "^4.2.4",
"@types/chai": "^4.3.5",
"@types/chai-subset": "^1.3.3",
"@types/express": "^4.17.17",
"@types/kerberos": "^1.1.2",
"@types/mocha": "^10.0.1",
"@types/node": "^20.5.9",
"@types/saslprep": "^1.0.1",
"@types/semver": "^7.5.0",
"@types/sinon": "^10.0.16",
"@types/sinon-chai": "^3.2.9",
"@types/whatwg-url": "^11.0.0",
"@types/chai": "^4.3.11",
"@types/chai-subset": "^1.3.5",
"@types/express": "^4.17.21",
"@types/kerberos": "^1.1.5",
"@types/mocha": "^10.0.6",
"@types/node": "^20.10.3",
"@types/saslprep": "^1.0.3",
"@types/semver": "^7.5.6",
"@types/sinon": "^10.0.20",
"@types/sinon-chai": "^3.2.12",
"@types/whatwg-url": "^11.0.3",
"@typescript-eslint/eslint-plugin": "^5.62.0",
"@typescript-eslint/parser": "^5.62.0",
"chai": "^4.3.7",
"chai": "^4.3.10",
"chai-subset": "^1.6.0",
"chalk": "^4.1.2",
"eslint": "^8.48.0",
"eslint": "^8.55.0",
"eslint-config-prettier": "^8.10.0",
"eslint-plugin-import": "^2.28.1",
"eslint-plugin-import": "^2.29.0",
"eslint-plugin-prettier": "^4.2.1",
"eslint-plugin-simple-import-sort": "^10.0.0",
"eslint-plugin-tsdoc": "^0.2.17",
"eslint-plugin-unused-imports": "^2.0.0",
"express": "^4.18.2",
"gcp-metadata": "^5.2.0",
"gcp-metadata": "^5.3.0",
"js-yaml": "^4.1.0",
"mocha": "^10.2.0",
"mocha-sinon": "^2.1.2",
"mongodb-client-encryption": "^6.0.0",
"mongodb-legacy": "^6.0.0",
"mongodb-legacy": "^6.0.1",
"nyc": "^15.1.0",
"prettier": "^2.8.8",
"semver": "^7.5.4",
Expand Down
221 changes: 114 additions & 107 deletions src/cmap/connection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;
commandAsync: (
ns: MongoDBNamespace,
cmd: Document,
options: CommandOptions | undefined
) => Promise<Document>;
/** @internal */
authContext?: AuthContext;

Expand Down Expand Up @@ -217,15 +212,6 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
constructor(stream: Stream, options: ConnectionOptions) {
super();

this.commandAsync = promisify(
(
ns: MongoDBNamespace,
cmd: Document,
options: CommandOptions | undefined,
callback: Callback
) => this.command(ns, cmd, options, callback as any)
);

this.id = options.id;
this.address = streamIdentifier(stream, options);
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
Expand Down Expand Up @@ -262,6 +248,12 @@ export class Connection extends TypedEventEmitter<ConnectionEvents> {
this[kMessageStream].pipe(this[kStream]);
}

// This whole class is temporary,
// Need to move this to be defined on the prototype for spying.
async commandAsync(ns: MongoDBNamespace, cmd: Document, opt?: CommandOptions) {
return promisify(this.command.bind(this))(ns, cmd, opt);
}

get description(): StreamDescription {
return this[kDescription];
}
Expand Down Expand Up @@ -791,7 +783,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
lastHelloMS?: number;
serverApi?: ServerApi;
helloOk?: boolean;
commandAsync: ModernConnection['command'];
/** @internal */
authContext?: AuthContext;

Expand Down Expand Up @@ -831,8 +822,6 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
constructor(stream: Stream, options: ConnectionOptions) {
super();

this.commandAsync = this.command.bind(this);

this.id = options.id;
this.address = streamIdentifier(stream, options);
this.socketTimeoutMS = options.socketTimeoutMS ?? 0;
Expand All @@ -852,6 +841,10 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
this.socket.on('timeout', this.onTimeout.bind(this));
}

async commandAsync(...args: Parameters<typeof this.command>) {
return this.command(...args);
}

/** Indicates that the connection (including underlying TCP socket) has been closed. */
get closed(): boolean {
return this.controller.signal.aborted;
Expand Down Expand Up @@ -1036,62 +1029,68 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
return message;
}

private async sendCommand(
message: WriteProtocolMessageType,
options: CommandOptions
): Promise<Document> {
const { signal } = this.controller;

signal.throwIfAborted();
private async *sendWire(message: WriteProtocolMessageType, options: CommandOptions) {
this.controller.signal.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
} else if (this.socketTimeoutMS !== 0) {
this.socket.setTimeout(this.socketTimeoutMS);
}

let response;
try {
await writeCommand(this, message, {
agreedCompressor: this.description.compressor ?? 'none',
zlibCompressionLevel: this.description.zlibCompressionLevel,
signal
signal: this.controller.signal
});

if (options.noResponse) return { ok: 1 };
// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

signal.throwIfAborted();
if (options.noResponse) {
yield { ok: 1 };
return;
}

response = await read(this, { signal });
} finally {
// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
if (!signal.aborted) this.controller = new AbortController();
}
this.controller.signal.throwIfAborted();

response.parse(options);
for await (const response of readMany(this, { signal: this.controller.signal })) {
this.socket.setTimeout(0);
response.parse(options);

const [document] = response.documents;
const [document] = response.documents;

if (!Buffer.isBuffer(document)) {
const { session } = options;
if (session) {
updateSessionFromResponse(session, document);
}
if (!Buffer.isBuffer(document)) {
const { session } = options;
if (session) {
updateSessionFromResponse(session, document);
}

if (document.$clusterTime) {
this[kClusterTime] = document.$clusterTime;
this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
if (document.$clusterTime) {
this[kClusterTime] = document.$clusterTime;
this.emit(Connection.CLUSTER_TIME_RECEIVED, document.$clusterTime);
}
}

// TODO(NODE-5770): Replace controller to avoid boundless 'abort' listeners
this.controller = new AbortController();

yield document;
this.controller.signal.throwIfAborted();

if (typeof options.socketTimeoutMS === 'number') {
this.socket.setTimeout(options.socketTimeoutMS);
} else if (this.socketTimeoutMS !== 0) {
this.socket.setTimeout(this.socketTimeoutMS);
}
}
} finally {
this.socket.setTimeout(0);
}

return document;
}

async command(
ns: MongoDBNamespace,
command: Document,
options: CommandOptions = {}
): Promise<Document> {
async *sendCommand(ns: MongoDBNamespace, command: Document, options: CommandOptions = {}) {
const message = this.prepareCommand(ns.db, command, options);

let started = 0;
Expand All @@ -1103,76 +1102,84 @@ export class ModernConnection extends TypedEventEmitter<ConnectionEvents> {
);
}

let document = null;
let document;
try {
document = await this.sendCommand(message, options);
} catch (ioError) {
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, ioError, started)
);
}
throw ioError;
}
this.controller.signal.throwIfAborted();
for await (document of this.sendWire(message, options)) {
if (!Buffer.isBuffer(document) && document.writeConcernError) {
throw new MongoWriteConcernError(document.writeConcernError, document);
}

if (document == null) {
const unexpected = new MongoUnexpectedServerResponseError(
'sendCommand did not throw and did not return a document'
);
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, unexpected, started)
);
}
throw unexpected;
}
if (
!Buffer.isBuffer(document) &&
(document.ok === 0 || document.$err || document.errmsg || document.code)
) {
throw new MongoServerError(document);
}

if (document.writeConcernError) {
const writeConcernError = new MongoWriteConcernError(document.writeConcernError, document);
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(this as unknown as Connection, message, document, started)
);
}
throw writeConcernError;
}
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(
this as unknown as Connection,
message,
options.noResponse ? undefined : document,
started
)
);
}

if (document.ok === 0 || document.$err || document.errmsg || document.code) {
const serverError = new MongoServerError(document);
yield document;
this.controller.signal.throwIfAborted();
}
} catch (error) {
if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, serverError, started)
);
error.name === 'MongoWriteConcernError'
? this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(
this as unknown as Connection,
message,
options.noResponse ? undefined : document,
started
)
)
: this.emit(
ModernConnection.COMMAND_FAILED,
new CommandFailedEvent(this as unknown as Connection, message, error, started)
);
}
throw serverError;
throw error;
}
}

if (this.monitorCommands) {
this.emit(
ModernConnection.COMMAND_SUCCEEDED,
new CommandSucceededEvent(
this as unknown as Connection,
message,
options.noResponse ? undefined : document,
started
)
);
async command(
ns: MongoDBNamespace,
command: Document,
options: CommandOptions = {}
): Promise<Document> {
this.controller.signal.throwIfAborted();
for await (const document of this.sendCommand(ns, command, options)) {
return document;
}

return document;
throw new MongoUnexpectedServerResponseError('Unable to get response from server');
}

exhaustCommand(
_ns: MongoDBNamespace,
_command: Document,
_options: CommandOptions,
_replyListener: Callback
ns: MongoDBNamespace,
command: Document,
options: CommandOptions,
replyListener: Callback
) {
throw new Error('NODE-5742: not implemented.');
const exhaustLoop = async () => {
this.controller.signal.throwIfAborted();
for await (const reply of this.sendCommand(ns, command, options)) {
replyListener(undefined, reply);
this.controller.signal.throwIfAborted();
}
throw new MongoUnexpectedServerResponseError('Server ended moreToCome unexpectedly');
};
exhaustLoop().catch(replyListener);
}
}

Expand Down
8 changes: 3 additions & 5 deletions src/error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1145,14 +1145,12 @@ const RETRYABLE_READ_ERROR_CODES = new Set<number>([
MONGODB_ERROR_CODES.InterruptedAtShutdown,
MONGODB_ERROR_CODES.InterruptedDueToReplStateChange,
MONGODB_ERROR_CODES.NotPrimaryNoSecondaryOk,
MONGODB_ERROR_CODES.NotPrimaryOrSecondary
MONGODB_ERROR_CODES.NotPrimaryOrSecondary,
MONGODB_ERROR_CODES.ExceededTimeLimit
]);

// see: https://github.com/mongodb/specifications/blob/master/source/retryable-writes/retryable-writes.rst#terms
const RETRYABLE_WRITE_ERROR_CODES = new Set<number>([
...RETRYABLE_READ_ERROR_CODES,
MONGODB_ERROR_CODES.ExceededTimeLimit
]);
const RETRYABLE_WRITE_ERROR_CODES = RETRYABLE_READ_ERROR_CODES;

export function needsRetryableWriteLabel(error: Error, maxWireVersion: number): boolean {
// pre-4.4 server, then the driver adds an error label for every valid case
Expand Down
Loading

0 comments on commit a9c8d77

Please sign in to comment.