Skip to content

Commit

Permalink
SERVER-66854 Prevent step-up ops from being killed by killSessions co…
Browse files Browse the repository at this point in the history
…mmands
  • Loading branch information
vessy-mongodb authored and Evergreen Agent committed Aug 25, 2022
1 parent d3b68ac commit 5930149
Show file tree
Hide file tree
Showing 7 changed files with 292 additions and 10 deletions.
8 changes: 8 additions & 0 deletions etc/backports_required_for_multiversion_tests.yml
Expand Up @@ -278,6 +278,10 @@ last-continuous:
test_file: jstests/sharding/drop_index_fails_if_multikey_index_is_last_compatible.js
- ticket: SERVER-68628
test_file: jstests/sharding/resharding_temp_ns_routing_info_unsharded.js
- ticket: SERVER-66854
test_file: jstests/replsets/prepared_transaction_kill_during_step_up_refresh.js
- ticket: SERVER-66854
test_file: jstests/replsets/step_up_kill_abort_transactions.js
- ticket: SERVER-68728
test_file: jstests/sharding/move_chunk_interrupt_postimage.js
- ticket: SERVER-64741
Expand Down Expand Up @@ -693,6 +697,10 @@ last-lts:
test_file: jstests/sharding/drop_index_fails_if_multikey_index_is_last_compatible.js
- ticket: SERVER-68628
test_file: jstests/sharding/resharding_temp_ns_routing_info_unsharded.js
- ticket: SERVER-66854
test_file: jstests/replsets/prepared_transaction_kill_during_step_up_refresh.js
- ticket: SERVER-66854
test_file: jstests/replsets/step_up_kill_abort_transactions.js
- ticket: SERVER-68728
test_file: jstests/sharding/move_chunk_interrupt_postimage.js
- ticket: SERVER-64741
Expand Down
@@ -0,0 +1,70 @@
/**
* Tests that the work to restore locks for prepared transactions on step up is not killable via
* killSessions commands.
*
* @tags: [uses_transactions, uses_prepare_transaction]
*/
(function() {
"use strict";
load("jstests/core/txns/libs/prepare_helpers.js");
load("jstests/libs/fail_point_util.js");
load("jstests/replsets/rslib.js"); // For reconnect()

const rst = new ReplSetTest({nodes: 2, name: jsTestName()});
rst.startSet();
rst.initiateWithHighElectionTimeout();

const dbName = "primaryDB";
const collName = "testcoll";

const primary = rst.getPrimary();
const newPrimary = rst.getSecondary();

const primaryDB = primary.getDB(dbName);
const primaryColl = primaryDB.getCollection(collName);
assert.commandWorked(primaryDB.runCommand({create: collName, writeConcern: {w: "majority"}}));

jsTestName("Starting a transaction");
const session = primary.startSession({causalConsistency: false});
session.startTransaction({writeConcern: {w: "majority"}});
const lsid = session.getSessionId().id;

jsTestLog("LSID for our session is " + tojson(lsid));

jsTestLog("Inserting a doc in a transaction.");
const doc = {
_id: "txnDoc"
};
assert.commandWorked(session.getDatabase(dbName).getCollection(collName).insert(doc));

jsTestLog("Putting transaction into prepare");
const prepareTimestamp = PrepareHelpers.prepareTransaction(session);

jsTestLog("Setting failpoint on new primary");
const stepUpFP = configureFailPoint(newPrimary, "hangDuringStepUpPrepareRestoreLocks");

jsTestLog("Stepping up new primary");
rst.stepUp(newPrimary, {awaitWritablePrimary: false});
reconnect(primary);

jsTestLog("Waiting on new primary to hit step up failpoint");
stepUpFP.wait();

jsTestLog("Killing the session");
const newPrimaryDB = newPrimary.getDB(dbName);
assert.commandWorked(newPrimaryDB.runCommand({killSessions: [{id: lsid}]}));

jsTestLog("Allowing step up to continue");
stepUpFP.off();
assert(newPrimary, rst.getPrimary());

jsTestLog("Committing transaction on the new primary");
// Create a proxy session to reuse the session state of the old primary.
const newSession = new _DelegatingDriverSession(newPrimary, session);

assert.commandWorked(PrepareHelpers.commitTransaction(newSession, prepareTimestamp));

assert.eq(doc, primaryColl.findOne({}), primaryColl.find({}).toArray());

rst.stopSet();
})();
149 changes: 149 additions & 0 deletions jstests/replsets/step_up_kill_abort_transactions.js
@@ -0,0 +1,149 @@
/**
* Tests that the work for aborting in-progress transactions on step up is not killable via
* killSessions commands.
*
* @tags: [
* exclude_from_large_txns,
* uses_transactions,
* ]
*/

