Skip to content

Commit

Permalink
SERVER-61760 Stop migrations in sharded collections before executing …
Browse files Browse the repository at this point in the history
…a collMod command
  • Loading branch information
m4nti5 authored and Evergreen Agent committed Jan 17, 2022
1 parent 0be3003 commit 59d341f
Show file tree
Hide file tree
Showing 7 changed files with 114 additions and 31 deletions.
21 changes: 21 additions & 0 deletions jstests/sharding/ddl_ops_reported_on_current_op_command.js
Expand Up @@ -116,6 +116,27 @@ if (jsTestOptions().useRandomBinVersionsWithinReplicaSet || jsTestOptions().shar
assert(currOp[0].command.hasOwnProperty('allowMigrations'));
assert.eq(true, currOp[0].command.allowMigrations);
}

{
jsTestLog('Check collmod shows in current op');

let ddlOpThread = new Thread((mongosConnString, db, coll, nss) => {
let mongos = new Mongo(mongosConnString);
mongos.getCollection(nss).runCommand(
{createIndexes: coll, indexes: [{key: {c: 1}, name: "c_1"}]});
mongos.getDB(db).runCommand({collMod: coll, validator: {}});
}, st.s0.host, kDbName, kCollectionName, nss);

let currOp = getCurrentOpOfDDL(ddlOpThread, 'CollModCoordinator');

// There must be one operation running with the appropiate ns.
assert.eq(1, currOp.length);
assert.eq(nss, currOp[0].ns);
assert(currOp[0].hasOwnProperty('command'));
jsTestLog(tojson(currOp[0].command));
assert(currOp[0].command.hasOwnProperty('validator'));
assert.docEq({}, currOp[0].command.validator);
}
}

