Skip to content

Commit

Permalink
feat(NODE-4720): Add log messages to CMAP spec (#3645)
Browse files Browse the repository at this point in the history
  • Loading branch information
W-A-James committed May 8, 2023
1 parent 2264fbb commit b27f385
Show file tree
Hide file tree
Showing 102 changed files with 1,959 additions and 202 deletions.
44 changes: 26 additions & 18 deletions src/cmap/connection_pool.ts
Expand Up @@ -251,8 +251,11 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this[kMetrics] = new ConnectionPoolMetrics();
this[kProcessingWaitQueue] = false;

this.mongoLogger = this[kServer].topology.client.mongoLogger;
this.component = 'connection';

process.nextTick(() => {
this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this));
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this));
});
}

Expand Down Expand Up @@ -337,7 +340,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return;
}
this[kPoolState] = PoolState.ready;
this.emit(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this));
this.emitAndLog(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this));
clearTimeout(this[kMinPoolSizeTimer]);
this.ensureMinPoolSize();
}
Expand All @@ -348,7 +351,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* explicitly destroyed by the new owner.
*/
checkOut(callback: Callback<Connection>): void {
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);
Expand All @@ -360,7 +363,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
waitQueueMember[kCancelled] = true;
waitQueueMember.timer = undefined;

this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
Expand Down Expand Up @@ -398,7 +401,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

this[kCheckedOut].delete(connection);
this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new ConnectionCheckedInEvent(this, connection));
this.emitAndLog(
ConnectionPool.CONNECTION_CHECKED_IN,
new ConnectionCheckedInEvent(this, connection)
);

if (willDestroy) {
const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale';
Expand Down Expand Up @@ -437,7 +443,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// Increment the generation for the service id.
this.serviceGenerations.set(sid, generation + 1);
}
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_POOL_CLEARED,
new ConnectionPoolClearedEvent(this, { serviceId })
);
Expand All @@ -452,9 +458,11 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

this.clearMinPoolSizeTimer();
if (!alreadyPaused) {
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_POOL_CLEARED,
new ConnectionPoolClearedEvent(this, { interruptInUseConnections })
new ConnectionPoolClearedEvent(this, {
interruptInUseConnections
})
);
}

Expand Down Expand Up @@ -509,15 +517,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
eachAsync<Connection>(
this[kConnections].toArray(),
(conn, cb) => {
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy({ force: !!options.force }, cb);
},
err => {
this[kConnections].clear();
this.emit(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
callback(err);
}
);
Expand Down Expand Up @@ -645,7 +653,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
connection: Connection,
reason: 'error' | 'idle' | 'stale' | 'poolClosed'
) {
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, connection, reason)
);
Expand Down Expand Up @@ -694,15 +702,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

this[kPending]++;
// This is our version of a "virtual" no-I/O connection as the spec requires
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CREATED,
new ConnectionCreatedEvent(this, { id: connectOptions.id })
);

connect(connectOptions, (err, connection) => {
if (err || !connection) {
this[kPending]--;
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(
this,
Expand Down Expand Up @@ -750,7 +758,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

connection.markAvailable();
this.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection));
this.emitAndLog(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection));

this[kPending]--;
callback(undefined, connection);
Expand Down Expand Up @@ -819,7 +827,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
if (this[kPoolState] !== PoolState.ready) {
const reason = this.closed ? 'poolClosed' : 'connectionError';
const error = this.closed ? new PoolClosedError(this) : new PoolClearedError(this);
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, reason, error)
);
Expand All @@ -842,7 +850,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

if (!this.destroyConnectionIfPerished(connection)) {
this[kCheckedOut].add(connection);
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
Expand Down Expand Up @@ -872,14 +880,14 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}
} else {
if (err) {
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
// TODO(NODE-5192): Remove this cast
new ConnectionCheckOutFailedEvent(this, 'connectionError', err as MongoError)
);
} else if (connection) {
this[kCheckedOut].add(connection);
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
Expand Down
14 changes: 6 additions & 8 deletions src/mongo_client.ts
Expand Up @@ -325,7 +325,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
/** @internal */
topology?: Topology;
/** @internal */
readonly mongoLogger: MongoLogger;
override readonly mongoLogger: MongoLogger;
/** @internal */
private connectionLock?: Promise<this>;

Expand Down Expand Up @@ -471,23 +471,21 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
}
}

