Skip to content

Commit

Permalink
SERVER-58060 Add new internal FindAndModifyImageLookup aggregation stage
Browse files Browse the repository at this point in the history
  • Loading branch information
jasonjhchan authored and Evergreen Agent committed Jul 23, 2021
1 parent a83aed6 commit e7efcb3
Show file tree
Hide file tree
Showing 4 changed files with 606 additions and 0 deletions.
3 changes: 3 additions & 0 deletions src/mongo/db/pipeline/SConscript
Expand Up @@ -243,6 +243,7 @@ pipelineEnv.Library(
'document_source_densify.cpp',
'document_source_exchange.cpp',
'document_source_facet.cpp',
'document_source_find_and_modify_image_lookup.cpp',
'document_source_geo_near.cpp',
'document_source_graph_lookup.cpp',
'document_source_group.cpp',
Expand Down Expand Up @@ -312,6 +313,7 @@ pipelineEnv.Library(
'$BUILD_DIR/mongo/db/query/datetime/date_time_support',
'$BUILD_DIR/mongo/db/query/query_knobs',
'$BUILD_DIR/mongo/db/query/sort_pattern',
'$BUILD_DIR/mongo/db/repl/image_collection_entry',
'$BUILD_DIR/mongo/db/repl/oplog_entry',
'$BUILD_DIR/mongo/db/repl/read_concern_args',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
Expand Down Expand Up @@ -424,6 +426,7 @@ env.CppUnitTest(
'document_source_densify_test.cpp',
'document_source_exchange_test.cpp',
'document_source_facet_test.cpp',
'document_source_find_and_modify_image_lookup_test.cpp',
'document_source_geo_near_test.cpp',
'document_source_graph_lookup_test.cpp',
'document_source_group_test.cpp',
Expand Down
219 changes: 219 additions & 0 deletions src/mongo/db/pipeline/document_source_find_and_modify_image_lookup.cpp
@@ -0,0 +1,219 @@
/**
* Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/

#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand

#include "mongo/platform/basic.h"

#include "mongo/db/exec/document_value/document.h"
#include "mongo/db/pipeline/document_source_find_and_modify_image_lookup.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/logv2/log.h"

namespace mongo {
namespace {
// Downconverts a 'findAndModify' entry by stripping the 'needsRetryImage' field and appending
// the appropriate 'preImageOpTime' or 'postImageOpTime' field.
Document downConvertFindAndModifyEntry(Document inputDoc,
repl::OpTime imageOpTime,
repl::RetryImageEnum imageType) {
MutableDocument doc{inputDoc};
const auto imageOpTimeFieldName = imageType == repl::RetryImageEnum::kPreImage
? repl::OplogEntry::kPreImageOpTimeFieldName
: repl::OplogEntry::kPostImageOpTimeFieldName;
doc.setField(
imageOpTimeFieldName,
Value{Document{{repl::OpTime::kTimestampFieldName.toString(), imageOpTime.getTimestamp()},
{repl::OpTime::kTermFieldName.toString(), imageOpTime.getTerm()}}});
doc.remove(repl::OplogEntryBase::kNeedsRetryImageFieldName);
return doc.freeze();
}
} // namespace

using OplogEntry = repl::OplogEntryBase;

REGISTER_INTERNAL_DOCUMENT_SOURCE(_internalFindAndModifyImageLookup,
LiteParsedDocumentSourceDefault::parse,
DocumentSourceFindAndModifyImageLookup::createFromBson,
true);

boost::intrusive_ptr<DocumentSourceFindAndModifyImageLookup>
DocumentSourceFindAndModifyImageLookup::create(
const boost::intrusive_ptr<ExpressionContext>& expCtx) {
return new DocumentSourceFindAndModifyImageLookup(expCtx);
}

boost::intrusive_ptr<DocumentSourceFindAndModifyImageLookup>
DocumentSourceFindAndModifyImageLookup::createFromBson(
const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx) {
uassert(5806003,
str::stream() << "the '" << kStageName << "' spec must be an empty object",
elem.type() == BSONType::Object && elem.Obj().isEmpty());
return DocumentSourceFindAndModifyImageLookup::create(expCtx);
}

DocumentSourceFindAndModifyImageLookup::DocumentSourceFindAndModifyImageLookup(
const boost::intrusive_ptr<ExpressionContext>& expCtx)
: DocumentSource(kStageName, expCtx) {}

StageConstraints DocumentSourceFindAndModifyImageLookup::constraints(
Pipeline::SplitState pipeState) const {
return StageConstraints(StreamType::kStreaming,
PositionRequirement::kNone,
HostTypeRequirement::kAnyShard,
DiskUseRequirement::kNoDiskUse,
FacetRequirement::kNotAllowed,
TransactionRequirement::kNotAllowed,
LookupRequirement::kNotAllowed,
UnionRequirement::kNotAllowed,
ChangeStreamRequirement::kDenylist);
}

Value DocumentSourceFindAndModifyImageLookup::serialize(
boost::optional<ExplainOptions::Verbosity> explain) const {
return Value(Document{{kStageName, Value(Document{})}});
}

DepsTracker::State DocumentSourceFindAndModifyImageLookup::getDependencies(
DepsTracker* deps) const {
deps->fields.insert(OplogEntry::kSessionIdFieldName.toString());
deps->fields.insert(OplogEntry::kTxnNumberFieldName.toString());
deps->fields.insert(OplogEntry::kNeedsRetryImageFieldName.toString());
deps->fields.insert(OplogEntry::kWallClockTimeFieldName.toString());
deps->fields.insert(OplogEntry::kNssFieldName.toString());
deps->fields.insert(OplogEntry::kTimestampFieldName.toString());
deps->fields.insert(OplogEntry::kTermFieldName.toString());
deps->fields.insert(OplogEntry::kUuidFieldName.toString());
return DepsTracker::State::SEE_NEXT;
}

DocumentSource::GetModPathsReturn DocumentSourceFindAndModifyImageLookup::getModifiedPaths() const {
return {DocumentSource::GetModPathsReturn::Type::kAllPaths, std::set<std::string>{}, {}};
}

DocumentSource::GetNextResult DocumentSourceFindAndModifyImageLookup::doGetNext() {
uassert(5806001,
str::stream() << kStageName << " cannot be executed from mongos",
!pExpCtx->inMongos);
if (_stashedFindAndModifyDoc) {
// Return the stashed findAndModify document. This indicates that the previous document
// returned was a forged noop image document.
auto doc = *_stashedFindAndModifyDoc;
_stashedFindAndModifyDoc = boost::none;
return doc;
}

auto input = pSource->getNext();
if (!input.isAdvanced()) {
return input;
}
auto doc = input.releaseDocument();
if (auto imageEntry = _forgeNoopImageDoc(doc, pExpCtx->opCtx)) {
return std::move(*imageEntry);
}
return doc;
}

boost::optional<Document> DocumentSourceFindAndModifyImageLookup::_forgeNoopImageDoc(
Document inputDoc, OperationContext* opCtx) {
const auto needsRetryImageVal =
inputDoc.getField(repl::OplogEntryBase::kNeedsRetryImageFieldName);
if (needsRetryImageVal.missing()) {
return boost::none;
}

const auto inputDocBson = inputDoc.toBson();
const auto sessionIdBson = inputDocBson.getObjectField(OplogEntry::kSessionIdFieldName);
auto localImageCollInfo = pExpCtx->mongoProcessInterface->getCollectionOptions(
pExpCtx->opCtx, NamespaceString::kConfigImagesNamespace);

// Extract the UUID from the collection information. We should always have a valid uuid
// here.
auto imageCollUUID = invariantStatusOK(UUID::parse(localImageCollInfo["uuid"]));
const auto& readConcernBson = repl::ReadConcernArgs::get(opCtx).toBSON();
auto imageDoc = pExpCtx->mongoProcessInterface->lookupSingleDocument(
pExpCtx,
NamespaceString::kConfigImagesNamespace,
imageCollUUID,
Document{BSON("_id" << sessionIdBson)},
std::move(readConcernBson));

if (!imageDoc) {
// If no image document with the corresponding 'sessionId' is found, we skip forging the
// no-op and rely on the retryable write mechanism to catch that no pre- or post- image
// exists.
LOGV2_DEBUG(
580602,
2,
"Not forging no-op image oplog entry because no image document found with sessionId",
"sessionId"_attr = sessionIdBson);
return boost::none;
}

auto image = repl::ImageEntry::parse(IDLParserErrorContext("image entry"), imageDoc->toBson());
const auto inputOplog = uassertStatusOK(repl::OplogEntry::parse(inputDocBson));
if (image.getTxnNumber() != inputOplog.getTxnNumber()) {
// In our snapshot, fetch the current transaction number for a session. If that
// transaction number doesn't match what's found on the image lookup, it implies that
// the image is not the correct version for this oplog entry. We will not forge a noop
// from it.
LOGV2_DEBUG(
580603,
2,
"Not forging no-op image oplog entry because image document has a different txnNum",
"sessionId"_attr = sessionIdBson,
"expectedTxnNum"_attr = inputOplog.getTxnNumber(),
"actualTxnNum"_attr = image.getTxnNumber());
return boost::none;
}

// Stash the 'findAndModify' document to return after downconverting it.
repl::OpTime imageOpTime(inputOplog.getTimestamp(), *inputOplog.getTerm());
const auto docToStash =
downConvertFindAndModifyEntry(inputDoc,
imageOpTime,
repl::RetryImage_parse(IDLParserErrorContext("retry image"),
needsRetryImageVal.getStringData()));
_stashedFindAndModifyDoc = docToStash;

// Forge a no-op image document to be returned.
repl::MutableOplogEntry forgedNoop;
forgedNoop.setSessionId(image.get_id());
forgedNoop.setTxnNumber(image.getTxnNumber());
forgedNoop.setObject(image.getImage());
forgedNoop.setOpType(repl::OpTypeEnum::kNoop);
forgedNoop.setWallClockTime(inputOplog.getWallClockTime());
forgedNoop.setNss(inputOplog.getNss());
forgedNoop.setUuid(*inputOplog.getUuid());

forgedNoop.setOpTime(imageOpTime);
forgedNoop.setStatementIds({0});
return Document{forgedNoop.toBSON()};
}
} // namespace mongo
@@ -0,0 +1,83 @@
/**
* Copyright (C) 2021-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
* as published by MongoDB, Inc.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Server Side Public License for more details.
*
* You should have received a copy of the Server Side Public License
* along with this program. If not, see
* <http://www.mongodb.com/licensing/server-side-public-license>.
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the Server Side Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/

#pragma once

#include "mongo/db/pipeline/document_source.h"

namespace mongo {

/**
* This stage will take a list of oplog entries as input and forge a no-op pre- or post-image to be
* returned before each 'findAndModify' oplog entry that has the 'needsRetryImage' field. This stage
* also downconverts 'findAndModify' entries by stripping the 'needsRetryImage' field and appending
* the appropriate 'preImageOpTime' or 'postImageOpTime' field.
*/
class DocumentSourceFindAndModifyImageLookup : public DocumentSource {
public:
static constexpr StringData kStageName = "$_internalFindAndModifyImageLookup"_sd;

static boost::intrusive_ptr<DocumentSourceFindAndModifyImageLookup> create(
const boost::intrusive_ptr<ExpressionContext>&);

static boost::intrusive_ptr<DocumentSourceFindAndModifyImageLookup> createFromBson(
const BSONElement elem, const boost::intrusive_ptr<ExpressionContext>& expCtx);

DepsTracker::State getDependencies(DepsTracker* deps) const final;

DocumentSource::GetModPathsReturn getModifiedPaths() const final;

Value serialize(boost::optional<ExplainOptions::Verbosity> explain) const;

StageConstraints constraints(Pipeline::SplitState pipeState) const final;

boost::optional<DistributedPlanLogic> distributedPlanLogic() final {
return boost::none;
}

const char* getSourceName() const {
return DocumentSourceFindAndModifyImageLookup::kStageName.rawData();
}

protected:
DocumentSource::GetNextResult doGetNext() override;

private:
DocumentSourceFindAndModifyImageLookup(const boost::intrusive_ptr<ExpressionContext>& expCtx);

// Forges the no-op pre- or post-image document to be returned. Also downconverts the original
// 'findAndModify' oplog entry and stashes it.
boost::optional<Document> _forgeNoopImageDoc(Document inputDoc, OperationContext* opCtx);

// Represents the stashed 'findAndModify' document. This indicates that the previous document
// emitted was a forged pre- or post-image.
boost::optional<Document> _stashedFindAndModifyDoc;
};

} // namespace mongo

0 comments on commit e7efcb3

Please sign in to comment.