Skip to content

Commit

Permalink
CDRIVER-3893: Conditionally support $merge and $out execution on seco…
Browse files Browse the repository at this point in the history
…ndaries (#904)

* Allow readPreference:secondary with $merge and $out. [Finish CDRIVER-3893, CDRIVER-4224, and CDRIVER-4195]
* Pass around the effectively chosen read mode used in server selection back to the command builder
* Update doc func name
* Fix duplicate doc comments
* Consolidate duplicate state in suitable_servers()
* Less magic numbers
* Clarify wire version checks
* Force primary read preference if talking to a single server older than 5.0
  • Loading branch information
vector-of-bool committed Jan 25, 2022
1 parent ee64af1 commit 9eba451
Show file tree
Hide file tree
Showing 38 changed files with 1,291 additions and 233 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Expand Up @@ -79,6 +79,8 @@ set (CMAKE_MODULE_PATH
include (InstallRequiredSystemLibraries)
include (GNUInstallDirs)

include(MongoC-Warnings)

# Enable CCache, if possible
include (CCache)
# Link with LLD, if possible
Expand Down
9 changes: 3 additions & 6 deletions src/libmongoc/src/mongoc/mongoc-aggregate.c
Expand Up @@ -276,12 +276,9 @@ _mongoc_aggregate (mongoc_client_t *client,
has_write_key = _has_write_key (&iter);
}

if (has_write_key && cursor->read_prefs->mode != MONGOC_READ_PRIMARY) {
mongoc_read_prefs_destroy (cursor->read_prefs);
cursor->read_prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY);
MONGOC_WARNING ("$out or $merge stage specified. Overriding read "
"preference to primary.");
}
/* This has an important effect on server selection when
* readPreferences=secondary. Keep track of this fact for later use. */
cursor->is_aggr_with_write_stage = has_write_key;

/* server id isn't enough. ensure we're connected & know wire version */
server_stream = _mongoc_cursor_fetch_stream (cursor);
Expand Down
9 changes: 7 additions & 2 deletions src/libmongoc/src/mongoc/mongoc-change-stream.c
Expand Up @@ -279,8 +279,13 @@ _make_cursor (mongoc_change_stream_t *stream)
goto cleanup;
}

