Skip to content

Commit

Permalink
SERVER-35173 Added autocommit value to currentOp's transaction sub-do…
Browse files Browse the repository at this point in the history
…cument
  • Loading branch information
jinichu committed Jul 5, 2018
1 parent c0fecea commit 1447252
Show file tree
Hide file tree
Showing 8 changed files with 159 additions and 25 deletions.
8 changes: 6 additions & 2 deletions jstests/noPassthrough/currentop_active_transaction.js
Expand Up @@ -36,8 +36,12 @@

// Keep running currentOp() until we see the transaction subdocument.
assert.soon(function() {
const transactionFilter =
{active: true, 'lsid': {$exists: true}, 'transaction.parameters.txnNumber': {$eq: 0}};
const transactionFilter = {
active: true,
'lsid': {$exists: true},
'transaction.parameters.txnNumber': {$eq: 0},
'transaction.parameters.autocommit': {$eq: false}
};
return 1 === adminDB.aggregate([{$currentOp: {}}, {$match: transactionFilter}]).itcount();
});

Expand Down
3 changes: 2 additions & 1 deletion jstests/sharding/aggregation_currentop.js
Expand Up @@ -625,7 +625,8 @@ TestData.skipAwaitingReplicationOnShardsBeforeCheckingUUIDs = true;
opid: {$exists: false},
desc: "inactive transaction",
"lsid.id": {$in: sessions.map((session) => session.getSessionId().id)},
"transaction.parameters.txnNumber": {$gte: 0, $lt: sessions.length}
"transaction.parameters.txnNumber": {$gte: 0, $lt: sessions.length},
"transaction.parameters.autocommit": false
};
}

Expand Down
5 changes: 0 additions & 5 deletions src/mongo/db/curop.cpp
Expand Up @@ -257,11 +257,6 @@ void CurOp::reportCurrentOpForClient(OperationContext* opCtx,
lsid->serialize(&lsidBuilder);
}

if (auto txnNumber = clientOpCtx->getTxnNumber()) {
infoBuilder->append("transaction",
BSON("parameters" << BSON("txnNumber" << *txnNumber)));
}

CurOp::get(clientOpCtx)->reportState(infoBuilder, truncateOps);
}
}
Expand Down
10 changes: 8 additions & 2 deletions src/mongo/db/pipeline/pipeline_d.cpp
Expand Up @@ -964,8 +964,14 @@ BSONObj PipelineD::MongoDInterface::_reportCurrentOpForClient(
CurOp::reportCurrentOpForClient(
opCtx, client, (truncateOps == CurrentOpTruncateMode::kTruncateOps), &builder);

// Append lock stats before returning.
if (auto clientOpCtx = client->getOperationContext()) {
OperationContext* clientOpCtx = client->getOperationContext();

if (clientOpCtx) {
if (auto opCtxSession = OperationContextSession::get(clientOpCtx)) {
opCtxSession->reportUnstashedState(&builder);
}

// Append lock stats before returning.
if (auto lockerInfo = clientOpCtx->lockState()->getLockerInfo()) {
fillLockerInfo(*lockerInfo, builder);
}
Expand Down
29 changes: 27 additions & 2 deletions src/mongo/db/session.cpp
Expand Up @@ -1104,15 +1104,40 @@ void Session::reportStashedState(BSONObjBuilder* builder) const {
BSONObjBuilder lsid(builder->subobjStart("lsid"));
getSessionId().serialize(&lsid);
}
builder->append("transaction",
BSON("parameters" << BSON("txnNumber" << _activeTxnNumber)));
BSONObjBuilder transactionBuilder;
_reportTransactionStats(ls, &transactionBuilder);
builder->append("transaction", transactionBuilder.obj());
builder->append("waitingForLock", false);
builder->append("active", false);
fillLockerInfo(*lockerInfo, *builder);
}
}
}

void Session::reportUnstashedState(BSONObjBuilder* builder) const {
stdx::lock_guard<stdx::mutex> ls(_mutex);

if (!_txnResourceStash) {
BSONObjBuilder transactionBuilder;
_reportTransactionStats(ls, &transactionBuilder);
builder->append("transaction", transactionBuilder.obj());
}
}

