Skip to content

Commit

Permalink
SERVER-61985 SERVER-67193 Make reshardingPauseCoordinatorBeforeComple…
Browse files Browse the repository at this point in the history
…tion failpoint pause conditionally
  • Loading branch information
zorro786 authored and Evergreen Agent committed Jul 18, 2022
1 parent 6a8cc3a commit 0d5fd57
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
/**
* Tests that setFeatureCompatibilityVersion command aborts an ongoing reshardCollection command
*/
(function() {
"use strict";

load("jstests/libs/parallel_shell_helpers.js");
load("jstests/sharding/libs/resharding_test_fixture.js");
load('jstests/libs/discover_topology.js');
load('jstests/libs/fail_point_util.js');
Expand All @@ -21,6 +25,8 @@ function runTest(forcePooledConnectionsDropped) {
],
});

const sourceNamespace = inputCollection.getFullName();

let mongos = inputCollection.getMongo();

for (let x = 0; x < 1000; x++) {
Expand All @@ -37,7 +43,17 @@ function runTest(forcePooledConnectionsDropped) {
pauseBeforeCloseCxns = configureFailPoint(config, "pauseBeforeCloseCxns");
}

function checkCoordinatorDoc() {
assert.soon(() => {
const coordinatorDoc =
mongos.getCollection("config.reshardingOperations").findOne({ns: sourceNamespace});

return coordinatorDoc === null || coordinatorDoc.state === "aborting";
});
}

const recipientShardNames = reshardingTest.recipientShardNames;
let awaitShell;
reshardingTest.withReshardingInBackground(
{
newShardKeyPattern: {newKey: 1},
Expand All @@ -63,7 +79,7 @@ function runTest(forcePooledConnectionsDropped) {
assert.commandWorked(db.adminCommand({setFeatureCompatibilityVersion: lastLTSFCV}));
}`;

let awaitShell = startParallelShell(codeToRunInParallelShell, mongos.port);
awaitShell = startParallelShell(codeToRunInParallelShell, mongos.port);

if (forcePooledConnectionsDropped) {
pauseBeforeCloseCxns.wait();
Expand All @@ -88,8 +104,7 @@ function runTest(forcePooledConnectionsDropped) {
jsTestLog("Turn off pause before pauseBeforeMarkKeepOpen failpoint");
pauseBeforeMarkKeepOpen.off();
}

awaitShell();
checkCoordinatorDoc();
},
{
expectedErrorCode: [
Expand All @@ -98,6 +113,8 @@ function runTest(forcePooledConnectionsDropped) {
]
});

awaitShell();

reshardingTest.withReshardingInBackground(
{
newShardKeyPattern: {newKey: 1},
Expand All @@ -107,7 +124,14 @@ function runTest(forcePooledConnectionsDropped) {
],
},
() => {
assert.commandWorked(mongos.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
assert.soon(() => {
return mongos.getDB('config').reshardingOperations.findOne() != null;
}, "timed out waiting for coordinator doc to be written", 30 * 1000);
awaitShell = startParallelShell(funWithArgs(function(latestFCV) {
assert.commandWorked(db.adminCommand(
{setFeatureCompatibilityVersion: latestFCV}));
}, latestFCV), mongos.port);
checkCoordinatorDoc();
},
{
expectedErrorCode: [
Expand All @@ -117,6 +141,7 @@ function runTest(forcePooledConnectionsDropped) {
]
});

awaitShell();
reshardingTest.teardown();
}

Expand Down
6 changes: 4 additions & 2 deletions jstests/sharding/libs/resharding_test_fixture.js
Original file line number Diff line number Diff line change
Expand Up @@ -321,8 +321,10 @@ var ReshardingTest = class {
configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeBlockingWrites");
this._pauseCoordinatorBeforeDecisionPersistedFailpoint =
configureFailPoint(configPrimary, "reshardingPauseCoordinatorBeforeDecisionPersisted");
this._pauseCoordinatorBeforeCompletionFailpoint = configureFailPoint(
configPrimary, "reshardingPauseCoordinatorBeforeCompletion", {}, {times: 1});
this._pauseCoordinatorBeforeCompletionFailpoint =
configureFailPoint(configPrimary,
"reshardingPauseCoordinatorBeforeCompletion",
{"sourceNamespace": this._ns});

this._commandDoneSignal = new CountDownLatch(1);

Expand Down
21 changes: 20 additions & 1 deletion jstests/sharding/resharding_abort_in_preparing_to_donate.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
"use strict";
load("jstests/libs/discover_topology.js");
load("jstests/sharding/libs/resharding_test_fixture.js");
load('jstests/libs/parallel_shell_helpers.js');

const originalCollectionNs = "reshardingDb.coll";

Expand All @@ -36,6 +37,7 @@ const configsvr = new Mongo(topology.configsvr.nodes[0]);
const pauseAfterPreparingToDonateFP =
configureFailPoint(configsvr, "reshardingPauseCoordinatorAfterPreparingToDonate");

let awaitAbort;
reshardingTest.withReshardingInBackground(
{

Expand All @@ -47,13 +49,30 @@ reshardingTest.withReshardingInBackground(
},
() => {
pauseAfterPreparingToDonateFP.wait();
assert.commandWorked(mongos.adminCommand({abortReshardCollection: originalCollectionNs}));
assert.neq(null, mongos.getCollection("config.reshardingOperations").findOne({
ns: originalCollectionNs
}));
// Signaling abort will cause the
// pauseAfterPreparingToDonateFP to throw, implicitly
// allowing the coordinator to make progress without
// explicitly turning off the failpoint.
awaitAbort =
startParallelShell(funWithArgs(function(sourceNamespace) {
db.adminCommand({abortReshardCollection: sourceNamespace});
}, originalCollectionNs), mongos.port);
// Wait for the coordinator to remove coordinator document from config.reshardingOperations
// as a result of the recipients and donors transitioning to done due to abort.
assert.soon(() => {
const coordinatorDoc = mongos.getCollection("config.reshardingOperations").findOne({
ns: originalCollectionNs
});
return coordinatorDoc === null || coordinatorDoc.state === "aborting";
});
},
{expectedErrorCode: ErrorCodes.ReshardCollectionAborted});

awaitAbort();
pauseAfterPreparingToDonateFP.off();

reshardingTest.teardown();
})();
14 changes: 1 addition & 13 deletions jstests/sharding/resharding_nonblocking_coordinator_rebuild.js
Original file line number Diff line number Diff line change
Expand Up @@ -109,19 +109,7 @@ reshardingTest.withReshardingInBackground(
}
},
{
// As a result of the elections intentionally triggered on the config server replica sets,
// the primary shard of the database may retry the _configsvrReshardCollection command. It
// is possible for the resharding operation from the first _configsvrReshardCollection
// command to have entirely finished executing to the point of removing the coordinator
// state document. A retry of the _configsvrReshardCollection command in this situation will
// lead to a second resharding operation to run. The second resharding operation will have
// the duplicate documents cloned by the ReshardingCollectionCloner rather than applied by
// the ReshardingOplogApplier as intended. This results in the reshardCollection command
// failing with a DuplicateKey error rather than the error code for the stash collections
// being non-empty. The recipient must have been able to successfully update its state to
// "applying" in the first resharding operation even when the ReshardingCoordinatorService
// had yet to be rebuilt so we accept DuplicateKey as an error too.
expectedErrorCode: [5356800, ErrorCodes.DuplicateKey],
expectedErrorCode: 5356800,
});

reshardingTest.teardown();
Expand Down
19 changes: 16 additions & 3 deletions jstests/sharding/resharding_prohibited_commands.js
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ const waitUntilReshardingInitializedOnDonor = () => {
* @param {Function} config.setup
* @param {AfterReshardingCallback} afterReshardingFn
*/

const withReshardingInBackground =
(duringReshardingFn,
{setup = () => {}, expectedErrorCode, afterReshardingFn = () => {}} = {}) => {
Expand All @@ -132,22 +133,34 @@ const withReshardingInBackground =
},
duringReshardingFn,
{expectedErrorCode: expectedErrorCode, afterReshardingFn: afterReshardingFn});

assertCommandsSucceedAfterReshardingOpFinishes(mongos.getDB(databaseName));
assert.commandWorked(sourceCollection.dropIndex(indexCreatedByTest));
};

// Tests that the prohibited commands work if the resharding operation is aborted.
let awaitAbort;
withReshardingInBackground(() => {
waitUntilReshardingInitializedOnDonor();
assert.neq(null,
mongos.getCollection("config.reshardingOperations").findOne({ns: sourceNamespace}));
awaitAbort = startParallelShell(funWithArgs(function(sourceNamespace) {
db.adminCommand({abortReshardCollection: sourceNamespace});
}, sourceNamespace), mongos.port);
// Wait for the coordinator to remove coordinator document from config.reshardingOperations
// as a result of the recipients and donors transitioning to done due to abort.
assert.soon(() => {
const coordinatorDoc =
mongos.getCollection("config.reshardingOperations").findOne({ns: sourceNamespace});

assert.commandWorked(mongos.adminCommand({abortReshardCollection: sourceNamespace}));
return coordinatorDoc === null || coordinatorDoc.state === "aborting";
});
}, {
expectedErrorCode: ErrorCodes.ReshardCollectionAborted,
});
awaitAbort();

// Tests that the prohibited commands succeed if the resharding operation succeeds. During the
// operation it makes sures that the prohibited commands are rejected during the resharding
// operation it makes sure that the prohibited commands are rejected during the resharding
// operation.
withReshardingInBackground(() => {
waitUntilReshardingInitializedOnDonor();
Expand Down
11 changes: 9 additions & 2 deletions src/mongo/db/s/resharding/resharding_coordinator_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1364,8 +1364,15 @@ SemiFuture<void> ReshardingCoordinatorService::ReshardingCoordinator::run(
})
.onCompletion([this, executor](Status status) {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getStepdownToken());
reshardingPauseCoordinatorBeforeCompletion.executeIf(
[&](const BSONObj&) {
reshardingPauseCoordinatorBeforeCompletion.pauseWhileSetAndNotCanceled(
opCtx.get(), _ctHolder->getStepdownToken());
},
[&](const BSONObj& data) {
auto ns = data.getStringField("sourceNamespace");
return ns.empty() ? true : ns.toString() == _coordinatorDoc.getSourceNss().ns();
});

{
auto lg = stdx::lock_guard(_fulfillmentMutex);
Expand Down

0 comments on commit 0d5fd57

Please sign in to comment.