Skip to content

Commit

Permalink
feat(NODE-4687): Add logging to server selection (#3946)
Browse files Browse the repository at this point in the history
Co-authored-by: Durran Jordan <durran@gmail.com>
  • Loading branch information
aditi-khare-mongoDB and durran authored Jan 8, 2024
1 parent b93d405 commit 7f3ce0b
Show file tree
Hide file tree
Showing 24 changed files with 3,033 additions and 54 deletions.
16 changes: 11 additions & 5 deletions src/change_stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -934,10 +934,14 @@ export class ChangeStream<
this.cursor.close().catch(() => null);

const topology = getTopology(this.parent);
topology.selectServer(this.cursor.readPreference, {}, serverSelectionError => {
if (serverSelectionError) return this._closeEmitterModeWithError(changeStreamError);
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
});
topology.selectServer(
this.cursor.readPreference,
{ operationName: 'reconnect topology in change stream' },
serverSelectionError => {
if (serverSelectionError) return this._closeEmitterModeWithError(changeStreamError);
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
}
);
} else {
this._closeEmitterModeWithError(changeStreamError);
}
Expand All @@ -962,7 +966,9 @@ export class ChangeStream<
await this.cursor.close().catch(() => null);
const topology = getTopology(this.parent);
try {
await topology.selectServerAsync(this.cursor.readPreference, {});
await topology.selectServerAsync(this.cursor.readPreference, {
operationName: 'reconnect topology in change stream'
});
this.cursor = this._createChangeStreamCursor(this.cursor.resumeOptions);
} catch {
// if the topology can't reconnect, close the stream
Expand Down
8 changes: 8 additions & 0 deletions src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,14 @@ export const TOPOLOGY_CLOSED = 'topologyClosed' as const;
/** @internal */
export const TOPOLOGY_DESCRIPTION_CHANGED = 'topologyDescriptionChanged' as const;
/** @internal */
export const SERVER_SELECTION_STARTED = 'serverSelectionStarted' as const;
/** @internal */
export const SERVER_SELECTION_FAILED = 'serverSelectionFailed' as const;
/** @internal */
export const SERVER_SELECTION_SUCCEEDED = 'serverSelectionSucceeded' as const;
/** @internal */
export const WAITING_FOR_SUITABLE_SERVER = 'waitingForSuitableServer' as const;
/** @internal */
export const CONNECTION_POOL_CREATED = 'connectionPoolCreated' as const;
/** @internal */
export const CONNECTION_POOL_CLOSED = 'connectionPoolClosed' as const;
Expand Down
13 changes: 12 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ export {
TopologyDescriptionChangedEvent,
TopologyOpeningEvent
} from './sdam/events';
export {
ServerSelectionEvent,
ServerSelectionFailedEvent,
ServerSelectionStartedEvent,
ServerSelectionSucceededEvent,
WaitingForSuitableServerEvent
} from './sdam/server_selection_events';
export { SrvPollingEvent } from './sdam/srv_polling';

// type only exports below, these are removed from emitted JS
Expand Down Expand Up @@ -303,9 +310,13 @@ export type {
SERVER_HEARTBEAT_STARTED,
SERVER_HEARTBEAT_SUCCEEDED,
SERVER_OPENING,
SERVER_SELECTION_FAILED,
SERVER_SELECTION_STARTED,
SERVER_SELECTION_SUCCEEDED,
TOPOLOGY_CLOSED,
TOPOLOGY_DESCRIPTION_CHANGED,
TOPOLOGY_OPENING
TOPOLOGY_OPENING,
WAITING_FOR_SUITABLE_SERVER
} from './constants';
export type {
AbstractCursorEvents,
Expand Down
66 changes: 56 additions & 10 deletions src/mongo_logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import type {
ConnectionPoolClearedEvent,
ConnectionPoolClosedEvent,
ConnectionPoolCreatedEvent,
ConnectionPoolMonitoringEvent,
ConnectionPoolReadyEvent,
ConnectionReadyEvent
} from './cmap/connection_pool_events';
Expand All @@ -41,9 +40,13 @@ import {
SERVER_HEARTBEAT_STARTED,
SERVER_HEARTBEAT_SUCCEEDED,
SERVER_OPENING,
SERVER_SELECTION_FAILED,
SERVER_SELECTION_STARTED,
SERVER_SELECTION_SUCCEEDED,
TOPOLOGY_CLOSED,
TOPOLOGY_DESCRIPTION_CHANGED,
TOPOLOGY_OPENING
TOPOLOGY_OPENING,
WAITING_FOR_SUITABLE_SERVER
} from './constants';
import type {
ServerClosedEvent,
Expand All @@ -52,6 +55,13 @@ import type {
TopologyDescriptionChangedEvent,
TopologyOpeningEvent
} from './sdam/events';
import type {
ServerSelectionEvent,
ServerSelectionFailedEvent,
ServerSelectionStartedEvent,
ServerSelectionSucceededEvent,
WaitingForSuitableServerEvent
} from './sdam/server_selection_events';
import { HostAddress, parseUnsignedInteger } from './utils';

/** @internal */
Expand Down Expand Up @@ -335,6 +345,10 @@ type SDAMLoggableEvent =

/** @internal */
export type LoggableEvent =
| ServerSelectionStartedEvent
| ServerSelectionFailedEvent
| ServerSelectionSucceededEvent
| WaitingForSuitableServerEvent
| CommandStartedEvent
| CommandSucceededEvent
| CommandFailedEvent
Expand Down Expand Up @@ -369,11 +383,16 @@ export function stringifyWithMaxLen(
maxDocumentLength: number,
options: EJSONOptions = {}
): string {
const ejson = EJSON.stringify(value, options);
let strToTruncate: string;
if (typeof value === 'function') {
strToTruncate = value.toString();
} else {
strToTruncate = EJSON.stringify(value, options);
}

return maxDocumentLength !== 0 && ejson.length > maxDocumentLength
? `${ejson.slice(0, maxDocumentLength)}...`
: ejson;
return maxDocumentLength !== 0 && strToTruncate.length > maxDocumentLength
? `${strToTruncate.slice(0, maxDocumentLength)}...`
: strToTruncate;
}

/** @internal */
Expand All @@ -385,6 +404,20 @@ function isLogConvertible(obj: Loggable): obj is LogConvertible {
return objAsLogConvertible.toLog !== undefined && typeof objAsLogConvertible.toLog === 'function';
}

function attachServerSelectionFields(
log: Record<string, any>,
serverSelectionEvent: ServerSelectionEvent,
maxDocumentLength: number = DEFAULT_MAX_DOCUMENT_LENGTH
) {
const { selector, operation, topologyDescription, message } = serverSelectionEvent;
log.selector = stringifyWithMaxLen(selector, maxDocumentLength);
log.operation = operation;
log.topologyDescription = stringifyWithMaxLen(topologyDescription, maxDocumentLength);
log.message = message;

return log;
}

function attachCommandFields(
log: Record<string, any>,
commandEvent: CommandStartedEvent | CommandSucceededEvent | CommandFailedEvent
Expand All @@ -402,10 +435,7 @@ function attachCommandFields(
return log;
}

function attachConnectionFields(
log: Record<string, any>,
event: ConnectionPoolMonitoringEvent | ServerOpeningEvent | ServerClosedEvent
) {
function attachConnectionFields(log: Record<string, any>, event: any) {
const { host, port } = HostAddress.fromString(event.address).toHostPort();
log.serverHost = host;
log.serverPort = port;
Expand Down Expand Up @@ -441,6 +471,22 @@ function defaultLogTransform(
let log: Omit<Log, 's' | 't' | 'c'> = Object.create(null);

switch (logObject.name) {
case SERVER_SELECTION_STARTED:
log = attachServerSelectionFields(log, logObject, maxDocumentLength);
return log;
case SERVER_SELECTION_FAILED:
log = attachServerSelectionFields(log, logObject, maxDocumentLength);
log.failure = logObject.failure.message;
return log;
case SERVER_SELECTION_SUCCEEDED:
log = attachServerSelectionFields(log, logObject, maxDocumentLength);
log.serverHost = logObject.serverHost;
log.serverPort = logObject.serverPort;
return log;
case WAITING_FOR_SUITABLE_SERVER:
log = attachServerSelectionFields(log, logObject, maxDocumentLength);
log.remainingTimeMS = logObject.remainingTimeMS;
return log;
case COMMAND_STARTED:
log = attachCommandFields(log, logObject);
log.message = 'Command started';
Expand Down
10 changes: 8 additions & 2 deletions src/operations/execute_operation.ts
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,10 @@ async function executeOperationAsync<
selector = readPreference;
}

const server = await topology.selectServerAsync(selector, { session });
const server = await topology.selectServerAsync(selector, {
session,
operationName: operation.commandName
});

if (session == null) {
// No session also means it is not retryable, early exit
Expand Down Expand Up @@ -251,7 +254,10 @@ async function retryOperation<
}

// select a new server, and attempt to retry the operation
const server = await topology.selectServerAsync(selector, { session });
const server = await topology.selectServerAsync(selector, {
session,
operationName: operation.commandName
});

if (isWriteOperation && !supportsRetryableWrites(server)) {
throw new MongoUnexpectedServerResponseError(
Expand Down
142 changes: 142 additions & 0 deletions src/sdam/server_selection_events.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
import { HostAddress } from '.././utils';
import {
SERVER_SELECTION_FAILED,
SERVER_SELECTION_STARTED,
SERVER_SELECTION_SUCCEEDED,
WAITING_FOR_SUITABLE_SERVER
} from '../constants';
import { type ReadPreference } from '../read_preference';
import { type ServerSelector } from './server_selection';
import type { TopologyDescription } from './topology_description';

/**
* The base export class for all logs published from server selection
* @internal
* @category Log Type
*/
export abstract class ServerSelectionEvent {
/** String representation of the selector being used to select the server.
* Defaults to 'custom selector' for application-provided custom selector case.
*/
selector: string | ReadPreference | ServerSelector;
/** The name of the operation for which a server is being selected. */
operation: string;
/** The current topology description. */
topologyDescription: TopologyDescription;

/** @internal */
abstract name:
| typeof SERVER_SELECTION_STARTED
| typeof SERVER_SELECTION_SUCCEEDED
| typeof SERVER_SELECTION_FAILED
| typeof WAITING_FOR_SUITABLE_SERVER;

abstract message: string;

/** @internal */
constructor(
selector: string | ReadPreference | ServerSelector,
topologyDescription: TopologyDescription,
operation: string
) {
this.selector = selector;
this.operation = operation;
this.topologyDescription = topologyDescription;
}
}

/**
* An event published when server selection starts
* @internal
* @category Event
*/
export class ServerSelectionStartedEvent extends ServerSelectionEvent {
/** @internal */
name = SERVER_SELECTION_STARTED;
message = 'Server selection started';

/** @internal */
constructor(
selector: string | ReadPreference | ServerSelector,
topologyDescription: TopologyDescription,
operation: string
) {
super(selector, topologyDescription, operation);
}
}

/**
* An event published when a server selection fails
* @internal
* @category Event
*/
export class ServerSelectionFailedEvent extends ServerSelectionEvent {
/** @internal */
name = SERVER_SELECTION_FAILED;
message = 'Server selection failed';
/** Representation of the error the driver will throw regarding server selection failing. */
failure: Error;

/** @internal */
constructor(
selector: string | ReadPreference | ServerSelector,
topologyDescription: TopologyDescription,
error: Error,
operation: string
) {
super(selector, topologyDescription, operation);
this.failure = error;
}
}

/**
* An event published when server selection succeeds
* @internal
* @category Event
*/
export class ServerSelectionSucceededEvent extends ServerSelectionEvent {
/** @internal */
name = SERVER_SELECTION_SUCCEEDED;
message = 'Server selection succeeded';
/** The hostname, IP address, or Unix domain socket path for the selected server. */
serverHost: string;
/** The port for the selected server. Optional; not present for Unix domain sockets. When the user does not specify a port and the default (27017) is used, the driver SHOULD include it here. */
serverPort: number | undefined;

/** @internal */
constructor(
selector: string | ReadPreference | ServerSelector,
topologyDescription: TopologyDescription,
address: string,
operation: string
) {
super(selector, topologyDescription, operation);
const { host, port } = HostAddress.fromString(address).toHostPort();
this.serverHost = host;
this.serverPort = port;
}
}

/**
* An event published when server selection is waiting for a suitable server to become available
* @internal
* @category Event
*/
export class WaitingForSuitableServerEvent extends ServerSelectionEvent {
/** @internal */
name = WAITING_FOR_SUITABLE_SERVER;
message = 'Waiting for suitable server to become available';
/** The remaining time left until server selection will time out. */
remainingTimeMS: number;

/** @internal */
constructor(
selector: string | ReadPreference | ServerSelector,
topologyDescription: TopologyDescription,
remainingTimeMS: number,
operation: string
) {
super(selector, topologyDescription, operation);
this.remainingTimeMS = remainingTimeMS;
}
}
Loading

0 comments on commit 7f3ce0b

Please sign in to comment.