Skip to content

Commit

Permalink
SERVER-28509 DBClientCursor now uses read commands
Browse files Browse the repository at this point in the history
  • Loading branch information
RedBeard0531 committed Jul 26, 2017
1 parent 8b36e1d commit 84ef475
Show file tree
Hide file tree
Showing 19 changed files with 277 additions and 166 deletions.
161 changes: 110 additions & 51 deletions src/mongo/client/dbclientcursor.cpp
Expand Up @@ -37,6 +37,9 @@
#include "mongo/db/client.h"
#include "mongo/db/dbmessage.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/query/cursor_response.h"
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/query_request.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/metadata.h"
Expand Down Expand Up @@ -87,36 +90,70 @@ int DBClientCursor::nextBatchSize() {
return batchSize < nToReturn ? batchSize : nToReturn;
}

void DBClientCursor::_assembleInit(Message& toSend) {
Message DBClientCursor::_assembleInit() {
if (cursorId) {
return _assembleGetMore();
}

// If we haven't gotten a cursorId yet, we need to issue a new query or command.
if (!cursorId) {
if (_isCommand) {
// HACK:
// Unfortunately, this code is used by the shell to run commands,
// so we need to allow the shell to send invalid options so that we can
// test that the server rejects them. Thus, to allow generating commands with
// invalid options, we validate them here, and fall back to generating an OP_QUERY
// through assembleQueryRequest if the options are invalid.
bool hasValidNToReturnForCommand = (nToReturn == 1 || nToReturn == -1);
bool hasValidFlagsForCommand = !(opts & mongo::QueryOption_Exhaust);
bool hasInvalidMaxTimeMs = query.hasField("$maxTimeMS");

if (hasValidNToReturnForCommand && hasValidFlagsForCommand && !hasInvalidMaxTimeMs) {
toSend = assembleCommandRequest(_client, nsToDatabaseSubstring(ns), opts, query);
return;
if (_isCommand) {
// HACK:
// Unfortunately, this code is used by the shell to run commands,
// so we need to allow the shell to send invalid options so that we can
// test that the server rejects them. Thus, to allow generating commands with
// invalid options, we validate them here, and fall back to generating an OP_QUERY
// through assembleQueryRequest if the options are invalid.
bool hasValidNToReturnForCommand = (nToReturn == 1 || nToReturn == -1);
bool hasValidFlagsForCommand = !(opts & mongo::QueryOption_Exhaust);
bool hasInvalidMaxTimeMs = query.hasField("$maxTimeMS");

if (hasValidNToReturnForCommand && hasValidFlagsForCommand && !hasInvalidMaxTimeMs) {
return assembleCommandRequest(_client, ns.db(), opts, query);
}
} else if (_useFindCommand) {
auto qr = QueryRequest::fromLegacyQuery(ns,
query,
fieldsToReturn ? *fieldsToReturn : BSONObj(),
nToSkip,
nextBatchSize(),
opts);
if (qr.isOK() && !qr.getValue()->isExplain() && !qr.getValue()->isExhaust()) {
BSONObj cmd = qr.getValue()->asFindCommand();
if (auto readPref = query["$readPreference"]) {
// QueryRequest doesn't handle $readPreference.
cmd = BSONObjBuilder(std::move(cmd)).append(readPref).obj();
}
return assembleCommandRequest(_client, ns.db(), opts, std::move(cmd));
}
assembleQueryRequest(ns, query, nextBatchSize(), nToSkip, fieldsToReturn, opts, toSend);
return;
// else use legacy OP_QUERY request.
}

_useFindCommand = false; // Make sure we handle the reply correctly.
Message toSend;
assembleQueryRequest(ns.ns(), query, nextBatchSize(), nToSkip, fieldsToReturn, opts, toSend);
return toSend;
}

Message DBClientCursor::_assembleGetMore() {
invariant(cursorId);
if (_useFindCommand) {
long long batchSize = nextBatchSize();
auto gmr = GetMoreRequest(ns,
cursorId,
boost::make_optional(batchSize != 0, batchSize),
boost::none, // awaitDataTimeout
boost::none, // term
boost::none); // lastKnownCommittedOptime
return assembleCommandRequest(_client, ns.db(), opts, gmr.toBSON());
} else {
// Assemble a legacy getMore request.
return makeGetMoreMessage(ns.ns(), cursorId, nextBatchSize(), opts);
}
// Assemble a legacy getMore request.
toSend = makeGetMoreMessage(ns, cursorId, nToReturn, opts);
}

bool DBClientCursor::init() {
invariant(!_connectionHasPendingReplies);
Message toSend;
_assembleInit(toSend);
Message toSend = _assembleInit();
verify(_client);
Message reply;
if (!_client->call(toSend, reply, false, &_originalHost)) {
Expand All @@ -137,8 +174,7 @@ void DBClientCursor::initLazy(bool isRetry) {
massert(15875,
"DBClientCursor::initLazy called on a client that doesn't support lazy",
_client->lazySupported());
Message toSend;
_assembleInit(toSend);
Message toSend = _assembleInit();
_client->say(toSend, isRetry, &_originalHost);
_lastRequestId = toSend.header().getId();
_connectionHasPendingReplies = true;
Expand Down Expand Up @@ -176,21 +212,27 @@ void DBClientCursor::requestMore() {
verify(nToReturn > 0);
}

Message toSend = makeGetMoreMessage(ns, cursorId, nextBatchSize(), opts);
ON_BLOCK_EXIT([ this, origClient = _client ] { _client = origClient; });
boost::optional<ScopedDbConnection> connHolder;
if (!_client) {
invariant(_scopedHost.size());
connHolder.emplace(_scopedHost);
_client = connHolder->get();
}

Message toSend = _assembleGetMore();
Message response;
_client->call(toSend, response);

// If call() succeeds, the connection is clean so we can return it to the pool, even if
// dataReceived() throws because the command reported failure. However, we can't return it yet,
// because dataReceived() needs to get the metadata reader from the connection.
ON_BLOCK_EXIT([&] {
if (connHolder)
connHolder->done();
});

if (_client) {
_client->call(toSend, response);
dataReceived(response);
} else {
verify(_scopedHost.size());
ScopedDbConnection conn(_scopedHost);
conn->call(toSend, response);
_client = conn.get();
ON_BLOCK_EXIT([this] { _client = nullptr; });
dataReceived(response);
conn.done();
}
dataReceived(response);
}

/** with QueryOption_Exhaust, the server just blasts data at us (marked at end with cursorid==0). */
Expand All @@ -205,15 +247,11 @@ void DBClientCursor::exhaustReceiveMore() {
dataReceived(response);
}

void DBClientCursor::commandDataReceived(const Message& reply) {
BSONObj DBClientCursor::commandDataReceived(const Message& reply) {
int op = reply.operation();
invariant(op == opReply || op == dbCommandReply || op == dbMsg);

batch.objs.clear();
batch.pos = 0;

auto commandReply = rpc::makeReply(&reply);

auto commandStatus = getStatusFromCommandResult(commandReply->getCommandReply());

if (ErrorCodes::SendStaleConfig == commandStatus) {
Expand All @@ -229,13 +267,25 @@ void DBClientCursor::commandDataReceived(const Message& reply) {
opCtx, commandReply->getMetadata(), _client->getServerAddress()));
}

batch.objs.push_back(commandReply->getCommandReply().getOwned());
return commandReply->getCommandReply().getOwned();
}

void DBClientCursor::dataReceived(const Message& reply, bool& retry, string& host) {
batch.objs.clear();
batch.pos = 0;

// If this is a reply to our initial command request.
if (_isCommand && cursorId == 0) {
commandDataReceived(reply);
batch.objs.push_back(commandDataReceived(reply));
return;
}

if (_useFindCommand) {
cursorId = 0; // Don't try to kill cursor if we get back an error.
auto cr = uassertStatusOK(CursorResponse::parseFromBSON(commandDataReceived(reply)));
cursorId = cr.getCursorId();
ns = cr.getNSS(); // Unlike OP_REPLY, find command can change the ns to use for getMores.
batch.objs = cr.releaseBatch();
return;
}

Expand Down Expand Up @@ -272,8 +322,6 @@ void DBClientCursor::dataReceived(const Message& reply, bool& retry, string& hos
_lastRequestId = reply.header().getId();
}

batch.pos = 0;
batch.objs.clear();
batch.objs.reserve(qr.getNReturned());

BufReader data(qr.data(), qr.dataLen());
Expand Down Expand Up @@ -460,13 +508,16 @@ DBClientCursor::DBClientCursor(DBClientBase* client,
haveLimit(nToReturn > 0 && !(queryOptions & QueryOption_CursorTailable)),
nToSkip(nToSkip),
fieldsToReturn(fieldsToReturn),
opts(queryOptions),
opts(queryOptions & ~QueryOptionLocal_forceOpQuery),
batchSize(batchSize == 1 ? 2 : batchSize),
resultFlags(0),
cursorId(cursorId),
_ownCursor(true),
wasError(false),
_enabledBSONVersion(Validator<BSONObj>::enabledBSONVersion()) {}
_enabledBSONVersion(Validator<BSONObj>::enabledBSONVersion()) {
if (queryOptions & QueryOptionLocal_forceOpQuery)
_useFindCommand = false;
}

DBClientCursor::~DBClientCursor() {
kill();
Expand All @@ -475,14 +526,22 @@ DBClientCursor::~DBClientCursor() {
void DBClientCursor::kill() {
DESTRUCTOR_GUARD({
if (cursorId && _ownCursor && !globalInShutdownDeprecated()) {
auto toSend = makeKillCursorsMessage(cursorId);
auto killCursor = [&](auto& conn) {
if (_useFindCommand) {
conn->killCursor(ns, cursorId);
} else {
auto toSend = makeKillCursorsMessage(cursorId);
conn->say(toSend);
}
};

if (_client && !_connectionHasPendingReplies) {
_client->say(toSend);
killCursor(_client);
} else {
// Use a side connection to send the kill cursor request.
verify(_scopedHost.size() || (_client && _connectionHasPendingReplies));
ScopedDbConnection conn(_client ? _client->getServerAddress() : _scopedHost);
conn->say(toSend);
killCursor(conn);
conn.done();
}
}
Expand Down
43 changes: 18 additions & 25 deletions src/mongo/client/dbclientcursor.h
Expand Up @@ -41,28 +41,13 @@ namespace mongo {

class AScopedConnection;

/** for mock purposes only -- do not create variants of DBClientCursor, nor hang code here
@see DBClientMockCursor
*/
class DBClientCursorInterface {
MONGO_DISALLOW_COPYING(DBClientCursorInterface);

public:
virtual ~DBClientCursorInterface() {}
virtual bool more() = 0;
virtual BSONObj next() = 0;
// TODO bring more of the DBClientCursor interface to here
protected:
DBClientCursorInterface() {}
};

/** Queries return a cursor object */
class DBClientCursor : public DBClientCursorInterface {
class DBClientCursor {
MONGO_DISALLOW_COPYING(DBClientCursor);

public:
/** If true, safe to call next(). Requests more from server if necessary. */
bool more();
virtual bool more();

/** If true, there is more in our local buffers to be fetched via next(). Returns
false when a getMore request back to server would be required. You can use this
Expand All @@ -82,7 +67,7 @@ class DBClientCursor : public DBClientCursorInterface {
{ $err: <std::string> }
if you do not want to handle that yourself, call nextSafe().
*/
BSONObj next();
virtual BSONObj next();

/**
restore an object previously returned by next() to the cursor
Expand Down Expand Up @@ -147,6 +132,13 @@ class DBClientCursor : public DBClientCursorInterface {
batchSize = newBatchSize;
}


/**
* Fold this in with queryOptions to force the use of legacy query operations.
* This flag is never sent over the wire and is only used locally.
*/
enum { QueryOptionLocal_forceOpQuery = 1 << 30 };

DBClientCursor(DBClientBase* client,
const std::string& ns,
const BSONObj& query,
Expand Down Expand Up @@ -182,7 +174,7 @@ class DBClientCursor : public DBClientCursorInterface {
}

std::string getns() const {
return ns;
return ns.ns();
}

/**
Expand Down Expand Up @@ -244,7 +236,7 @@ class DBClientCursor : public DBClientCursorInterface {
Batch batch;
DBClientBase* _client;
std::string _originalHost;
const std::string ns;
NamespaceString ns;
const bool _isCommand;
BSONObj query;
int nToReturn;
Expand All @@ -261,6 +253,7 @@ class DBClientCursor : public DBClientCursorInterface {
std::string _lazyHost;
bool wasError;
BSONVersion _enabledBSONVersion;
bool _useFindCommand = true;
bool _connectionHasPendingReplies = false;
int _lastRequestId = 0;

Expand All @@ -272,16 +265,16 @@ class DBClientCursor : public DBClientCursorInterface {
void dataReceived(const Message& reply, bool& retry, std::string& lazyHost);

/**
* Called by dataReceived when the query was actually a command. Parses the command reply
* according to the RPC protocol used to send it, and then fills in the internal field
* of this cursor with the received data.
* Parses and returns command replies regardless of which command protocol was used.
* Does *not* parse replies from non-command OP_QUERY finds.
*/
void commandDataReceived(const Message& reply);
BSONObj commandDataReceived(const Message& reply);

void requestMore();

// init pieces
void _assembleInit(Message& toSend);
Message _assembleInit();
Message _assembleGetMore();
};

/** iterate over objects in current batch only - will not cause a network call
Expand Down
2 changes: 1 addition & 1 deletion src/mongo/client/dbclientmockcursor.h
Expand Up @@ -33,7 +33,7 @@

namespace mongo {

class DBClientMockCursor : public DBClientCursorInterface {
class DBClientMockCursor {
public:
DBClientMockCursor(const BSONArray& mockCollection) : _iter(mockCollection) {}
virtual ~DBClientMockCursor() {}
Expand Down
6 changes: 0 additions & 6 deletions src/mongo/db/commands/find_cmd.cpp
Expand Up @@ -227,12 +227,6 @@ class FindCmd : public BasicCommand {
// Although it is a command, a find command gets counted as a query.
globalOpCounters.gotQuery();

if (opCtx->getClient()->isInDirectClient()) {
return appendCommandStatus(
result,
Status(ErrorCodes::IllegalOperation, "Cannot run find command from eval()"));
}

// Parse the command BSON to a QueryRequest.
const bool isExplain = false;
auto qrStatus = QueryRequest::makeFromFindCommand(nss, cmdObj, isExplain);
Expand Down

0 comments on commit 84ef475

Please sign in to comment.