Skip to content

Commit

Permalink
CDRIVER-2109 Add support for writing OP_COMPRESSED
Browse files Browse the repository at this point in the history
  • Loading branch information
bjori committed Apr 20, 2017
1 parent 546284e commit 3f53e36
Show file tree
Hide file tree
Showing 9 changed files with 319 additions and 6 deletions.
1 change: 1 addition & 0 deletions src/mongoc/mongoc-async-cmd.c
Expand Up @@ -132,6 +132,7 @@ _mongoc_async_cmd_init_send (mongoc_async_cmd_t *acmd, const char *dbname)
acmd->rpc.query.query = bson_get_data (&acmd->cmd);
acmd->rpc.query.fields = NULL;

/* This will always be isMaster, which are not allowed to be compressed */
_mongoc_rpc_gather (&acmd->rpc, &acmd->array);
acmd->iovec = (mongoc_iovec_t *) acmd->array.data;
acmd->niovec = acmd->array.len;
Expand Down
6 changes: 6 additions & 0 deletions src/mongoc/mongoc-cluster-private.h
Expand Up @@ -90,6 +90,12 @@ int32_t
mongoc_cluster_node_max_wire_version (mongoc_cluster_t *cluster,
uint32_t server_id);

size_t
_mongoc_cluster_buffer_iovec (mongoc_iovec_t *iov,
size_t iovcnt,
int skip,
char *buffer);

bool
mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,
mongoc_rpc_t *rpcs,
Expand Down
159 changes: 153 additions & 6 deletions src/mongoc/mongoc-cluster.c
Expand Up @@ -55,6 +55,14 @@
#include "mongoc-uri-private.h"
#include "mongoc-rpc-private.h"

#ifdef MONGOC_ENABLE_COMPRESSION
#ifdef MONGOC_ENABLE_COMPRESSION_ZLIB
#endif
#ifdef MONGOC_ENABLE_COMPRESSION_SNAPPY
#include <snappy-c.h>
#endif
#endif


#undef MONGOC_LOG_DOMAIN
#define MONGOC_LOG_DOMAIN "cluster"
Expand Down Expand Up @@ -201,6 +209,43 @@ _mongoc_cluster_inc_ingress_rpc (const mongoc_rpc_t *rpc)
}
}


size_t
_mongoc_cluster_buffer_iovec (mongoc_iovec_t *iov,
size_t iovcnt,
int skip,
char *buffer)
{
int n;
size_t buffer_offset = 0;
int total_iov_len = 0;
int difference = 0;

for (n = 0; n < iovcnt; n++) {
total_iov_len += iov[n].iov_len;

if (total_iov_len <= skip) {
continue;
}

/* If this iovec starts before the skip, and takes the total count
* beyond the skip, we need to figure out the portion of the iovec
* we should skip passed */
if (total_iov_len - iov[n].iov_len < skip) {
difference = skip - (total_iov_len - iov[n].iov_len);
} else {
difference = 0;
}

memcpy (buffer + buffer_offset,
iov[n].iov_base + difference,
iov[n].iov_len - difference);
buffer_offset += iov[n].iov_len - difference;
}

return buffer_offset;
}