void Session::_reportTransactionStats(WithLock wl, BSONObjBuilder* builder) const {
BSONObjBuilder parametersBuilder(builder->subobjStart("parameters"));
parametersBuilder.append("txnNumber", _activeTxnNumber);

if (!_txnState.inMultiDocumentTransaction(wl)) {
// For retryable writes, we only include the txnNumber.
parametersBuilder.done();
return;
}
parametersBuilder.append("autocommit", _autocommit);

parametersBuilder.done();
}

void Session::_checkValid(WithLock) const {
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Session " << getSessionId()
Expand Down
11 changes: 11 additions & 0 deletions src/mongo/db/session.h
Expand Up @@ -361,6 +361,13 @@ class Session : public Decorable<Session> {
*/
void reportStashedState(BSONObjBuilder* builder) const;

/**
* If this session is not holding stashed locks in _txnResourceStash (transaction is active),
* reports the current state of the session using the provided builder. Locks the session
* object's mutex while running.
*/
void reportUnstashedState(BSONObjBuilder* builder) const;

/**
* Convenience method which creates and populates a BSONObj containing the stashed state.
* Returns an empty BSONObj if this session has no stashed resources.
Expand Down Expand Up @@ -541,6 +548,10 @@ class Session : public Decorable<Session> {
// truncated because it was too old.
bool _hasIncompleteHistory{false};

// Reports transaction stats for both active and inactive transactions using the provided
// builder.
void _reportTransactionStats(WithLock wl, BSONObjBuilder* builder) const;

// Caches what is known to be the last written transaction record for the session
boost::optional<SessionTxnRecord> _lastWrittenSessionRecord;

Expand Down
6 changes: 6 additions & 0 deletions src/mongo/db/session_catalog.cpp
Expand Up @@ -254,6 +254,9 @@ OperationContextSession::OperationContextSession(OperationContext* opCtx,
auto& checkedOutSession = operationSessionDecoration(opCtx);
if (!checkedOutSession) {
auto sessionTransactionTable = SessionCatalog::get(opCtx);
// We acquire a Client lock here to guard the construction of this session so that
// references to this session are safe to use while the lock is held.
stdx::lock_guard<Client> lk(*opCtx->getClient());
checkedOutSession.emplace(sessionTransactionTable->checkOutSession(opCtx));
} else {
// The only reason to be trying to check out a session when you already have a session
Expand Down Expand Up @@ -281,6 +284,9 @@ OperationContextSession::~OperationContextSession() {
}

auto& checkedOutSession = operationSessionDecoration(_opCtx);
// We acquire a Client lock here to guard the destruction of this session so that references to
// this session are safe to use while the lock is held.
stdx::lock_guard<Client> lk(*_opCtx->getClient());
checkedOutSession.reset();
}

Expand Down
112 changes: 99 additions & 13 deletions src/mongo/db/session_test.cpp
Expand Up @@ -728,6 +728,7 @@ TEST_F(SessionTest, StashAndUnstashResources) {
TEST_F(SessionTest, ReportStashedResources) {
const auto sessionId = makeLogicalSessionIdForTest();
const TxnNumber txnNum = 20;
const bool autocommit = false;
opCtx()->setLogicalSessionId(sessionId);
opCtx()->setTxnNumber(txnNum);

Expand All @@ -737,7 +738,7 @@ TEST_F(SessionTest, ReportStashedResources) {
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());

session.beginOrContinueTxn(opCtx(), txnNum, false, true, "testDB", "find");
session.beginOrContinueTxn(opCtx(), txnNum, autocommit, true, "testDB", "find");

repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
Expand All @@ -757,18 +758,20 @@ TEST_F(SessionTest, ReportStashedResources) {
const auto lockerInfo = opCtx()->lockState()->getLockerInfo();
ASSERT(lockerInfo);

auto txnDoc = BSON("parameters" << BSON("txnNumber" << txnNum));
auto reportBuilder =
std::move(BSONObjBuilder() << "host" << getHostNameCachedAndPort() << "desc"
<< "inactive transaction"
<< "lsid"
<< sessionId.toBSON()
<< "transaction"
<< txnDoc
<< "waitingForLock"
<< false
<< "active"
<< false);
BSONObjBuilder reportBuilder;
reportBuilder.append("host", getHostNameCachedAndPort());
reportBuilder.append("desc", "inactive transaction");
reportBuilder.append("lsid", sessionId.toBSON());
// Build the transaction parameters sub-document.
BSONObjBuilder transactionBuilder(reportBuilder.subobjStart("transaction"));
BSONObjBuilder parametersBuilder(transactionBuilder.subobjStart("parameters"));
parametersBuilder.append("txnNumber", txnNum);
parametersBuilder.append("autocommit", autocommit);
parametersBuilder.done();
transactionBuilder.done();

reportBuilder.append("waitingForLock", false);
reportBuilder.append("active", false);
fillLockerInfo(*lockerInfo, reportBuilder);

// Stash resources. The original Locker and RecoveryUnit now belong to the stash.
Expand All @@ -793,6 +796,89 @@ TEST_F(SessionTest, ReportStashedResources) {
session.commitTransaction(opCtx());
}

TEST_F(SessionTest, ReportUnstashedResources) {
const auto sessionId = makeLogicalSessionIdForTest();
const TxnNumber txnNum = 20;
const bool autocommit = false;
opCtx()->setLogicalSessionId(sessionId);
opCtx()->setTxnNumber(txnNum);

ASSERT(opCtx()->lockState());
ASSERT(opCtx()->recoveryUnit());

Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());

session.beginOrContinueTxn(opCtx(), txnNum, autocommit, true, "testDB", "find");

repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;

// Perform initial unstash which sets up a WriteUnitOfWork.
session.unstashTransactionResources(opCtx(), "find");
ASSERT(opCtx()->getWriteUnitOfWork());
ASSERT(opCtx()->lockState()->isLocked());

// Build a BSONObj containing the details which we expect to see reported when we call
// Session::reportUnstashedState.
BSONObjBuilder reportBuilder;
BSONObjBuilder transactionBuilder(reportBuilder.subobjStart("transaction"));
BSONObjBuilder parametersBuilder(transactionBuilder.subobjStart("parameters"));
parametersBuilder.append("txnNumber", txnNum);
parametersBuilder.append("autocommit", autocommit);
parametersBuilder.done();
transactionBuilder.done();

// Verify that the Session's report of its own unstashed state aligns with our expectations.
BSONObjBuilder unstashedStateBuilder;
session.reportUnstashedState(&unstashedStateBuilder);
ASSERT_BSONOBJ_EQ(unstashedStateBuilder.obj(), reportBuilder.obj());

// Stash resources. The original Locker and RecoveryUnit now belong to the stash.
session.stashTransactionResources(opCtx());
ASSERT(!opCtx()->getWriteUnitOfWork());

// With the resources stashed, verify that the Session reports an empty unstashed state.
BSONObjBuilder builder;
session.reportUnstashedState(&builder);
ASSERT(builder.obj().isEmpty());
}

TEST_F(SessionTest, ReportUnstashedResourcesForARetryableWrite) {
const auto sessionId = makeLogicalSessionIdForTest();
const TxnNumber txnNum = 20;
opCtx()->setLogicalSessionId(sessionId);
opCtx()->setTxnNumber(txnNum);

ASSERT(opCtx()->lockState());
ASSERT(opCtx()->recoveryUnit());

Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());

session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none, "testDB", "find");
session.unstashTransactionResources(opCtx(), "find");

// Build a BSONObj containing the details which we expect to see reported when we call
// Session::reportUnstashedState. For a retryable write, we should only include the txnNumber.
BSONObjBuilder reportBuilder;
BSONObjBuilder transactionBuilder(reportBuilder.subobjStart("transaction"));
BSONObjBuilder parametersBuilder(transactionBuilder.subobjStart("parameters"));
parametersBuilder.append("txnNumber", txnNum);
parametersBuilder.done();
transactionBuilder.done();

// Verify that the Session's report of its own unstashed state aligns with our expectations.
BSONObjBuilder unstashedStateBuilder;
session.reportUnstashedState(&unstashedStateBuilder);
ASSERT_BSONOBJ_EQ(unstashedStateBuilder.obj(), reportBuilder.obj());
}

TEST_F(SessionTest, CannotSpecifyStartTransactionOnInProgressTxn) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
Expand Down

0 comments on commit 1447252

Please sign in to comment.