Skip to content

Commit

Permalink
SERVER-28923 Add stats for retryable writes
Browse files Browse the repository at this point in the history
  • Loading branch information
jsmulrow committed Jan 18, 2018
1 parent e0febf2 commit 9b6f404
Show file tree
Hide file tree
Showing 8 changed files with 334 additions and 0 deletions.
99 changes: 99 additions & 0 deletions jstests/sharding/retryable_writes.js
Expand Up @@ -18,12 +18,37 @@
assert.docEq(expected.lastErrorObject, toCheck.lastErrorObject);
}

function verifyServerStatusFields(serverStatusResponse) {
assert(serverStatusResponse.hasOwnProperty("transactions"),
"Expected the serverStatus response to have a 'transactions' field");
assert.hasFields(
serverStatusResponse.transactions,
["retriedCommandsCount", "retriedStatementsCount", "transactionsCollectionWriteCount"],
"The 'transactions' field in serverStatus did not have all of the expected fields");
}

function verifyServerStatusChanges(
initialStats, newStats, newCommands, newStatements, newCollectionWrites) {
assert.eq(initialStats.retriedCommandsCount + newCommands,
newStats.retriedCommandsCount,
"expected retriedCommandsCount to increase by " + newCommands);
assert.eq(initialStats.retriedStatementsCount + newStatements,
newStats.retriedStatementsCount,
"expected retriedStatementsCount to increase by " + newStatements);
assert.eq(initialStats.transactionsCollectionWriteCount + newCollectionWrites,
newStats.transactionsCollectionWriteCount,
"expected retriedCommandsCount to increase by " + newCollectionWrites);
}

function runTests(mainConn, priConn) {
var lsid = UUID();

////////////////////////////////////////////////////////////////////////
// Test insert command

let initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);

var cmd = {
insert: 'user',
documents: [{_id: 10}, {_id: 30}],
Expand All @@ -50,9 +75,20 @@
assert.eq(2, testDBPri.user.find().itcount());
assert.eq(insertOplogEntries, oplog.find({ns: 'test.user', op: 'i'}).itcount());

let newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
2 /* newStatements */,
1 /* newCollectionWrites */);

////////////////////////////////////////////////////////////////////////
// Test update command

initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);

cmd = {
update: 'user',
updates: [
Expand Down Expand Up @@ -91,9 +127,20 @@
assert.eq(updateOplogEntries, oplog.find({ns: 'test.user', op: 'u'}).itcount());
assert.eq(insertOplogEntries, oplog.find({ns: 'test.user', op: 'i'}).itcount());

newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
3 /* newStatements */,
3 /* newCollectionWrites */);

////////////////////////////////////////////////////////////////////////
// Test delete command

initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);

assert.writeOK(testDBMain.user.insert({_id: 40, x: 1}));
assert.writeOK(testDBMain.user.insert({_id: 50, y: 1}));

Expand Down Expand Up @@ -126,9 +173,20 @@

assert.eq(deleteOplogEntries, oplog.find({ns: 'test.user', op: 'd'}).itcount());

newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
2 /* newStatements */,
2 /* newCollectionWrites */);

////////////////////////////////////////////////////////////////////////
// Test findAndModify command (upsert)

initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);

cmd = {
findAndModify: 'user',
query: {_id: 60},
Expand All @@ -152,9 +210,20 @@

checkFindAndModifyResult(result, retryResult);

newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
1 /* newStatements */,
1 /* newCollectionWrites */);

////////////////////////////////////////////////////////////////////////
// Test findAndModify command (update, return pre-image)

initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);

cmd = {
findAndModify: 'user',
query: {_id: 60},
Expand All @@ -176,9 +245,20 @@

checkFindAndModifyResult(result, retryResult);

newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
1 /* newStatements */,
1 /* newCollectionWrites */);

////////////////////////////////////////////////////////////////////////
// Test findAndModify command (update, return post-image)

initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);

cmd = {
findAndModify: 'user',
query: {_id: 60},
Expand All @@ -200,9 +280,20 @@

checkFindAndModifyResult(result, retryResult);

newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
1 /* newStatements */,
1 /* newCollectionWrites */);

////////////////////////////////////////////////////////////////////////
// Test findAndModify command (remove, return pre-image)

initialStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(initialStatus);

assert.writeOK(testDBMain.user.insert({_id: 70, f: 1}));
assert.writeOK(testDBMain.user.insert({_id: 80, f: 1}));

Expand All @@ -224,6 +315,14 @@
assert.eq(docCount, testDBPri.user.find().itcount());

checkFindAndModifyResult(result, retryResult);

newStatus = priConn.adminCommand({serverStatus: 1});
verifyServerStatusFields(newStatus);
verifyServerStatusChanges(initialStatus.transactions,
newStatus.transactions,
1 /* newCommands */,
1 /* newStatements */,
1 /* newCollectionWrites */);
}