/* Allows caller to safely overwrite error->message with a formatted string,
* even if the formatted string includes original error->message. */
static void
Expand Down Expand Up @@ -280,6 +325,11 @@ mongoc_cluster_run_command_internal (mongoc_cluster_t *cluster,
mongoc_apm_command_succeeded_t succeeded_event;
mongoc_apm_command_failed_t failed_event;
bool ret = false;
#ifdef MONGOC_ENABLE_COMPRESSION
int32_t compressor_id = 0;
size_t output_length = 0;
char *output;
#endif

ENTRY;

Expand Down Expand Up @@ -326,6 +376,46 @@ mongoc_cluster_run_command_internal (mongoc_cluster_t *cluster,
_mongoc_rpc_gather (&rpc, &ar);
_mongoc_rpc_swab_to_le (&rpc);

#ifdef MONGOC_ENABLE_COMPRESSION
if (server_id) {
mongoc_server_stream_t *server_stream = mongoc_cluster_stream_for_server (
cluster, server_id, false /* don't reconnect */, error);
compressor_id =
mongoc_server_description_compressor_id (server_stream->sd);
mongoc_server_stream_cleanup (server_stream);

if (compressor_id) {
size_t allocate = rpc.header.msg_len - 16;
char *data;
int size;

BSON_ASSERT (allocate > 0);
data = bson_malloc0 (allocate);
size = _mongoc_cluster_buffer_iovec (
(mongoc_iovec_t *) ar.data, ar.len, 16, data);
BSON_ASSERT (size);

#ifdef MONGOC_ENABLE_COMPRESSION
output_length = snappy_max_compressed_length (size);
#else
#error \
"FIXME for other compressors.. _mongoc_rpc_compressed_length(compressor, size)?"
#endif
output = (char *) bson_malloc0 (output_length);
if (_mongoc_rpc_compress (
&rpc, compressor_id, data, size, output, output_length)) {
_mongoc_array_destroy (&ar);
_mongoc_array_init (&ar, sizeof (mongoc_iovec_t));
_mongoc_cluster_inc_egress_rpc (&rpc);
_mongoc_rpc_gather (&rpc, &ar);
} else {
MONGOC_WARNING ("Could not compress data");
}
bson_free (data);
}
}
#endif

if (monitored && callbacks->started) {
mongoc_apm_command_started_init (&started_event,
command,
Expand Down Expand Up @@ -505,6 +595,12 @@ mongoc_cluster_run_command_internal (mongoc_cluster_t *cluster,
bson_destroy (reply_ptr);
}

#ifdef MONGOC_ENABLE_COMPRESSION_SNAPPY
if (output_length) {
bson_free (output);
}
#endif

RETURN (ret);
}

Expand Down Expand Up @@ -2195,6 +2291,12 @@ mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,
bool need_gle;
char cmdname[140];
int32_t max_msg_size;
bool ret = false;
#ifdef MONGOC_ENABLE_COMPRESSION
int32_t compressor_id = 0;
size_t output_length;
char *output;
#endif

ENTRY;

Expand All @@ -2210,7 +2312,7 @@ mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,
MONGOC_ERROR_CLIENT,
MONGOC_ERROR_CLIENT_IN_EXHAUST,
"A cursor derived from this client is in exhaust.");
RETURN (false);
GOTO (done);
}

if (!write_concern) {
Expand All @@ -2219,10 +2321,13 @@ mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,

if (!_mongoc_cluster_check_interval (
cluster, server_stream->sd->id, error)) {
RETURN (false);
GOTO (done);
}

_mongoc_array_clear (&cluster->iov);
#ifdef MONGOC_ENABLE_COMPRESSION
compressor_id = mongoc_server_description_compressor_id (server_stream->sd);
#endif

/*
* TODO: We can probably remove the need for sendv and just do send since
Expand All @@ -2232,10 +2337,42 @@ mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,
*/

for (i = 0; i < rpcs_len; i++) {
_mongoc_cluster_inc_egress_rpc (&rpcs[i]);
need_gle = _mongoc_rpc_needs_gle (&rpcs[i], write_concern);
_mongoc_cluster_inc_egress_rpc (&rpcs[i]);
_mongoc_rpc_gather (&rpcs[i], &cluster->iov);

#ifdef MONGOC_ENABLE_COMPRESSION
if (compressor_id) {
size_t allocate = rpcs[i].header.msg_len - 16;
char *data;
int size;

BSON_ASSERT (allocate > 0);
data = bson_malloc0 (allocate);
size = _mongoc_cluster_buffer_iovec (
(mongoc_iovec_t *) cluster->iov.data, cluster->iov.len, 16, data);
BSON_ASSERT (size);

#ifdef MONGOC_ENABLE_COMPRESSION_SNAPPY
output_length = snappy_max_compressed_length (size);
#else
#error \
"FIXME for other compressors.. _mongoc_rpc_compressed_length(compressor, size)?"
#endif
output = (char *) bson_malloc0 (output_length);
if (_mongoc_rpc_compress (
&rpcs[i], compressor_id, data, size, output, output_length)) {
_mongoc_array_destroy (&cluster->iov);
_mongoc_array_init (&cluster->iov, sizeof (mongoc_iovec_t));
_mongoc_cluster_inc_egress_rpc (&rpcs[i]);
_mongoc_rpc_gather (&rpcs[i], &cluster->iov);
} else {
MONGOC_WARNING ("Could not compress data");
}
bson_free (data);
}
#endif

max_msg_size = mongoc_server_stream_max_msg_size (server_stream);

if (rpcs[i].header.msg_len > max_msg_size) {
Expand All @@ -2246,7 +2383,7 @@ mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,
"max allowed message size. Was %u, allowed %u.",
rpcs[i].header.msg_len,
max_msg_size);
RETURN (false);
GOTO (done);
}

if (need_gle) {
Expand Down Expand Up @@ -2296,7 +2433,7 @@ mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,
iovcnt,
cluster->sockettimeoutms,
error)) {
RETURN (false);
GOTO (done);
}

if (cluster->client->topology->single_threaded) {
Expand All @@ -2308,7 +2445,17 @@ mongoc_cluster_sendv_to_server (mongoc_cluster_t *cluster,
}
}

RETURN (true);
ret = true;

done:

#ifdef MONGOC_ENABLE_COMPRESSION
if (compressor_id) {
bson_free (output);
}
#endif

RETURN (ret);
}


