Skip to content

Commit

Permalink
SERVER-27725 Use batch insert when migrating chunks
Browse files Browse the repository at this point in the history
(cherry picked from commit b20b8c2)
  • Loading branch information
kaloianm committed Aug 29, 2018
1 parent 0a30697 commit f49d487
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 64 deletions.
19 changes: 12 additions & 7 deletions src/mongo/db/ops/write_ops_exec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
#include "mongo/db/ops/parsed_update.h"
#include "mongo/db/ops/update_lifecycle_impl.h"
#include "mongo/db/ops/update_request.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/query/plan_summary_stats.h"
Expand Down Expand Up @@ -301,12 +302,13 @@ static WriteResult performCreateIndexes(OperationContext* txn, const InsertOp& w
static void insertDocuments(OperationContext* txn,
Collection* collection,
std::vector<BSONObj>::const_iterator begin,
std::vector<BSONObj>::const_iterator end) {
std::vector<BSONObj>::const_iterator end,
bool fromMigrate) {
// Intentionally not using a WRITE_CONFLICT_RETRY_LOOP. That is handled by the caller so it can
// react to oversized batches.
WriteUnitOfWork wuow(txn);
uassertStatusOK(collection->insertDocuments(
txn, begin, end, &CurOp::get(txn)->debug(), /*enforceQuota*/ true));
txn, begin, end, &CurOp::get(txn)->debug(), /*enforceQuota*/ true, fromMigrate));
wuow.commit();
}

