Skip to content

Commit

Permalink
CDRIVER-2106 Minor refactoring to make accommodating new opcode easier
Browse files Browse the repository at this point in the history
  • Loading branch information
bjori committed Apr 7, 2017
1 parent ed0f73a commit 236d3df
Show file tree
Hide file tree
Showing 4 changed files with 191 additions and 159 deletions.
12 changes: 2 additions & 10 deletions src/mongoc/mongoc-async-cmd.c
Original file line number Diff line number Diff line change
Expand Up @@ -357,19 +357,11 @@ _mongoc_async_cmd_phase_recv_rpc (mongoc_async_cmd_t *acmd)

_mongoc_rpc_swab_from_le (&acmd->rpc);

if (acmd->rpc.header.opcode != MONGOC_OPCODE_REPLY) {
if (!_mongoc_rpc_get_first_document (&acmd->rpc, &acmd->reply)) {
bson_set_error (&acmd->error,
MONGOC_ERROR_PROTOCOL,
MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
"Invalid reply from server.");
return MONGOC_ASYNC_CMD_ERROR;
}

if (!_mongoc_rpc_reply_get_first (&acmd->rpc.reply, &acmd->reply)) {
bson_set_error (&acmd->error,
MONGOC_ERROR_BSON,
MONGOC_ERROR_BSON_INVALID,
"Failed to decode reply BSON document.");
"Invalid reply from server");
return MONGOC_ASYNC_CMD_ERROR;
}

Expand Down
256 changes: 128 additions & 128 deletions src/mongoc/mongoc-cluster.c
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,118 @@ static void
_bson_error_message_printf (bson_error_t *error, const char *format, ...)
BSON_GNUC_PRINTF (2, 3);

/*
*--------------------------------------------------------------------------
*
* _mongoc_cluster_inc_egress_rpc --
*
* Helper to increment the counter for a particular RPC based on
* it's opcode.
*
* Returns:
* None.
*
* Side effects:
* None.
*
*--------------------------------------------------------------------------
*/

static void
_mongoc_cluster_inc_egress_rpc (const mongoc_rpc_t *rpc)
{
mongoc_counter_op_egress_total_inc ();

switch (rpc->header.opcode) {
case MONGOC_OPCODE_DELETE:
mongoc_counter_op_egress_delete_inc ();
break;
case MONGOC_OPCODE_UPDATE:
mongoc_counter_op_egress_update_inc ();
break;
case MONGOC_OPCODE_INSERT:
mongoc_counter_op_egress_insert_inc ();
break;
case MONGOC_OPCODE_KILL_CURSORS:
mongoc_counter_op_egress_killcursors_inc ();
break;
case MONGOC_OPCODE_GET_MORE:
mongoc_counter_op_egress_getmore_inc ();
break;
case MONGOC_OPCODE_REPLY:
mongoc_counter_op_egress_reply_inc ();
break;
case MONGOC_OPCODE_MSG:
mongoc_counter_op_egress_msg_inc ();
break;
case MONGOC_OPCODE_QUERY:
mongoc_counter_op_egress_query_inc ();
break;
case MONGOC_OPCODE_COMPRESSED:
mongoc_counter_op_egress_compressed_inc ();
break;
default:
BSON_ASSERT (false);
break;
}
}

/*
*--------------------------------------------------------------------------
*
* _mongoc_cluster_inc_ingress_rpc --
*
* Helper to increment the counter for a particular RPC based on
* it's opcode.
*
* Returns:
* None.
*
* Side effects:
* None.
*
*--------------------------------------------------------------------------
*/

static void
_mongoc_cluster_inc_ingress_rpc (const mongoc_rpc_t *rpc)
{
mongoc_counter_op_ingress_total_inc ();

switch (rpc->header.opcode) {
case MONGOC_OPCODE_DELETE:
mongoc_counter_op_ingress_delete_inc ();
break;
case MONGOC_OPCODE_UPDATE:
mongoc_counter_op_ingress_update_inc ();
break;
case MONGOC_OPCODE_INSERT:
mongoc_counter_op_ingress_insert_inc ();
break;
case MONGOC_OPCODE_KILL_CURSORS:
mongoc_counter_op_ingress_killcursors_inc ();
break;
case MONGOC_OPCODE_GET_MORE:
mongoc_counter_op_ingress_getmore_inc ();
break;
case MONGOC_OPCODE_REPLY:
mongoc_counter_op_ingress_reply_inc ();
break;
case MONGOC_OPCODE_MSG:
mongoc_counter_op_ingress_msg_inc ();
break;
case MONGOC_OPCODE_QUERY:
mongoc_counter_op_ingress_query_inc ();
break;
case MONGOC_OPCODE_COMPRESSED:
mongoc_counter_op_ingress_compressed_inc ();
break;
default:
BSON_ASSERT (false);
break;
}
}

/* Allows caller to safely overwrite error->message with a formatted string,
* even if the formatted string includes original error->message. */
static void
Expand Down Expand Up @@ -281,27 +393,27 @@ mongoc_cluster_run_command_internal (mongoc_cluster_t *cluster,
&rpc, reply_header_buf, reply_header_size)) {
GOTO (done);
}
doc_len = (size_t) msg_len - reply_header_size;