{
Expand Down
14 changes: 11 additions & 3 deletions jstests/sharding/libs/resharding_test_fixture.js
Expand Up @@ -377,13 +377,18 @@ var ReshardingTest = class {
* @param postDecisionPersistedFn - a function for evaluating addition assertions after
* the decision has been persisted, but before the resharding operation finishes and returns
* to the client.
*
* @param afterReshardingFn - a function that will be called after the resharding operation
* finishes but before checking the the state post resharding. By the time afterReshardingFn
* is called the temporary resharding collection will either have been dropped or renamed.
*/
withReshardingInBackground({newShardKeyPattern, newChunks},
duringReshardingFn = (tempNs) => {},
{
expectedErrorCode = ErrorCodes.OK,
postCheckConsistencyFn = (tempNs) => {},
postDecisionPersistedFn = () => {}
postDecisionPersistedFn = () => {},
afterReshardingFn = () => {}
} = {}) {
this._startReshardingInBackgroundAndAllowCommandFailure({newShardKeyPattern, newChunks},
expectedErrorCode);
Expand All @@ -396,7 +401,8 @@ var ReshardingTest = class {
this._callFunctionSafely(() => duringReshardingFn(this._tempNs));
this._checkConsistencyAndPostState(expectedErrorCode,
() => postCheckConsistencyFn(this._tempNs),
() => postDecisionPersistedFn());
() => postDecisionPersistedFn(),
() => afterReshardingFn());
}

/** @private */
Expand Down Expand Up @@ -500,7 +506,8 @@ var ReshardingTest = class {
/** @private */
_checkConsistencyAndPostState(expectedErrorCode,
postCheckConsistencyFn = () => {},
postDecisionPersistedFn = () => {}) {
postDecisionPersistedFn = () => {},
afterReshardingFn = () => {}) {
let performCorrectnessChecks = true;
if (expectedErrorCode === ErrorCodes.OK) {
this._callFunctionSafely(() => {
Expand Down Expand Up @@ -566,6 +573,7 @@ var ReshardingTest = class {
expectedErrorCode: expectedErrorCode
});

afterReshardingFn();
this._checkPostState(expectedErrorCode);
}

Expand Down
5 changes: 5 additions & 0 deletions jstests/sharding/resharding_disallow_writes.js
Expand Up @@ -71,6 +71,7 @@ reshardingTest.withReshardingInBackground(
assert(ErrorCodes.isExceededTimeLimitError(res.code));

jsTestLog("Attempting collMod");

assert.commandFailedWithCode(
// The collMod is serialized with the resharding command, so we explicitly add an
// timeout to the command so that it doesn't get blocked and timeout the test.
Expand All @@ -83,6 +84,10 @@ reshardingTest.withReshardingInBackground(
ErrorCodes.ReshardCollectionInProgress);

jsTestLog("Completed operations");
},
afterReshardingFn: (tempNS) => {
jsTestLog("Join possible ongoing collMod command");
assert.commandWorked(sourceCollection.runCommand("collMod"));
}
});

Expand Down
17 changes: 16 additions & 1 deletion src/mongo/db/s/collmod_coordinator.cpp
Expand Up @@ -167,10 +167,15 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
<< "Cannot update granularity of a sharded time-series collection.",
!hasTimeSeriesGranularityUpdate(_doc.getCollModRequest()));
}
_doc.setCollUUID(
sharding_ddl_util::getCollectionUUID(opCtx, nss(), true /* allowViews */));

if (_recoveredFromDisk) {
sharding_ddl_util::stopMigrations(opCtx, nss(), _doc.getCollUUID());

if (!_firstExecution) {
_performNoopRetryableWriteOnParticipants(opCtx, **executor);
}

_doc = _updateSession(opCtx, _doc);
const OperationSessionInfo osi = getCurrentSession(_doc);

Expand Down Expand Up @@ -201,6 +206,7 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
CommandHelpers::appendSimpleCommandStatus(builder, ok, errmsg);
}
_result = builder.obj();
sharding_ddl_util::resumeMigrations(opCtx, nss(), _doc.getCollUUID());
} else {
CollMod cmd(nss());
cmd.setCollModRequest(_doc.getCollModRequest());
Expand Down Expand Up @@ -228,6 +234,15 @@ ExecutorFuture<void> CollModCoordinator::_runImpl(
"Error running collMod",
"namespace"_attr = nss(),
"error"_attr = redact(status));
// If we have the collection UUID set, this error happened in a sharded collection,
// we should restore the migrations.
if (_doc.getCollUUID()) {
auto opCtxHolder = cc().makeOperationContext();
auto* opCtx = opCtxHolder.get();
getForwardableOpMetadata().setOn(opCtx);

sharding_ddl_util::resumeMigrations(opCtx, nss(), _doc.getCollUUID());
}
}
return status;
});
Expand Down
4 changes: 4 additions & 0 deletions src/mongo/db/s/collmod_coordinator_document.idl
Expand Up @@ -61,3 +61,7 @@ structs:
collModRequest:
type: CollModRequest
description: "Initial collMod request."
collUUID:
type: uuid
description: "Collection uuid."
optional: true
68 changes: 43 additions & 25 deletions src/mongo/db/s/sharding_ddl_util.cpp
Expand Up @@ -146,6 +146,35 @@ write_ops::UpdateCommandRequest buildNoopWriteRequestCommand() {
return updateOp;
}

void setAllowMigrations(OperationContext* opCtx,
const NamespaceString& nss,
const boost::optional<UUID>& expectedCollectionUUID,
bool allowMigrations) {
ConfigsvrSetAllowMigrations configsvrSetAllowMigrationsCmd(nss, allowMigrations);
configsvrSetAllowMigrationsCmd.setCollectionUUID(expectedCollectionUUID);

const auto swSetAllowMigrationsResult =
Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
NamespaceString::kAdminDb.toString(),
CommandHelpers::appendMajorityWriteConcern(configsvrSetAllowMigrationsCmd.toBSON({})),
Shard::RetryPolicy::kIdempotent // Although ConfigsvrSetAllowMigrations is not really
// idempotent (because it will cause the collection
// version to be bumped), it is safe to be retried.
);
try {
uassertStatusOKWithContext(
Shard::CommandResponse::getEffectiveStatus(std::move(swSetAllowMigrationsResult)),
str::stream() << "Error setting allowMigrations to " << allowMigrations
<< " for collection " << nss.toString());
} catch (const ExceptionFor<ErrorCodes::NamespaceNotSharded>&) {
// Collection no longer exists
} catch (const ExceptionFor<ErrorCodes::ConflictingOperationInProgress>&) {
// Collection metadata was concurrently dropped
}
}

} // namespace

