Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(NODE-4720): Add log messages to CMAP spec #3645

Merged
merged 37 commits into from
May 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8e34408
feat(NODE-4720): Add cmap log messages
W-A-James Apr 14, 2023
bd4f3b9
refactor(NODE-4720): add client close operation
W-A-James Apr 17, 2023
185dc7a
fix(NODE-4720): use debug logger
W-A-James Apr 17, 2023
3a3755c
test(NODE-4720): pull in cmap loggingt tests
W-A-James Apr 17, 2023
a6ae55e
fix(NODE-4720): start cmap debugging
W-A-James Apr 18, 2023
f7460a4
refactor(NODE-5170): update unified spec test runner
W-A-James Apr 14, 2023
e229978
Merge branch 'main' into NODE-4720/add_log_messages_to_cmap_spec
W-A-James Apr 19, 2023
58b1f8d
refactor(NODE-4720): Fix log collector
W-A-James Apr 20, 2023
30e0a6d
WIP
W-A-James Apr 20, 2023
013c252
test(NODE-4720): Fix skip reason
W-A-James Apr 26, 2023
f5850c0
Merge branch 'main' into NODE-4720/add_log_messages_to_cmap_spec
W-A-James Apr 26, 2023
132e16a
test(NODE-4720): sync spec tests
W-A-James Apr 26, 2023
793831f
Merge branch 'NODE-4720/add_log_messages_to_cmap_spec' of github.com:…
W-A-James Apr 26, 2023
b8d4f22
test(NODE-4720): Add modified spec test
W-A-James Apr 26, 2023
a519db5
fix(NODE-4720): Update Topology constructor
W-A-James Apr 27, 2023
8cfd0cc
test(NODE-4720): remove console.log
W-A-James Apr 27, 2023
d9dff09
style(NODE-4720): remove commented code
W-A-James Apr 27, 2023
9496a0d
Merge branch 'main' into NODE-4720/add_log_messages_to_cmap_spec
W-A-James Apr 27, 2023
9b20b3e
Merge branch 'NODE-4720/add_log_messages_to_cmap_spec' of github.com:…
W-A-James Apr 27, 2023
c98d6a9
fix(NODE-4720): start addressing review comments
W-A-James May 2, 2023
f336b93
test(NODE-4720): test fix
W-A-James May 2, 2023
b5f3348
test(NODE-4720): undo janky fix for evg
W-A-James May 2, 2023
b62637e
fix(NODE-4720): Fix test failures
W-A-James May 2, 2023
4ace6fa
fix(NODE-4720): review comment fixes
W-A-James May 2, 2023
8c8a9f6
fix(NODE-4720): update typedEE
W-A-James May 3, 2023
62e5ea2
test(NODE-4720): fix test failure
W-A-James May 3, 2023
019913f
style(NODE-4720): eslint
W-A-James May 3, 2023
671dacc
test(NODE-4720): fix test failures
W-A-James May 3, 2023
3bcbe16
Merge branch 'main' into NODE-4720/add_log_messages_to_cmap_spec
W-A-James May 3, 2023
b86180a
test(NODE-4720): test update
W-A-James May 3, 2023
50cb515
test(NODE-4720): move to ts
W-A-James May 4, 2023
2075178
Merge branch 'main' into NODE-4720/add_log_messages_to_cmap_spec
W-A-James May 4, 2023
ea213bb
Merge branch 'NODE-4720/add_log_messages_to_cmap_spec' of github.com:…
W-A-James May 4, 2023
4fe466a
test(NODE-4720): test refactor
W-A-James May 4, 2023
0855d8e
style(NODE-4720): eslint
W-A-James May 4, 2023
f5f1b71
style(NODE-4720): move comment location
W-A-James May 8, 2023
5c2b31b
Merge branch 'main' into NODE-4720/add_log_messages_to_cmap_spec
nbbeeken May 8, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 26 additions & 18 deletions src/cmap/connection_pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -251,8 +251,11 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
this[kMetrics] = new ConnectionPoolMetrics();
this[kProcessingWaitQueue] = false;

this.mongoLogger = this[kServer].topology.client.mongoLogger;
this.component = 'connection';

process.nextTick(() => {
this.emit(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this));
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CREATED, new ConnectionPoolCreatedEvent(this));
});
}