_mongoc_rpc_swab_from_le (&rpc);
if (rpc.header.opcode != MONGOC_OPCODE_REPLY ||
rpc.reply_header.n_returned != 1) {
if (rpc.header.opcode == MONGOC_OPCODE_REPLY &&
rpc.reply_header.n_returned == 1) {
reply_buf = bson_reserve_buffer (reply_ptr, (uint32_t) doc_len);
BSON_ASSERT (reply_buf);

if (doc_len != mongoc_stream_read (stream,
(void *) reply_buf,
doc_len,
doc_len,
cluster->sockettimeoutms)) {
RUN_CMD_ERR (MONGOC_ERROR_STREAM,
MONGOC_ERROR_STREAM_SOCKET,
"socket error or timeout");
}
} else {
GOTO (done);
}

doc_len = (size_t) msg_len - reply_header_size;
reply_buf = bson_reserve_buffer (reply_ptr, (uint32_t) doc_len);
BSON_ASSERT (reply_buf);

if (doc_len != mongoc_stream_read (stream,
(void *) reply_buf,
doc_len,
doc_len,
cluster->sockettimeoutms)) {
RUN_CMD_ERR (MONGOC_ERROR_STREAM,
MONGOC_ERROR_STREAM_SOCKET,
"socket error or timeout");
}

if (_mongoc_populate_cmd_error (
reply_ptr, cluster->client->error_api_version, error)) {
GOTO (done);
Expand Down Expand Up @@ -1802,118 +1914,6 @@ mongoc_cluster_stream_for_writes (mongoc_cluster_t *cluster,
cluster, MONGOC_SS_WRITE, NULL, error);
}

/*
*--------------------------------------------------------------------------
*
* _mongoc_cluster_inc_egress_rpc --
*
* Helper to increment the counter for a particular RPC based on
* it's opcode.
*
* Returns:
* None.
*
* Side effects:
* None.
*
*--------------------------------------------------------------------------
*/

static BSON_INLINE void
_mongoc_cluster_inc_egress_rpc (const mongoc_rpc_t *rpc)
{
mongoc_counter_op_egress_total_inc ();

switch (rpc->header.opcode) {
case MONGOC_OPCODE_DELETE:
mongoc_counter_op_egress_delete_inc ();
break;
case MONGOC_OPCODE_UPDATE:
mongoc_counter_op_egress_update_inc ();
break;
case MONGOC_OPCODE_INSERT:
mongoc_counter_op_egress_insert_inc ();
break;
case MONGOC_OPCODE_KILL_CURSORS:
mongoc_counter_op_egress_killcursors_inc ();
break;
case MONGOC_OPCODE_GET_MORE:
mongoc_counter_op_egress_getmore_inc ();
break;
case MONGOC_OPCODE_REPLY:
mongoc_counter_op_egress_reply_inc ();
break;
case MONGOC_OPCODE_MSG:
mongoc_counter_op_egress_msg_inc ();
break;
case MONGOC_OPCODE_QUERY:
mongoc_counter_op_egress_query_inc ();
break;
case MONGOC_OPCODE_COMPRESSED:
mongoc_counter_op_egress_compressed_inc ();
break;
default:
BSON_ASSERT (false);
break;
}
}

/*
*--------------------------------------------------------------------------
*
* _mongoc_cluster_inc_ingress_rpc --
*
* Helper to increment the counter for a particular RPC based on
* it's opcode.
*
* Returns:
* None.
*
* Side effects:
* None.
*
*--------------------------------------------------------------------------
*/

static BSON_INLINE void
_mongoc_cluster_inc_ingress_rpc (const mongoc_rpc_t *rpc)
{
mongoc_counter_op_ingress_total_inc ();

switch (rpc->header.opcode) {
case MONGOC_OPCODE_DELETE:
mongoc_counter_op_ingress_delete_inc ();
break;
case MONGOC_OPCODE_UPDATE:
mongoc_counter_op_ingress_update_inc ();
break;
case MONGOC_OPCODE_INSERT:
mongoc_counter_op_ingress_insert_inc ();
break;
case MONGOC_OPCODE_KILL_CURSORS:
mongoc_counter_op_ingress_killcursors_inc ();
break;
case MONGOC_OPCODE_GET_MORE:
mongoc_counter_op_ingress_getmore_inc ();
break;
case MONGOC_OPCODE_REPLY:
mongoc_counter_op_ingress_reply_inc ();
break;
case MONGOC_OPCODE_MSG:
mongoc_counter_op_ingress_msg_inc ();
break;
case MONGOC_OPCODE_QUERY:
mongoc_counter_op_ingress_query_inc ();
break;
case MONGOC_OPCODE_COMPRESSED:
mongoc_counter_op_ingress_compressed_inc ();
break;
default:
BSON_ASSERT (false);
break;
}
}

static bool
_mongoc_cluster_min_of_max_obj_size_sds (void *item, void *ctx)
{
Expand Down
2 changes: 2 additions & 0 deletions src/mongoc/mongoc-rpc-private.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,8 @@ _mongoc_rpc_scatter_reply_header_only (mongoc_rpc_t *rpc,
const uint8_t *buf,
size_t buflen);
bool
_mongoc_rpc_get_first_document (mongoc_rpc_t *rpc, bson_t *reply);
bool
_mongoc_rpc_reply_get_first (mongoc_rpc_reply_t *reply, bson_t *bson);
void
_mongoc_rpc_prep_command (mongoc_rpc_t *rpc,
Expand Down
Loading

0 comments on commit 236d3df

Please sign in to comment.