void linearizeCSRSReads(OperationContext* opCtx) {
Expand Down Expand Up @@ -404,34 +433,23 @@ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded(
void stopMigrations(OperationContext* opCtx,
const NamespaceString& nss,
const boost::optional<UUID>& expectedCollectionUUID) {
ConfigsvrSetAllowMigrations configsvrSetAllowMigrationsCmd(nss, false /* allowMigrations */);
configsvrSetAllowMigrationsCmd.setCollectionUUID(expectedCollectionUUID);

const auto swSetAllowMigrationsResult =
Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
NamespaceString::kAdminDb.toString(),
CommandHelpers::appendMajorityWriteConcern(configsvrSetAllowMigrationsCmd.toBSON({})),
Shard::RetryPolicy::kIdempotent // Although ConfigsvrSetAllowMigrations is not really
// idempotent (because it will cause the collection
// version to be bumped), it is safe to be retried.
);
setAllowMigrations(opCtx, nss, expectedCollectionUUID, false);
}

try {
uassertStatusOKWithContext(
Shard::CommandResponse::getEffectiveStatus(std::move(swSetAllowMigrationsResult)),
str::stream() << "Error setting allowMigrations to false for collection "
<< nss.toString());
} catch (const ExceptionFor<ErrorCodes::NamespaceNotSharded>&) {
// Collection no longer exists
} catch (const ExceptionFor<ErrorCodes::ConflictingOperationInProgress>&) {
// Collection metadata was concurrently dropped
}
void resumeMigrations(OperationContext* opCtx,
const NamespaceString& nss,
const boost::optional<UUID>& expectedCollectionUUID) {
setAllowMigrations(opCtx, nss, expectedCollectionUUID, true);
}

boost::optional<UUID> getCollectionUUID(OperationContext* opCtx, const NamespaceString& nss) {
AutoGetCollection autoColl(opCtx, nss, MODE_IS, AutoGetCollectionViewMode::kViewsForbidden);
boost::optional<UUID> getCollectionUUID(OperationContext* opCtx,
const NamespaceString& nss,
bool allowViews) {
AutoGetCollection autoColl(opCtx,
nss,
MODE_IS,
allowViews ? AutoGetCollectionViewMode::kViewsPermitted
: AutoGetCollectionViewMode::kViewsForbidden);
return autoColl ? boost::make_optional(autoColl->uuid()) : boost::none;
}

Expand Down
16 changes: 14 additions & 2 deletions src/mongo/db/s/sharding_ddl_util.h
Expand Up @@ -138,16 +138,28 @@ boost::optional<CreateCollectionResponse> checkIfCollectionAlreadySharded(
/**
* Stops ongoing migrations and prevents future ones to start for the given nss.
* If expectedCollectionUUID is set and doesn't match that of that collection, then this is a no-op.
* If expectedCollectionUUID is not set, no UUID check will be performed before stopping migrations.
*/
void stopMigrations(OperationContext* opCtx,
const NamespaceString& nss,
const boost::optional<UUID>& expectedCollectionUUID);

/**
* Resume migrations and balancing rounds for the given nss.
* If expectedCollectionUUID is set and doesn't match that of the collection, then this is a no-op.
* If expectedCollectionUUID is not set, no UUID check will be performed before resuming migrations.
*/
void resumeMigrations(OperationContext* opCtx,
const NamespaceString& nss,
const boost::optional<UUID>& expectedCollectionUUID);

/*
* Returns the UUID of the collection (if exists) using the catalog. It does not provide any locking
*guarantees.
* guarantees after the call.
**/
boost::optional<UUID> getCollectionUUID(OperationContext* opCtx, const NamespaceString& nss);
boost::optional<UUID> getCollectionUUID(OperationContext* opCtx,
const NamespaceString& nss,
bool allowViews = false);

/*
* Performs a noop retryable write on the given shards using the session and txNumber specified in
Expand Down

0 comments on commit 59d341f

Please sign in to comment.