(function() {
"use strict";
load("jstests/replsets/rslib.js"); // For reconnect()
load("jstests/libs/fail_point_util.js");

function getTxnTableEntry(db) {
let txnTableEntries = db.getSiblingDB("config")["transactions"].find().toArray();
assert.eq(txnTableEntries.length, 1);
return txnTableEntries[0];
}

const rst = new ReplSetTest({
name: jsTestName(),
nodes: 3,
nodeOptions: {
setParameter:
// Make it easier to hold a transaction before it completes.
{maxNumberOfTransactionOperationsInSingleOplogEntry: 1, bgSyncOplogFetcherBatchSize: 1}
},
});

rst.startSet();
let config = rst.getReplSetConfig();
config.members[2].priority = 0;
// Disable primary catchup and chaining.
config.settings = {
catchUpTimeoutMillis: 0,
chainingAllowed: false
};
rst.initiate(config);

setLogVerbosity(rst.nodes, {"replication": {"verbosity": 3}});

const dbName = "testdb";
const collName = "testcoll";

const primary = rst.nodes[0];
const primaryDB = primary.getDB(dbName);
const newPrimary = rst.nodes[1];
const newPrimaryDB = newPrimary.getDB(dbName);

assert.commandWorked(primaryDB.runCommand({create: collName, writeConcern: {w: "majority"}}));

// Prevent the priority: 0 node from fetching new ops so that it can vote for the new primary.
const stopReplProducerFailPoint = configureFailPoint(rst.nodes[2], 'stopReplProducer');

jsTest.log("Stop secondary oplog replication before the last operation in the transaction.");
// The stopReplProducerOnDocument failpoint ensures that secondary stops replicating before
// applying the last operation in the transaction. This depends on the oplog fetcher batch size
// being 1.
const stopReplProducerOnDocumentFailPoint = configureFailPoint(
newPrimary, "stopReplProducerOnDocument", {document: {"applyOps.o._id": "last in txn"}});

jsTestLog("Start a transaction.");
const session = primary.startSession({causalConsistency: false});
const sessionDB = session.getDatabase(dbName);
const sessionColl = sessionDB.getCollection(collName);
session.startTransaction({writeConcern: {w: "majority", wtimeout: 500}});

const lsid = session.getSessionId().id;
jsTestLog("LSID for our session is " + tojson(lsid));

jsTestLog("Add inserts to transaction.");
assert.commandWorked(sessionColl.insert({_id: "first in txn on primary " + primary}));
assert.commandWorked(sessionColl.insert({_id: "last in txn"}));

jsTestLog("Confirm we cannot commit the transaction due to insufficient replication.");
let res = session.commitTransaction_forTesting();
assert.commandFailedWithCode(res, ErrorCodes.WriteConcernFailed);

jsTestLog("Find the start and commit optimes on the primary.");
let txnTableEntry = getTxnTableEntry(primaryDB);
assert.eq(txnTableEntry.state, "committed");
const commitOpTime = txnTableEntry.lastWriteOpTime;
const startOpTime =
primaryDB.getSiblingDB("local").oplog.rs.findOne({ts: commitOpTime.ts}).prevOpTime;

jsTestLog("Wait for the new primary to block on fail point.");
stopReplProducerOnDocumentFailPoint.wait();

jsTestLog("Wait for the new primary to apply the first op of transaction at timestamp: " +
tojson(startOpTime));
assert.soon(() => {
const lastOpTime = getLastOpTime(newPrimary);
jsTestLog("Current lastOpTime on the new primary: " + tojson(lastOpTime));
return rs.compareOpTimes(lastOpTime, startOpTime) >= 0;
});

// Now the transaction should be in-progress on the new primary.
txnTableEntry = getTxnTableEntry(newPrimaryDB);
assert.eq(txnTableEntry.state, "inProgress");
// The startOpTime should be less than the commit optime.
assert.eq(rs.compareOpTimes(txnTableEntry.startOpTime, commitOpTime), -1);

jsTestLog("Set step up failpoint on new primary");
const stepUpFP = configureFailPoint(newPrimary, "hangDuringStepUpAbortInProgressTransactions");

jsTestLog("Step down primary via heartbeat.");
assert.commandWorked(newPrimary.adminCommand({replSetStepUp: 1}));
rst.awaitNodesAgreeOnPrimary();
reconnect(primary);

jsTestLog("Wait for the new primary to stop replication after primary catch-up.");
checkLog.contains(newPrimary, "Stopping replication producer");

jsTestLog("Enable replication on the new primary so that it can continue the state transition");
stopReplProducerOnDocumentFailPoint.off();

jsTestLog("Wait on new primary to hit step up failpoint");
stepUpFP.wait();

jsTestLog("Attempt to kill the session");
assert.commandWorked(newPrimaryDB.runCommand({killSessions: [{id: lsid}]}));

jsTestLog("Allow step up to continue");
stepUpFP.off();
assert.eq(rst.getPrimary(), newPrimary);
stopReplProducerFailPoint.off();
rst.awaitReplication();

jsTestLog("Verifying that the transaction has been aborted on the new primary.");
// Create a proxy session to reuse the session state of the old primary.
const newSession = new _DelegatingDriverSession(newPrimary, session);
const newSessionDB = newSession.getDatabase(dbName);
// The transaction should have been aborted.
assert.commandFailedWithCode(newSessionDB.adminCommand({
commitTransaction: 1,
txnNumber: NumberLong(newSession.getTxnNumber_forTesting()),
autocommit: false,
writeConcern: {w: "majority"}
}),
ErrorCodes.NoSuchTransaction);

jsTestLog("Verifying that the collection was not changed by the transaction.");
assert.eq(primaryDB.getCollection(collName).find().itcount(), 0);
assert.eq(newPrimaryDB.getCollection(collName).find().itcount(), 0);

rst.stopSet();
})();
26 changes: 26 additions & 0 deletions src/mongo/db/operation_context.h
Expand Up @@ -532,6 +532,7 @@ class OperationContext : public Interruptible, public Decorable<OperationContext
_lsid = boost::none;
_txnNumber = boost::none;
_txnRetryCounter = boost::none;
_killOpsExempt = false;
}