Expand Down Expand Up @@ -337,7 +340,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
return;
}
this[kPoolState] = PoolState.ready;
this.emit(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this));
this.emitAndLog(ConnectionPool.CONNECTION_POOL_READY, new ConnectionPoolReadyEvent(this));
clearTimeout(this[kMinPoolSizeTimer]);
this.ensureMinPoolSize();
}
Expand All @@ -348,7 +351,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
* explicitly destroyed by the new owner.
*/
checkOut(callback: Callback<Connection>): void {
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_STARTED,
new ConnectionCheckOutStartedEvent(this)
);
Expand All @@ -360,7 +363,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
waitQueueMember[kCancelled] = true;
waitQueueMember.timer = undefined;

this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, 'timeout')
);
Expand Down Expand Up @@ -398,7 +401,10 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

this[kCheckedOut].delete(connection);
this.emit(ConnectionPool.CONNECTION_CHECKED_IN, new ConnectionCheckedInEvent(this, connection));
this.emitAndLog(
ConnectionPool.CONNECTION_CHECKED_IN,
new ConnectionCheckedInEvent(this, connection)
);

if (willDestroy) {
const reason = connection.closed ? 'error' : poolClosed ? 'poolClosed' : 'stale';
Expand Down Expand Up @@ -437,7 +443,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
// Increment the generation for the service id.
this.serviceGenerations.set(sid, generation + 1);
}
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_POOL_CLEARED,
new ConnectionPoolClearedEvent(this, { serviceId })
);
Expand All @@ -452,9 +458,11 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

this.clearMinPoolSizeTimer();
if (!alreadyPaused) {
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_POOL_CLEARED,
new ConnectionPoolClearedEvent(this, { interruptInUseConnections })
new ConnectionPoolClearedEvent(this, {
interruptInUseConnections
})
);
}

Expand Down Expand Up @@ -509,15 +517,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
eachAsync<Connection>(
this[kConnections].toArray(),
(conn, cb) => {
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, conn, 'poolClosed')
);
conn.destroy({ force: !!options.force }, cb);
},
err => {
this[kConnections].clear();
this.emit(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
this.emitAndLog(ConnectionPool.CONNECTION_POOL_CLOSED, new ConnectionPoolClosedEvent(this));
callback(err);
}
);
Expand Down Expand Up @@ -645,7 +653,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
connection: Connection,
reason: 'error' | 'idle' | 'stale' | 'poolClosed'
) {
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(this, connection, reason)
);
Expand Down Expand Up @@ -694,15 +702,15 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

this[kPending]++;
// This is our version of a "virtual" no-I/O connection as the spec requires
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CREATED,
new ConnectionCreatedEvent(this, { id: connectOptions.id })
);