const topology = new Topology(options.hosts, options);
this.topology = new Topology(this, options.hosts, options);
// Events can be emitted before initialization is complete so we have to
// save the reference to the topology on the client ASAP if the event handlers need to access it
this.topology = topology;
topology.client = this;

topology.once(Topology.OPEN, () => this.emit('open', this));
this.topology.once(Topology.OPEN, () => this.emit('open', this));

for (const event of MONGO_CLIENT_EVENTS) {
topology.on(event, (...args: any[]) => this.emit(event, ...(args as any)));
this.topology.on(event, (...args: any[]) => this.emit(event, ...(args as any)));
}

const topologyConnect = async () => {
try {
await promisify(callback => topology.connect(options, callback))();
await promisify(callback => this.topology?.connect(options, callback))();
} catch (error) {
topology.close({ force: true });
this.topology?.close({ force: true });
throw error;
}
};
Expand Down
18 changes: 16 additions & 2 deletions src/mongo_types.ts
Expand Up @@ -12,6 +12,7 @@ import type {
ObjectId,
Timestamp
} from './bson';
import type { MongoLoggableComponent, MongoLogger } from './mongo_logger';
import type { Sort } from './sort';

/** @internal */
Expand Down Expand Up @@ -397,8 +398,21 @@ export declare interface TypedEventEmitter<Events extends EventsDescription> ext
* Typescript type safe event emitter
* @public
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export class TypedEventEmitter<Events extends EventsDescription> extends EventEmitter {}

export class TypedEventEmitter<Events extends EventsDescription> extends EventEmitter {
/** @internal */
protected mongoLogger?: MongoLogger;
/** @internal */
protected component?: MongoLoggableComponent;
/** @internal */
protected emitAndLog<EventKey extends keyof Events>(
event: EventKey | symbol,
...args: Parameters<Events[EventKey]>
): void {
this.emit(event, ...args);
if (this.component) this.mongoLogger?.debug(this.component, args[0]);
}
}