Expand All @@ -317,7 +319,8 @@ static bool insertBatchAndHandleErrors(OperationContext* txn,
const InsertOp& wholeOp,
const std::vector<BSONObj>& batch,
LastOpFixer* lastOpFixer,
WriteResult* out) {
WriteResult* out,
bool fromMigrate) {
if (batch.empty())
return true;

Expand Down Expand Up @@ -350,7 +353,8 @@ static bool insertBatchAndHandleErrors(OperationContext* txn,
// First try doing it all together. If all goes well, this is all we need to do.
// See Collection::_insertDocuments for why we do all capped inserts one-at-a-time.
lastOpFixer->startingOp();
insertDocuments(txn, collection->getCollection(), batch.begin(), batch.end());
insertDocuments(
txn, collection->getCollection(), batch.begin(), batch.end(), fromMigrate);
lastOpFixer->finishedOpSuccessfully();
globalOpCounters.gotInserts(batch.size());
std::fill_n(
Expand All @@ -374,7 +378,7 @@ static bool insertBatchAndHandleErrors(OperationContext* txn,
if (!collection)
acquireCollection();
lastOpFixer->startingOp();
insertDocuments(txn, collection->getCollection(), it, it + 1);
insertDocuments(txn, collection->getCollection(), it, it + 1, fromMigrate);
lastOpFixer->finishedOpSuccessfully();
out->results.emplace_back(WriteResult::SingleResult{1});
curOp.debug().ninserted++;
Expand All @@ -396,7 +400,7 @@ static bool insertBatchAndHandleErrors(OperationContext* txn,
return true;
}

WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp) {
WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp, bool fromMigrate) {
invariant(!txn->lockState()->inAWriteUnitOfWork()); // Does own retries.
auto& curOp = *CurOp::get(txn);
ON_BLOCK_EXIT([&] {
Expand Down Expand Up @@ -453,7 +457,8 @@ WriteResult performInserts(OperationContext* txn, const InsertOp& wholeOp) {
continue; // Add more to batch before inserting.
}

bool canContinue = insertBatchAndHandleErrors(txn, wholeOp, batch, &lastOpFixer, &out);
bool canContinue =
insertBatchAndHandleErrors(txn, wholeOp, batch, &lastOpFixer, &out, fromMigrate);
batch.clear(); // We won't need the current batch any more.
bytesInBatch = 0;

Expand Down
4 changes: 3 additions & 1 deletion src/mongo/db/ops/write_ops_exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,10 @@ struct WriteResult {
* LastError is updated for failures of individual writes, but not for batch errors reported by an
* exception being thrown from these functions. Callers are responsible for managing LastError in
* that case. This should generally be combined with LastError handling from parse failures.
*
* 'fromMigrate' indicates whether the operation was induced by a chunk migration
*/
WriteResult performInserts(OperationContext* txn, const InsertOp& op);
WriteResult performInserts(OperationContext* txn, const InsertOp& op, bool fromMigrate = false);
WriteResult performUpdates(OperationContext* txn, const UpdateOp& op);
WriteResult performDeletes(OperationContext* txn, const DeleteOp& op);

Expand Down
96 changes: 46 additions & 50 deletions src/mongo/db/s/migration_destination_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/delete.h"
#include "mongo/db/ops/write_ops.h"
#include "mongo/db/ops/write_ops_exec.h"
#include "mongo/db/range_deleter_service.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/repl/replication_coordinator_global.h"
Expand Down Expand Up @@ -349,7 +351,7 @@ Status MigrationDestinationManager::start(const NamespaceString& nss,

void MigrationDestinationManager::cloneDocumentsFromDonor(
OperationContext* txn,
stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
stdx::function<void(OperationContext*, BSONObj)> insertBatchFn,
stdx::function<BSONObj(OperationContext*)> fetchBatchFn) {

ProducerConsumerQueue<BSONObj> batches(1);
Expand All @@ -364,7 +366,7 @@ void MigrationDestinationManager::cloneDocumentsFromDonor(
if (arr.isEmpty()) {
return;
}
insertBatchFn(inserterTxn.get(), BSONObjIterator(arr));
insertBatchFn(inserterTxn.get(), arr);
}
} catch (...) {
stdx::lock_guard<Client> lk(*txn->getClient());
Expand Down Expand Up @@ -710,56 +712,50 @@ void MigrationDestinationManager::_migrateDriver(OperationContext* txn,

const BSONObj migrateCloneRequest = createMigrateCloneRequest(_nss, *_sessionId);

auto insertBatchFn = [&](OperationContext* txn, BSONObjIterator docs) {
while (docs.more()) {
txn->checkForInterrupt();
auto assertNotAborted = [&](OperationContext* opCtx) {
opCtx->checkForInterrupt();
uassert(40655, "Migration aborted while copying documents", getState() != ABORT);
};

if (getState() == ABORT) {
auto message = "Migration aborted while copying documents";
log() << message << migrateLog;
uasserted(40655, message);
}
auto insertBatchFn = [&](OperationContext* opCtx, BSONObj arr) {
int batchNumCloned = 0;
int batchClonedBytes = 0;

BSONObj docToClone = docs.next().Obj();
{
OldClientWriteContext cx(txn, _nss.ns());
BSONObj localDoc;
if (willOverrideLocalId(txn,
_nss.ns(),
min,
max,
shardKeyPattern,
cx.db(),
docToClone,
&localDoc)) {
const std::string errMsg = str::stream()
<< "cannot migrate chunk, local document " << redact(localDoc)
<< " has same _id as cloned "
<< "remote document " << redact(docToClone);
warning() << errMsg;

// Exception will abort migration cleanly
uasserted(16976, errMsg);
}
Helpers::upsert(txn, _nss.ns(), docToClone, true);
}
{
stdx::lock_guard<stdx::mutex> statsLock(_mutex);
_numCloned++;
_clonedBytes += docToClone.objsize();
}
if (writeConcern.shouldWaitForOtherNodes()) {
repl::ReplicationCoordinator::StatusAndDuration replStatus =
repl::ReplicationCoordinator::get(txn)->awaitReplication(
txn,
repl::ReplClientInfo::forClient(txn->getClient()).getLastOp(),
writeConcern);
if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
warning() << "secondaryThrottle on, but doc insert timed out; "
"continuing";
} else {
massertStatusOK(replStatus.status);
}
assertNotAborted(opCtx);

std::vector<BSONObj> toInsert;
for (const auto& doc : arr) {
BSONObj docToClone = doc.Obj();
toInsert.push_back(docToClone);
batchNumCloned++;
batchClonedBytes += docToClone.objsize();
}
InsertOp insertOp;
insertOp.ns = _nss;
insertOp.documents = toInsert;

const WriteResult reply = performInserts(opCtx, insertOp, true);

for (unsigned long i = 0; i < reply.results.size(); ++i) {
uassertStatusOK(reply.results[i]);
}

{
stdx::lock_guard<stdx::mutex> statsLock(_mutex);
_numCloned += batchNumCloned;
_clonedBytes += batchClonedBytes;
}
if (writeConcern.shouldWaitForOtherNodes()) {
repl::ReplicationCoordinator::StatusAndDuration replStatus =
repl::ReplicationCoordinator::get(opCtx)->awaitReplication(
opCtx,
repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(),
writeConcern);
if (replStatus.status.code() == ErrorCodes::WriteConcernFailed) {
warning() << "secondaryThrottle on, but doc insert timed out; "
"continuing";
} else {
uassertStatusOK(replStatus.status);
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/db/s/migration_destination_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class MigrationDestinationManager {
*/
static void cloneDocumentsFromDonor(
OperationContext* txn,
stdx::function<void(OperationContext*, BSONObjIterator)> insertBatchFn,
stdx::function<void(OperationContext*, BSONObj)> insertBatchFn,
stdx::function<BSONObj(OperationContext*)> fetchBatchFn);

/**
Expand Down
10 changes: 5 additions & 5 deletions src/mongo/db/s/migration_destination_manager_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,9 +84,9 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsFromDonorWorksCorrectly) {

std::vector<BSONObj> resultDocs;

auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
while (docs.more()) {
resultDocs.push_back(docs.next().Obj().getOwned());
auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) {
for (auto&& docToClone : docs) {
resultDocs.push_back(docToClone.Obj().getOwned());
}
};

Expand Down Expand Up @@ -122,7 +122,7 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsThrowsFetchErrors) {
return fetchBatchResultBuilder.obj();
};

auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {};
auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) {};

ASSERT_THROWS_CODE_AND_WHAT(MigrationDestinationManager::cloneDocumentsFromDonor(
operationContext(), insertBatchFn, fetchBatchFn),
Expand All @@ -140,7 +140,7 @@ TEST_F(MigrationDestinationManagerTest, CloneDocumentsCatchesInsertErrors) {
return fetchBatchResultBuilder.obj();
};

auto insertBatchFn = [&](OperationContext* opCtx, BSONObjIterator docs) {
auto insertBatchFn = [&](OperationContext* opCtx, BSONObj docs) {
uasserted(ErrorCodes::FailedToParse, "insertion error");
};

Expand Down

0 comments on commit f49d487

Please sign in to comment.