From 178e241b81882f85a58deda960d80607a77e1c3a Mon Sep 17 00:00:00 2001 From: Charlie Swanson Date: Fri, 25 Mar 2016 11:41:00 -0400 Subject: [PATCH] SERVER-22178 Always retry sorted findAndModify upon write conflict. Previously, if there was a WriteConflictException while actually doing the update or delete, we would retry the findAndModify, but if the update or delete stage detected that the document was already deleted or that it no longer matched the predicate, it would not retry. This patch ensures the findAndModify will be retried in either of those scenarios. --- .../fsm_all_sharded_replication.js | 10 +- ...m_all_sharded_replication_with_balancer.js | 10 +- .../findAndModify_mixed_queue_unindexed.js | 96 ++++++++++++++ .../findAndModify_remove_queue.js | 14 +- .../findAndModify_remove_queue_unindexed.js | 31 +++++ .../findAndModify_update_queue.js | 7 +- .../findAndModify_update_queue_unindexed.js | 31 +++++ src/mongo/db/exec/SConscript | 1 + src/mongo/db/exec/delete.cpp | 125 ++++++++++-------- src/mongo/db/exec/delete.h | 11 +- src/mongo/db/exec/update.cpp | 123 +++++++++-------- src/mongo/db/exec/update.h | 6 + src/mongo/db/exec/write_stage_common.cpp | 68 ++++++++++ src/mongo/db/exec/write_stage_common.h | 58 ++++++++ src/mongo/db/query/get_executor.cpp | 1 + 15 files changed, 462 insertions(+), 130 deletions(-) create mode 100644 jstests/concurrency/fsm_workloads/findAndModify_mixed_queue_unindexed.js create mode 100644 jstests/concurrency/fsm_workloads/findAndModify_remove_queue_unindexed.js create mode 100644 jstests/concurrency/fsm_workloads/findAndModify_update_queue_unindexed.js create mode 100644 src/mongo/db/exec/write_stage_common.cpp create mode 100644 src/mongo/db/exec/write_stage_common.h diff --git a/jstests/concurrency/fsm_all_sharded_replication.js b/jstests/concurrency/fsm_all_sharded_replication.js index e1b3741583eb4..5600ffaf5c23f 100644 --- a/jstests/concurrency/fsm_all_sharded_replication.js +++ b/jstests/concurrency/fsm_all_sharded_replication.js @@ -45,9 +45,13 @@ var blacklist = [ 'compact_simultaneous_padding_bytes.js', // compact can only be run against a mongod 'convert_to_capped_collection.js', // convertToCapped can't be run on mongos processes 'convert_to_capped_collection_index.js', // convertToCapped can't be run on mongos processes - 'findAndModify_remove_queue.js', // remove cannot be {} for findAndModify - 'findAndModify_update_collscan.js', // findAndModify requires a shard key - 'findAndModify_update_queue.js', // findAndModify requires a shard key + 'findAndModify_mixed_queue.js', // findAndModify requires a shard key + 'findAndModify_mixed_queue_unindexed.js', // findAndModify requires a shard key + 'findAndModify_remove_queue.js', // remove cannot be {} for findAndModify + 'findAndModify_remove_queue_unindexed.js', // findAndModify requires a shard key + 'findAndModify_update_collscan.js', // findAndModify requires a shard key + 'findAndModify_update_queue.js', // findAndModify requires a shard key + 'findAndModify_update_queue_unindexed.js', // findAndModify requires a shard key 'group.js', // the group command cannot be issued against a sharded cluster 'group_cond.js', // the group command cannot be issued against a sharded cluster 'indexed_insert_eval.js', // eval doesn't work with sharded collections diff --git a/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js b/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js index c4437fdd7dd26..70e6f5d272bf0 100644 --- a/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js +++ b/jstests/concurrency/fsm_all_sharded_replication_with_balancer.js @@ -50,9 +50,13 @@ var blacklist = [ 'compact_simultaneous_padding_bytes.js', // compact can only be run against a mongod 'convert_to_capped_collection.js', // convertToCapped can't be run on mongos processes 'convert_to_capped_collection_index.js', // convertToCapped can't be run on mongos processes - 'findAndModify_remove_queue.js', // remove cannot be {} for findAndModify - 'findAndModify_update_collscan.js', // findAndModify requires a shard key - 'findAndModify_update_queue.js', // findAndModify requires a shard key + 'findAndModify_mixed_queue.js', // findAndModify requires a shard key + 'findAndModify_mixed_queue_unindexed.js', // findAndModify requires a shard key + 'findAndModify_remove_queue.js', // remove cannot be {} for findAndModify + 'findAndModify_remove_queue_unindexed.js', // findAndModify requires a shard key + 'findAndModify_update_collscan.js', // findAndModify requires a shard key + 'findAndModify_update_queue.js', // findAndModify requires a shard key + 'findAndModify_update_queue_unindexed.js', // findAndModify requires a shard key 'group.js', // the group command cannot be issued against a sharded cluster 'group_cond.js', // the group command cannot be issued against a sharded cluster 'indexed_insert_eval.js', // eval doesn't work with sharded collections diff --git a/jstests/concurrency/fsm_workloads/findAndModify_mixed_queue_unindexed.js b/jstests/concurrency/fsm_workloads/findAndModify_mixed_queue_unindexed.js new file mode 100644 index 0000000000000..793f531c9b9f6 --- /dev/null +++ b/jstests/concurrency/fsm_workloads/findAndModify_mixed_queue_unindexed.js @@ -0,0 +1,96 @@ +'use strict'; + +/** + * findAndModify_mixed_queue_unindexed.js + * + * This workload is a combination of findAndModify_remove_queue_unindexed.js and + * findAndModify_update_queue_unindexed.js. + * + * Each thread contends on the same document as one another by randomly performing either a + * findAndModify update operation or a findAndModify remove operation. The lack of an index that + * could satisfy the sort forces the findAndModify operations to scan all the matching documents in + * order to find the relevant document. This increases the amount of work each findAndModify + * operation has to do before getting to the matching document, and thus increases the chance of a + * write conflict because each is trying to update or remove the same document. + * + * This workload was designed to reproduce SERVER-21434. + */ +load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload +load('jstests/concurrency/fsm_workloads/findAndModify_remove_queue.js'); // for $config +load('jstests/concurrency/fsm_workload_helpers/server_types.js'); // for isMMAPv1 + +var $config = extendWorkload( + $config, + function($config, $super) { + + // Use the workload name as the database name, since the workload name is assumed to be + // unique. + $config.data.uniqueDBName = 'findAndModify_mixed_queue_unindexed'; + + $config.data.newDocForInsert = function newDocForInsert(i) { + return { + _id: i, + rand: Random.rand(), + counter: 0 + }; + }; + + $config.data.getIndexSpecs = function getIndexSpecs() { + return []; + }; + + $config.data.opName = 'modified'; + + $config.data.validateResult = function validateResult(db, collName, res) { + assertAlways.commandWorked(res); + + var doc = res.value; + if (isMongod(db) && !isMMAPv1(db)) { + // MMAPv1 does not automatically retry if there was a conflict, so it is expected + // that it may return null in the case of a conflict. All other storage engines + // should automatically retry the operation, and thus should never return null. + assertWhenOwnColl.neq( + doc, null, 'findAndModify should have found a matching document'); + } + if (doc !== null) { + this.saveDocId(db, collName, doc._id); + } + }; + + $config.states = (function() { + // Avoid removing documents that were already updated. + function remove(db, collName) { + var res = db.runCommand({ + findAndModify: db[collName].getName(), + query: {counter: 0}, + sort: {rand: -1}, + remove: true + }); + this.validateResult(db, collName, res); + } + + function update(db, collName) { + // Update the counter field to avoid matching the same document again. + var res = db.runCommand({ + findAndModify: db[collName].getName(), + query: {counter: 0}, + sort: {rand: -1}, + update: {$inc: {counter: 1}}, new: false + }); + this.validateResult(db, collName, res); + } + + return { + remove: remove, + update: update, + }; + + })(); + + $config.transitions = { + remove: {remove: 0.5, update: 0.5}, + update: {remove: 0.5, update: 0.5}, + }; + + return $config; + }); diff --git a/jstests/concurrency/fsm_workloads/findAndModify_remove_queue.js b/jstests/concurrency/fsm_workloads/findAndModify_remove_queue.js index c08fc5775aa6a..037eaf472509b 100644 --- a/jstests/concurrency/fsm_workloads/findAndModify_remove_queue.js +++ b/jstests/concurrency/fsm_workloads/findAndModify_remove_queue.js @@ -26,10 +26,8 @@ var $config = (function() { }; }, - getIndexSpec: function getIndexSpec() { - return { - rand: 1 - }; + getIndexSpecs: function getIndexSpecs() { + return [{rand: 1}]; }, opName: 'removed', @@ -114,7 +112,9 @@ var $config = (function() { assertAlways.writeOK(res); assertAlways.eq(this.numDocs, res.nInserted); - assertAlways.commandWorked(db[collName].ensureIndex(this.getIndexSpec())); + this.getIndexSpecs().forEach(function ensureIndex(indexSpec) { + assertAlways.commandWorked(db[collName].ensureIndex(indexSpec)); + }); } function teardown(db, collName, cluster) { @@ -133,7 +133,7 @@ var $config = (function() { } } - assertWhenOwnColl(function() { + assertWhenOwnColl(() => { var docs = ownedDB[collName].find().toArray(); var ids = []; @@ -142,7 +142,7 @@ var $config = (function() { } checkForDuplicateIds(ids, this.opName); - }.bind(this)); + }); var res = ownedDB.dropDatabase(); assertAlways.commandWorked(res); diff --git a/jstests/concurrency/fsm_workloads/findAndModify_remove_queue_unindexed.js b/jstests/concurrency/fsm_workloads/findAndModify_remove_queue_unindexed.js new file mode 100644 index 0000000000000..80ce7567a7d28 --- /dev/null +++ b/jstests/concurrency/fsm_workloads/findAndModify_remove_queue_unindexed.js @@ -0,0 +1,31 @@ +'use strict'; + +/** + * findAndModify_remove_queue_unindexed.js + * + * This is the same workload as findAndModify_remove_queue.js, but without the relevant index. + * + * The lack of an index that could satisfy the sort forces the findAndModify operations to scan all + * the matching documents in order to find the relevant document. This increases the amount of work + * each findAndModify operation has to do before getting to the matching document, and thus + * increases the chance of a write conflict because each concurrent findAndModify operation is + * trying to remove the same document from the queue. + * + * This workload was designed to reproduce SERVER-21434. + */ +load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload +load('jstests/concurrency/fsm_workloads/findAndModify_remove_queue.js'); // for $config + +var $config = extendWorkload($config, + function($config, $super) { + + // Use the workload name as the database name, since the workload + // name is assumed to be unique. + $config.data.uniqueDBName = 'findAndModify_remove_queue_unindexed'; + + $config.data.getIndexSpecs = function getIndexSpecs() { + return []; + }; + + return $config; + }); diff --git a/jstests/concurrency/fsm_workloads/findAndModify_update_queue.js b/jstests/concurrency/fsm_workloads/findAndModify_update_queue.js index 9489f44708bf1..9637de739c5b4 100644 --- a/jstests/concurrency/fsm_workloads/findAndModify_update_queue.js +++ b/jstests/concurrency/fsm_workloads/findAndModify_update_queue.js @@ -31,11 +31,8 @@ var $config = extendWorkload( }; }; - $config.data.getIndexSpec = function getIndexSpec() { - return { - counter: 1, - rand: -1 - }; + $config.data.getIndexSpecs = function getIndexSpecs() { + return [{counter: 1, rand: -1}]; }; $config.data.opName = 'updated'; diff --git a/jstests/concurrency/fsm_workloads/findAndModify_update_queue_unindexed.js b/jstests/concurrency/fsm_workloads/findAndModify_update_queue_unindexed.js new file mode 100644 index 0000000000000..c6561829b2673 --- /dev/null +++ b/jstests/concurrency/fsm_workloads/findAndModify_update_queue_unindexed.js @@ -0,0 +1,31 @@ +'use strict'; + +/** + * findAndModify_update_queue_unindexed.js + * + * This is the same workload as findAndModify_update_queue.js, but without the relevant index. + * + * The lack of an index that could satisfy the sort forces the findAndModify operations to scan all + * the matching documents in order to find the relevant document. This increases the amount of work + * each findAndModify operation has to do before getting to the matching document, and thus + * increases the chance of a write conflict because each concurrent findAndModify operation is + * trying to update the same document from the queue. + * + * This workload was designed to reproduce SERVER-21434. + */ +load('jstests/concurrency/fsm_libs/extend_workload.js'); // for extendWorkload +load('jstests/concurrency/fsm_workloads/findAndModify_update_queue.js'); // for $config + +var $config = extendWorkload($config, + function($config, $super) { + + // Use the workload name as the database name, since the workload + // name is assumed to be unique. + $config.data.uniqueDBName = 'findAndModify_update_queue_unindexed'; + + $config.data.getIndexSpecs = function getIndexSpecs() { + return []; + }; + + return $config; + }); diff --git a/src/mongo/db/exec/SConscript b/src/mongo/db/exec/SConscript index 55cc7ebcbe119..995d9aec8ecd4 100644 --- a/src/mongo/db/exec/SConscript +++ b/src/mongo/db/exec/SConscript @@ -77,6 +77,7 @@ env.Library( "text_or.cpp", "update.cpp", "working_set_common.cpp", + "write_stage_common.cpp", ], LIBDEPS = [ "scoped_timer", diff --git a/src/mongo/db/exec/delete.cpp b/src/mongo/db/exec/delete.cpp index d52ad5a942ddf..81e952e04fa2b 100644 --- a/src/mongo/db/exec/delete.cpp +++ b/src/mongo/db/exec/delete.cpp @@ -36,6 +36,7 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/exec/write_stage_common.h" #include "mongo/db/service_context.h" #include "mongo/db/op_observer.h" #include "mongo/db/query/canonical_query.h" @@ -50,6 +51,22 @@ using std::unique_ptr; using std::vector; using stdx::make_unique; +namespace { + +/** + * Returns true if we should throw a WriteConflictException in order to retry the operation in + * the case of a conflict. Returns false if we should skip the document and keep going. + */ +bool shouldRestartDeleteIfNoLongerMatches(const DeleteStageParams& params) { + // When we're doing a findAndModify with a sort, the sort will have a limit of 1, so it will not + // produce any more results even if there is another matching document. Throw a WCE here so that + // these operations get another chance to find a matching document. The findAndModify command + // should automatically retry if it gets a WCE. + return params.returnDeleted && !params.sort.isEmpty(); +}; + +} // namespace + // static const char* DeleteStage::kStageType = "DELETE"; @@ -141,7 +158,7 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) { // We advanced, or are retrying, and id is set to the WSM to work on. WorkingSetMember* member = _ws->get(id); - // We want to free this member when we return, unless we need to retry it. + // We want to free this member when we return, unless we need to retry deleting or returning it. ScopeGuard memberFreer = MakeGuard(&WorkingSet::free, _ws, id); if (!member->hasRecordId()) { @@ -154,66 +171,57 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) { // a fetch. We should always get fetched data, and never just key data. invariant(member->hasObj()); + // Ensure the document still exists and matches the predicate. + bool docStillMatches; try { - // If the snapshot changed, then we have to make sure we have the latest copy of the - // doc and that it still matches. - std::unique_ptr cursor; - if (getOpCtx()->recoveryUnit()->getSnapshotId() != member->obj.snapshotId()) { - cursor = _collection->getCursor(getOpCtx()); - if (!WorkingSetCommon::fetch(getOpCtx(), _ws, id, cursor)) { - // Doc is already deleted. Nothing more to do. - return PlanStage::NEED_TIME; - } - - // Make sure the re-fetched doc still matches the predicate. - if (_params.canonicalQuery && - !_params.canonicalQuery->root()->matchesBSON(member->obj.value(), NULL)) { - // Doesn't match. - return PlanStage::NEED_TIME; - } - } + docStillMatches = write_stage_common::ensureStillMatches( + _collection, getOpCtx(), _ws, id, _params.canonicalQuery); + } catch (const WriteConflictException& wce) { + // There was a problem trying to detect if the document still exists, so retry. + memberFreer.Dismiss(); + return prepareToRetryWSM(id, out); + } - // Ensure that the BSONObj underlying the WorkingSetMember is owned because saveState() - // is allowed to free the memory. - if (_params.returnDeleted) { - // Save a copy of the document that is about to get deleted, but keep it in the - // RID_AND_OBJ state in case we need to retry deleting it. - BSONObj deletedDoc = member->obj.value(); - member->obj.setValue(deletedDoc.getOwned()); + if (!docStillMatches) { + // Either the document has already been deleted, or it has been updated such that it no + // longer matches the predicate. + if (shouldRestartDeleteIfNoLongerMatches(_params)) { + throw WriteConflictException(); } + return PlanStage::NEED_TIME; + } - // TODO: Do we want to buffer docs and delete them in a group rather than - // saving/restoring state repeatedly? + // Ensure that the BSONObj underlying the WorkingSetMember is owned because saveState() is + // allowed to free the memory. + if (_params.returnDeleted) { + // Save a copy of the document that is about to get deleted, but keep it in the RID_AND_OBJ + // state in case we need to retry deleting it. + BSONObj deletedDoc = member->obj.value(); + member->obj.setValue(deletedDoc.getOwned()); + } - try { - WorkingSetCommon::prepareForSnapshotChange(_ws); - child()->saveState(); - } catch (const WriteConflictException& wce) { - std::terminate(); - } + // TODO: Do we want to buffer docs and delete them in a group rather than saving/restoring state + // repeatedly? + + try { + WorkingSetCommon::prepareForSnapshotChange(_ws); + child()->saveState(); + } catch (const WriteConflictException& wce) { + std::terminate(); + } - // Do the write, unless this is an explain. - if (!_params.isExplain) { + // Do the write, unless this is an explain. + if (!_params.isExplain) { + try { WriteUnitOfWork wunit(getOpCtx()); _collection->deleteDocument(getOpCtx(), recordId, _params.fromMigrate); wunit.commit(); + } catch (const WriteConflictException& wce) { + memberFreer.Dismiss(); // Keep this member around so we can retry deleting it. + return prepareToRetryWSM(id, out); } - - ++_specificStats.docsDeleted; - } catch (const WriteConflictException& wce) { - // When we're doing a findAndModify with a sort, the sort will have a limit of 1, so will - // not produce any more results even if there is another matching document. Re-throw the WCE - // here so that these operations get another chance to find a matching document. The - // findAndModify command should automatically retry if it gets a WCE. - // TODO: this is not necessary if there was no sort specified. - if (_params.returnDeleted) { - throw; - } - _idRetrying = id; - memberFreer.Dismiss(); // Keep this member around so we can retry deleting it. - *out = WorkingSet::INVALID_ID; - return NEED_YIELD; } + ++_specificStats.docsDeleted; if (_params.returnDeleted) { // After deleting the document, the RecordId associated with this member is invalid. @@ -222,15 +230,14 @@ PlanStage::StageState DeleteStage::doWork(WorkingSetID* out) { member->transitionToOwnedObj(); } - // As restoreState may restore (recreate) cursors, cursors are tied to the - // transaction in which they are created, and a WriteUnitOfWork is a - // transaction, make sure to restore the state outside of the WritUnitOfWork. + // As restoreState may restore (recreate) cursors, cursors are tied to the transaction in which + // they are created, and a WriteUnitOfWork is a transaction, make sure to restore the state + // outside of the WriteUnitOfWork. try { child()->restoreState(); } catch (const WriteConflictException& wce) { - // Note we don't need to retry anything in this case since the delete already - // was committed. However, we still need to return the deleted document - // (if it was requested). + // Note we don't need to retry anything in this case since the delete already was committed. + // However, we still need to return the deleted document (if it was requested). if (_params.returnDeleted) { // member->obj should refer to the deleted document. invariant(member->getState() == WorkingSetMember::OWNED_OBJ); @@ -286,4 +293,10 @@ long long DeleteStage::getNumDeleted(const PlanExecutor& exec) { return deleteStats->docsDeleted; } +PlanStage::StageState DeleteStage::prepareToRetryWSM(WorkingSetID idToRetry, WorkingSetID* out) { + _idRetrying = idToRetry; + *out = WorkingSet::INVALID_ID; + return NEED_YIELD; +} + } // namespace mongo diff --git a/src/mongo/db/exec/delete.h b/src/mongo/db/exec/delete.h index 55879182adbfc..9a71c597e6365 100644 --- a/src/mongo/db/exec/delete.h +++ b/src/mongo/db/exec/delete.h @@ -44,7 +44,7 @@ struct DeleteStageParams { fromMigrate(false), isExplain(false), returnDeleted(false), - canonicalQuery(NULL) {} + canonicalQuery(nullptr) {} // Should we delete all documents returned from the child (a "multi delete"), or at most one // (a "single delete")? @@ -62,6 +62,9 @@ struct DeleteStageParams { // The parsed query predicate for this delete. Not owned here. CanonicalQuery* canonicalQuery; + + // The user-requested sort specification. Currently used just for findAndModify. + BSONObj sort; }; /** @@ -105,6 +108,12 @@ class DeleteStage final : public PlanStage { static long long getNumDeleted(const PlanExecutor& exec); private: + /** + * Stores 'idToRetry' in '_idRetrying' so the delete can be retried during the next call to + * work(). Always returns NEED_YIELD and sets 'out' to WorkingSet::INVALID_ID. + */ + StageState prepareToRetryWSM(WorkingSetID idToRetry, WorkingSetID* out); + DeleteStageParams _params; // Not owned by us. diff --git a/src/mongo/db/exec/update.cpp b/src/mongo/db/exec/update.cpp index 25ad8c344b0c2..15da8ecb0db78 100644 --- a/src/mongo/db/exec/update.cpp +++ b/src/mongo/db/exec/update.cpp @@ -36,6 +36,7 @@ #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/exec/scoped_timer.h" #include "mongo/db/exec/working_set_common.h" +#include "mongo/db/exec/write_stage_common.h" #include "mongo/db/service_context.h" #include "mongo/db/op_observer.h" #include "mongo/db/ops/update_lifecycle.h" @@ -410,6 +411,18 @@ Status addObjectIDIdField(mb::Document* doc) { return Status::OK(); } +/** + * Returns true if we should throw a WriteConflictException in order to retry the operation in the + * case of a conflict. Returns false if we should skip the document and keep going. + */ +bool shouldRestartUpdateIfNoLongerMatches(const UpdateStageParams& params) { + // When we're doing a findAndModify with a sort, the sort will have a limit of 1, so it will not + // produce any more results even if there is another matching document. Throw a WCE here so that + // these operations get another chance to find a matching document. The findAndModify command + // should automatically retry if it gets a WCE. + return params.request->shouldReturnAnyDocs() && !params.request->getSort().isEmpty(); +}; + } // namespace const char* UpdateStage::kStageType = "UPDATE"; @@ -826,7 +839,8 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) { WorkingSetMember* member = _ws->get(id); - // We want to free this member when we return, unless we need to retry it. + // We want to free this member when we return, unless we need to retry updating or returning + // it. ScopeGuard memberFreer = MakeGuard(&WorkingSet::free, _ws, id); if (!member->hasRecordId()) { @@ -848,70 +862,63 @@ PlanStage::StageState UpdateStage::doWork(WorkingSetID* out) { return PlanStage::NEED_TIME; } + bool docStillMatches; try { - std::unique_ptr cursor; - if (getOpCtx()->recoveryUnit()->getSnapshotId() != member->obj.snapshotId()) { - cursor = _collection->getCursor(getOpCtx()); - // our snapshot has changed, refetch - if (!WorkingSetCommon::fetch(getOpCtx(), _ws, id, cursor)) { - // document was deleted, we're done here - return PlanStage::NEED_TIME; - } + docStillMatches = write_stage_common::ensureStillMatches( + _collection, getOpCtx(), _ws, id, _params.canonicalQuery); + } catch (const WriteConflictException& wce) { + // There was a problem trying to detect if the document still exists, so retry. + memberFreer.Dismiss(); + return prepareToRetryWSM(id, out); + } - // we have to re-match the doc as it might not match anymore - CanonicalQuery* cq = _params.canonicalQuery; - if (cq && !cq->root()->matchesBSON(member->obj.value(), NULL)) { - // doesn't match predicates anymore! - return PlanStage::NEED_TIME; - } + if (!docStillMatches) { + // Either the document has been deleted, or it has been updated such that it no longer + // matches the predicate. + if (shouldRestartUpdateIfNoLongerMatches(_params)) { + throw WriteConflictException(); } + return PlanStage::NEED_TIME; + } - // Ensure that the BSONObj underlying the WorkingSetMember is owned because saveState() - // is allowed to free the memory. - member->makeObjOwnedIfNeeded(); + // Ensure that the BSONObj underlying the WorkingSetMember is owned because saveState() + // is allowed to free the memory. + member->makeObjOwnedIfNeeded(); - // Save state before making changes - try { - WorkingSetCommon::prepareForSnapshotChange(_ws); - child()->saveState(); - } catch (const WriteConflictException& wce) { - std::terminate(); - } + // Save state before making changes + try { + WorkingSetCommon::prepareForSnapshotChange(_ws); + child()->saveState(); + } catch (const WriteConflictException& wce) { + std::terminate(); + } - // If we care about the pre-updated version of the doc, save it out here. - BSONObj oldObj; - if (_params.request->shouldReturnOldDocs()) { - oldObj = member->obj.value().getOwned(); - } + // If we care about the pre-updated version of the doc, save it out here. + BSONObj oldObj; + if (_params.request->shouldReturnOldDocs()) { + oldObj = member->obj.value().getOwned(); + } + BSONObj newObj; + try { // Do the update, get us the new version of the doc. - BSONObj newObj = transformAndUpdate(member->obj, recordId); - - // Set member's obj to be the doc we want to return. - if (_params.request->shouldReturnAnyDocs()) { - if (_params.request->shouldReturnNewDocs()) { - member->obj = Snapshotted(getOpCtx()->recoveryUnit()->getSnapshotId(), - newObj.getOwned()); - } else { - invariant(_params.request->shouldReturnOldDocs()); - member->obj.setValue(oldObj); - } - member->recordId = RecordId(); - member->transitionToOwnedObj(); - } + newObj = transformAndUpdate(member->obj, recordId); } catch (const WriteConflictException& wce) { - // When we're doing a findAndModify with a sort, the sort will have a limit of 1, so - // will not produce any more results even if there is another matching document. - // Re-throw the WCE here so that these operations get another chance to find a matching - // document. The findAndModify command should automatically retry if it gets a WCE. - // TODO: this is not necessary if there was no sort specified. - if (_params.request->shouldReturnAnyDocs()) { - throw; - } - _idRetrying = id; memberFreer.Dismiss(); // Keep this member around so we can retry updating it. - *out = WorkingSet::INVALID_ID; - return NEED_YIELD; + return prepareToRetryWSM(id, out); + } + + // Set member's obj to be the doc we want to return. + if (_params.request->shouldReturnAnyDocs()) { + if (_params.request->shouldReturnNewDocs()) { + member->obj = Snapshotted(getOpCtx()->recoveryUnit()->getSnapshotId(), + newObj.getOwned()); + } else { + invariant(_params.request->shouldReturnOldDocs()); + member->obj.setValue(oldObj); + } + member->recordId = RecordId(); + member->transitionToOwnedObj(); } // This should be after transformAndUpdate to make sure we actually updated this doc. @@ -1048,4 +1055,10 @@ UpdateResult UpdateStage::makeUpdateResult(const UpdateStats* updateStats) { updateStats->objInserted); }; +PlanStage::StageState UpdateStage::prepareToRetryWSM(WorkingSetID idToRetry, WorkingSetID* out) { + _idRetrying = idToRetry; + *out = WorkingSet::INVALID_ID; + return NEED_YIELD; +} + } // namespace mongo diff --git a/src/mongo/db/exec/update.h b/src/mongo/db/exec/update.h index 263c28ab8e520..0fa391c778f36 100644 --- a/src/mongo/db/exec/update.h +++ b/src/mongo/db/exec/update.h @@ -176,6 +176,12 @@ class UpdateStage final : public PlanStage { */ Status restoreUpdateState(); + /** + * Stores 'idToRetry' in '_idRetrying' so the update can be retried during the next call to + * work(). Always returns NEED_YIELD and sets 'out' to WorkingSet::INVALID_ID. + */ + StageState prepareToRetryWSM(WorkingSetID idToRetry, WorkingSetID* out); + UpdateStageParams _params; // Not owned by us. diff --git a/src/mongo/db/exec/write_stage_common.cpp b/src/mongo/db/exec/write_stage_common.cpp new file mode 100644 index 0000000000000..662373f1d04b6 --- /dev/null +++ b/src/mongo/db/exec/write_stage_common.cpp @@ -0,0 +1,68 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/exec/write_stage_common.h" + +#include "mongo/db/catalog/collection.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/exec/working_set.h" +#include "mongo/db/exec/working_set_common.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/query/canonical_query.h" + +namespace mongo { +namespace write_stage_common { + +bool ensureStillMatches(const Collection* collection, + OperationContext* txn, + WorkingSet* ws, + WorkingSetID id, + const CanonicalQuery* cq) { + // If the snapshot changed, then we have to make sure we have the latest copy of the doc and + // that it still matches. + WorkingSetMember* member = ws->get(id); + if (txn->recoveryUnit()->getSnapshotId() != member->obj.snapshotId()) { + std::unique_ptr cursor(collection->getCursor(txn)); + + if (!WorkingSetCommon::fetch(txn, ws, id, cursor)) { + // Doc is already deleted. + return false; + } + + // Make sure the re-fetched doc still matches the predicate. + if (cq && !cq->root()->matchesBSON(member->obj.value(), nullptr)) { + // No longer matches. + return false; + } + } + return true; +} + +} // namespace write_stage_common +} // namespace mongo diff --git a/src/mongo/db/exec/write_stage_common.h b/src/mongo/db/exec/write_stage_common.h new file mode 100644 index 0000000000000..388c7d2c763cc --- /dev/null +++ b/src/mongo/db/exec/write_stage_common.h @@ -0,0 +1,58 @@ +/** + * Copyright (C) 2016 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/platform/basic.h" + +#include "mongo/db/exec/working_set.h" + +namespace mongo { + +class CanonicalQuery; +class Collection; +class OperationContext; + +namespace write_stage_common { + +/** + * Returns true if the document referred to by 'id' still exists and matches the query predicate + * given by 'cq'. Returns true if the document still exists and 'cq' is null. Returns false + * otherwise. + * + * May throw a WriteConflictException if there was a conflict while searching to see if the document + * still exists. + */ +bool ensureStillMatches(const Collection* collection, + OperationContext* txn, + WorkingSet* ws, + WorkingSetID id, + const CanonicalQuery* cq); +} +} diff --git a/src/mongo/db/query/get_executor.cpp b/src/mongo/db/query/get_executor.cpp index ade024378e657..dbbd6e8176aaa 100644 --- a/src/mongo/db/query/get_executor.cpp +++ b/src/mongo/db/query/get_executor.cpp @@ -671,6 +671,7 @@ StatusWith> getExecutorDelete(OperationContext* txn, deleteStageParams.fromMigrate = request->isFromMigrate(); deleteStageParams.isExplain = request->isExplain(); deleteStageParams.returnDeleted = request->shouldReturnDeleted(); + deleteStageParams.sort = request->getSort(); unique_ptr ws = make_unique(); PlanExecutor::YieldPolicy policy =