function runFailpointTests(mainConn, priConn) {
Expand Down
2 changes: 2 additions & 0 deletions src/mongo/db/SConscript
Expand Up @@ -1623,11 +1623,13 @@ env.Library(
'ops/insert.cpp',
'ops/update.cpp',
'ops/write_ops_retryability.cpp',
'retryable_writes_stats.cpp',
'session.cpp',
'session_catalog.cpp',
'transaction_history_iterator.cpp',
env.Idlc('ops/single_write_result.idl')[0],
env.Idlc('session_txn_record.idl')[0],
env.Idlc('transactions_stats.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
Expand Down
3 changes: 3 additions & 0 deletions src/mongo/db/commands/find_and_modify.cpp
Expand Up @@ -62,6 +62,7 @@
#include "mongo/db/query/plan_summary_stats.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
#include "mongo/db/retryable_writes_stats.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/stats/top.h"
Expand Down Expand Up @@ -354,6 +355,8 @@ class CmdFindAndModify : public BasicCommand {
auto session = OperationContextSession::get(opCtx);
if (auto entry =
session->checkStatementExecuted(opCtx, *opCtx->getTxnNumber(), stmtId)) {
RetryableWritesStats::get(opCtx)->incrementRetriedCommandsCount();
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
parseOplogEntryForFindAndModify(opCtx, args, *entry, &result);
return true;
}
Expand Down
22 changes: 22 additions & 0 deletions src/mongo/db/ops/write_ops_exec.cpp
Expand Up @@ -60,6 +60,7 @@
#include "mongo/db/query/query_knobs.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/retryable_writes_stats.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/session_catalog.h"
Expand All @@ -82,6 +83,12 @@ MONGO_FP_DECLARE(failAllInserts);
MONGO_FP_DECLARE(failAllUpdates);
MONGO_FP_DECLARE(failAllRemoves);

void updateRetryStats(OperationContext* opCtx, bool containsRetry) {
if (containsRetry) {
RetryableWritesStats::get(opCtx)->incrementRetriedCommandsCount();
}
}

void finishCurOp(OperationContext* opCtx, CurOp* curOp) {
try {
curOp->done();
Expand Down Expand Up @@ -459,6 +466,9 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who
WriteResult out;
out.results.reserve(wholeOp.getDocuments().size());

bool containsRetry = false;
ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); });

size_t stmtIdIndex = 0;
size_t bytesInBatch = 0;
std::vector<InsertStatement> batch;
Expand All @@ -478,6 +488,8 @@ WriteResult performInserts(OperationContext* opCtx, const write_ops::Insert& who
auto session = OperationContextSession::get(opCtx);
if (session->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(),
stmtId)) {
containsRetry = true;
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry());
continue;
}
Expand Down Expand Up @@ -617,6 +629,9 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
opCtx, wholeOp.getWriteCommandBase().getBypassDocumentValidation());
LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace());

bool containsRetry = false;
ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); });

size_t stmtIdIndex = 0;
WriteResult out;
out.results.reserve(wholeOp.getUpdates().size());
Expand All @@ -627,6 +642,8 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
auto session = OperationContextSession::get(opCtx);
if (auto entry =
session->checkStatementExecuted(opCtx, *opCtx->getTxnNumber(), stmtId)) {
containsRetry = true;
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
out.results.emplace_back(parseOplogEntryForUpdate(*entry));
continue;
}
Expand Down Expand Up @@ -745,6 +762,9 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who
opCtx, wholeOp.getWriteCommandBase().getBypassDocumentValidation());
LastOpFixer lastOpFixer(opCtx, wholeOp.getNamespace());

bool containsRetry = false;
ON_BLOCK_EXIT([&] { updateRetryStats(opCtx, containsRetry); });

size_t stmtIdIndex = 0;
WriteResult out;
out.results.reserve(wholeOp.getDeletes().size());
Expand All @@ -754,6 +774,8 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who
if (opCtx->getTxnNumber()) {
auto session = OperationContextSession::get(opCtx);
if (session->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(), stmtId)) {
containsRetry = true;
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry());
continue;
}
Expand Down
90 changes: 90 additions & 0 deletions src/mongo/db/retryable_writes_stats.cpp
@@ -0,0 +1,90 @@
/**
* Copyright (C) 2018 MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/

#include "mongo/platform/basic.h"

#include "mongo/db/retryable_writes_stats.h"

#include "mongo/db/commands/server_status.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/db/transactions_stats_gen.h"

namespace mongo {
namespace {
const auto retryableWritesStatsDecoration =
ServiceContext::declareDecoration<RetryableWritesStats>();
} // namespace

RetryableWritesStats* RetryableWritesStats::get(ServiceContext* service) {
return &retryableWritesStatsDecoration(service);
}

RetryableWritesStats* RetryableWritesStats::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
}

void RetryableWritesStats::incrementRetriedCommandsCount() {
_retriedCommandsCount.fetchAndAdd(1);
}

void RetryableWritesStats::incrementRetriedStatementsCount() {
_retriedStatementsCount.fetchAndAdd(1);
}

void RetryableWritesStats::incrementTransactionsCollectionWriteCount() {
_transactionsCollectionWriteCount.fetchAndAdd(1);
}

void RetryableWritesStats::updateStats(TransactionsStats* stats) {
stats->setRetriedCommandsCount(_retriedCommandsCount.load());
stats->setRetriedStatementsCount(_retriedStatementsCount.load());
stats->setTransactionsCollectionWriteCount(_transactionsCollectionWriteCount.load());
}

class TransactionsSSS : public ServerStatusSection {
public:
TransactionsSSS() : ServerStatusSection("transactions") {}

virtual ~TransactionsSSS() {}

virtual bool includeByDefault() const {
return true;
}

virtual BSONObj generateSection(OperationContext* opCtx,
const BSONElement& configElement) const {
TransactionsStats stats;
RetryableWritesStats::get(opCtx)->updateStats(&stats);
return stats.toBSON();
}

} transactionsSSS;

} // namespace mongo

0 comments on commit 9b6f404

Please sign in to comment.