Skip to content

Commit

Permalink
SERVER-39172 Shut down and validate nodes in parallel in ReplSetTest.…
Browse files Browse the repository at this point in the history
…stopSet
  • Loading branch information
will62794 authored and evergreen committed Nov 4, 2019
1 parent 56866f4 commit a417e97
Show file tree
Hide file tree
Showing 4 changed files with 117 additions and 13 deletions.
Expand Up @@ -9,6 +9,12 @@
load("jstests/libs/command_sequence_with_retries.js"); // for CommandSequenceWithRetries

MongoRunner.validateCollectionsCallback = function(port) {
// This function may be executed in a new Thread context, so ensure the proper definitions
// are loaded.
if (typeof CommandSequenceWithRetries === "undefined") {
load("jstests/libs/command_sequence_with_retries.js");
}

if (jsTest.options().skipCollectionAndIndexValidation) {
print("Skipping collection validation during mongod shutdown");
return;
Expand Down
65 changes: 57 additions & 8 deletions src/mongo/shell/replsettest.js
Expand Up @@ -75,6 +75,8 @@
var ReplSetTest = function(opts) {
'use strict';

load("jstests/libs/parallelTester.js"); // For Thread.

if (!(this instanceof ReplSetTest)) {
return new ReplSetTest(opts);
}
Expand Down Expand Up @@ -2665,9 +2667,11 @@ var ReplSetTest = function(opts) {
* @param {Object} [extraOptions={}]
* @param {boolean} [extraOptions.forRestart=false] indicates whether stop() is being called
* with the intent to call start() with restart=true for the same node(s) n.
* @param {boolean} [extraOptions.waitPid=true] if true, we will wait for the process to
* terminate after stopping it.
*/
this.stop = _nodeParamToSingleNode(_nodeParamToConn(function(
n, signal, opts, {forRestart: forRestart = false} = {}) {
n, signal, opts, {forRestart: forRestart = false, waitpid: waitPid = true} = {}) {
// Can specify wait as second parameter, if using default signal
if (signal == true || signal == false) {
signal = undefined;
Expand All @@ -2676,11 +2680,15 @@ var ReplSetTest = function(opts) {
n = this.getNodeId(n);

var conn = _useBridge ? _unbridgedNodes[n] : this.nodes[n];
print('ReplSetTest stop *** Shutting down mongod in port ' + conn.port + ' ***');
var ret = MongoRunner.stopMongod(conn, signal, opts);
print('ReplSetTest stop *** Shutting down mongod in port ' + conn.port +
', wait for process termination: ' + waitPid + ' ***');
var ret = MongoRunner.stopMongod(conn, signal, opts, waitPid);

print('ReplSetTest stop *** Mongod in port ' + conn.port + ' shutdown with code (' + ret +
') ***');
// We only expect the process to have terminated if we actually called 'waitpid'.
if (waitPid) {
print('ReplSetTest stop *** Mongod in port ' + conn.port + ' shutdown with code (' +
ret + ') ***');
}

if (_useBridge && !forRestart) {
// We leave the mongobridge process running when the mongod process is being restarted.
Expand All @@ -2694,6 +2702,25 @@ var ReplSetTest = function(opts) {
return ret;
}));

/**
* Performs collection validation on all nodes in the given 'ports' array in parallel.
*
* @param {int[]} ports the array of mongo ports to run validation on
*/
this.validateNodes = function(ports) {
// Perform collection validation on each node in parallel.
let validators = [];
for (let i = 0; i < ports.length; i++) {
let validator = new Thread(MongoRunner.validateCollectionsCallback, this.ports[i]);
validators.push(validator);
validators[i].start();
}
// Wait for all validators to finish.
for (let i = 0; i < ports.length; i++) {
validators[i].join();
}
};

/**
* Kill all members of this replica set.
*
Expand Down Expand Up @@ -2750,11 +2777,33 @@ var ReplSetTest = function(opts) {
});
}

print("ReplSetTest stopSet stopping all replica set nodes.");
let startTime = new Date(); // Measure the execution time of shutting down nodes.
for (var i = 0; i < this.ports.length; i++) {
this.stop(i, signal, opts);

// Optionally validate collections on all nodes.
if (opts && opts.skipValidation) {
print("ReplSetTest stopSet skipping validation before stopping nodes.");
} else {
print("ReplSetTest stopSet validating all replica set nodes before stopping them.");
this.validateNodes(this.ports);
}

// Stop all nodes without waiting for them to terminate. We also skip validation since we
// have already done it above.
opts = Object.merge(opts, {skipValidation: true});
for (let i = 0; i < this.ports.length; i++) {
this.stop(i, signal, opts, {waitpid: false});
}

// Wait for all processes to terminate.
for (let i = 0; i < this.ports.length; i++) {
let conn = _useBridge ? _unbridgedNodes[i] : this.nodes[i];
let port = parseInt(conn.port);
print("ReplSetTest stopSet waiting for mongo program on port " + port + " to stop.");
let exitCode = waitMongoProgram(port);
print("ReplSetTest stopSet mongo program on port " + port + " shut down with code " +
exitCode);
}

print("ReplSetTest stopSet stopped all replica set nodes, took " +
(new Date() - startTime) + "ms for " + this.ports.length + " nodes.");

Expand Down
11 changes: 9 additions & 2 deletions src/mongo/shell/servers.js
Expand Up @@ -918,11 +918,12 @@ MongoRunner.validateCollectionsCallback = function(port) {};
* skipValidation: <bool>,
* allowedExitCode: <int>
* }
* @param {boolean} waitpid should we wait for the process to terminate after stopping it.
*
* Note: The auth option is required in a authenticated mongod running in Windows since
* it uses the shutdown command, which requires admin credentials.
*/
MongoRunner.stopMongod = function(conn, signal, opts) {
MongoRunner.stopMongod = function(conn, signal, opts, waitpid) {
if (!conn.pid) {
throw new Error("first arg must have a `pid` property; " +
"it is usually the object returned from MongoRunner.runMongod/s");
Expand All @@ -935,6 +936,7 @@ MongoRunner.stopMongod = function(conn, signal, opts) {

signal = parseInt(signal) || 15;
opts = opts || {};
waitpid = (waitpid === undefined) ? true : waitpid;

var allowedExitCode = MongoRunner.EXIT_CLEAN;

Expand Down Expand Up @@ -965,7 +967,12 @@ MongoRunner.stopMongod = function(conn, signal, opts) {
MongoRunner.validateCollectionsCallback(port);
}

returnCode = _stopMongoProgram(port, signal, opts);
returnCode = _stopMongoProgram(port, signal, opts, waitpid);
}

// If we are not waiting for shutdown, then there is no exit code to check.
if (!waitpid) {
return 0;
}
if (allowedExitCode !== returnCode) {
throw new MongoRunner.StopError(returnCode);
Expand Down
48 changes: 45 additions & 3 deletions src/mongo/shell/shell_utils_launcher.cpp
Expand Up @@ -781,6 +781,23 @@ BSONObj WaitProgram(const BSONObj& a, void* data) {
return BSON(string("") << exit_code);
}

// Calls waitpid on a mongo process specified by a port. If there is no pid registered for the given
// port, this function returns an exit code of 0 without doing anything. Otherwise, it calls waitpid
// for the pid associated with the given port and returns its exit code.
BSONObj WaitMongoProgram(const BSONObj& a, void* data) {
int port = singleArg(a).numberInt();
ProcessId pid;
int exit_code = -123456; // sentinel value
invariant(port >= 0);
if (!registry.isPortRegistered(port)) {
log() << "No db started on port: " << port;
return BSON(string("") << 0);
}
pid = registry.pidForPort(port);
wait_for_pid(pid, true, &exit_code);
return BSON(string("") << exit_code);
}

// This function starts a program. In its input array it accepts either all commandline tokens
// which will be executed, or a single Object which must have a field named "args" which contains
// an array with all commandline tokens. The Object may have a field named "env" which contains an
Expand Down Expand Up @@ -978,7 +995,7 @@ inline void kill_wrapper(ProcessId pid, int sig, int port, const BSONObj& opt) {
#endif
}

int killDb(int port, ProcessId _pid, int signal, const BSONObj& opt) {
int killDb(int port, ProcessId _pid, int signal, const BSONObj& opt, bool waitPid = true) {
ProcessId pid;
if (port > 0) {
if (!registry.isPortRegistered(port)) {
Expand All @@ -992,8 +1009,15 @@ int killDb(int port, ProcessId _pid, int signal, const BSONObj& opt) {

kill_wrapper(pid, signal, port, opt);

// If we are not waiting for the process to end, then return immediately.
if (!waitPid) {
log() << "skip waiting for pid " << pid << " to terminate";
return 0;
}

int exitCode = EXIT_FAILURE;
try {
log() << "waiting for process " << pid << " to terminate.";
wait_for_pid(pid, true, &exitCode);
} catch (...) {
warning() << "process " << pid << " failed to terminate.";
Expand Down Expand Up @@ -1039,13 +1063,30 @@ BSONObj getStopMongodOpts(const BSONObj& a) {
return BSONObj();
}

bool getWaitPid(const BSONObj& a) {
if (a.nFields() == 4) {
BSONObjIterator i(a);
i.next();
i.next();
i.next();
BSONElement e = i.next();
if (e.isBoolean()) {
return e.boolean();
}
}
// Default to wait for pid.
return true;
}

/** stopMongoProgram(port[, signal]) */
BSONObj StopMongoProgram(const BSONObj& a, void* data) {
int nFields = a.nFields();
uassert(ErrorCodes::FailedToParse, "wrong number of arguments", nFields >= 1 && nFields <= 3);
uassert(ErrorCodes::FailedToParse, "wrong number of arguments", nFields >= 1 && nFields <= 4);
uassert(ErrorCodes::BadValue, "stopMongoProgram needs a number", a.firstElement().isNumber());
int port = int(a.firstElement().number());
int code = killDb(port, ProcessId::fromNative(0), getSignal(a), getStopMongodOpts(a));
log() << "shell: stopping mongo program, waitpid=" << getWaitPid(a);
int code =
killDb(port, ProcessId::fromNative(0), getSignal(a), getStopMongodOpts(a), getWaitPid(a));
log() << "shell: stopped mongo program on port " << port;
return BSON("" << (double)code);
}
Expand Down Expand Up @@ -1128,6 +1169,7 @@ void installShellUtilsLauncher(Scope& scope) {
scope.injectNative("rawMongoProgramOutput", RawMongoProgramOutput);
scope.injectNative("clearRawMongoProgramOutput", ClearRawMongoProgramOutput);
scope.injectNative("waitProgram", WaitProgram);
scope.injectNative("waitMongoProgram", WaitMongoProgram);
scope.injectNative("checkProgram", CheckProgram);
scope.injectNative("resetDbpath", ResetDbpath);
scope.injectNative("pathExists", PathExists);
Expand Down

0 comments on commit a417e97

Please sign in to comment.