server_stream = mongoc_cluster_stream_for_reads (
&stream->client->cluster, stream->read_prefs, cs, &reply, &stream->err);
server_stream =
mongoc_cluster_stream_for_reads (&stream->client->cluster,
stream->read_prefs,
cs,
&reply,
/* Not aggregate-with-write */ false,
&stream->err);
if (!server_stream) {
bson_destroy (&stream->err_doc);
bson_copy_to (&reply, &stream->err_doc);
Expand Down
33 changes: 25 additions & 8 deletions src/libmongoc/src/mongoc/mongoc-client.c
Expand Up @@ -1791,6 +1791,7 @@ _mongoc_client_retryable_read_command_with_stream (
parts->read_prefs,
parts->assembled.session,
NULL,
/* Not aggregate-with-write */ false,
&ignored_error);

if (retry_server_stream && retry_server_stream->sd->max_wire_version >=
Expand Down Expand Up @@ -1879,7 +1880,12 @@ mongoc_client_command_simple (mongoc_client_t *client,
* preference argument."
*/
server_stream =
mongoc_cluster_stream_for_reads (cluster, read_prefs, NULL, reply, error);
mongoc_cluster_stream_for_reads (cluster,
read_prefs,
NULL,
reply,
/* Not aggregate-with-write */ false,
error);

if (server_stream) {
ret = _mongoc_client_command_with_stream (
Expand Down Expand Up @@ -2039,7 +2045,12 @@ _mongoc_client_command_with_opts (mongoc_client_t *client,
mongoc_cluster_stream_for_writes (cluster, cs, reply_ptr, error);
} else {
server_stream =
mongoc_cluster_stream_for_reads (cluster, prefs, cs, reply_ptr, error);
mongoc_cluster_stream_for_reads (cluster,
prefs,
cs,
reply_ptr,
/* Not aggregate-with-write */ false,
error);
}

if (!server_stream) {
Expand Down Expand Up @@ -2586,8 +2597,12 @@ mongoc_client_kill_cursor (mongoc_client_t *client, int64_t cursor_id)
}

/* see if there's a known writable server - do no I/O or retries */
selected_server = mongoc_topology_description_select (
td.ptr, MONGOC_SS_WRITE, read_prefs, topology->local_threshold_msec);
selected_server =
mongoc_topology_description_select (td.ptr,
MONGOC_SS_WRITE,
read_prefs,
NULL /* chosen read mode */,
topology->local_threshold_msec);

if (selected_server) {
server_id = selected_server->id;
Expand Down Expand Up @@ -2850,7 +2865,8 @@ mongoc_client_select_server (mongoc_client_t *client,
return NULL;
}

sd = mongoc_topology_select (client->topology, optype, prefs, error);
sd = mongoc_topology_select (
client->topology, optype, prefs, NULL /* chosen read mode */, error);
if (!sd) {
return NULL;
}
Expand All @@ -2862,7 +2878,8 @@ mongoc_client_select_server (mongoc_client_t *client,

/* check failed, retry once */
mongoc_server_description_destroy (sd);
sd = mongoc_topology_select (client->topology, optype, prefs, error);
sd = mongoc_topology_select (
client->topology, optype, prefs, NULL /* chosen read mode */, error);
if (sd) {
return sd;
}
Expand Down Expand Up @@ -2997,8 +3014,8 @@ _mongoc_client_end_sessions (mongoc_client_t *client)

while (!mongoc_server_session_pool_is_empty (t->session_pool)) {
prefs = mongoc_read_prefs_new (MONGOC_READ_PRIMARY_PREFERRED);
server_id =
mongoc_topology_select_server_id (t, MONGOC_SS_READ, prefs, &error);
server_id = mongoc_topology_select_server_id (
t, MONGOC_SS_READ, prefs, NULL /* chosen read mode */, &error);

mongoc_read_prefs_destroy (prefs);
if (!server_id) {
Expand Down
4 changes: 3 additions & 1 deletion src/libmongoc/src/mongoc/mongoc-cluster-private.h
Expand Up @@ -44,7 +44,8 @@ BSON_BEGIN_DECLS
typedef struct _mongoc_cluster_node_t {
mongoc_stream_t *stream;
char *connection_address;
/* handshake_sd is a server description created from the handshake on the stream. */
/* handshake_sd is a server description created from the handshake on the
* stream. */
mongoc_server_description_t *handshake_sd;
} mongoc_cluster_node_t;

Expand Down Expand Up @@ -122,6 +123,7 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
bson_t *reply,
bool is_aggr_with_write,
bson_error_t *error);

/**
Expand Down
24 changes: 18 additions & 6 deletions src/libmongoc/src/mongoc/mongoc-cluster.c
Expand Up @@ -2707,6 +2707,7 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
mongoc_topology_t *topology,
mongoc_ss_optype_t optype,
const mongoc_read_prefs_t *read_prefs,
bool *must_use_primary,
bson_error_t *error)
{
uint32_t server_id;
Expand All @@ -2715,14 +2716,14 @@ _mongoc_cluster_select_server_id (mongoc_client_session_t *cs,
server_id = cs->server_id;
if (!server_id) {
server_id = mongoc_topology_select_server_id (
topology, optype, read_prefs, error);
topology, optype, read_prefs, must_use_primary, error);
if (server_id) {
_mongoc_client_session_pin (cs, server_id);
}
}
} else {
server_id =
mongoc_topology_select_server_id (topology, optype, read_prefs, error);
server_id = mongoc_topology_select_server_id (
topology, optype, read_prefs, must_use_primary, error);
/* Transactions Spec: Additionally, any non-transaction operation using a
* pinned ClientSession MUST unpin the session and the operation MUST
* perform normal server selection. */
Expand Down Expand Up @@ -2763,13 +2764,14 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
mongoc_server_stream_t *server_stream;
uint32_t server_id;
mongoc_topology_t *topology = cluster->client->topology;
bool must_use_primary = false;

ENTRY;

BSON_ASSERT (cluster);

server_id = _mongoc_cluster_select_server_id (
cs, topology, optype, read_prefs, error);
cs, topology, optype, read_prefs, &must_use_primary, error);

if (!server_id) {
_mongoc_bson_init_with_transient_txn_error (cs, reply);
Expand All @@ -2779,7 +2781,7 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
if (!mongoc_cluster_check_interval (cluster, server_id)) {
/* Server Selection Spec: try once more */
server_id = _mongoc_cluster_select_server_id (
cs, topology, optype, read_prefs, error);
cs, topology, optype, read_prefs, &must_use_primary, error);

if (!server_id) {
_mongoc_bson_init_with_transient_txn_error (cs, reply);
Expand All @@ -2790,6 +2792,9 @@ _mongoc_cluster_stream_for_optype (mongoc_cluster_t *cluster,
/* connect or reconnect to server if necessary */
server_stream = _mongoc_cluster_stream_for_server (
cluster, server_id, true /* reconnect_ok */, cs, reply, error);
if (server_stream) {
server_stream->must_use_primary = must_use_primary;
}

RETURN (server_stream);
}
Expand All @@ -2799,6 +2804,7 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
const mongoc_read_prefs_t *read_prefs,
mongoc_client_session_t *cs,
bson_t *reply,
bool has_write_stage,
bson_error_t *error)
{
const mongoc_read_prefs_t *prefs_override = read_prefs;
Expand All @@ -2808,7 +2814,13 @@ mongoc_cluster_stream_for_reads (mongoc_cluster_t *cluster,
}

return _mongoc_cluster_stream_for_optype (
cluster, MONGOC_SS_READ, prefs_override, cs, reply, error);
cluster,
/* Narrow down the optype if this is an aggregate op with a write stage */
has_write_stage ? MONGOC_SS_AGGREGATE_WITH_WRITE : MONGOC_SS_READ,
prefs_override,
cs,
reply,
error);
}

mongoc_server_stream_t *
Expand Down
18 changes: 17 additions & 1 deletion src/libmongoc/src/mongoc/mongoc-cmd.c
Expand Up @@ -474,6 +474,13 @@ _mongoc_cmd_parts_assemble_mongos (mongoc_cmd_parts_t *parts,
hedge = mongoc_read_prefs_get_hedge (parts->read_prefs);
}

if (server_stream->must_use_primary) {
/* Server selection has overriden the read mode used to generate this
* server stream. This has effects on the body of the message that we send
* to the server */
mode = MONGOC_READ_PRIMARY;
}

/* Server Selection Spec says:
*
* For mode 'primary', drivers MUST NOT set the secondaryOk wire protocol
Expand Down Expand Up @@ -820,6 +827,7 @@ mongoc_cmd_parts_assemble (mongoc_cmd_parts_t *parts,
const char *cmd_name;
bool is_get_more;
const mongoc_read_prefs_t *prefs_ptr;
mongoc_read_mode_t mode = mongoc_read_prefs_get_mode (parts->read_prefs);
bool ret = false;

ENTRY;
Expand Down Expand Up @@ -878,6 +886,14 @@ mongoc_cmd_parts_assemble (mongoc_cmd_parts_t *parts,
prefs_ptr = parts->read_prefs;
}

mode = mongoc_read_prefs_get_mode (prefs_ptr);
if (server_stream->must_use_primary) {
/* Server selection may have overriden the read mode used to generate this
* server stream. This has effects on the body of the message that we send
* to the server */
mode = MONGOC_READ_PRIMARY;
}

if (server_stream->sd->max_wire_version >= WIRE_VERSION_OP_MSG) {
if (!bson_has_field (parts->body, "$db")) {
BSON_APPEND_UTF8 (&parts->extra, "$db", parts->assembled.db_name);
Expand All @@ -892,7 +908,7 @@ mongoc_cmd_parts_assemble (mongoc_cmd_parts_t *parts,
"Read preference in a transaction must be primary");
GOTO (done);
}
} else if (!IS_PREF_PRIMARY (prefs_ptr) &&
} else if (mode != MONGOC_READ_PRIMARY &&
server_type != MONGOC_SERVER_STANDALONE) {
/* "Type Standalone: clients MUST NOT send the read preference to the
* server" */
Expand Down
9 changes: 7 additions & 2 deletions src/libmongoc/src/mongoc/mongoc-collection.c
Expand Up @@ -861,8 +861,13 @@ mongoc_collection_estimated_document_count (

BSON_ASSERT_PARAM (coll);

server_stream = mongoc_cluster_stream_for_reads (
&coll->client->cluster, read_prefs, NULL, reply, error);
server_stream =
mongoc_cluster_stream_for_reads (&coll->client->cluster,
read_prefs,
NULL,
reply,
/* Not aggregate-with-write */ false,
error);

if (opts && bson_has_field (opts, "sessionId")) {
bson_set_error (error,
Expand Down
11 changes: 11 additions & 0 deletions src/libmongoc/src/mongoc/mongoc-cursor-private.h
Expand Up @@ -25,6 +25,7 @@
#include "mongoc-buffer-private.h"
#include "mongoc-rpc-private.h"
#include "mongoc-server-stream-private.h"
#include "mongoc-cluster-private.h"


BSON_BEGIN_DECLS
Expand Down Expand Up @@ -131,6 +132,16 @@ struct _mongoc_cursor_t {
mongoc_read_prefs_t *read_prefs;
mongoc_write_concern_t *write_concern;

/** If the cursor was created for an operation that might have overridden the
* user's read preferences' read mode, then server selection forced the
* cursor to use a read preference mode of 'primary' server. Whether this
* force occurred is stored here: */
bool must_use_primary;

/** Whether this cursor corresponds to an aggregate command that contains a
* writing-stage */
bool is_aggr_with_write_stage;

bool explicit_session;
mongoc_client_session_t *client_session;

Expand Down
37 changes: 27 additions & 10 deletions src/libmongoc/src/mongoc/mongoc-cursor.c
Expand Up @@ -251,6 +251,7 @@ _mongoc_cursor_new_with_opts (mongoc_client_t *client,
cursor->client = client;
cursor->state = UNPRIMED;
cursor->client_generation = client->generation;
cursor->is_aggr_with_write_stage = false;

bson_init (&cursor->opts);
bson_init (&cursor->error_doc);
Expand Down Expand Up @@ -654,22 +655,33 @@ _mongoc_cursor_fetch_stream (mongoc_cursor_t *cursor)
ENTRY;

if (cursor->server_id) {
/* We already did server selection once before. Reuse the prior
* selection to create a new stream on the same server. */
server_stream =
mongoc_cluster_stream_for_server (&cursor->client->cluster,
cursor->server_id,
true /* reconnect_ok */,
cursor->client_session,
&reply,
&cursor->error);
/* Also restore whether primary read preference was forced by server
* selection */
server_stream->must_use_primary = cursor->must_use_primary;
} else {
server_stream = mongoc_cluster_stream_for_reads (&cursor->client->cluster,
cursor->read_prefs,
cursor->client_session,
&reply,
&cursor->error);
server_stream =
mongoc_cluster_stream_for_reads (&cursor->client->cluster,
cursor->read_prefs,
cursor->client_session,
&reply,
cursor->is_aggr_with_write_stage,
&cursor->error);

if (server_stream) {
/* Remember the selected server_id and whether primary read mode was
* forced so that we can re-create an equivalent server_stream at a
* later time */
cursor->server_id = server_stream->sd->id;
cursor->must_use_primary = server_stream->must_use_primary;
}
}

Expand Down Expand Up @@ -1083,11 +1095,16 @@ _mongoc_cursor_run_command (mongoc_cursor_t *cursor,

mongoc_server_stream_cleanup (server_stream);

server_stream = mongoc_cluster_stream_for_reads (&cursor->client->cluster,
cursor->read_prefs,
cursor->client_session,
reply,
&cursor->error);
BSON_ASSERT (!cursor->is_aggr_with_write_stage &&
"Cannot attempt a retry on an aggregate operation that "
"contains write stages");
server_stream =
mongoc_cluster_stream_for_reads (&cursor->client->cluster,
cursor->read_prefs,
cursor->client_session,
reply,
/* Not aggregate-with-write */ false,
&cursor->error);

if (server_stream &&
server_stream->sd->max_wire_version >= WIRE_VERSION_RETRY_READS) {
Expand Down
1 change: 1 addition & 0 deletions src/libmongoc/src/mongoc/mongoc-read-prefs.c
Expand Up @@ -164,6 +164,7 @@ mongoc_read_prefs_is_valid (const mongoc_read_prefs_t *read_prefs)
return false;
}


return true;
}

Expand Down

0 comments on commit 9eba451

Please sign in to comment.