Expand Down
9 changes: 9 additions & 0 deletions src/mongoc/mongoc-rpc-private.h
Expand Up @@ -151,6 +151,15 @@ _mongoc_populate_cmd_error (const bson_t *doc,

bool
_mongoc_rpc_decompress (mongoc_rpc_t *rpc, uint8_t *buf, size_t buflen);

bool
_mongoc_rpc_compress (mongoc_rpc_t *rpc,
int compressor_id,
char *data,
size_t size,
char *output,
size_t output_length);

BSON_END_DECLS


Expand Down
34 changes: 34 additions & 0 deletions src/mongoc/mongoc-rpc.c
Expand Up @@ -676,6 +676,40 @@ _mongoc_rpc_decompress (mongoc_rpc_t *rpc, uint8_t *buf, size_t buflen)
}
return false;
}

bool
_mongoc_rpc_compress (mongoc_rpc_t *rpc,
int compressor_id,
char *data,
size_t size,
char *output,
size_t output_length)
{
switch (compressor_id) {
case MONGOC_COMPRESSOR_SNAPPY_ID:
#ifdef MONGOC_ENABLE_COMPRESSION_SNAPPY
if (snappy_compress (data, size, output, &output_length) == SNAPPY_OK) {
rpc->header.msg_len = 0;
rpc->compressed.original_opcode = rpc->header.opcode;
rpc->header.opcode = MONGOC_OPCODE_COMPRESSED;
rpc->compressed.uncompressed_size = size;
rpc->compressed.compressor_id = compressor_id;
rpc->compressed.compressed_message = (const uint8_t *) output;
rpc->compressed.compressed_message_len = output_length;
return true;
}
break;
#else
MONGOC_ERROR ("Client attempting to use compress with snappy, but snappy "
"compression is not compiled in");
return false;
#endif
default:
return false;
}

return false;
}
bool
_mongoc_rpc_scatter (mongoc_rpc_t *rpc, const uint8_t *buf, size_t buflen)
{
Expand Down
4 changes: 4 additions & 0 deletions src/mongoc/mongoc-server-description-private.h
Expand Up @@ -76,6 +76,10 @@ struct _mongoc_server_description_t {
int64_t set_version;
bson_oid_t election_id;
int64_t last_write_date_ms;

#ifdef MONGOC_ENABLE_COMPRESSION
bson_t compressors;
#endif
};

void
Expand Down

0 comments on commit 3f53e36

Please sign in to comment.