/**
Expand Down Expand Up @@ -621,6 +622,23 @@ class OperationContext : public Interruptible, public Decorable<OperationContext
return _opCompressed;
}

/**
* Returns whether or not a local killOps may kill this opCtx.
*/
bool isKillOpsExempt() const {
return _killOpsExempt;
}

/**
* Set to prevent killOps from killing this opCtx even when an LSID is set.
* You may only call this method prior to setting an LSID on this opCtx.
* Calls to resetMultiDocumentTransactionState will reset _killOpsExempt to false.
*/
void setKillOpsExempt() {
invariant(!_lsid);
_killOpsExempt = true;
}

private:
StatusWith<stdx::cv_status> waitForConditionOrInterruptNoAssertUntil(
stdx::condition_variable& cv, BasicLockableAdapter m, Date_t deadline) noexcept override;
Expand Down Expand Up @@ -807,6 +825,14 @@ class OperationContext : public Interruptible, public Decorable<OperationContext

// Whether this operation was started by a compressed command.
bool _opCompressed = false;

// Prevent this opCtx from being killed by killSessionsLocalKillOps if an LSID is attached.
// Normally, the presence of an LSID implies kill-eligibility as it uniquely identifies a
// session and can thus be passed into a killSessions command to target that session and its
// operations. However, there are some cases where we want the opCtx to have both an LSID and
// kill-immunity. Current examples include checking out sessions on replica set step up in order
// to refresh locks for prepared tranasctions or abort in-progress transactions.
bool _killOpsExempt = false;
};

