Skip to content

Commit

Permalink
feat(execute-operation): allow execution with server selection
Browse files Browse the repository at this point in the history
This is the first step to moving all server selection into the
operation executor. An aspect on the operation called
`EXECUTE_WITH_SELECTION` indicates that the operation will use a
variant of the `execute` method which accepts a selected `Server`
instance. NOTE: the retry logic is current read specific.
  • Loading branch information
mbroadst authored and daprahamian committed Aug 13, 2019
1 parent 781fcfa commit 36bc1fd
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
1 change: 1 addition & 0 deletions lib/operations/connect.js
Expand Up @@ -133,6 +133,7 @@ const validOptionNames = [
'minSize',
'monitorCommands',
'retryWrites',
'retryReads',
'useNewUrlParser',
'useUnifiedTopology',
'serverSelectionTimeoutMS',
Expand Down
54 changes: 52 additions & 2 deletions lib/operations/execute_operation.js
Expand Up @@ -3,6 +3,8 @@
const MongoError = require('../core').MongoError;
const Aspect = require('./operation').Aspect;
const OperationBase = require('./operation').OperationBase;
const ReadPreference = require('../core').ReadPreference;
const isRetryableError = require('../core/error').isRetryableError;

/**
* Executes the given operation with provided arguments.
Expand Down Expand Up @@ -65,7 +67,11 @@ function executeOperation(topology, operation, callback) {
);

try {
return operation.execute(handler);
if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
return executeWithServerSelection(topology, operation, handler);
} else {
return operation.execute(handler);
}
} catch (e) {
handler(e);
throw e;
Expand All @@ -76,11 +82,55 @@ function executeOperation(topology, operation, callback) {
const handler = makeExecuteCallback(resolve, reject);

try {
return operation.execute(handler);
if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
return executeWithServerSelection(topology, operation, handler);
} else {
return operation.execute(handler);
}
} catch (e) {
handler(e);
}
});
}

function executeWithServerSelection(topology, operation, callback) {
const readPreference = operation.readPreference || ReadPreference.primary;
const shouldRetryReads = topology.s.options.retryReads !== false;

function callbackWithRetry(err, result) {
if (err == null) {
return callback(null, result);
}

if (!isRetryableError(err)) {
return callback(err);
}

// select a new server, and attempt to retry the operation
topology.selectServer(readPreference, (err, server) => {
if (err) {
callback(err, null);
return;
}

operation.execute(server, callback);
});
}

// select a server, and execute the operation against it
topology.selectServer(readPreference, (err, server) => {
if (err) {
callback(err, null);
return;
}

if (operation.hasAspect(Aspect.RETRYABLE) && shouldRetryReads) {
operation.execute(server, callbackWithRetry);
return;
}

operation.execute(server, callback);
});
}

module.exports = executeOperation;

0 comments on commit 36bc1fd

Please sign in to comment.