diff --git a/devel/config/mongod-3.6-compression-zlib.yml b/devel/config/mongod-3.6-compression-zlib.yml new file mode 100644 index 00000000..bb1b89c3 --- /dev/null +++ b/devel/config/mongod-3.6-compression-zlib.yml @@ -0,0 +1,7 @@ +--- +type: single +default_args: -v --bind_ip 0.0.0.0 --enableMajorityReadConcern --networkMessageCompressors zlib +default_version: 3.6 +mongod: + - name: host1 + diff --git a/devel/t-dynamic/OP_COMPRESSION.t b/devel/t-dynamic/OP_COMPRESSION.t new file mode 100644 index 00000000..a7cfbf7e --- /dev/null +++ b/devel/t-dynamic/OP_COMPRESSION.t @@ -0,0 +1,123 @@ +# +# Copyright 2015 MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use strict; +use warnings; +use Test::More 0.96; +use Test::Fatal; +use Test::Deep qw/!blessed/; +use UUID::URandom qw/create_uuid/; + +use utf8; +use Tie::IxHash; + +use MongoDB; +use MongoDB::Error; + +use lib "t/lib"; +use lib "devel/lib"; + +use if $ENV{MONGOVERBOSE}, qw/Log::Any::Adapter Stderr/; + +use MongoDBTest::Orchestrator; + +use MongoDBTest qw/ + build_client + get_test_db + server_version + server_type + clear_testdbs + get_unique_collection + uuid_to_string +/; + +my $orc = +MongoDBTest::Orchestrator->new( + config_file => "devel/config/mongod-3.6-compression-zlib.yml" ); +$orc->start; + +$ENV{MONGOD} = $orc->as_uri; + +print $ENV{MONGOD}, "\n"; + +my $conn = build_client( + compressors => ['zlib'], + zlib_compression_level => 9, +); +my $testdb = get_test_db($conn); +my $server_version = server_version($conn); +my $server_type = server_type($conn); +my $coll = $testdb->get_collection('test_collection'); + +plan skip_all => "Requires MongoDB 3.6" + if $server_version < v3.6.0; + +my $server = $orc->get_server('host1'); +my $logfile = $server->logfile; + +open my $logfile_fh, '<', $server->logfile + or die "Unable to read $logfile"; + +my @init_messages = collect_log_messages(); +ok scalar(grep { /zlib is supported/ } @init_messages), + 'zlib is supported'; + +$coll->insert_one({ value => 23 }); +subtest 'compression for insert one' => \&subtest_roundtrip; + +$coll->insert_many([{ value => 24 }, { value => 25 }]); +subtest 'compression for insert many' => \&subtest_roundtrip; + +$testdb->run_command([getnonce => 1]); +subtest 'no compression on getnonce' => \&subtest_no_compression; + +subtest 'connection string' => sub { + my $client = MongoDB->connect( + $orc->as_uri.'/?compressors=zlib&zlibCompressionLevel=9', + ); + is_deeply $client->compressors, ['zlib'], 'compressors'; + is $client->zlib_compression_level, 9, 'zlib compression level'; +}; + +clear_testdbs; + +done_testing; + +sub subtest_no_compression { + my @messages = collect_log_messages(); + is scalar(grep { /\bdecompressing message with zlib/i } @messages), 0, + 'no decompressed message'; + is scalar(grep { /\bcompressing message with zlib/i } @messages), 0, + 'no compressed message'; +} + +sub subtest_roundtrip { + my @messages = collect_log_messages(); + is scalar(grep { /\bdecompressing message with zlib/i } @messages), 1, + 'decompressed message'; + is scalar(grep { /\bcompressing message with zlib/i } @messages), 1, + 'compressed message'; +} + +sub collect_log_messages { + my @messages; + while (defined(my $line = <$logfile_fh>)) { + chomp $line; + push @messages, $line + if $line =~ m{zlib}; + } + return @messages; +} diff --git a/lib/MongoDB/MongoClient.pm b/lib/MongoDB/MongoClient.pm index 267b39ce..5664fb95 100644 --- a/lib/MongoDB/MongoClient.pm +++ b/lib/MongoDB/MongoClient.pm @@ -53,12 +53,14 @@ use MongoDB::_Types qw( AuthMechanism Boolish BSONCodec + CompressionType Document HeartbeatFreq MaxStalenessNum NonNegNum ReadPrefMode ReadPreference + ZlibCompressionLevel ); use Types::Standard qw( CodeRef @@ -230,6 +232,50 @@ sub _build_bson_codec { return BSON->new(); } +=attr compressors + +An array reference of compression type names. Currently only C +is supported. + +=cut + +has compressors => ( + is => 'lazy', + isa => ArrayRef[CompressionType], + builder => '_build_compressors', +); + +sub _build_compressors { + my ($self) = @_; + return $self->__uri_or_else( + u => 'compressors', + e => 'compressors', + d => [], + ); +} + +=attr zlib_compression_level + +An integer from C<-1> to C<9> specifying the compression level to use +when L is set to C. + +=cut + +has zlib_compression_level => ( + is => 'lazy', + isa => ZlibCompressionLevel, + builder => '_build_zlib_compression_level', +); + +sub _build_zlib_compression_level { + my ($self) = @_; + return $self->__uri_or_else( + u => 'zlibcompressionlevel', + e => 'zlib_compression_level', + d => -1, + ); +} + =attr connect_timeout_ms This attribute specifies the amount of time in milliseconds to wait for a @@ -1222,6 +1268,8 @@ sub _build__topology { ( ref( $self->ssl ) eq 'HASH' ? ( SSL_options => $self->ssl ) : () ), }, monitoring_callback => $self->monitoring_callback, + compression => $self->compressors, + zlib_compression_level => $self->zlib_compression_level, ); } @@ -2040,6 +2088,7 @@ The currently supported connection string options are: *appName *authMechanism *authMechanism.SERVICE_NAME +*compressors *connectTimeoutMS *journal *readPreference @@ -2048,6 +2097,7 @@ The currently supported connection string options are: *ssl *w *wtimeoutMS +*zlibCompressionLevel See the official MongoDB documentation on connection strings for more on the URI format and connection string options: diff --git a/lib/MongoDB/Op/_Command.pm b/lib/MongoDB/Op/_Command.pm index ab5beb5d..5c345fe6 100644 --- a/lib/MongoDB/Op/_Command.pm +++ b/lib/MongoDB/Op/_Command.pm @@ -89,9 +89,16 @@ sub execute { $self->publish_command_started( $link, $self->{query}, $request_id ) if $self->monitoring_callback; + my %write_opt; + my $command_name = do { + my $command = _to_tied_ixhash($self->{query}); + lc tied(%$command)->Keys(0); + }; + $write_opt{disable_compression} = !is_compressible($command_name); + my $result; eval { - $link->write( $op_bson ), + $link->write( $op_bson, %write_opt ), ( $result = MongoDB::_Protocol::parse_reply( $link->read, $request_id ) ); }; if ( my $err = $@ ) { @@ -117,4 +124,20 @@ sub execute { return $res; } +sub is_compressible { + my $command_name = lc shift; + return not grep { $_ eq $command_name } qw( + ismaster + saslstart + saslcontinue + getnonce + authenticate + createuser + updateuser + copydbsaslstart + copydbgetnonce + copydb + ); +} + 1; diff --git a/lib/MongoDB/_Link.pm b/lib/MongoDB/_Link.pm index 8a104b85..e73898e1 100644 --- a/lib/MongoDB/_Link.pm +++ b/lib/MongoDB/_Link.pm @@ -35,6 +35,7 @@ use Socket qw/SOL_SOCKET SO_KEEPALIVE SO_RCVBUF IPPROTO_TCP TCP_NODELAY AF_INET/ use Time::HiRes qw/time/; use MongoDB::Error; use MongoDB::_Constants; +use MongoDB::_Protocol; use MongoDB::_Types qw( Boolish HostAddress @@ -422,7 +423,20 @@ sub is_connected { } sub write { - my ( $self, $buf ) = @_; + my ( $self, $buf, %write_opt ) = @_; + + if ( + !$write_opt{disable_compression} + && $self->server + && $self->server->compressor + && MongoDB::_Protocol::is_compressible($buf) + ) { + $buf = MongoDB::_Protocol::compress( + $buf, + $self->server->compressor, + zlib_compression_level => $self->server->zlib_compression_level, + ); + } my ( $len, $off, $pending, $nfound, $r ) = ( length($buf), 0 ); diff --git a/lib/MongoDB/_Protocol.pm b/lib/MongoDB/_Protocol.pm index 877a125b..9316323a 100644 --- a/lib/MongoDB/_Protocol.pm +++ b/lib/MongoDB/_Protocol.pm @@ -36,6 +36,7 @@ use constant { OP_GET_MORE => 2005, # Get more data from a query. See Cursors OP_DELETE => 2006, # Delete documents OP_KILL_CURSORS => 2007, # Tell database client is done with a cursor + OP_COMPRESSED => 2012, # wire compression }; use constant { @@ -62,6 +63,7 @@ use constant { P_DELETE => PERL58 ? "l5Z*l" : "l<5Z*l<", P_KILL_CURSORS => PERL58 ? "l6(a8)*" : "l<6(a8)*", P_REPLY_HEADER => PERL58 ? "l5a8l2" : "l<5a8l<2", + P_COMPRESSED => PERL58 ? "l6C" : "l<6C", }; # struct MsgHeader { @@ -77,6 +79,126 @@ use constant { # my $msg = pack( P_INSERT, 0, int(rand(2**32-1)), 0, OP_INSERT, 0, $ns ) . $bson_docs; # substr( $msg, 0, 4, pack( P_INT32, length($msg) ) ); +use constant { + # length for MsgHeader + P_HEADER_LENGTH => + length(pack P_HEADER, 0, 0, 0, 0), + # length for OP_COMPRESSED + P_COMPRESSED_PREFIX_LENGTH => + length(pack P_COMPRESSED, 0, 0, 0, 0, 0, 0, 0), +}; + +# struct OP_COMPRESSED { +# MsgHeader header; // standard message header +# int32_t originalOpcode; // wrapped op code +# int32_t uncompressedSize; // size of deflated wo. header +# uint8_t compressorId; // compressor +# char* compressedMessage; // compressed contents +# }; + +# compressor dispatchers +# $name => [ +# $compressor_id, +# $compress, # $compress->($buf, $opts) +# $decompress, # $decompress->($buf) +# ], +my %COMPRESSOR = ( + none => [ + 0, + sub { shift }, + sub { shift }, + ], + zlib => [ + 2, + sub { + my ($buf, $opts) = @_; + + my $level = $opts->{zlib_compression_level}; + $level = undef + if defined $level and $level < 0; + + require Compress::Zlib; + return Compress::Zlib::compress( + $buf, + defined($level) ? $level : (), + ); + }, + sub { + require Compress::Zlib; + return Compress::Zlib::uncompress(shift); + }, + ], +); + +# determines if an op code should be compressed +sub is_compressible { + my ($msg) = @_; + + my ($len, $request_id, $response_to, $op_code) + = unpack(P_HEADER, $msg); + + return grep { $_ == $op_code } + OP_QUERY, + OP_MSG, + OP_REPLY, + OP_INSERT, + OP_UPDATE, + OP_DELETE, + OP_GET_MORE, + OP_KILL_CURSORS; +} + +# compress message +sub compress { + my ($msg, $compressor, %opts) = @_; + + my ($comp_id, $comp_cb) = @{ $COMPRESSOR{$compressor} || [] } + or MongoDB::ProtocolError->throw("Unknown compressor '$compressor'"); + + my ($len, $request_id, $response_to, $op_code) + = unpack(P_HEADER, $msg); + + $msg = substr $msg, P_HEADER_LENGTH; + + my $msg_comp = pack( + P_COMPRESSED, + 0, $request_id, $response_to, OP_COMPRESSED, + $op_code, + length($msg), + $comp_id, + ).$comp_cb->($msg, \%opts); + + substr($msg_comp, 0, 4, pack(P_INT32, length($msg_comp))); + return $msg_comp; +} + +# attempt to uncompress message +# messages that aren't OP_COMPRESSED are returned as-is +sub try_uncompress { + my ($msg) = @_; + + my ($len, $request_id, $response_to, $op_code, $orig_op_code, undef, $comp_id) + = unpack(P_COMPRESSED, $msg); + + return $msg + if $op_code != OP_COMPRESSED; + + $msg = substr $msg, P_COMPRESSED_PREFIX_LENGTH; + + for my $comp_name (keys %COMPRESSOR) { + if ($COMPRESSOR{$comp_name}[0] == $comp_id) { + my $decomp_msg = $COMPRESSOR{$comp_name}[2]->($msg); + my $done = + pack(P_HEADER, 0, $request_id, $response_to, $orig_op_code) + .$decomp_msg; + substr($done, 0, 4, pack(P_INT32, length($done))); + return $done; + } + } + + MongoDB::ProtocolError->throw("Unknown compressor ID '$comp_id'"); +} + # struct OP_UPDATE { # MsgHeader header; // standard message header # int32 ZERO; // 0 - reserved for future use @@ -290,6 +412,8 @@ sub parse_reply { MongoDB::ProtocolError->throw("response was truncated") if length($msg) < MIN_REPLY_LENGTH; + $msg = try_uncompress($msg); + my ( $len, $msg_id, $response_to, $opcode, $bitflags, $cursor_id, $starting_from, $number_returned diff --git a/lib/MongoDB/_Server.pm b/lib/MongoDB/_Server.pm index 4a61498b..a14a88c6 100644 --- a/lib/MongoDB/_Server.pm +++ b/lib/MongoDB/_Server.pm @@ -28,6 +28,7 @@ use MongoDB::_Types qw( HostAddress ServerType HostAddressList + ZlibCompressionLevel ); use Types::Standard qw( InstanceOf @@ -82,6 +83,24 @@ has is_master => ( default => sub { {} }, ); +has compressor => ( + is => 'lazy', + isa => Maybe[Str], + builder => '_build_compressor', +); + +has zlib_compression_level => ( + is => 'ro', + isa => ZlibCompressionLevel, + default => sub { -1 }, +); + +sub _build_compressor { + my ($self) = @_; + my ($comp) = @{ ($self->is_master || {})->{compression} || [] }; + return $comp; +} + # type: a ServerType enum value. Default Unknown. Definitions from the Server # Discovery and Monitoring Spec: # - Unknown Initial, or after a network error or failed ismaster call, or "ok: 1" diff --git a/lib/MongoDB/_Topology.pm b/lib/MongoDB/_Topology.pm index 5e28b544..501354f3 100644 --- a/lib/MongoDB/_Topology.pm +++ b/lib/MongoDB/_Topology.pm @@ -32,9 +32,11 @@ use MongoDB::_Link; use MongoDB::_Types qw( Boolish BSONCodec + CompressionType Document NonNegNum TopologyType + ZlibCompressionLevel to_IxHash ); use Types::Standard qw( @@ -97,6 +99,18 @@ has monitoring_callback => ( isa => Maybe[CodeRef], ); +has compression => ( + is => 'ro', + isa => ArrayRef[CompressionType], + default => sub { [] }, +); + +has zlib_compression_level => ( + is => 'ro', + isa => ZlibCompressionLevel, + default => sub { -1 }, +); + has type => ( is => 'ro', writer => '_set_type', @@ -1001,6 +1015,9 @@ sub _generate_ismaster_request { push @opts, saslSupportedMechs => $db_user; } } + if (@{ $self->compression }) { + push @opts, compression => $self->compression; + } return [ ismaster => 1, @opts ]; } @@ -1053,6 +1070,7 @@ sub _update_topology_from_link { last_update_time => $end_time, rtt_sec => $rtt_sec, is_master => $is_master, + zlib_compression_level => $self->zlib_compression_level, ); $self->_update_topology_from_server_desc( $link->address, $new_server ); diff --git a/lib/MongoDB/_Types.pm b/lib/MongoDB/_Types.pm index 9e3ddded..97a8cfb9 100644 --- a/lib/MongoDB/_Types.pm +++ b/lib/MongoDB/_Types.pm @@ -32,6 +32,8 @@ use Type::Library Booleanpm BSONCodec ClientSession + CompressionType + ZlibCompressionLevel ConnectType CursorType DBRefColl @@ -76,6 +78,7 @@ use Types::Standard qw( ArrayRef Dict HashRef + Int Maybe Num Optional @@ -112,6 +115,12 @@ duck_type BSONCodec, [ qw/encode_one decode_one/ ]; class_type ClientSession, { class => 'MongoDB::ClientSession' }; +enum CompressionType, [qw/zlib/]; + +declare ZlibCompressionLevel, as Int, + where { $_ >= -1 && $_ <= 9 }, + message { "zlib compression value must be value from -1 to 9" }; + enum ConnectType, [qw/replicaSet direct none/]; enum CursorType, [qw/non_tailable tailable tailable_await/]; diff --git a/lib/MongoDB/_URI.pm b/lib/MongoDB/_URI.pm index 63ff0af6..27d03a62 100644 --- a/lib/MongoDB/_URI.pm +++ b/lib/MongoDB/_URI.pm @@ -99,6 +99,7 @@ sub _build_valid_options { authMechanism authMechanismProperties authSource + compressors connectTimeoutMS connect heartbeatFrequencyMS @@ -118,6 +119,7 @@ sub _build_valid_options { w wTimeoutMS readConcernLevel + zlibCompressionLevel ) }; } @@ -185,6 +187,9 @@ sub _parse_options { if ( $lc_k eq 'authmechanismproperties' ) { $parsed{$lc_k} = _parse_doc( $k, $v ); } + elsif ( $lc_k eq 'compressors' ) { + $parsed{$lc_k} = [split m{,}, $v, -1]; + } elsif ( $lc_k eq 'authsource' ) { $result->{db_name} = $v; $parsed{$lc_k} = $v; diff --git a/t/lib/MongoDBTest.pm b/t/lib/MongoDBTest.pm index 6c9f991f..9f7b1f0c 100644 --- a/t/lib/MongoDBTest.pm +++ b/t/lib/MongoDBTest.pm @@ -78,6 +78,11 @@ sub build_client { ); } + # allow whole test suite to be run with compression enabled + if (my $comp = $ENV{PERL_MONGO_TEST_COMPRESSION}) { + $args{compressors} ||= [$comp]; + } + # long query timeout may help spurious failures on heavily loaded CI machines return MongoDB->connect( $host,