// Gets a TimeZoneDatabase pointer from the ServiceContext.
Expand Down
3 changes: 2 additions & 1 deletion src/mongo/db/session/kill_sessions_common.cpp
Expand Up @@ -54,8 +54,9 @@ SessionKiller::Result killSessionsLocalKillOps(OperationContext* opCtx,
OperationContext* opCtxToKill = client->getOperationContext();
if (opCtxToKill) {
const auto& lsid = opCtxToKill->getLogicalSessionId();
const auto exempt = opCtxToKill->isKillOpsExempt();

if (lsid) {
if (lsid && !exempt) {
if (const KillAllSessionsByPattern* pattern = matcher.match(*lsid)) {
ScopedKillAllSessionsByPatternImpersonator impersonator(opCtx, *pattern);

Expand Down
24 changes: 22 additions & 2 deletions src/mongo/db/session/session_catalog_mongod.cpp
Expand Up @@ -52,6 +52,7 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/transaction_router.h"
#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/scopeguard.h"

#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTransaction
Expand All @@ -60,6 +61,9 @@
namespace mongo {
namespace {

MONGO_FAIL_POINT_DEFINE(hangDuringStepUpPrepareRestoreLocks);
MONGO_FAIL_POINT_DEFINE(hangDuringStepUpAbortInProgressTransactions);

const auto getMongoDSessionCatalog =
ServiceContext::declareDecoration<std::unique_ptr<MongoDSessionCatalog>>();

Expand Down Expand Up @@ -462,9 +466,17 @@ void abortInProgressTransactions(OperationContext* opCtx,
while (cursor->more()) {
auto txnRecord = SessionTxnRecord::parse(IDLParserContext("abort-in-progress-transactions"),
cursor->next());
opCtx->setLogicalSessionId(txnRecord.getSessionId());

// Synchronize with killOps to make this unkillable.
{
stdx::unique_lock<Client> lk(*opCtx->getClient());
opCtx->setKillOpsExempt();
opCtx->setLogicalSessionId(txnRecord.getSessionId());
}
opCtx->setTxnNumber(txnRecord.getTxnNum());
opCtx->setInMultiDocumentTransaction();

hangDuringStepUpAbortInProgressTransactions.pauseWhileSet();
auto ocs = mongoDSessionCatalog->checkOutSessionWithoutRefresh(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
LOGV2_DEBUG(21978,
Expand Down Expand Up @@ -565,11 +577,19 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) {
AlternativeClientRegion acr(newClient);
for (const auto& sessionInfo : sessionsToReacquireLocks) {
auto newOpCtx = cc().makeOperationContext();
newOpCtx->setLogicalSessionId(*sessionInfo.getSessionId());

// Synchronize with killOps to make this unkillable.
{
stdx::unique_lock<Client> lk(*newOpCtx->getClient());
newOpCtx->setKillOpsExempt();
newOpCtx->setLogicalSessionId(*sessionInfo.getSessionId());
}
newOpCtx->setTxnNumber(*sessionInfo.getTxnNumber());
newOpCtx->setTxnRetryCounter(*sessionInfo.getTxnRetryCounter());
newOpCtx->setInMultiDocumentTransaction();

hangDuringStepUpPrepareRestoreLocks.pauseWhileSet();

// Use MongoDOperationContextSessionWithoutRefresh to check out the session because:
// - The in-memory state for this session has been kept in sync with the on-disk state
// by secondary oplog application for prepared transactions so no refresh will be
Expand Down
22 changes: 15 additions & 7 deletions src/mongo/db/transaction/transaction_participant_test.cpp
Expand Up @@ -1087,14 +1087,22 @@ TEST_F(TxnParticipantTest, CleanOperationContextOnStepUp) {
// onStepUp() relies on the storage interface to create the config.transactions table.
repl::StorageInterface::set(service, std::make_unique<repl::StorageInterfaceImpl>());

// onStepUp() must not leave aborted transactions' metadata attached to the operation context.
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx());
mongoDSessionCatalog->onStepUp(opCtx());
// The test fixture set up sets an LSID on this opCtx, which we do not want here.
auto onStepUpFunc = [&](OperationContext* opCtx) {
// onStepUp() must not leave aborted transactions' metadata attached to the operation
// context.
auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx);
mongoDSessionCatalog->onStepUp(opCtx);

// onStepUp() must not leave aborted transactions' metadata attached to the operation
// context.
ASSERT_FALSE(opCtx->inMultiDocumentTransaction());
ASSERT_FALSE(opCtx->isStartingMultiDocumentTransaction());
ASSERT_FALSE(opCtx->getLogicalSessionId());
ASSERT_FALSE(opCtx->getTxnNumber());
};

ASSERT_FALSE(opCtx()->inMultiDocumentTransaction());
ASSERT_FALSE(opCtx()->isStartingMultiDocumentTransaction());
ASSERT_FALSE(opCtx()->getLogicalSessionId());
ASSERT_FALSE(opCtx()->getTxnNumber());
runFunctionFromDifferentOpCtx(onStepUpFunc);
}

TEST_F(TxnParticipantTest, StepDownDuringPreparedAbortFails) {
Expand Down

0 comments on commit 5930149

Please sign in to comment.