Skip to content

Commit

Permalink
SERVER-42772 Return transaction commit decision from onCompletion fut…
Browse files Browse the repository at this point in the history
…ure in TransactionCoordinator
  • Loading branch information
saltzm authored and evergreen committed Aug 21, 2019
1 parent 54d445e commit 05a9987
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 64 deletions.
29 changes: 5 additions & 24 deletions src/mongo/db/s/transaction_coordinator.cpp
Expand Up @@ -323,8 +323,7 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
}

TransactionCoordinator::~TransactionCoordinator() {
invariant(_completionPromisesFired);
invariant(_completionPromises.empty());
invariant(_completionPromise.getFuture().isReady());
}

void TransactionCoordinator::runCommit(std::vector<ShardId> participants) {
Expand Down Expand Up @@ -352,15 +351,8 @@ SharedSemiFuture<CommitDecision> TransactionCoordinator::getDecision() const {
return _decisionPromise.getFuture();
}

Future<void> TransactionCoordinator::onCompletion() {
stdx::lock_guard<stdx::mutex> lg(_mutex);
if (_completionPromisesFired)
return Future<void>::makeReady();

auto completionPF = makePromiseFuture<void>();
_completionPromises.emplace_back(std::move(completionPF.promise));

return std::move(completionPF.future);
SharedSemiFuture<txn::CommitDecision> TransactionCoordinator::onCompletion() {
return _completionPromise.getFuture();
}

void TransactionCoordinator::cancelIfCommitNotYetStarted() {
Expand Down Expand Up @@ -412,22 +404,11 @@ void TransactionCoordinator::_done(Status status) {
_logSlowTwoPhaseCommit(*_decision);
}

_completionPromisesFired = true;

ul.unlock();
if (!_decisionDurable) {
ul.unlock();
_decisionPromise.setError(status);
ul.lock();
}

// Trigger the onCompletion promises outside of a lock, because the future handlers indicate to
// the potential lifetime controller that the object can be destroyed
auto promisesToTrigger = std::move(_completionPromises);
ul.unlock();

for (auto&& promise : promisesToTrigger) {
promise.emplaceValue();
}
_completionPromise.setFrom(_decisionPromise.getFuture().getNoThrow());
}

void TransactionCoordinator::_logSlowTwoPhaseCommit(
Expand Down
7 changes: 2 additions & 5 deletions src/mongo/db/s/transaction_coordinator.h
Expand Up @@ -105,7 +105,7 @@ class TransactionCoordinator {
* this coordinator has completed. It will always eventually be set and once set it is safe to
* dispose of the TransactionCoordinator object.
*/
Future<void> onCompletion();
SharedSemiFuture<txn::CommitDecision> onCompletion();

/**
* If runCommit has not yet been called, this will transition this coordinator object to
Expand Down Expand Up @@ -188,10 +188,7 @@ class TransactionCoordinator {

// A list of all promises corresponding to futures that were returned to callers of
// onCompletion.
//
// TODO (SERVER-38346): Remove this when SharedSemiFuture supports continuations.
bool _completionPromisesFired{false};
std::vector<Promise<void>> _completionPromises;
SharedPromise<txn::CommitDecision> _completionPromise;

// Store as unique_ptr to avoid a circular dependency between the TransactionCoordinator and the
// TransactionCoordinatorMetricsObserver.
Expand Down
7 changes: 5 additions & 2 deletions src/mongo/db/s/transaction_coordinator_catalog.cpp
Expand Up @@ -33,6 +33,7 @@

#include "mongo/db/s/transaction_coordinator_catalog.h"

#include "mongo/s/grid.h"
#include "mongo/util/log.h"

namespace mongo {
Expand Down Expand Up @@ -104,8 +105,10 @@ void TransactionCoordinatorCatalog::insert(OperationContext* opCtx,
// recursively acquire the mutex.
ul.unlock();

coordinator->onCompletion().getAsync(
[this, lsid, txnNumber](Status) { _remove(lsid, txnNumber); });
coordinator->onCompletion()
.thenRunOn(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor())
.ignoreValue()
.getAsync([this, lsid, txnNumber](Status) { _remove(lsid, txnNumber); });
}

std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get(
Expand Down
4 changes: 4 additions & 0 deletions src/mongo/db/s/transaction_coordinator_catalog_test.cpp
Expand Up @@ -142,6 +142,10 @@ TEST_F(TransactionCoordinatorCatalogTest, CoordinatorsRemoveThemselvesFromCatalo
coordinator->cancelIfCommitNotYetStarted();
coordinator->onCompletion().wait();

// Wait for the coordinator to be removed before attempting to call getLatestOnSession() since
// the coordinator is removed from the catalog asynchronously.
_coordinatorCatalog->join();

auto latestTxnNumAndCoordinator =
_coordinatorCatalog->getLatestOnSession(operationContext(), lsid);
ASSERT_FALSE(latestTxnNumAndCoordinator);
Expand Down
18 changes: 8 additions & 10 deletions src/mongo/db/s/transaction_coordinator_service.cpp
Expand Up @@ -88,11 +88,11 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
commitDeadline));
}

boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coordinateCommit(
OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber,
const std::set<ShardId>& participantList) {
boost::optional<SharedSemiFuture<txn::CommitDecision>>
TransactionCoordinatorService::coordinateCommit(OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber,
const std::set<ShardId>& participantList) {
auto cas = _getCatalogAndScheduler(opCtx);
auto& catalog = cas->catalog;

Expand All @@ -103,16 +103,15 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coor

coordinator->runCommit(std::vector<ShardId>{participantList.begin(), participantList.end()});

return coordinator->onCompletion().then(
[coordinator] { return coordinator->getDecision().get(); });
return coordinator->onCompletion();

// TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision
// is made durable. Currently the coordinator waits to hear acks because participants in prepare
// reject requests with a higher transaction number, causing tests to fail.
// return coordinator->getDecision();
}

boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::recoverCommit(
boost::optional<SharedSemiFuture<txn::CommitDecision>> TransactionCoordinatorService::recoverCommit(
OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber) {
auto cas = _getCatalogAndScheduler(opCtx);
auto& catalog = cas->catalog;
Expand All @@ -126,8 +125,7 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::reco
// the coordinator.
coordinator->cancelIfCommitNotYetStarted();

return coordinator->onCompletion().then(
[coordinator] { return coordinator->getDecision().get(); });
return coordinator->onCompletion();

// TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision
// is made durable. Currently the coordinator waits to hear acks because participants in prepare
Expand Down
8 changes: 4 additions & 4 deletions src/mongo/db/s/transaction_coordinator_service.h
Expand Up @@ -65,7 +65,7 @@ class TransactionCoordinatorService {
*
* If no coordinator for the (lsid, txnNumber) exists, returns boost::none.
*/
boost::optional<Future<txn::CommitDecision>> coordinateCommit(
boost::optional<SharedSemiFuture<txn::CommitDecision>> coordinateCommit(
OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber,
Expand All @@ -77,9 +77,9 @@ class TransactionCoordinatorService {
*
* If no coordinator for the (lsid, txnNumber) exists, returns boost::none.
*/
boost::optional<Future<txn::CommitDecision>> recoverCommit(OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber);
boost::optional<SharedSemiFuture<txn::CommitDecision>> recoverCommit(OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber);

/**
* Marks the coordinator catalog as stepping up, which blocks all incoming requests for
Expand Down
22 changes: 15 additions & 7 deletions src/mongo/db/s/transaction_coordinator_test.cpp
Expand Up @@ -956,8 +956,9 @@ TEST_F(TransactionCoordinatorTest,

ASSERT_THROWS_CODE(
commitDecisionFuture.get(), AssertionException, ErrorCodes::ReadConcernMajorityNotEnabled);

coordinator.onCompletion().get();
ASSERT_THROWS_CODE(coordinator.onCompletion().get(),
AssertionException,
ErrorCodes::ReadConcernMajorityNotEnabled);
}


Expand Down Expand Up @@ -1641,7 +1642,8 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorIsCanceledWhileInactive) {
expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);

coordinator.cancelIfCommitNotYetStarted();
coordinator.onCompletion().get();
ASSERT_THROWS_CODE(
coordinator.onCompletion().get(), DBException, ErrorCodes::NoSuchTransaction);

checkStats(stats, expectedStats);
checkMetrics(expectedMetrics);
Expand Down Expand Up @@ -1683,7 +1685,8 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina
expectedStats.totalDuration = *expectedStats.totalDuration + Microseconds(100);

awsPtr->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "dummy"});
coordinator.onCompletion().get();
ASSERT_THROWS_CODE(
coordinator.onCompletion().get(), DBException, ErrorCodes::InterruptedDueToReplStateChange);

checkStats(stats, expectedStats);
checkMetrics(expectedMetrics);
Expand Down Expand Up @@ -1743,7 +1746,8 @@ TEST_F(TransactionCoordinatorMetricsTest,
expectedMetrics.currentWritingParticipantList--;

killClientOpCtx(getServiceContext(), "hangBeforeWaitingForParticipantListWriteConcern");
coordinator.onCompletion().get();
ASSERT_THROWS_CODE(
coordinator.onCompletion().get(), DBException, ErrorCodes::InterruptedAtShutdown);

checkStats(stats, expectedStats);
checkMetrics(expectedMetrics);
Expand Down Expand Up @@ -1806,7 +1810,9 @@ TEST_F(TransactionCoordinatorMetricsTest,
network()->enterNetwork();
network()->runReadyNetworkOperations();
network()->exitNetwork();
coordinator.onCompletion().get();

ASSERT_THROWS_CODE(
coordinator.onCompletion().get(), DBException, ErrorCodes::InterruptedDueToReplStateChange);

checkStats(stats, expectedStats);
checkMetrics(expectedMetrics);
Expand Down Expand Up @@ -1871,7 +1877,9 @@ TEST_F(TransactionCoordinatorMetricsTest,
expectedMetrics.currentWritingDecision--;

killClientOpCtx(getServiceContext(), "hangBeforeWaitingForDecisionWriteConcern");
coordinator.onCompletion().get();
ASSERT_THROWS_CODE(
coordinator.onCompletion().get(), DBException, ErrorCodes::InterruptedAtShutdown);


checkStats(stats, expectedStats);
checkMetrics(expectedMetrics);
Expand Down
20 changes: 8 additions & 12 deletions src/mongo/db/s/txn_two_phase_commit_cmds.cpp
Expand Up @@ -224,18 +224,14 @@ class CoordinateCommitTransactionCmd : public TypedCommand<CoordinateCommitTrans
const auto& cmd = request();
const auto tcs = TransactionCoordinatorService::get(opCtx);

boost::optional<Future<txn::CommitDecision>> coordinatorDecisionFuture;

if (!cmd.getParticipants().empty()) {
coordinatorDecisionFuture =
tcs->coordinateCommit(opCtx,
*opCtx->getLogicalSessionId(),
*opCtx->getTxnNumber(),
validateParticipants(opCtx, cmd.getParticipants()));
} else {
coordinatorDecisionFuture = tcs->recoverCommit(
opCtx, *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber());
}
// Coordinate the commit, or recover the commit decision from disk if this command was
// sent without a participant list.
auto coordinatorDecisionFuture = cmd.getParticipants().empty()
? tcs->recoverCommit(opCtx, *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber())
: tcs->coordinateCommit(opCtx,
*opCtx->getLogicalSessionId(),
*opCtx->getTxnNumber(),
validateParticipants(opCtx, cmd.getParticipants()));

ON_BLOCK_EXIT([opCtx] {
// A decision will most likely have been written from a different OperationContext
Expand Down

0 comments on commit 05a9987

Please sign in to comment.