Skip to content

Commit

Permalink
feat: support the streaming protocol for topology updates
Browse files Browse the repository at this point in the history
This ports work from 3.6 to support the following:
 - `topologyVersion` on all topology updates, including work to
   ignore stale errors for topology updates
 - support servers which stream topology updates
  • Loading branch information
mbroadst committed Jun 2, 2020
1 parent 1a443e7 commit 7e9c5bc
Show file tree
Hide file tree
Showing 184 changed files with 17,401 additions and 487 deletions.
13 changes: 8 additions & 5 deletions lib/cmap/connect.js
Expand Up @@ -2,11 +2,14 @@
const net = require('net');
const tls = require('tls');
const { Connection } = require('./connection');
const MongoError = require('../error').MongoError;
const MongoNetworkError = require('../error').MongoNetworkError;
const {
MongoError,
MongoNetworkError,
MongoNetworkTimeoutError
} = require('../error');
const { defaultAuthProviders } = require('./auth/defaultAuthProviders');
const AuthContext = require('./auth/auth_provider').AuthContext;
const makeClientMetadata = require('../utils').makeClientMetadata;
const { AuthContext } = require('./auth/auth_provider');
const { makeClientMetadata } = require('../utils');
const {
MAX_SUPPORTED_WIRE_VERSION,
MAX_SUPPORTED_SERVER_VERSION,
Expand Down Expand Up @@ -314,7 +317,7 @@ function connectionFailureError(type, err) {
case 'error':
return new MongoNetworkError(err);
case 'timeout':
return new MongoNetworkError(`connection timed out`);
return new MongoNetworkTimeoutError(`connection timed out`);
case 'close':
return new MongoNetworkError(`connection closed`);
case 'cancel':
Expand Down
25 changes: 16 additions & 9 deletions lib/cmap/connection.js
Expand Up @@ -8,7 +8,12 @@ const wp = require('./wire_protocol');
const { CommandStartedEvent, CommandFailedEvent, CommandSucceededEvent } = require('./events');
const { updateSessionFromResponse } = require('../sessions');
const { uuidV4 } = require('../utils');
const { MongoError, MongoNetworkError, MongoWriteConcernError } = require('../error');
const {
MongoError,
MongoNetworkError,
MongoNetworkTimeoutError,
MongoWriteConcernError
} = require('../error');

const kStream = Symbol('stream');
const kQueue = Symbol('queue');
Expand Down Expand Up @@ -72,10 +77,14 @@ class Connection extends EventEmitter {
stream.destroy();
this.closed = true;
this[kQueue].forEach(op =>
op.cb(new MongoNetworkError(`connection ${this.id} to ${this.address} timed out`))
op.cb(
new MongoNetworkTimeoutError(`connection ${this.id} to ${this.address} timed out`, {
beforeHandshake: this[kIsMaster] == null
})
)
);
this[kQueue].clear();

this[kQueue].clear();
this.emit('close');
});

Expand Down Expand Up @@ -212,6 +221,7 @@ function messageHandler(conn) {
}

const operationDescription = conn[kQueue].get(message.responseTo);
const callback = operationDescription.cb;

// SERVER-45775: For exhaust responses we should be able to use the same requestId to
// track response, however the server currently synthetically produces remote requests
Expand All @@ -220,10 +230,7 @@ function messageHandler(conn) {
if (message.moreToCome) {
// requeue the callback for next synthetic request
conn[kQueue].set(message.requestId, operationDescription);
}

const callback = operationDescription.cb;
if (operationDescription.socketTimeoutOverride) {
} else if (operationDescription.socketTimeoutOverride) {
conn[kStream].setTimeout(conn.socketTimeout);
}

Expand Down Expand Up @@ -320,8 +327,8 @@ function write(command, options, callback) {
}

// if command monitoring is enabled we need to modify the callback here
if (this.monitorCommands) {
connection.emit('commandStarted', new CommandStartedEvent(this, command));
if (connection.monitorCommands) {
connection.emit('commandStarted', new CommandStartedEvent(connection, command));

operationDescription.started = process.hrtime();
operationDescription.cb = (err, reply) => {
Expand Down
30 changes: 28 additions & 2 deletions lib/error.js
Expand Up @@ -105,6 +105,11 @@ class MongoError extends Error {
}
}

const kBeforeHandshake = Symbol('beforeHandshake');
function isNetworkErrorBeforeHandshake(err) {
return err[kBeforeHandshake] === true;
}

/**
* An error indicating an issue with the network, including TCP
* errors and timeouts.
Expand All @@ -115,9 +120,28 @@ class MongoError extends Error {
* @extends MongoError
*/
class MongoNetworkError extends MongoError {
constructor(message) {
constructor(message, options) {
super(message);
this.name = 'MongoNetworkError';

if (options && options.beforeHandshake === true) {
this[kBeforeHandshake] = true;
}
}
}

/**
* An error indicating a network timeout occurred
*
* @param {Error|string|object} message The error message
* @property {string} message The error message
* @property {object} [options.beforeHandshake] Indicates the timeout happened before a connection handshake completed
* @extends MongoError
*/
class MongoNetworkTimeoutError extends MongoNetworkError {
constructor(message, options) {
super(message, options);
this.name = 'MongoNetworkTimeoutError';
}
}

Expand Down Expand Up @@ -363,6 +387,7 @@ module.exports = {
GET_MORE_RESUMABLE_CODES,
MongoError,
MongoNetworkError,
MongoNetworkTimeoutError,
MongoParseError,
MongoTimeoutError,
MongoServerSelectionError,
Expand All @@ -372,5 +397,6 @@ module.exports = {
isNodeShuttingDownError,
isNetworkTimeoutError,
isRetryableWriteError,
isResumableError
isResumableError,
isNetworkErrorBeforeHandshake
};
3 changes: 2 additions & 1 deletion lib/operations/connect.js
Expand Up @@ -113,7 +113,8 @@ const validOptionNames = [
'minHeartbeatFrequencyMS',
'heartbeatFrequencyMS',
'waitQueueTimeoutMS',
'directConnection'
'directConnection',
'appName'
];

const ignoreOptionNames = ['native_parser'];
Expand Down

0 comments on commit 7e9c5bc

Please sign in to comment.