/** @public */
export class CancellationToken extends TypedEventEmitter<{ cancel(): void }> {}
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/monitor.ts
Expand Up @@ -111,7 +111,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
const connectOptions = Object.assign(
{
id: '<monitor>' as const,
generation: server.s.pool.generation,
generation: server.pool.generation,
connectionType: Connection,
cancellationToken,
hostAddress: server.description.hostAddress
Expand Down
41 changes: 21 additions & 20 deletions src/sdam/server.ts
Expand Up @@ -87,10 +87,6 @@ export interface ServerPrivate {
options: ServerOptions;
/** The current state of the Server */
state: string;
/** The topology this server is a part of */
topology: Topology;
/** A connection pool for this server */
pool: ConnectionPool;
/** MongoDB server API version */
serverApi?: ServerApi;
/** A count of the operations currently running against the server. */
Expand All @@ -114,6 +110,10 @@ export type ServerEvents = {
export class Server extends TypedEventEmitter<ServerEvents> {
/** @internal */
s: ServerPrivate;
/** @internal */
topology: Topology;
/** @internal */
pool: ConnectionPool;
serverApi?: ServerApi;
hello?: Document;
[kMonitor]: Monitor | null;
Expand Down Expand Up @@ -143,20 +143,21 @@ export class Server extends TypedEventEmitter<ServerEvents> {

const poolOptions = { hostAddress: description.hostAddress, ...options };

this.topology = topology;
this.pool = new ConnectionPool(this, poolOptions);

this.s = {
description,
options,
state: STATE_CLOSED,
topology,
pool: new ConnectionPool(this, poolOptions),
operationCount: 0
};

for (const event of [...CMAP_EVENTS, ...APM_EVENTS]) {
this.s.pool.on(event, (e: any) => this.emit(event, e));
this.pool.on(event, (e: any) => this.emit(event, e));
}

this.s.pool.on(Connection.CLUSTER_TIME_RECEIVED, (clusterTime: ClusterTime) => {
this.pool.on(Connection.CLUSTER_TIME_RECEIVED, (clusterTime: ClusterTime) => {
this.clusterTime = clusterTime;
});

Expand Down Expand Up @@ -192,11 +193,11 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}

get clusterTime(): ClusterTime | undefined {
return this.s.topology.clusterTime;
return this.topology.clusterTime;
}

set clusterTime(clusterTime: ClusterTime | undefined) {
this.s.topology.clusterTime = clusterTime;
this.topology.clusterTime = clusterTime;
}

get description(): ServerDescription {
Expand All @@ -215,7 +216,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}

get loadBalanced(): boolean {
return this.s.topology.description.type === TopologyType.LoadBalanced;
return this.topology.description.type === TopologyType.LoadBalanced;
}

/**
Expand Down Expand Up @@ -261,7 +262,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
this[kMonitor]?.close();
}

this.s.pool.close(options, err => {
this.pool.close(options, err => {
stateTransition(this, STATE_CLOSED);
this.emit('closed');
if (typeof callback === 'function') {
Expand Down Expand Up @@ -330,7 +331,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
// balanced code makes a recursive call). Instead, we increment the count after this
// check.
if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) {
this.s.pool.checkOut((err, checkedOut) => {
this.pool.checkOut((err, checkedOut) => {
if (err || checkedOut == null) {
if (callback) return callback(err);
return;
Expand All @@ -344,7 +345,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {

this.s.operationCount += 1;

this.s.pool.withConnection(
this.pool.withConnection(
conn,
(err, conn, cb) => {
if (err || !conn) {
Expand Down Expand Up @@ -382,7 +383,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}

const isStaleError =
error.connectionGeneration && error.connectionGeneration < this.s.pool.generation;
error.connectionGeneration && error.connectionGeneration < this.pool.generation;
if (isStaleError) {
return;
}
Expand All @@ -398,14 +399,14 @@ export class Server extends TypedEventEmitter<ServerEvents> {
error.addErrorLabel(MongoErrorLabel.ResetPool);
markServerUnknown(this, error);
} else if (connection) {
this.s.pool.clear({ serviceId: connection.serviceId });
this.pool.clear({ serviceId: connection.serviceId });
}
} else {
if (isSDAMUnrecoverableError(error)) {
if (shouldHandleStateChangeError(this, error)) {
const shouldClearPool = maxWireVersion(this) <= 7 || isNodeShuttingDownError(error);
if (this.loadBalanced && connection && shouldClearPool) {
this.s.pool.clear({ serviceId: connection.serviceId });
this.pool.clear({ serviceId: connection.serviceId });
}

if (!this.loadBalanced) {
Expand Down Expand Up @@ -514,7 +515,7 @@ function makeOperationHandler(
return callback(error);
}

if (connectionIsStale(server.s.pool, connection)) {
if (connectionIsStale(server.pool, connection)) {
return callback(error);
}

Expand All @@ -532,15 +533,15 @@ function makeOperationHandler(
}

if (
(isRetryableWritesEnabled(server.s.topology) || isTransactionCommand(cmd)) &&
(isRetryableWritesEnabled(server.topology) || isTransactionCommand(cmd)) &&
supportsRetryableWrites(server) &&
!inActiveTransaction(session, cmd)
) {
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}
} else {
if (
(isRetryableWritesEnabled(server.s.topology) || isTransactionCommand(cmd)) &&
(isRetryableWritesEnabled(server.topology) || isTransactionCommand(cmd)) &&
needsRetryableWriteLabel(error, maxWireVersion(server)) &&
!inActiveTransaction(session, cmd)
) {
Expand Down

0 comments on commit b27f385

Please sign in to comment.