connect(connectOptions, (err, connection) => {
if (err || !connection) {
this[kPending]--;
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CLOSED,
new ConnectionClosedEvent(
this,
Expand Down Expand Up @@ -750,7 +758,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}

connection.markAvailable();
this.emit(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection));
this.emitAndLog(ConnectionPool.CONNECTION_READY, new ConnectionReadyEvent(this, connection));

this[kPending]--;
callback(undefined, connection);
Expand Down Expand Up @@ -819,7 +827,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
if (this[kPoolState] !== PoolState.ready) {
const reason = this.closed ? 'poolClosed' : 'connectionError';
const error = this.closed ? new PoolClosedError(this) : new PoolClearedError(this);
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
new ConnectionCheckOutFailedEvent(this, reason, error)
);
Expand All @@ -842,7 +850,7 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {

if (!this.destroyConnectionIfPerished(connection)) {
this[kCheckedOut].add(connection);
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
Expand Down Expand Up @@ -872,14 +880,14 @@ export class ConnectionPool extends TypedEventEmitter<ConnectionPoolEvents> {
}
} else {
if (err) {
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECK_OUT_FAILED,
// TODO(NODE-5192): Remove this cast
new ConnectionCheckOutFailedEvent(this, 'connectionError', err as MongoError)
);
} else if (connection) {
this[kCheckedOut].add(connection);
this.emit(
this.emitAndLog(
ConnectionPool.CONNECTION_CHECKED_OUT,
new ConnectionCheckedOutEvent(this, connection)
);
Expand Down
14 changes: 6 additions & 8 deletions src/mongo_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
/** @internal */
topology?: Topology;
/** @internal */
readonly mongoLogger: MongoLogger;
override readonly mongoLogger: MongoLogger;
/** @internal */
private connectionLock?: Promise<this>;

Expand Down Expand Up @@ -471,23 +471,21 @@ export class MongoClient extends TypedEventEmitter<MongoClientEvents> {
}
}

const topology = new Topology(options.hosts, options);
this.topology = new Topology(this, options.hosts, options);
// Events can be emitted before initialization is complete so we have to
// save the reference to the topology on the client ASAP if the event handlers need to access it
this.topology = topology;
topology.client = this;

topology.once(Topology.OPEN, () => this.emit('open', this));
this.topology.once(Topology.OPEN, () => this.emit('open', this));

for (const event of MONGO_CLIENT_EVENTS) {
topology.on(event, (...args: any[]) => this.emit(event, ...(args as any)));
this.topology.on(event, (...args: any[]) => this.emit(event, ...(args as any)));
}

const topologyConnect = async () => {
try {
await promisify(callback => topology.connect(options, callback))();
await promisify(callback => this.topology?.connect(options, callback))();
} catch (error) {
topology.close({ force: true });
this.topology?.close({ force: true });
throw error;
}
};
Expand Down
18 changes: 16 additions & 2 deletions src/mongo_types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import type {
ObjectId,
Timestamp
} from './bson';
import type { MongoLoggableComponent, MongoLogger } from './mongo_logger';
import type { Sort } from './sort';

/** @internal */
Expand Down Expand Up @@ -397,8 +398,21 @@ export declare interface TypedEventEmitter<Events extends EventsDescription> ext
* Typescript type safe event emitter
* @public
*/
// eslint-disable-next-line @typescript-eslint/no-unused-vars
export class TypedEventEmitter<Events extends EventsDescription> extends EventEmitter {}

export class TypedEventEmitter<Events extends EventsDescription> extends EventEmitter {
/** @internal */
protected mongoLogger?: MongoLogger;
/** @internal */
protected component?: MongoLoggableComponent;
/** @internal */
protected emitAndLog<EventKey extends keyof Events>(
event: EventKey | symbol,
...args: Parameters<Events[EventKey]>
): void {
this.emit(event, ...args);
if (this.component) this.mongoLogger?.debug(this.component, args[0]);
dariakp marked this conversation as resolved.
Show resolved Hide resolved
}
}

/** @public */
export class CancellationToken extends TypedEventEmitter<{ cancel(): void }> {}
Expand Down
2 changes: 1 addition & 1 deletion src/sdam/monitor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ export class Monitor extends TypedEventEmitter<MonitorEvents> {
const connectOptions = Object.assign(
{
id: '<monitor>' as const,
generation: server.s.pool.generation,
generation: server.pool.generation,
connectionType: Connection,
cancellationToken,
hostAddress: server.description.hostAddress
Expand Down
41 changes: 21 additions & 20 deletions src/sdam/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,10 +87,6 @@ export interface ServerPrivate {
options: ServerOptions;
/** The current state of the Server */
state: string;
/** The topology this server is a part of */
topology: Topology;
/** A connection pool for this server */
pool: ConnectionPool;
/** MongoDB server API version */
serverApi?: ServerApi;
/** A count of the operations currently running against the server. */
Expand All @@ -114,6 +110,10 @@ export type ServerEvents = {
export class Server extends TypedEventEmitter<ServerEvents> {
/** @internal */
s: ServerPrivate;
/** @internal */
topology: Topology;
/** @internal */
pool: ConnectionPool;
serverApi?: ServerApi;
hello?: Document;
[kMonitor]: Monitor | null;
Expand Down Expand Up @@ -143,20 +143,21 @@ export class Server extends TypedEventEmitter<ServerEvents> {

const poolOptions = { hostAddress: description.hostAddress, ...options };

this.topology = topology;
this.pool = new ConnectionPool(this, poolOptions);

this.s = {
description,
options,
state: STATE_CLOSED,
dariakp marked this conversation as resolved.
Show resolved Hide resolved
topology,
pool: new ConnectionPool(this, poolOptions),
operationCount: 0
};

for (const event of [...CMAP_EVENTS, ...APM_EVENTS]) {
this.s.pool.on(event, (e: any) => this.emit(event, e));
this.pool.on(event, (e: any) => this.emit(event, e));
}

this.s.pool.on(Connection.CLUSTER_TIME_RECEIVED, (clusterTime: ClusterTime) => {
this.pool.on(Connection.CLUSTER_TIME_RECEIVED, (clusterTime: ClusterTime) => {
this.clusterTime = clusterTime;
});

Expand Down Expand Up @@ -192,11 +193,11 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}

get clusterTime(): ClusterTime | undefined {
return this.s.topology.clusterTime;
return this.topology.clusterTime;
}

set clusterTime(clusterTime: ClusterTime | undefined) {
this.s.topology.clusterTime = clusterTime;
this.topology.clusterTime = clusterTime;
}

get description(): ServerDescription {
Expand All @@ -215,7 +216,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}

get loadBalanced(): boolean {
return this.s.topology.description.type === TopologyType.LoadBalanced;
return this.topology.description.type === TopologyType.LoadBalanced;
}

/**
Expand Down Expand Up @@ -261,7 +262,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
this[kMonitor]?.close();
}

this.s.pool.close(options, err => {
this.pool.close(options, err => {
stateTransition(this, STATE_CLOSED);
this.emit('closed');
if (typeof callback === 'function') {
Expand Down Expand Up @@ -330,7 +331,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
// balanced code makes a recursive call). Instead, we increment the count after this
// check.
if (this.loadBalanced && session && conn == null && isPinnableCommand(cmd, session)) {
this.s.pool.checkOut((err, checkedOut) => {
this.pool.checkOut((err, checkedOut) => {
if (err || checkedOut == null) {
if (callback) return callback(err);
return;
Expand All @@ -344,7 +345,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {

this.s.operationCount += 1;

this.s.pool.withConnection(
this.pool.withConnection(
conn,
(err, conn, cb) => {
if (err || !conn) {
Expand Down Expand Up @@ -382,7 +383,7 @@ export class Server extends TypedEventEmitter<ServerEvents> {
}

const isStaleError =
error.connectionGeneration && error.connectionGeneration < this.s.pool.generation;
error.connectionGeneration && error.connectionGeneration < this.pool.generation;
if (isStaleError) {
return;
}
Expand All @@ -398,14 +399,14 @@ export class Server extends TypedEventEmitter<ServerEvents> {
error.addErrorLabel(MongoErrorLabel.ResetPool);
markServerUnknown(this, error);
} else if (connection) {
this.s.pool.clear({ serviceId: connection.serviceId });
this.pool.clear({ serviceId: connection.serviceId });
}
} else {
if (isSDAMUnrecoverableError(error)) {
if (shouldHandleStateChangeError(this, error)) {
const shouldClearPool = maxWireVersion(this) <= 7 || isNodeShuttingDownError(error);
if (this.loadBalanced && connection && shouldClearPool) {
this.s.pool.clear({ serviceId: connection.serviceId });
this.pool.clear({ serviceId: connection.serviceId });
}

if (!this.loadBalanced) {
Expand Down Expand Up @@ -514,7 +515,7 @@ function makeOperationHandler(
return callback(error);
}

if (connectionIsStale(server.s.pool, connection)) {
if (connectionIsStale(server.pool, connection)) {
return callback(error);
}

Expand All @@ -532,15 +533,15 @@ function makeOperationHandler(
}

if (
(isRetryableWritesEnabled(server.s.topology) || isTransactionCommand(cmd)) &&
(isRetryableWritesEnabled(server.topology) || isTransactionCommand(cmd)) &&
supportsRetryableWrites(server) &&
!inActiveTransaction(session, cmd)
) {
error.addErrorLabel(MongoErrorLabel.RetryableWriteError);
}
} else {
if (
(isRetryableWritesEnabled(server.s.topology) || isTransactionCommand(cmd)) &&
(isRetryableWritesEnabled(server.topology) || isTransactionCommand(cmd)) &&
needsRetryableWriteLabel(error, maxWireVersion(server)) &&
!inActiveTransaction(session, cmd)
) {
Expand Down
Loading