From 91203ff0ed16c4a45c2fd5fa3d3592722ec2f17c Mon Sep 17 00:00:00 2001 From: Thomas Bloor Date: Wed, 4 Jul 2018 17:23:18 +0100 Subject: [PATCH 1/9] PERL-789 Section encode/decode in _Protocol with tests --- lib/MongoDB/_Protocol.pm | 218 ++++++++++++++++++++++++++++++++++++++- t/protocol_op_msg.t | 218 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 435 insertions(+), 1 deletion(-) create mode 100644 t/protocol_op_msg.t diff --git a/lib/MongoDB/_Protocol.pm b/lib/MongoDB/_Protocol.pm index fd38ff7c..aac51e1c 100644 --- a/lib/MongoDB/_Protocol.pm +++ b/lib/MongoDB/_Protocol.pm @@ -28,7 +28,6 @@ use Compress::Zlib (); use constant { OP_REPLY => 1, # Reply to a client request. responseTo is set - OP_MSG => 1000, # generic msg command followed by a string OP_UPDATE => 2001, # update document OP_INSERT => 2002, # insert new document RESERVED => 2003, # formerly used for OP_GET_BY_OID @@ -37,6 +36,7 @@ use constant { OP_DELETE => 2006, # Delete documents OP_KILL_CURSORS => 2007, # Tell database client is done with a cursor OP_COMPRESSED => 2012, # wire compression + OP_MSG => 2013, # generic bi-directional op code }; use constant { @@ -64,6 +64,8 @@ use constant { P_KILL_CURSORS => PERL58 ? "l6(a8)*" : "l<6(a8)*", P_REPLY_HEADER => PERL58 ? "l5a8l2" : "l<5a8l<2", P_COMPRESSED => PERL58 ? "l6C" : "l<6C", + P_MSG => PERL58 ? "l5" : "l<5", + P_MSG_PL_1 => PERL58 ? "lZ*" : "l length(pack P_COMPRESSED, 0, 0, 0, 0, 0, 0, 0), + P_MSG_PREFIX_LENGTH => + length(pack P_MSG, 0, 0, 0, 0, 0), }; +# struct OP_MSG { +# MsgHeader header; // standard message header, with opCode 2013 +# uint32 flagBits; +# Section+ sections; +# [uint32 checksum;] +# }; +# +# struct Section { +# uint8 payloadType; +# union payload { +# document document; // payloadType == 0 +# struct sequence { // payloadType == 1 +# int32 size; +# cstring identifier; +# document* documents; +# }; +# }; +# }; + +use constant { + P_SECTION_PAYLOAD_TYPE => "C", + P_SECTION_SEQUENCE_SIZE => PERL58 ? "l" : "l<", +}; + +use constant { + P_SECTION_HEADER => P_SECTION_PAYLOAD_TYPE . P_SECTION_SEQUENCE_SIZE, + P_SECTION_PAYLOAD_TYPE_LENGTH => length( pack P_SECTION_PAYLOAD_TYPE, 0 ), + P_SECTION_SEQUENCE_SIZE_LENGTH => length( pack P_SECTION_SEQUENCE_SIZE, 0 ), +}; + +# TODO this really isnt the place for this... +sub maybe_split_payload { + my ( $codec, $cmd ) = @_; + + # TODO This is probably going to explode + my %th_cmd; + tie %th_cmd, "Tie::IxHash", @$cmd; + + my $split_commands = { + insert => 'documents', + update => 'updates', + delete => 'deletes', + }; + + my $split_docs = $split_commands->{$cmd->[0]}; + if ( defined $split_docs ) { + # can remap documents + my $docs = delete $th_cmd{ $split_docs }; + + my $packed_pl_1 = pack ( P_MSG_PL_1, 0, $split_docs ) + . join( '', ( map { $codec->encode_one( $_ ) } @$docs ) ); + + substr( $packed_pl_1, 0, 4, pack( P_INT32, length ($packed_pl_1) ) ); + + $packed_pl_1 = pack( 'C', 1 ) . $packed_pl_1; + + my $packed_pl_0 = $codec->encode_one( \%th_cmd ); + + $packed_pl_0 = pack( 'C', 0 ) . $packed_pl_0; + + return [ $packed_pl_0, $packed_pl_1 ]; + } + + # Drop through if nothings happened yet + return [ $codec->encode_one( $cmd ) ]; +}; + +=method encode_section( $payload_type, $identifier, @documents ) + + MongoDB::_Protocol::encode_section( 0, undef, $doc ); + MongoDB::_Protocol::encode_section( 1, $identifier, $doc, $doc2, $doc3 ); + +Encodes a section for C as needed for each type of payload. Note that +payload type 0 only accepts one document, an exception will be thrown if more +than one is passed. + +In a payload type 0, the identifier is not required (and is ignored). In a +payload type 1, this is used as the identifier in the sequence for the struct. + +=cut + +sub encode_section { + my ( $type, $ident, @docs ) = @_; + + my $pl; + + if ( $type == 0 ) { + MongoDB::ProtocolError->throw( + "Creating an OP_MSG Section Payload 0 with multiple documents is not supported") + if scalar( @docs ) > 1; + $pl = $docs[0]; + } elsif ( $type == 1 ) { + # Add size and ident placeholders + $pl = pack( P_MSG_PL_1, 0, $ident ) + . join( '', @docs ); + # calculate size + substr( $pl, 0, 4, pack( P_SECTION_SEQUENCE_SIZE, length( $pl ) ) ); + } else { + MongoDB::ProtocolError->throw("Encode: Unsupported section payload type"); + } + + # Add payload type prefix + $pl = pack( P_SECTION_PAYLOAD_TYPE, $type ) . $pl; + + return $pl; +} + +=method decode_section( $encoded ) + +Peforms the exact oposite of encode_section - takes an encoded section and +returns type, identifier (if applicable) and the documents contained. + +=cut + +sub decode_section { + my $enc = shift; + my ( $type, $ident, @docs ); + + # first, extract the type + ( $type ) = unpack( 'C', $enc ); + my $payload = substr( $enc, P_SECTION_PAYLOAD_TYPE_LENGTH ); + + if ( $type == 0 ) { + # payload is actually the document + push @docs, $payload; + } elsif ( $type == 1 ) { + # Pull size off and double check + my ( $pl_size ) = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); + unless ( $pl_size == length( $payload ) ) { + MongoDB::ProtocolError->throw("Decode: Section size incorrect"); + } + $payload = substr( $payload, P_SECTION_SEQUENCE_SIZE_LENGTH ); + # Pull out then remove + ( $ident ) = unpack( 'Z*', $payload ); + $payload = substr( $payload, length ( pack 'Z*', $ident ) ); + + while ( length $payload ) { + my $doc_size = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); + my $doc = substr( $payload, 0, $doc_size ); + $payload = substr( $payload, $doc_size ); + push @docs, $doc; + } + } else { + MongoDB::ProtocolError->throw("Decode: Unsupported section payload type"); + } + + return ( $type, $ident, @docs ); +} + +=method join_sections + + MongoDB::_Protocol::join_sections( + [ 0, undef, $doc ], [ 1, 'documents', $doc, $doc2 ] ); + +Joins an array of sections ready for passing to encode_sections. + +=cut + +sub join_sections { + my ( @sections ) = @_; + + my $msg = join ('', ( map { encode_section( @$_ ) } @sections ) ); + + return $msg; +} + +=method split_sections( $msg ) + +Does the exact opposite of join_sections. + +=cut + +sub split_sections { + my $msg = shift; + my @sections; + while ( length $msg ) { + # get first section length + my ( undef, $section_length ) = unpack( P_SECTION_HEADER, $msg ); + + # Add the payload type length as we reached over it for the length + my $section = substr( $msg, 0, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); + push @sections, [ decode_section( $section ) ]; + + $msg = substr( $msg, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); + } + + return @sections; +} + +use constant { + MSG_FB_CHECKSUM => 0, + MSG_FB_MORE_TO_COME => 1, +}; + +sub write_msg { + my ( $sections, $flags ) = @_; + my $flagbits = 0; + # checksum is reserved for future use + if ( $flags ) { + $flagbits = + ( $flags->{checksum_present} ? 1 << MSG_FB_CHECKSUM : 0 ) + | ( $flags->{more_to_come} ? 1 << MSG_FB_MORE_TO_COME : 0 ); + } + + my $request_id = int( rand( MAX_REQUEST_ID ) ); + + my $msg = pack( P_MSG, 0, $request_id, 0, OP_MSG, 0 ) + . join ( '', @$sections ); + substr( $msg, 0, 4, pack( P_INT32, length($msg) ) ); + return ( $msg, $request_id ); +} + # struct OP_COMPRESSED { # MsgHeader header; // standard message header # int32_t originalOpcode; // wrapped op code diff --git a/t/protocol_op_msg.t b/t/protocol_op_msg.t new file mode 100644 index 00000000..ef62197e --- /dev/null +++ b/t/protocol_op_msg.t @@ -0,0 +1,218 @@ +# Copyright 2018 - present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +use strict; +use warnings; +use utf8; +use Test::More; +use Test::Fatal; + +use MongoDB::_Protocol; +use BSON; + +my $codec = BSON->new(); + +# struct Section { +# uint8 payloadType; +# union payload { +# document document; // payloadType == 0 +# struct sequence { // payloadType == 1 +# int32 size; +# cstring identifier; +# document* documents; +# }; +# }; +# }; + +subtest 'encode section' => sub { + my $doc = $codec->encode_one([ test => 'document' ]); + subtest 'payload 0' => sub { + my $got_section = MongoDB::_Protocol::encode_section( 0, undef, $doc ); + my $expected_section = "\0" . $doc; + + is $got_section, $expected_section, 'encode payload 0 correctly'; + + ok exception{ MongoDB::_Protocol::encode_section( 0, undef, $doc, $doc ) }, 'multiple docs in payload 0 causes error'; + }; + + subtest 'payload 1 single doc' => sub { + my $got_section = MongoDB::_Protocol::encode_section( 1, 'documents', $doc ); + my $expected_section = "\1&\0\0\0" . "documents\0" . $doc; + + is $got_section, $expected_section, 'encode payload 1 correctly'; + }; + + subtest 'payload 1 multiple doc' => sub { + my $got_section = MongoDB::_Protocol::encode_section( 1, 'documents', $doc, $doc ); + my $expected_section = "\1>\0\0\0" . "documents\0" . $doc . $doc; + + is $got_section, $expected_section, 'encode payload 1 correctly'; + }; +}; + +subtest 'decode section' => sub { + my $doc = $codec->encode_one([ test => 'document' ]); + subtest 'payload 0' => sub { + my $encoded = "\0" . $doc; + my @got_decoded = MongoDB::_Protocol::decode_section( $encoded ); + my @expected_decoded = ( 0, undef, $doc ); + + is_deeply \@got_decoded, \@expected_decoded, 'decoded payload 0 correctly'; + }; + + subtest 'payload 1' => sub { + my $encoded = "\1&\0\0\0" ."documents\0" . $doc; + my @got_decoded = MongoDB::_Protocol::decode_section( $encoded ); + my @expected_decoded = ( 1, 'documents', $doc ); + + + is_deeply \@got_decoded, \@expected_decoded, 'decoded payload 1 correctly'; + }; + + subtest 'payload 1 multiple docs' => sub { + my $encoded = "\1>\0\0\0" ."documents\0" . $doc . $doc; + my @got_decoded = MongoDB::_Protocol::decode_section( $encoded ); + my @expected_decoded = ( 1, 'documents', $doc, $doc ); + + is_deeply \@got_decoded, \@expected_decoded, 'decoded payload 1 correctly'; + }; +}; + +subtest 'join sections' => sub { + my $doc = $codec->encode_one([ test => 'document' ]); + subtest 'payload 0' => sub { + my @sections = ( + [ 0, undef, $doc ], + ); + my $got_sections = MongoDB::_Protocol::join_sections( @sections ); + my $expected_sections = "\0" . $doc; + + is_deeply $got_sections, $expected_sections, 'joined correctly'; + }; + + subtest 'payload 0 + 1' => sub { + subtest 'single document' => sub { + my @sections = ( + [ 0, undef, $doc ], + [ 1, 'documents', $doc ], + ); + my $got_sections = MongoDB::_Protocol::join_sections( @sections ); + my $expected_sections = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; + + is_deeply $got_sections, $expected_sections, 'joined correctly'; + }; + + subtest 'multiple documents' => sub { + my @sections = ( + [ 0, undef, $doc ], + [ 1, 'documents', $doc, $doc ], + ); + my $got_sections = MongoDB::_Protocol::join_sections( @sections ); + my $expected_sections = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; + + is_deeply $got_sections, $expected_sections, 'joined correctly'; + }; + }; + + subtest 'payload 0 + multiple 1' => sub { + subtest 'single document' => sub { + my @sections = ( + [ 0, undef, $doc ], + [ 1, 'documents', $doc ], + [ 1, 'documents', $doc ], + ); + my $got_sections = MongoDB::_Protocol::join_sections( @sections ); + my $expected_sections = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; + + is_deeply $got_sections, $expected_sections, 'joined correctly'; + }; + + subtest 'multiple documents' => sub { + my @sections = ( + [ 0, undef, $doc ], + [ 1, 'documents', $doc, $doc ], + [ 1, 'documents', $doc, $doc ], + ); + my $got_sections = MongoDB::_Protocol::join_sections( @sections ); + my $expected_sections = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; + + is_deeply $got_sections, $expected_sections, 'joined correctly'; + }; + }; +}; + +subtest 'split sections' => sub { + my $doc = $codec->encode_one([ test => 'document' ]); + subtest 'payload 0' => sub { + my $encoded = "\0" . $doc; + my @got_sections = MongoDB::_Protocol::split_sections( $encoded ); + my @expected_sections = ( + [ 0, undef, $doc ], + ); + + is_deeply \@got_sections, \@expected_sections, 'split correctly'; + }; + + subtest 'payload 0 + 1' => sub { + subtest 'single document' => sub { + my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; + my @got_sections = MongoDB::_Protocol::split_sections( $encoded ); + my @expected_sections = ( + [ 0, undef, $doc ], + [ 1, 'documents', $doc ], + ); + + is_deeply \@got_sections, \@expected_sections, 'split correctly'; + }; + + subtest 'multiple documents' => sub { + my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; + my @got_sections = MongoDB::_Protocol::split_sections( $encoded ); + my @expected_sections = ( + [ 0, undef, $doc ], + [ 1, 'documents', $doc, $doc ], + ); + + is_deeply \@got_sections, \@expected_sections, 'split correctly'; + }; + }; + + subtest 'payload 0 + multiple 1' => sub { + subtest 'single document' => sub { + my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; + my @got_sections = MongoDB::_Protocol::split_sections( $encoded ); + my @expected_sections = ( + [ 0, undef, $doc ], + [ 1, 'documents', $doc ], + [ 1, 'documents', $doc ], + ); + + is_deeply \@got_sections, \@expected_sections, 'split correctly'; + }; + + subtest 'multiple documents' => sub { + my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; + my @got_sections = MongoDB::_Protocol::split_sections( $encoded ); + my @expected_sections = ( + [ 0, undef, $doc ], + [ 1, 'documents', $doc, $doc ], + [ 1, 'documents', $doc, $doc ], + ); + + is_deeply \@got_sections, \@expected_sections, 'split correctly'; + }; + }; +}; + +done_testing; From 523fe3b7f41806f76472cdf6ba0b1cf8d7657983 Mon Sep 17 00:00:00 2001 From: Thomas Bloor Date: Thu, 5 Jul 2018 15:54:23 +0100 Subject: [PATCH 2/9] PERL-789 Refactor to external Section class to make command monitoring and decoding easier --- lib/MongoDB/Protocol/_Section.pm | 202 +++++++++++++++++++++++++ lib/MongoDB/_Protocol.pm | 152 +++++-------------- t/protocol_op_msg.t | 249 ++++++++++++++++++++++++------- 3 files changed, 438 insertions(+), 165 deletions(-) create mode 100644 lib/MongoDB/Protocol/_Section.pm diff --git a/lib/MongoDB/Protocol/_Section.pm b/lib/MongoDB/Protocol/_Section.pm new file mode 100644 index 00000000..529d2bc2 --- /dev/null +++ b/lib/MongoDB/Protocol/_Section.pm @@ -0,0 +1,202 @@ +package MongoDB::Protocol::_Section; + +use Moo; +use Types::Standard qw( + ArrayRef + Str + Maybe + Enum +); +use MongoDB::_Types qw( + BSONCodec +); +use MongoDB::Error; + +use constant { + PERL58 => $] lt '5.010', +}; +use constant { + P_MSG_PL_1 => PERL58 ? "lZ*" : "l "C", + P_SECTION_SEQUENCE_SIZE => PERL58 ? "l" : "l<", +}; +use constant { + P_SECTION_HEADER => P_SECTION_PAYLOAD_TYPE . P_SECTION_SEQUENCE_SIZE, + P_SECTION_PAYLOAD_TYPE_LENGTH => length( pack P_SECTION_PAYLOAD_TYPE, 0 ), + P_SECTION_SEQUENCE_SIZE_LENGTH => length( pack P_SECTION_SEQUENCE_SIZE, 0 ), +}; + +use namespace::clean; + +has bson_codec => ( + is => 'ro', + required => 1, + isa => BSONCodec, +); + +# either passed in on creation or pulled from binary docs +has type => ( + is => 'lazy', + isa => Enum[ 0, 1 ], +); + +sub _build_type { + my $self = shift; + return $self->_decoded->[0] if $self->has_binary; + MongoDB::ProtocolError->throw('No type passed to Section'); +} + +# If not passed in, assume is undef (just makes this one easier) +has identifier => ( + is => 'lazy', + isa => Maybe[Str], +); + +sub _build_identifier { + my $self = shift; + return $self->_decoded->[1] if $self->has_binary; + return; +} + +# Either passed in, or created from the binary documents. +has documents => ( + is => 'lazy', + isa => ArrayRef, +); + +sub _build_documents { + my $self = shift; + my @docs; + if ( $self->has_binary ) { + @docs = ( + map { $self->bson_codec->decode_one( $_ ) } @{ $self->encoded_documents } + ); + } else { + MongoDB::ProtocolError->throw('No documents passed to Section'); + } + return \@docs; +} + +has encoded_documents => ( + is => 'lazy', + isa => ArrayRef, +); + +sub _build_encoded_documents { + my $self = shift; + my @docs; + if ( $self->has_binary ) { + ( undef, undef, @docs ) = @{ $self->_decoded }; + } else { + @docs = ( + map { $self->bson_codec->encode_one( $_ ) } @{ $self->documents } + ); + } + return \@docs; +} + +has binary => ( + is => 'lazy', + isa => Str, + predicate => 1, +); + +sub _build_binary { return shift->_encoded } + +has _encoded => ( + is => 'lazy', + isa => Str, +); + +sub _build__encoded { + my $self = shift; + + my $type = $self->type; + my $ident = $self->identifier; + my @docs = @{ $self->encoded_documents }; + + my $pl; + + if ( $type == 0 ) { + MongoDB::ProtocolError->throw( + "Creating an OP_MSG Section Payload 0 with multiple documents is not supported") + if scalar( @docs ) > 1; + $pl = $docs[0]; + } elsif ( $type == 1 ) { + # Add size and ident placeholders + $pl = pack( P_MSG_PL_1, 0, $ident ) + . join( '', @docs ); + # calculate size + substr( $pl, 0, 4, pack( P_SECTION_SEQUENCE_SIZE, length( $pl ) ) ); + } else { + MongoDB::ProtocolError->throw("Encode: Unsupported section payload type"); + } + + # Add payload type prefix + $pl = pack( P_SECTION_PAYLOAD_TYPE, $type ) . $pl; + + return $pl; +} + +has _decoded => ( + is => 'lazy', + isa => ArrayRef, +); + +sub _build__decoded { + my $self = shift; + MongoDB::ProtocolError->throw('Section requires binary to decode') + unless $self->has_binary; + my $enc = $self->binary; + my ( $type, $ident, @docs ); + + # first, extract the type + ( $type ) = unpack( 'C', $enc ); + my $payload = substr( $enc, P_SECTION_PAYLOAD_TYPE_LENGTH ); + + if ( $type == 0 ) { + # payload is actually the document + push @docs, $payload; + } elsif ( $type == 1 ) { + # Pull size off and double check + my ( $pl_size ) = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); + unless ( $pl_size == length( $payload ) ) { + MongoDB::ProtocolError->throw("Decode: Section size incorrect"); + } + $payload = substr( $payload, P_SECTION_SEQUENCE_SIZE_LENGTH ); + # Pull out then remove + ( $ident ) = unpack( 'Z*', $payload ); + $payload = substr( $payload, length ( pack 'Z*', $ident ) ); + + while ( length $payload ) { + my $doc_size = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); + my $doc = substr( $payload, 0, $doc_size ); + $payload = substr( $payload, $doc_size ); + push @docs, $doc; + } + } else { + MongoDB::ProtocolError->throw("Decode: Unsupported section payload type"); + } + + return [ $type, $ident, @docs ]; +} + +=head1 SYNOPSIS + + # From a known document set + MongoDB::Protocol::_Section->new( + bson_codec => BSON->new, + type => 0, + identifier => undef, + documents => [ $doc ], + ); + + # From a binary section + MongoDB::Protocol::_Section->new( + bson_codec => BSON->new, + binary => $bin, + ); + +=cut + +1; diff --git a/lib/MongoDB/_Protocol.pm b/lib/MongoDB/_Protocol.pm index aac51e1c..bbe871ae 100644 --- a/lib/MongoDB/_Protocol.pm +++ b/lib/MongoDB/_Protocol.pm @@ -122,123 +122,50 @@ use constant { P_SECTION_SEQUENCE_SIZE_LENGTH => length( pack P_SECTION_SEQUENCE_SIZE, 0 ), }; -# TODO this really isnt the place for this... -sub maybe_split_payload { - my ( $codec, $cmd ) = @_; - - # TODO This is probably going to explode - my %th_cmd; - tie %th_cmd, "Tie::IxHash", @$cmd; - - my $split_commands = { - insert => 'documents', - update => 'updates', - delete => 'deletes', - }; - - my $split_docs = $split_commands->{$cmd->[0]}; - if ( defined $split_docs ) { - # can remap documents - my $docs = delete $th_cmd{ $split_docs }; - - my $packed_pl_1 = pack ( P_MSG_PL_1, 0, $split_docs ) - . join( '', ( map { $codec->encode_one( $_ ) } @$docs ) ); - - substr( $packed_pl_1, 0, 4, pack( P_INT32, length ($packed_pl_1) ) ); - - $packed_pl_1 = pack( 'C', 1 ) . $packed_pl_1; - - my $packed_pl_0 = $codec->encode_one( \%th_cmd ); - - $packed_pl_0 = pack( 'C', 0 ) . $packed_pl_0; - - return [ $packed_pl_0, $packed_pl_1 ]; - } - - # Drop through if nothings happened yet - return [ $codec->encode_one( $cmd ) ]; -}; - -=method encode_section( $payload_type, $identifier, @documents ) +use MongoDB::Protocol::_Section; - MongoDB::_Protocol::encode_section( 0, undef, $doc ); - MongoDB::_Protocol::encode_section( 1, $identifier, $doc, $doc2, $doc3 ); +=method prepare_sections( $cmd ) -Encodes a section for C as needed for each type of payload. Note that -payload type 0 only accepts one document, an exception will be thrown if more -than one is passed. - -In a payload type 0, the identifier is not required (and is ignored). In a -payload type 1, this is used as the identifier in the sequence for the struct. +Takes a command, returns sections ready for joining =cut -sub encode_section { - my ( $type, $ident, @docs ) = @_; - - my $pl; - - if ( $type == 0 ) { - MongoDB::ProtocolError->throw( - "Creating an OP_MSG Section Payload 0 with multiple documents is not supported") - if scalar( @docs ) > 1; - $pl = $docs[0]; - } elsif ( $type == 1 ) { - # Add size and ident placeholders - $pl = pack( P_MSG_PL_1, 0, $ident ) - . join( '', @docs ); - # calculate size - substr( $pl, 0, 4, pack( P_SECTION_SEQUENCE_SIZE, length( $pl ) ) ); - } else { - MongoDB::ProtocolError->throw("Encode: Unsupported section payload type"); - } - - # Add payload type prefix - $pl = pack( P_SECTION_PAYLOAD_TYPE, $type ) . $pl; - - return $pl; -} - -=method decode_section( $encoded ) - -Peforms the exact oposite of encode_section - takes an encoded section and -returns type, identifier (if applicable) and the documents contained. - -=cut +sub prepare_sections { + my ( $codec, $cmd ) = @_; -sub decode_section { - my $enc = shift; - my ( $type, $ident, @docs ); - - # first, extract the type - ( $type ) = unpack( 'C', $enc ); - my $payload = substr( $enc, P_SECTION_PAYLOAD_TYPE_LENGTH ); - - if ( $type == 0 ) { - # payload is actually the document - push @docs, $payload; - } elsif ( $type == 1 ) { - # Pull size off and double check - my ( $pl_size ) = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); - unless ( $pl_size == length( $payload ) ) { - MongoDB::ProtocolError->throw("Decode: Section size incorrect"); - } - $payload = substr( $payload, P_SECTION_SEQUENCE_SIZE_LENGTH ); - # Pull out then remove - ( $ident ) = unpack( 'Z*', $payload ); - $payload = substr( $payload, length ( pack 'Z*', $ident ) ); - - while ( length $payload ) { - my $doc_size = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); - my $doc = substr( $payload, 0, $doc_size ); - $payload = substr( $payload, $doc_size ); - push @docs, $doc; - } + my %split_commands = ( + insert => 'documents', + update => 'updates', + delete => 'deletes', + ); + + my ( $command, $collection, $ident, $docs, @other ) = @$cmd; + + if ( $split_commands{ $command } eq $ident ) { + # Assumes only a single split on the commands + return ( + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 0, + documents => [ [ $command, $collection, @other ] ] + ), + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 1, + identifier => $ident, + documents => $docs + ), + ); } else { - MongoDB::ProtocolError->throw("Decode: Unsupported section payload type"); + # Not a recognised command to split, just set up ready for later + return ( + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 0, + documents => [ $cmd ] + ), + ); } - - return ( $type, $ident, @docs ); } =method join_sections @@ -253,7 +180,7 @@ Joins an array of sections ready for passing to encode_sections. sub join_sections { my ( @sections ) = @_; - my $msg = join ('', ( map { encode_section( @$_ ) } @sections ) ); + my $msg = join ('', ( map { $_->binary } @sections ) ); return $msg; } @@ -265,6 +192,7 @@ Does the exact opposite of join_sections. =cut sub split_sections { + my $codec = shift; my $msg = shift; my @sections; while ( length $msg ) { @@ -273,7 +201,7 @@ sub split_sections { # Add the payload type length as we reached over it for the length my $section = substr( $msg, 0, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); - push @sections, [ decode_section( $section ) ]; + push @sections, MongoDB::Protocol::_Section->new( bson_codec => $codec, binary => $section ); $msg = substr( $msg, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); } @@ -299,7 +227,7 @@ sub write_msg { my $request_id = int( rand( MAX_REQUEST_ID ) ); my $msg = pack( P_MSG, 0, $request_id, 0, OP_MSG, 0 ) - . join ( '', @$sections ); + . $sections; substr( $msg, 0, 4, pack( P_INT32, length($msg) ) ); return ( $msg, $request_id ); } diff --git a/t/protocol_op_msg.t b/t/protocol_op_msg.t index ef62197e..1f8bfc9a 100644 --- a/t/protocol_op_msg.t +++ b/t/protocol_op_msg.t @@ -19,10 +19,47 @@ use Test::More; use Test::Fatal; use MongoDB::_Protocol; +use MongoDB::Protocol::_Section; use BSON; my $codec = BSON->new(); +subtest 'insert_doc' => sub { + my $insert_doc = [ + insert => 'collectionName', + documents => [ + [ id => 'Document#1', example => 1 ], + [ id => 'Document#2', example => 2 ], + [ id => 'Document#3', example => 3 ] + ], + writeConcern => [ w => 'majority' ] + ]; + + push @{$insert_doc}, ( '$db', 'someDatabase' ); + my @packed_payloads = MongoDB::_Protocol::prepare_sections( $codec, $insert_doc ); + my @expected_payloads = ( + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 0, + identifier => undef, + encoded_documents => [ "b\0\0\0\2insert\0\17\0\0\0collectionName\0\4writeConcern\0\36\0\0\0\0020\0\2\0\0\0w\0\0021\0\t\0\0\0majority\0\0\2\$db\0\r\0\0\0someDatabase\0\0" ] + ), + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 1, + identifier => "documents", + encoded_documents => [ + "%\0\0\0\2id\0\13\0\0\0Document#1\0\20example\0\1\0\0\0\0", + "%\0\0\0\2id\0\13\0\0\0Document#2\0\20example\0\2\0\0\0\0", + "%\0\0\0\2id\0\13\0\0\0Document#3\0\20example\0\3\0\0\0\0" + ], + ), + ); + + for my $i ( 0 .. $#expected_payloads ) { + is $packed_payloads[$i]->binary, $expected_payloads[$i]->binary, "section $i prepared correctly"; + } +}; # struct Section { # uint8 payloadType; # union payload { @@ -35,65 +72,102 @@ my $codec = BSON->new(); # }; # }; +my $raw_doc = [ test => 'document' ]; +my $doc = $codec->encode_one( $raw_doc ); +my $decoded_doc = $codec->decode_one( $doc ); + subtest 'encode section' => sub { - my $doc = $codec->encode_one([ test => 'document' ]); subtest 'payload 0' => sub { - my $got_section = MongoDB::_Protocol::encode_section( 0, undef, $doc ); + my $got_section = MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 0, + identifier => undef, + documents => [$raw_doc], + ); my $expected_section = "\0" . $doc; - is $got_section, $expected_section, 'encode payload 0 correctly'; + is $got_section->binary, $expected_section, 'encode payload 0 correctly'; - ok exception{ MongoDB::_Protocol::encode_section( 0, undef, $doc, $doc ) }, 'multiple docs in payload 0 causes error'; + ok exception { MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 0, + identifier => undef, + documents => [$raw_doc, $raw_doc], + )->binary; }, 'multiple docs in payload 0 causes error'; }; subtest 'payload 1 single doc' => sub { - my $got_section = MongoDB::_Protocol::encode_section( 1, 'documents', $doc ); + my $got_section = MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 1, + identifier => 'documents', + documents => [$raw_doc], + ); my $expected_section = "\1&\0\0\0" . "documents\0" . $doc; - is $got_section, $expected_section, 'encode payload 1 correctly'; + is $got_section->binary, $expected_section, 'encode payload 1 correctly'; }; subtest 'payload 1 multiple doc' => sub { - my $got_section = MongoDB::_Protocol::encode_section( 1, 'documents', $doc, $doc ); + my $got_section = MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 1, + identifier => 'documents', + documents => [$raw_doc, $raw_doc], + ); my $expected_section = "\1>\0\0\0" . "documents\0" . $doc . $doc; - is $got_section, $expected_section, 'encode payload 1 correctly'; + is $got_section->binary, $expected_section, 'encode payload 1 correctly'; }; }; subtest 'decode section' => sub { - my $doc = $codec->encode_one([ test => 'document' ]); subtest 'payload 0' => sub { my $encoded = "\0" . $doc; - my @got_decoded = MongoDB::_Protocol::decode_section( $encoded ); - my @expected_decoded = ( 0, undef, $doc ); + my $got_section = MongoDB::Protocol::_Section->new( + bson_codec => $codec, + binary => $encoded, + ); - is_deeply \@got_decoded, \@expected_decoded, 'decoded payload 0 correctly'; + is $got_section->type, 0, 'section type correct'; + is $got_section->identifier, undef, 'section identifier correct'; + is_deeply $got_section->documents, [ { test => 'document' } ], 'decoded document correctly'; }; subtest 'payload 1' => sub { my $encoded = "\1&\0\0\0" ."documents\0" . $doc; - my @got_decoded = MongoDB::_Protocol::decode_section( $encoded ); - my @expected_decoded = ( 1, 'documents', $doc ); - + my $got_section = MongoDB::Protocol::_Section->new( + bson_codec => $codec, + binary => $encoded, + ); - is_deeply \@got_decoded, \@expected_decoded, 'decoded payload 1 correctly'; + is $got_section->type, 1, 'section type correct'; + is $got_section->identifier, 'documents', 'section identifier correct'; + is_deeply $got_section->documents, [ { test => 'document' } ], 'decoded document correctly'; }; subtest 'payload 1 multiple docs' => sub { my $encoded = "\1>\0\0\0" ."documents\0" . $doc . $doc; - my @got_decoded = MongoDB::_Protocol::decode_section( $encoded ); - my @expected_decoded = ( 1, 'documents', $doc, $doc ); + my $got_section = MongoDB::Protocol::_Section->new( + bson_codec => $codec, + binary => $encoded, + ); - is_deeply \@got_decoded, \@expected_decoded, 'decoded payload 1 correctly'; + is $got_section->type, 1, 'section type correct'; + is $got_section->identifier, 'documents', 'section identifier correct'; + is_deeply $got_section->documents, [ { test => 'document' }, { test => 'document' } ], 'decoded document correctly'; }; }; subtest 'join sections' => sub { - my $doc = $codec->encode_one([ test => 'document' ]); subtest 'payload 0' => sub { my @sections = ( - [ 0, undef, $doc ], + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 0, + identifier => undef, + documents => [$raw_doc], + ) ); my $got_sections = MongoDB::_Protocol::join_sections( @sections ); my $expected_sections = "\0" . $doc; @@ -104,8 +178,18 @@ subtest 'join sections' => sub { subtest 'payload 0 + 1' => sub { subtest 'single document' => sub { my @sections = ( - [ 0, undef, $doc ], - [ 1, 'documents', $doc ], + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 0, + identifier => undef, + documents => [$raw_doc], + ), + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 1, + identifier => 'documents', + documents => [$raw_doc], + ), ); my $got_sections = MongoDB::_Protocol::join_sections( @sections ); my $expected_sections = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; @@ -115,8 +199,18 @@ subtest 'join sections' => sub { subtest 'multiple documents' => sub { my @sections = ( - [ 0, undef, $doc ], - [ 1, 'documents', $doc, $doc ], + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 0, + identifier => undef, + documents => [$raw_doc], + ), + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 1, + identifier => 'documents', + documents => [$raw_doc, $raw_doc], + ), ); my $got_sections = MongoDB::_Protocol::join_sections( @sections ); my $expected_sections = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; @@ -128,9 +222,24 @@ subtest 'join sections' => sub { subtest 'payload 0 + multiple 1' => sub { subtest 'single document' => sub { my @sections = ( - [ 0, undef, $doc ], - [ 1, 'documents', $doc ], - [ 1, 'documents', $doc ], + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 0, + identifier => undef, + documents => [$raw_doc], + ), + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 1, + identifier => 'documents', + documents => [$raw_doc], + ), + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 1, + identifier => 'documents', + documents => [$raw_doc], + ), ); my $got_sections = MongoDB::_Protocol::join_sections( @sections ); my $expected_sections = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; @@ -140,9 +249,24 @@ subtest 'join sections' => sub { subtest 'multiple documents' => sub { my @sections = ( - [ 0, undef, $doc ], - [ 1, 'documents', $doc, $doc ], - [ 1, 'documents', $doc, $doc ], + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 0, + identifier => undef, + documents => [$raw_doc], + ), + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 1, + identifier => 'documents', + documents => [$raw_doc, $raw_doc], + ), + MongoDB::Protocol::_Section->new( + bson_codec => $codec, + type => 1, + identifier => 'documents', + documents => [$raw_doc, $raw_doc], + ), ); my $got_sections = MongoDB::_Protocol::join_sections( @sections ); my $expected_sections = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; @@ -153,64 +277,83 @@ subtest 'join sections' => sub { }; subtest 'split sections' => sub { - my $doc = $codec->encode_one([ test => 'document' ]); subtest 'payload 0' => sub { my $encoded = "\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $encoded ); + my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded ); my @expected_sections = ( - [ 0, undef, $doc ], + [ 0, undef, [ $decoded_doc ] ], ); - is_deeply \@got_sections, \@expected_sections, 'split correctly'; + for my $i ( 0 .. $#expected_sections ) { + is $got_sections[$i]->type, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->identifier, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->documents, $expected_sections[$i][2], 'documents correct'; + } }; subtest 'payload 0 + 1' => sub { subtest 'single document' => sub { my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $encoded ); + my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded ); my @expected_sections = ( - [ 0, undef, $doc ], - [ 1, 'documents', $doc ], + [ 0, undef, [ $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc ] ], ); - is_deeply \@got_sections, \@expected_sections, 'split correctly'; + for my $i ( 0 .. $#expected_sections ) { + is $got_sections[$i]->type, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->identifier, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->documents, $expected_sections[$i][2], 'documents correct'; + } }; subtest 'multiple documents' => sub { my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $encoded ); + my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded ); my @expected_sections = ( - [ 0, undef, $doc ], - [ 1, 'documents', $doc, $doc ], + [ 0, undef, [ $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ], ); - is_deeply \@got_sections, \@expected_sections, 'split correctly'; + for my $i ( 0 .. $#expected_sections ) { + is $got_sections[$i]->type, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->identifier, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->documents, $expected_sections[$i][2], 'documents correct'; + } }; }; subtest 'payload 0 + multiple 1' => sub { subtest 'single document' => sub { my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $encoded ); + my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded ); my @expected_sections = ( - [ 0, undef, $doc ], - [ 1, 'documents', $doc ], - [ 1, 'documents', $doc ], + [ 0, undef, [ $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc ] ], ); - is_deeply \@got_sections, \@expected_sections, 'split correctly'; + for my $i ( 0 .. $#expected_sections ) { + is $got_sections[$i]->type, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->identifier, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->documents, $expected_sections[$i][2], 'documents correct'; + } }; subtest 'multiple documents' => sub { my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $encoded ); + my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded ); my @expected_sections = ( - [ 0, undef, $doc ], - [ 1, 'documents', $doc, $doc ], - [ 1, 'documents', $doc, $doc ], + [ 0, undef, [ $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ], ); - is_deeply \@got_sections, \@expected_sections, 'split correctly'; + for my $i ( 0 .. $#expected_sections ) { + is $got_sections[$i]->type, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->identifier, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->documents, $expected_sections[$i][2], 'documents correct'; + } }; }; }; From 77d5e7e6490ba58e373974f954a37eb5243388b6 Mon Sep 17 00:00:00 2001 From: Thomas Bloor Date: Thu, 5 Jul 2018 16:09:34 +0100 Subject: [PATCH 3/9] PERL-789 Initial test for op_msg using env var for triggering --- lib/MongoDB/Role/_CommandMonitoring.pm | 14 ++++++ lib/MongoDB/Role/_SingleBatchDocWrite.pm | 22 ++++++--- lib/MongoDB/_Link.pm | 8 ++++ lib/MongoDB/_Protocol.pm | 35 +++++++++++--- t/op_msg.t | 61 ++++++++++++++++++++++++ 5 files changed, 126 insertions(+), 14 deletions(-) create mode 100644 t/op_msg.t diff --git a/lib/MongoDB/Role/_CommandMonitoring.pm b/lib/MongoDB/Role/_CommandMonitoring.pm index a08d4e8c..b61b089e 100644 --- a/lib/MongoDB/Role/_CommandMonitoring.pm +++ b/lib/MongoDB/Role/_CommandMonitoring.pm @@ -256,6 +256,7 @@ sub _to_tied_ixhash { my $type = ref($in); my %out; if ( $type eq 'ARRAY' ) { + $in = _maybe_expand_sections( $in ); # earlier type checks should ensure even elements tie %out, "Tie::IxHash", map { _decode_preencoded($_) } @$in; } @@ -269,4 +270,17 @@ sub _to_tied_ixhash { return \%out; } +sub _maybe_expand_sections { + my $in = shift; + my @out; + for my $section ( @$in ) { + # If everything isnt a Section, drop out the original + return $in unless ref($section) eq 'MongoDB::Protocol::_Section'; + # Type 0 only ever has one document + push @out, @{ $section->documents->[0] } if $section->type == 0; + push @out, ( $section->identifier, $section->documents ) if $section->type == 1; + } + return \@out; +} + 1; diff --git a/lib/MongoDB/Role/_SingleBatchDocWrite.pm b/lib/MongoDB/Role/_SingleBatchDocWrite.pm index 167aa234..f15e19fd 100644 --- a/lib/MongoDB/Role/_SingleBatchDocWrite.pm +++ b/lib/MongoDB/Role/_SingleBatchDocWrite.pm @@ -167,12 +167,20 @@ sub _send_write_command { $self->_apply_session_and_cluster_time( $link, \$cmd ); - # send command and get response document - my $command = $self->bson_codec->encode_one( $cmd ); - - my ( $op_bson, $request_id ) = - MongoDB::_Protocol::write_query( $self->db_name . '.$cmd', - $command, undef, 0, -1, undef ); + my ( $op_bson, $request_id ); + if ( $ENV{DO_OP_MSG} ) {#$link->supports_op_msg ) { + push @$cmd, ( '$db', $self->db_name ); + my @sections = MongoDB::_Protocol::prepare_sections( $self->bson_codec, $cmd ); + $cmd = \@sections; + my $encoded_sections = MongoDB::_Protocol::join_sections( @sections ); + ( $op_bson, $request_id ) = MongoDB::_Protocol::write_msg( $encoded_sections, undef ); + } else { + # send command and get response document + my $command = $self->bson_codec->encode_one( $cmd ); + ( $op_bson, $request_id ) = + MongoDB::_Protocol::write_query( $self->db_name . '.$cmd', + $command, undef, 0, -1, undef ); + } if ( length($op_bson) > MAX_BSON_WIRE_SIZE ) { # XXX should this become public? @@ -188,7 +196,7 @@ sub _send_write_command { my $result; eval { $link->write( $op_bson ), - ( $result = MongoDB::_Protocol::parse_reply( $link->read, $request_id ) ); + ( $result = MongoDB::_Protocol::parse_reply( $link->read, $request_id, $self->bson_codec ) ); }; if ( my $err = $@ ) { $self->_update_session_connection_error( $err ); diff --git a/lib/MongoDB/_Link.pm b/lib/MongoDB/_Link.pm index 5ed96eb8..d0cd9036 100644 --- a/lib/MongoDB/_Link.pm +++ b/lib/MongoDB/_Link.pm @@ -215,6 +215,13 @@ has supports_retryWrites => ( isa => Boolish, ); +has supports_op_msg => ( + is => 'rwp', + init_arg => undef, + isa => Boolish, +); + +# for wire version >= 7 has supports_4_0_changestreams => ( is => 'rwp', init_arg => undef, @@ -355,6 +362,7 @@ sub set_metadata { ? 1 : 0 ); + $self->_set_supports_op_msg(1); } if ( $self->accepts_wire_version(7) ) { $self->_set_supports_4_0_changestreams(1); diff --git a/lib/MongoDB/_Protocol.pm b/lib/MongoDB/_Protocol.pm index bbe871ae..93247d6e 100644 --- a/lib/MongoDB/_Protocol.pm +++ b/lib/MongoDB/_Protocol.pm @@ -533,18 +533,18 @@ use constant { }; sub parse_reply { - my ( $msg, $request_id ) = @_; + my ( $msg, $request_id, $codec ) = @_; MongoDB::ProtocolError->throw("response was truncated") if length($msg) < MIN_REPLY_LENGTH; $msg = try_uncompress($msg); - + my ( $len, $msg_id, $response_to, $opcode, $bitflags, $cursor_id, $starting_from, $number_returned - ) = unpack( P_REPLY_HEADER, $msg ); - + ) = unpack( P_MSG, $msg ); + # pre-check all conditions using a modifier in one statement for speed; # disambiguate afterwards only if an error exists @@ -554,8 +554,8 @@ sub parse_reply { MongoDB::ProtocolError->throw("response was truncated"); } - if ( $opcode != OP_REPLY ) { - MongoDB::ProtocolError->throw("response was not OP_REPLY"); + if ( $opcode != OP_REPLY && $opcode != OP_MSG ) { + MongoDB::ProtocolError->throw("response was not OP_REPLY or OP_MSG"); } if ( $response_to != $request_id ) { @@ -564,9 +564,30 @@ sub parse_reply { } } if ( length($msg) < $len ) - || ( $opcode != OP_REPLY ) + || ( ( $opcode != OP_REPLY ) && ( $opcode != OP_MSG ) ) || ( $response_to != $request_id ); + + if ( $opcode == OP_MSG ) { + MongoDB::InternalError->throw('OP_MSG requires codec for parse_reply') unless defined $codec; + # TODO Extract and check checksum + my @sections = split_sections( $codec, substr $msg, P_MSG_PREFIX_LENGTH ); + # We have none of the other stuff? maybe flags... and an array of docs? erm + return { + flags => { + checksum_present => vec( $bitflags, MSG_FB_CHECKSUM, 1 ), + query_failure => vec( $bitflags, MSG_FB_MORE_TO_COME, 1 ), + }, + docs => $sections[0]->encoded_documents->[0] + }; + } else { + # Yes its two unpacks but its just easier than mapping through to the right size + ( + $len, $msg_id, $response_to, $opcode, $bitflags, $cursor_id, $starting_from, + $number_returned + ) = unpack( P_REPLY_HEADER, $msg ); + } + # returns non-zero cursor_id as blessed object to identify it as an # 8-byte opaque ID rather than an ambiguous Perl scalar. N.B. cursors # from commands are handled differently: they are perl integers or diff --git a/t/op_msg.t b/t/op_msg.t new file mode 100644 index 00000000..ebe064e2 --- /dev/null +++ b/t/op_msg.t @@ -0,0 +1,61 @@ +# Copyright 2018 - present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +use strict; +use warnings; +use utf8; +use Test::More; + +use MongoDB; + +use lib "t/lib"; +use MongoDBTest qw/ + skip_unless_mongod + build_client + get_test_db + server_version + clear_testdbs +/; +use MongoDBTest::Callback; + +skip_unless_mongod(); + +my $cb = MongoDBTest::Callback->new; +my $conn = build_client(monitoring_callback => $cb->callback); +my $testdb = get_test_db($conn); +my $server_version = server_version($conn); +my $coll = $testdb->get_collection('test_collection'); + +plan skip_all => 'MongoDB version 3.6 or higher required for OP_MSG support' + unless $server_version >= version->parse('v3.6.0'); + +subtest 'insert document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->insert_one([ _id => 1 ]); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is $ret->inserted_id, 1, 'Correct inserted id'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ { _id => 1 } ], 'Collection info correct'; + $coll->drop; +}; + +clear_testdbs; + +done_testing; From 1e004f3c70f5780b6035e85a24487d2284ab4d25 Mon Sep 17 00:00:00 2001 From: Thomas Bloor Date: Thu, 5 Jul 2018 16:38:21 +0100 Subject: [PATCH 4/9] PERL-789 Added testing for OP_MSG in delete and update functions --- lib/MongoDB/Op/_Command.pm | 18 +++++++-- t/op_msg.t | 78 +++++++++++++++++++++++++++++++++++++- 2 files changed, 90 insertions(+), 6 deletions(-) diff --git a/lib/MongoDB/Op/_Command.pm b/lib/MongoDB/Op/_Command.pm index d2725c56..ed223903 100644 --- a/lib/MongoDB/Op/_Command.pm +++ b/lib/MongoDB/Op/_Command.pm @@ -87,9 +87,19 @@ sub execute { # $query is passed as a reference because it *may* be replaced $self->_apply_read_prefs( $link, $topology_type, $self->{query_flags}, \$self->{query}); - my ( $op_bson, $request_id ) = - MongoDB::_Protocol::write_query( $self->{db_name} . '.$cmd', - $self->{bson_codec}->encode_one( $self->{query} ), undef, 0, -1, $self->{query_flags}); + my ( $op_bson, $request_id ); + + if ( $ENV{DO_OP_MSG} ) {#$link->supports_op_msg ) { + push @{$self->{query}}, ( '$db', $self->db_name ); + my @sections = MongoDB::_Protocol::prepare_sections( $self->{bson_codec}, $self->{query} ); + $self->{query} = \@sections; + my $encoded_sections = MongoDB::_Protocol::join_sections( @sections ); + ( $op_bson, $request_id ) = MongoDB::_Protocol::write_msg( $encoded_sections, undef ); + } else { + ( $op_bson, $request_id ) = + MongoDB::_Protocol::write_query( $self->{db_name} . '.$cmd', + $self->{bson_codec}->encode_one( $self->{query} ), undef, 0, -1, $self->{query_flags}); + } if ( length($op_bson) > MAX_BSON_WIRE_SIZE ) { # XXX should this become public? @@ -109,7 +119,7 @@ sub execute { my $result; eval { $link->write( $op_bson, \%write_opt ), - ( $result = MongoDB::_Protocol::parse_reply( $link->read, $request_id ) ); + ( $result = MongoDB::_Protocol::parse_reply( $link->read, $request_id, $self->{bson_codec} ) ); }; if ( my $err = $@ ) { $self->_update_session_connection_error( $err ); diff --git a/t/op_msg.t b/t/op_msg.t index ebe064e2..64717825 100644 --- a/t/op_msg.t +++ b/t/op_msg.t @@ -40,7 +40,7 @@ my $coll = $testdb->get_collection('test_collection'); plan skip_all => 'MongoDB version 3.6 or higher required for OP_MSG support' unless $server_version >= version->parse('v3.6.0'); -subtest 'insert document' => sub { +subtest 'insert single document' => sub { $cb->clear_events; $ENV{DO_OP_MSG} = 1; my $ret = $coll->insert_one([ _id => 1 ]); @@ -53,7 +53,81 @@ subtest 'insert document' => sub { my @collection = $coll->find()->all; is_deeply \@collection, [ { _id => 1 } ], 'Collection info correct'; - $coll->drop; +}; + +subtest 'insert multiple document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->insert_many([[ _id => 2 ], [ _id => 3 ]]); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is_deeply $ret->inserted_ids, { 0 => 2, 1 => 3 }, 'Correct inserted id'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ { _id => 1 }, { _id => 2 }, { _id => 3 } ], 'Collection info correct'; +}; + +subtest 'update single document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->update_one({ _id => 1 }, { '$set' => { eg => 2 } }); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is $ret->modified_count, 1, 'Correct modified count'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ { _id => 1, eg => 2 }, { _id => 2 }, { _id => 3 } ], 'Collection info correct'; +}; + +subtest 'update multiple document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->update_many({ _id => { '$gte' => 2 } }, { '$set' => { eg => 3 } }); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is $ret->modified_count, 2, 'Correct modified count'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ { _id => 1, eg => 2 }, { _id => 2, eg => 3 }, { _id => 3, eg => 3 } ], 'Collection info correct'; +}; + +subtest 'delete single document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->delete_one([ _id => 1 ]); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is $ret->deleted_count, 1, 'Correct deleted count'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ { _id => 2, eg => 3 }, { _id => 3, eg => 3 } ], 'Collection info correct'; +}; + +subtest 'delete multiple document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->delete_many([ _id => { '$gte' => 2 } ]); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is $ret->deleted_count, 2, 'Correct deleted count'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ ], 'Collection info correct'; }; clear_testdbs; From d62615c591e25bbdc6f1fabb1e1fb7a654a478c9 Mon Sep 17 00:00:00 2001 From: Thomas Bloor Date: Wed, 25 Jul 2018 18:37:58 +0100 Subject: [PATCH 5/9] PERL-789 Remove _Section OO object creation --- lib/MongoDB/Op/_Command.pm | 7 +- lib/MongoDB/Protocol/_Section.pm | 202 ------------------- lib/MongoDB/Role/_CommandMonitoring.pm | 6 +- lib/MongoDB/Role/_SingleBatchDocWrite.pm | 6 +- lib/MongoDB/_Protocol.pm | 181 +++++++++++++---- t/protocol_op_msg.t | 238 ++++++++++------------- 6 files changed, 261 insertions(+), 379 deletions(-) delete mode 100644 lib/MongoDB/Protocol/_Section.pm diff --git a/lib/MongoDB/Op/_Command.pm b/lib/MongoDB/Op/_Command.pm index ed223903..fc545b07 100644 --- a/lib/MongoDB/Op/_Command.pm +++ b/lib/MongoDB/Op/_Command.pm @@ -90,11 +90,14 @@ sub execute { my ( $op_bson, $request_id ); if ( $ENV{DO_OP_MSG} ) {#$link->supports_op_msg ) { + # TODO Cover other document object types push @{$self->{query}}, ( '$db', $self->db_name ); my @sections = MongoDB::_Protocol::prepare_sections( $self->{bson_codec}, $self->{query} ); $self->{query} = \@sections; - my $encoded_sections = MongoDB::_Protocol::join_sections( @sections ); - ( $op_bson, $request_id ) = MongoDB::_Protocol::write_msg( $encoded_sections, undef ); + ( $op_bson, $request_id ) = MongoDB::_Protocol::write_msg( + $self->{bson_codec}, + undef, + @sections ); } else { ( $op_bson, $request_id ) = MongoDB::_Protocol::write_query( $self->{db_name} . '.$cmd', diff --git a/lib/MongoDB/Protocol/_Section.pm b/lib/MongoDB/Protocol/_Section.pm deleted file mode 100644 index 529d2bc2..00000000 --- a/lib/MongoDB/Protocol/_Section.pm +++ /dev/null @@ -1,202 +0,0 @@ -package MongoDB::Protocol::_Section; - -use Moo; -use Types::Standard qw( - ArrayRef - Str - Maybe - Enum -); -use MongoDB::_Types qw( - BSONCodec -); -use MongoDB::Error; - -use constant { - PERL58 => $] lt '5.010', -}; -use constant { - P_MSG_PL_1 => PERL58 ? "lZ*" : "l "C", - P_SECTION_SEQUENCE_SIZE => PERL58 ? "l" : "l<", -}; -use constant { - P_SECTION_HEADER => P_SECTION_PAYLOAD_TYPE . P_SECTION_SEQUENCE_SIZE, - P_SECTION_PAYLOAD_TYPE_LENGTH => length( pack P_SECTION_PAYLOAD_TYPE, 0 ), - P_SECTION_SEQUENCE_SIZE_LENGTH => length( pack P_SECTION_SEQUENCE_SIZE, 0 ), -}; - -use namespace::clean; - -has bson_codec => ( - is => 'ro', - required => 1, - isa => BSONCodec, -); - -# either passed in on creation or pulled from binary docs -has type => ( - is => 'lazy', - isa => Enum[ 0, 1 ], -); - -sub _build_type { - my $self = shift; - return $self->_decoded->[0] if $self->has_binary; - MongoDB::ProtocolError->throw('No type passed to Section'); -} - -# If not passed in, assume is undef (just makes this one easier) -has identifier => ( - is => 'lazy', - isa => Maybe[Str], -); - -sub _build_identifier { - my $self = shift; - return $self->_decoded->[1] if $self->has_binary; - return; -} - -# Either passed in, or created from the binary documents. -has documents => ( - is => 'lazy', - isa => ArrayRef, -); - -sub _build_documents { - my $self = shift; - my @docs; - if ( $self->has_binary ) { - @docs = ( - map { $self->bson_codec->decode_one( $_ ) } @{ $self->encoded_documents } - ); - } else { - MongoDB::ProtocolError->throw('No documents passed to Section'); - } - return \@docs; -} - -has encoded_documents => ( - is => 'lazy', - isa => ArrayRef, -); - -sub _build_encoded_documents { - my $self = shift; - my @docs; - if ( $self->has_binary ) { - ( undef, undef, @docs ) = @{ $self->_decoded }; - } else { - @docs = ( - map { $self->bson_codec->encode_one( $_ ) } @{ $self->documents } - ); - } - return \@docs; -} - -has binary => ( - is => 'lazy', - isa => Str, - predicate => 1, -); - -sub _build_binary { return shift->_encoded } - -has _encoded => ( - is => 'lazy', - isa => Str, -); - -sub _build__encoded { - my $self = shift; - - my $type = $self->type; - my $ident = $self->identifier; - my @docs = @{ $self->encoded_documents }; - - my $pl; - - if ( $type == 0 ) { - MongoDB::ProtocolError->throw( - "Creating an OP_MSG Section Payload 0 with multiple documents is not supported") - if scalar( @docs ) > 1; - $pl = $docs[0]; - } elsif ( $type == 1 ) { - # Add size and ident placeholders - $pl = pack( P_MSG_PL_1, 0, $ident ) - . join( '', @docs ); - # calculate size - substr( $pl, 0, 4, pack( P_SECTION_SEQUENCE_SIZE, length( $pl ) ) ); - } else { - MongoDB::ProtocolError->throw("Encode: Unsupported section payload type"); - } - - # Add payload type prefix - $pl = pack( P_SECTION_PAYLOAD_TYPE, $type ) . $pl; - - return $pl; -} - -has _decoded => ( - is => 'lazy', - isa => ArrayRef, -); - -sub _build__decoded { - my $self = shift; - MongoDB::ProtocolError->throw('Section requires binary to decode') - unless $self->has_binary; - my $enc = $self->binary; - my ( $type, $ident, @docs ); - - # first, extract the type - ( $type ) = unpack( 'C', $enc ); - my $payload = substr( $enc, P_SECTION_PAYLOAD_TYPE_LENGTH ); - - if ( $type == 0 ) { - # payload is actually the document - push @docs, $payload; - } elsif ( $type == 1 ) { - # Pull size off and double check - my ( $pl_size ) = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); - unless ( $pl_size == length( $payload ) ) { - MongoDB::ProtocolError->throw("Decode: Section size incorrect"); - } - $payload = substr( $payload, P_SECTION_SEQUENCE_SIZE_LENGTH ); - # Pull out then remove - ( $ident ) = unpack( 'Z*', $payload ); - $payload = substr( $payload, length ( pack 'Z*', $ident ) ); - - while ( length $payload ) { - my $doc_size = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); - my $doc = substr( $payload, 0, $doc_size ); - $payload = substr( $payload, $doc_size ); - push @docs, $doc; - } - } else { - MongoDB::ProtocolError->throw("Decode: Unsupported section payload type"); - } - - return [ $type, $ident, @docs ]; -} - -=head1 SYNOPSIS - - # From a known document set - MongoDB::Protocol::_Section->new( - bson_codec => BSON->new, - type => 0, - identifier => undef, - documents => [ $doc ], - ); - - # From a binary section - MongoDB::Protocol::_Section->new( - bson_codec => BSON->new, - binary => $bin, - ); - -=cut - -1; diff --git a/lib/MongoDB/Role/_CommandMonitoring.pm b/lib/MongoDB/Role/_CommandMonitoring.pm index b61b089e..143c3b84 100644 --- a/lib/MongoDB/Role/_CommandMonitoring.pm +++ b/lib/MongoDB/Role/_CommandMonitoring.pm @@ -275,10 +275,10 @@ sub _maybe_expand_sections { my @out; for my $section ( @$in ) { # If everything isnt a Section, drop out the original - return $in unless ref($section) eq 'MongoDB::Protocol::_Section'; + return $in unless ref($section) eq 'HASH' && defined $section->{type} && defined $section->{documents}; # Type 0 only ever has one document - push @out, @{ $section->documents->[0] } if $section->type == 0; - push @out, ( $section->identifier, $section->documents ) if $section->type == 1; + push @out, @{ $section->{documents}->[0] } if $section->{type} == 0; + push @out, ( $section->{identifier}, $section->{documents} ) if $section->{type} == 1; } return \@out; } diff --git a/lib/MongoDB/Role/_SingleBatchDocWrite.pm b/lib/MongoDB/Role/_SingleBatchDocWrite.pm index f15e19fd..617b1f84 100644 --- a/lib/MongoDB/Role/_SingleBatchDocWrite.pm +++ b/lib/MongoDB/Role/_SingleBatchDocWrite.pm @@ -172,8 +172,10 @@ sub _send_write_command { push @$cmd, ( '$db', $self->db_name ); my @sections = MongoDB::_Protocol::prepare_sections( $self->bson_codec, $cmd ); $cmd = \@sections; - my $encoded_sections = MongoDB::_Protocol::join_sections( @sections ); - ( $op_bson, $request_id ) = MongoDB::_Protocol::write_msg( $encoded_sections, undef ); + ( $op_bson, $request_id ) = MongoDB::_Protocol::write_msg( + $self->bson_codec, + undef, + @sections ); } else { # send command and get response document my $command = $self->bson_codec->encode_one( $cmd ); diff --git a/lib/MongoDB/_Protocol.pm b/lib/MongoDB/_Protocol.pm index 93247d6e..d1882efe 100644 --- a/lib/MongoDB/_Protocol.pm +++ b/lib/MongoDB/_Protocol.pm @@ -122,8 +122,6 @@ use constant { P_SECTION_SEQUENCE_SIZE_LENGTH => length( pack P_SECTION_SEQUENCE_SIZE, 0 ), }; -use MongoDB::Protocol::_Section; - =method prepare_sections( $cmd ) Takes a command, returns sections ready for joining @@ -139,48 +137,139 @@ sub prepare_sections { delete => 'deletes', ); + # TODO can $cmd be any other type? my ( $command, $collection, $ident, $docs, @other ) = @$cmd; if ( $split_commands{ $command } eq $ident ) { # Assumes only a single split on the commands return ( - MongoDB::Protocol::_Section->new( - bson_codec => $codec, - type => 0, - documents => [ [ $command, $collection, @other ] ] - ), - MongoDB::Protocol::_Section->new( - bson_codec => $codec, - type => 1, - identifier => $ident, - documents => $docs - ), + { + type => 0, + documents => [ [ $command, $collection, @other ] ], + }, + { + type => 1, + identifier => $ident, + documents => $docs, + } ); } else { # Not a recognised command to split, just set up ready for later return ( - MongoDB::Protocol::_Section->new( - bson_codec => $codec, - type => 0, - documents => [ $cmd ] - ), + { + type => 0, + documents => [ $cmd ], + } ); } } +=method encode_section + + MongoDB::_Protocol::encode_section( $codec, { + type => 0, # 0 or 1 + identifier => undef, # optional in type 0 + documents => [ $cmd ] # must be an array of documents + }); + +Takes a section hashref and encodes it for joining + +=cut + +sub encode_section { + my ( $codec, $section ) = @_; + + my $type = $section->{type}; + my $ident = $section->{identifier}; + my @raw_docs = @{ $section->{documents} }; + + my @docs = map { $codec->encode_one( $_ ) } @raw_docs; + + my $pl; + if ( $type == 0 ) { + MongoDB::ProtocolError->throw( + "Creating an OP_MSG Section Payload 0 with multiple documents is not supported") + if scalar( @docs ) > 1; + $pl = $docs[0]; + } elsif ( $type == 1 ) { + $pl = pack( P_MSG_PL_1, 0, $ident ) + . join( '', @docs ); + # calculate size + substr( $pl, 0, 4, pack( P_SECTION_SEQUENCE_SIZE, length( $pl ) ) ); + } else { + MongoDB::ProtocolError->throw("Encode: Unsupported section payload type"); + } + + # Prepend the section type + $pl = pack( P_SECTION_PAYLOAD_TYPE, $type ) . $pl; + + return $pl; +} + +=method decode_section + + MongoDB::_Protocol::decode_section( $codec, $section ) + +Takes an encoded section and decodes it, exactly the opposite of encode_section. + +=cut + +sub decode_section { + my ( $codec, $doc, $flag ) = @_; + my ( $type, $ident, @enc_docs ); + my $section = {}; + + ( $type ) = unpack( 'C', $doc ); + my $payload = substr( $doc, P_SECTION_PAYLOAD_TYPE_LENGTH ); + + $section->{ type } = $type; + + if ( $type == 0 ) { + # payload is a raw document + push @enc_docs, $payload; + } elsif ( $type == 1 ) { + # Pull size off and double check + my ( $pl_size ) = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); + unless ( $pl_size == length( $payload ) ) { + MongoDB::ProtocolError->throw("Decode: Section size incorrect"); + } + $payload = substr( $payload, P_SECTION_SEQUENCE_SIZE_LENGTH ); + # Pull out then remove + ( $ident ) = unpack( 'Z*', $payload ); + $section->{ identifier } = $ident; + $payload = substr( $payload, length ( pack 'Z*', $ident ) ); + + while ( length $payload ) { + my $doc_size = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); + my $doc = substr( $payload, 0, $doc_size ); + $payload = substr( $payload, $doc_size ); + push @enc_docs, $doc; + } + } else { + MongoDB::ProtocolError->throw("Decode: Unsupported section payload type"); + } + ## XXX We dont seem to need this decoded till later? flag here for testing + if ( $flag ) { + @enc_docs = map { $codec->decode_one( $_ ) } @enc_docs; + } + $section->{ documents } = \@enc_docs; + + return $section; +} + =method join_sections MongoDB::_Protocol::join_sections( [ 0, undef, $doc ], [ 1, 'documents', $doc, $doc2 ] ); -Joins an array of sections ready for passing to encode_sections. +Encodes and joins an array of sections. =cut sub join_sections { - my ( @sections ) = @_; + my ( $codec, @sections ) = @_; - my $msg = join ('', ( map { $_->binary } @sections ) ); + my $msg = join ('', ( map { encode_section( $codec, $_ ) } @sections ) ); return $msg; } @@ -194,6 +283,7 @@ Does the exact opposite of join_sections. sub split_sections { my $codec = shift; my $msg = shift; + my $decode_flag = shift; my @sections; while ( length $msg ) { # get first section length @@ -201,7 +291,8 @@ sub split_sections { # Add the payload type length as we reached over it for the length my $section = substr( $msg, 0, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); - push @sections, MongoDB::Protocol::_Section->new( bson_codec => $codec, binary => $section ); + + push @sections, decode_section( $codec, $section, $decode_flag ); $msg = substr( $msg, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); } @@ -215,19 +306,24 @@ use constant { }; sub write_msg { - my ( $sections, $flags ) = @_; + my ( $codec, $flags, @sections ) = @_; my $flagbits = 0; # checksum is reserved for future use if ( $flags ) { $flagbits = - ( $flags->{checksum_present} ? 1 << MSG_FB_CHECKSUM : 0 ) - | ( $flags->{more_to_come} ? 1 << MSG_FB_MORE_TO_COME : 0 ); + ( $flags->{checksum_present} ? 1 << + MSG_FB_CHECKSUM : 0 ) # Newline to stop highlighter crapping itself.... + | ( $flags->{more_to_come} ? 1 << + MSG_FB_MORE_TO_COME : 0 ); # Newline to stop highlighter crapping itself.... + } my $request_id = int( rand( MAX_REQUEST_ID ) ); + my $encoded_sections = join_sections( $codec, @sections ); + my $msg = pack( P_MSG, 0, $request_id, 0, OP_MSG, 0 ) - . $sections; + . $encoded_sections; substr( $msg, 0, 4, pack( P_INT32, length($msg) ) ); return ( $msg, $request_id ); } @@ -348,8 +444,10 @@ sub write_update { my $bitflags = 0; if ($flags) { $bitflags = - ( $flags->{upsert} ? 1 << U_UPSERT : 0 ) - | ( $flags->{multi} ? 1 << U_MULTI_UPDATE : 0 ); + ( $flags->{upsert} ? 1 << + U_UPSERT : 0 ) + | ( $flags->{multi} ? 1 << + U_MULTI_UPDATE : 0 ); } my $msg = @@ -377,7 +475,8 @@ sub write_insert { my $bitflags = 0; if ($flags) { - $bitflags = ( $flags->{continue_on_error} ? 1 << I_CONTINUE_ON_ERROR : 0 ); + $bitflags = ( $flags->{continue_on_error} ? 1 << + I_CONTINUE_ON_ERROR : 0 ); } my $msg = @@ -416,11 +515,16 @@ sub write_query { my $bitflags = 0; if ($flags) { $bitflags = - ( $flags->{tailable} ? 1 << Q_TAILABLE : 0 ) - | ( $flags->{slave_ok} ? 1 << Q_SLAVE_OK : 0 ) - | ( $flags->{await_data} ? 1 << Q_AWAIT_DATA : 0 ) - | ( $flags->{immortal} ? 1 << Q_NO_CURSOR_TIMEOUT : 0 ) - | ( $flags->{partial} ? 1 << Q_PARTIAL : 0 ); + ( $flags->{tailable} ? 1 << + Q_TAILABLE : 0 ) + | ( $flags->{slave_ok} ? 1 << + Q_SLAVE_OK : 0 ) + | ( $flags->{await_data} ? 1 << + Q_AWAIT_DATA : 0 ) + | ( $flags->{immortal} ? 1 << + Q_NO_CURSOR_TIMEOUT : 0 ) + | ( $flags->{partial} ? 1 << + Q_PARTIAL : 0 ); } my $request_id = int( rand( MAX_REQUEST_ID ) ); @@ -473,7 +577,8 @@ sub write_delete { my $bitflags = 0; if ($flags) { - $bitflags = ( $flags->{just_one} ? 1 << D_SINGLE_REMOVE : 0 ); + $bitflags = ( $flags->{just_one} ? 1 << + D_SINGLE_REMOVE : 0 ); } my $msg = @@ -539,12 +644,12 @@ sub parse_reply { if length($msg) < MIN_REPLY_LENGTH; $msg = try_uncompress($msg); - + my ( $len, $msg_id, $response_to, $opcode, $bitflags, $cursor_id, $starting_from, $number_returned ) = unpack( P_MSG, $msg ); - + # pre-check all conditions using a modifier in one statement for speed; # disambiguate afterwards only if an error exists @@ -578,7 +683,7 @@ sub parse_reply { checksum_present => vec( $bitflags, MSG_FB_CHECKSUM, 1 ), query_failure => vec( $bitflags, MSG_FB_MORE_TO_COME, 1 ), }, - docs => $sections[0]->encoded_documents->[0] + docs => $sections[0]->{documents}->[0] }; } else { # Yes its two unpacks but its just easier than mapping through to the right size diff --git a/t/protocol_op_msg.t b/t/protocol_op_msg.t index 1f8bfc9a..5437ab5c 100644 --- a/t/protocol_op_msg.t +++ b/t/protocol_op_msg.t @@ -19,7 +19,6 @@ use Test::More; use Test::Fatal; use MongoDB::_Protocol; -use MongoDB::Protocol::_Section; use BSON; my $codec = BSON->new(); @@ -38,26 +37,29 @@ subtest 'insert_doc' => sub { push @{$insert_doc}, ( '$db', 'someDatabase' ); my @packed_payloads = MongoDB::_Protocol::prepare_sections( $codec, $insert_doc ); my @expected_payloads = ( - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + { type => 0, - identifier => undef, - encoded_documents => [ "b\0\0\0\2insert\0\17\0\0\0collectionName\0\4writeConcern\0\36\0\0\0\0020\0\2\0\0\0w\0\0021\0\t\0\0\0majority\0\0\2\$db\0\r\0\0\0someDatabase\0\0" ] - ), - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + documents => [ + [ + insert => 'collectionName', + writeConcern => [ w => 'majority' ], + '$db' => 'someDatabase', + ] + ] + }, + { type => 1, identifier => "documents", - encoded_documents => [ - "%\0\0\0\2id\0\13\0\0\0Document#1\0\20example\0\1\0\0\0\0", - "%\0\0\0\2id\0\13\0\0\0Document#2\0\20example\0\2\0\0\0\0", - "%\0\0\0\2id\0\13\0\0\0Document#3\0\20example\0\3\0\0\0\0" + documents => [ + [ id => 'Document#1', example => 1 ], + [ id => 'Document#2', example => 2 ], + [ id => 'Document#3', example => 3 ] ], - ), + }, ); for my $i ( 0 .. $#expected_payloads ) { - is $packed_payloads[$i]->binary, $expected_payloads[$i]->binary, "section $i prepared correctly"; + is_deeply $packed_payloads[$i], $expected_payloads[$i], "section $i prepared correctly"; } }; # struct Section { @@ -72,104 +74,90 @@ subtest 'insert_doc' => sub { # }; # }; -my $raw_doc = [ test => 'document' ]; -my $doc = $codec->encode_one( $raw_doc ); +my $raw_doc = [ test => 'document' ]; +my $doc = $codec->encode_one( $raw_doc ); my $decoded_doc = $codec->decode_one( $doc ); subtest 'encode section' => sub { subtest 'payload 0' => sub { - my $got_section = MongoDB::Protocol::_Section->new( - bson_codec => $codec, + my $raw_section = { type => 0, - identifier => undef, - documents => [$raw_doc], - ); + documents => [ $raw_doc ], + }; + my $got_section = MongoDB::_Protocol::encode_section( $codec, $raw_section ); my $expected_section = "\0" . $doc; - is $got_section->binary, $expected_section, 'encode payload 0 correctly'; + is $got_section, $expected_section, 'encode payload 0 correctly'; - ok exception { MongoDB::Protocol::_Section->new( - bson_codec => $codec, + ok exception { MongoDB::_Protocol::encode_section( $codec, { type => 0, - identifier => undef, - documents => [$raw_doc, $raw_doc], - )->binary; }, 'multiple docs in payload 0 causes error'; + documents => [ $raw_doc, $raw_doc ] + }); }, 'multiple docs in payload 0 causes error'; }; subtest 'payload 1 single doc' => sub { - my $got_section = MongoDB::Protocol::_Section->new( - bson_codec => $codec, + my $raw_section = { type => 1, identifier => 'documents', - documents => [$raw_doc], - ); + documents => [ $raw_doc ], + }; + my $got_section = MongoDB::_Protocol::encode_section( $codec, $raw_section ); my $expected_section = "\1&\0\0\0" . "documents\0" . $doc; - is $got_section->binary, $expected_section, 'encode payload 1 correctly'; + is $got_section, $expected_section, 'encode payload 1 correctly'; }; subtest 'payload 1 multiple doc' => sub { - my $got_section = MongoDB::Protocol::_Section->new( - bson_codec => $codec, + my $raw_section = { type => 1, identifier => 'documents', - documents => [$raw_doc, $raw_doc], - ); + documents => [ $raw_doc, $raw_doc ], + }; + my $got_section = MongoDB::_Protocol::encode_section( $codec, $raw_section ); my $expected_section = "\1>\0\0\0" . "documents\0" . $doc . $doc; - is $got_section->binary, $expected_section, 'encode payload 1 correctly'; + is $got_section, $expected_section, 'encode payload 1 correctly'; }; }; subtest 'decode section' => sub { subtest 'payload 0' => sub { my $encoded = "\0" . $doc; - my $got_section = MongoDB::Protocol::_Section->new( - bson_codec => $codec, - binary => $encoded, - ); + my $got_section = MongoDB::_Protocol::decode_section( $codec, $encoded, 1 ); - is $got_section->type, 0, 'section type correct'; - is $got_section->identifier, undef, 'section identifier correct'; - is_deeply $got_section->documents, [ { test => 'document' } ], 'decoded document correctly'; + is $got_section->{type}, 0, 'section type correct'; + is $got_section->{identifier}, undef, 'section identifier correct'; + is_deeply $got_section->{documents}, [ { test => 'document' } ], 'decoded document correctly'; }; subtest 'payload 1' => sub { my $encoded = "\1&\0\0\0" ."documents\0" . $doc; - my $got_section = MongoDB::Protocol::_Section->new( - bson_codec => $codec, - binary => $encoded, - ); + my $got_section = MongoDB::_Protocol::decode_section( $codec, $encoded, 1 ); - is $got_section->type, 1, 'section type correct'; - is $got_section->identifier, 'documents', 'section identifier correct'; - is_deeply $got_section->documents, [ { test => 'document' } ], 'decoded document correctly'; + is $got_section->{type}, 1, 'section type correct'; + is $got_section->{identifier}, 'documents', 'section identifier correct'; + is_deeply $got_section->{documents}, [ { test => 'document' } ], 'decoded document correctly'; }; subtest 'payload 1 multiple docs' => sub { my $encoded = "\1>\0\0\0" ."documents\0" . $doc . $doc; - my $got_section = MongoDB::Protocol::_Section->new( - bson_codec => $codec, - binary => $encoded, - ); + my $got_section = MongoDB::_Protocol::decode_section( $codec, $encoded, 1 ); - is $got_section->type, 1, 'section type correct'; - is $got_section->identifier, 'documents', 'section identifier correct'; - is_deeply $got_section->documents, [ { test => 'document' }, { test => 'document' } ], 'decoded document correctly'; + is $got_section->{type}, 1, 'section type correct'; + is $got_section->{identifier}, 'documents', 'section identifier correct'; + is_deeply $got_section->{documents}, [ { test => 'document' }, { test => 'document' } ], 'decoded document correctly'; }; }; subtest 'join sections' => sub { subtest 'payload 0' => sub { my @sections = ( - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + { type => 0, - identifier => undef, - documents => [$raw_doc], - ) + documents => [ $raw_doc ], + } ); - my $got_sections = MongoDB::_Protocol::join_sections( @sections ); + my $got_sections = MongoDB::_Protocol::join_sections( $codec, @sections ); my $expected_sections = "\0" . $doc; is_deeply $got_sections, $expected_sections, 'joined correctly'; @@ -178,20 +166,17 @@ subtest 'join sections' => sub { subtest 'payload 0 + 1' => sub { subtest 'single document' => sub { my @sections = ( - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + { type => 0, - identifier => undef, - documents => [$raw_doc], - ), - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + documents => [ $raw_doc ], + }, + { type => 1, identifier => 'documents', - documents => [$raw_doc], - ), + documents => [ $raw_doc ], + }, ); - my $got_sections = MongoDB::_Protocol::join_sections( @sections ); + my $got_sections = MongoDB::_Protocol::join_sections( $codec, @sections ); my $expected_sections = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; is_deeply $got_sections, $expected_sections, 'joined correctly'; @@ -199,20 +184,17 @@ subtest 'join sections' => sub { subtest 'multiple documents' => sub { my @sections = ( - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + { type => 0, - identifier => undef, - documents => [$raw_doc], - ), - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + documents => [ $raw_doc ], + }, + { type => 1, identifier => 'documents', - documents => [$raw_doc, $raw_doc], - ), + documents => [ $raw_doc, $raw_doc ], + }, ); - my $got_sections = MongoDB::_Protocol::join_sections( @sections ); + my $got_sections = MongoDB::_Protocol::join_sections( $codec, @sections ); my $expected_sections = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; is_deeply $got_sections, $expected_sections, 'joined correctly'; @@ -222,26 +204,22 @@ subtest 'join sections' => sub { subtest 'payload 0 + multiple 1' => sub { subtest 'single document' => sub { my @sections = ( - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + { type => 0, - identifier => undef, - documents => [$raw_doc], - ), - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + documents => [ $raw_doc ], + }, + { type => 1, identifier => 'documents', - documents => [$raw_doc], - ), - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + documents => [ $raw_doc ], + }, + { type => 1, identifier => 'documents', - documents => [$raw_doc], - ), + documents => [ $raw_doc ], + }, ); - my $got_sections = MongoDB::_Protocol::join_sections( @sections ); + my $got_sections = MongoDB::_Protocol::join_sections( $codec, @sections ); my $expected_sections = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; is_deeply $got_sections, $expected_sections, 'joined correctly'; @@ -249,26 +227,22 @@ subtest 'join sections' => sub { subtest 'multiple documents' => sub { my @sections = ( - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + { type => 0, - identifier => undef, - documents => [$raw_doc], - ), - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + documents => [ $raw_doc ], + }, + { type => 1, identifier => 'documents', - documents => [$raw_doc, $raw_doc], - ), - MongoDB::Protocol::_Section->new( - bson_codec => $codec, + documents => [ $raw_doc, $raw_doc ], + }, + { type => 1, identifier => 'documents', - documents => [$raw_doc, $raw_doc], - ), + documents => [ $raw_doc, $raw_doc ], + }, ); - my $got_sections = MongoDB::_Protocol::join_sections( @sections ); + my $got_sections = MongoDB::_Protocol::join_sections( $codec, @sections ); my $expected_sections = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; is_deeply $got_sections, $expected_sections, 'joined correctly'; @@ -279,46 +253,46 @@ subtest 'join sections' => sub { subtest 'split sections' => sub { subtest 'payload 0' => sub { my $encoded = "\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded ); + my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded, 1 ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], ); for my $i ( 0 .. $#expected_sections ) { - is $got_sections[$i]->type, $expected_sections[$i][0], 'type correct'; - is $got_sections[$i]->identifier, $expected_sections[$i][1], 'identifier correct'; - is_deeply $got_sections[$i]->documents, $expected_sections[$i][2], 'documents correct'; + is $got_sections[$i]->{type}, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->{identifier}, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->{documents}, $expected_sections[$i][2], 'documents correct'; } }; subtest 'payload 0 + 1' => sub { subtest 'single document' => sub { my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded ); + my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded, 1 ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc ] ], ); for my $i ( 0 .. $#expected_sections ) { - is $got_sections[$i]->type, $expected_sections[$i][0], 'type correct'; - is $got_sections[$i]->identifier, $expected_sections[$i][1], 'identifier correct'; - is_deeply $got_sections[$i]->documents, $expected_sections[$i][2], 'documents correct'; + is $got_sections[$i]->{type}, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->{identifier}, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->{documents}, $expected_sections[$i][2], 'documents correct'; } }; subtest 'multiple documents' => sub { my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded ); + my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded, 1 ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ], ); for my $i ( 0 .. $#expected_sections ) { - is $got_sections[$i]->type, $expected_sections[$i][0], 'type correct'; - is $got_sections[$i]->identifier, $expected_sections[$i][1], 'identifier correct'; - is_deeply $got_sections[$i]->documents, $expected_sections[$i][2], 'documents correct'; + is $got_sections[$i]->{type}, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->{identifier}, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->{documents}, $expected_sections[$i][2], 'documents correct'; } }; }; @@ -326,7 +300,7 @@ subtest 'split sections' => sub { subtest 'payload 0 + multiple 1' => sub { subtest 'single document' => sub { my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded ); + my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded, 1 ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc ] ], @@ -334,15 +308,15 @@ subtest 'split sections' => sub { ); for my $i ( 0 .. $#expected_sections ) { - is $got_sections[$i]->type, $expected_sections[$i][0], 'type correct'; - is $got_sections[$i]->identifier, $expected_sections[$i][1], 'identifier correct'; - is_deeply $got_sections[$i]->documents, $expected_sections[$i][2], 'documents correct'; + is $got_sections[$i]->{type}, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->{identifier}, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->{documents}, $expected_sections[$i][2], 'documents correct'; } }; subtest 'multiple documents' => sub { my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded ); + my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded, 1 ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ], @@ -350,9 +324,9 @@ subtest 'split sections' => sub { ); for my $i ( 0 .. $#expected_sections ) { - is $got_sections[$i]->type, $expected_sections[$i][0], 'type correct'; - is $got_sections[$i]->identifier, $expected_sections[$i][1], 'identifier correct'; - is_deeply $got_sections[$i]->documents, $expected_sections[$i][2], 'documents correct'; + is $got_sections[$i]->{type}, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->{identifier}, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->{documents}, $expected_sections[$i][2], 'documents correct'; } }; }; From add02a38b4a4f2c96dac0c623597f5ff31d95c29 Mon Sep 17 00:00:00 2001 From: Thomas Bloor Date: Tue, 31 Jul 2018 12:12:29 +0100 Subject: [PATCH 6/9] PERL-789 Second pass of OP_MSG support --- devel/config/mongod-4.0.yml | 6 ++++ lib/MongoDB/Op/_Command.pm | 15 ++++---- lib/MongoDB/Role/_CommandMonitoring.pm | 14 -------- lib/MongoDB/Role/_SingleBatchDocWrite.pm | 14 ++++---- lib/MongoDB/_Protocol.pm | 44 ++++++++++++++++++++---- t/lib/MongoDBTest.pm | 2 +- 6 files changed, 56 insertions(+), 39 deletions(-) create mode 100644 devel/config/mongod-4.0.yml diff --git a/devel/config/mongod-4.0.yml b/devel/config/mongod-4.0.yml new file mode 100644 index 00000000..d11f7c6e --- /dev/null +++ b/devel/config/mongod-4.0.yml @@ -0,0 +1,6 @@ +--- +type: single +default_args: -v --bind_ip 0.0.0.0 --enableMajorityReadConcern +default_version: 4.0 +mongod: + - name: host1 diff --git a/lib/MongoDB/Op/_Command.pm b/lib/MongoDB/Op/_Command.pm index fc545b07..da4dadbb 100644 --- a/lib/MongoDB/Op/_Command.pm +++ b/lib/MongoDB/Op/_Command.pm @@ -27,6 +27,7 @@ use MongoDB::_Constants; use MongoDB::_Types qw( Document ReadPreference + to_IxHash ); use List::Util qw/first/; use Types::Standard qw( @@ -89,15 +90,11 @@ sub execute { my ( $op_bson, $request_id ); - if ( $ENV{DO_OP_MSG} ) {#$link->supports_op_msg ) { - # TODO Cover other document object types - push @{$self->{query}}, ( '$db', $self->db_name ); - my @sections = MongoDB::_Protocol::prepare_sections( $self->{bson_codec}, $self->{query} ); - $self->{query} = \@sections; - ( $op_bson, $request_id ) = MongoDB::_Protocol::write_msg( - $self->{bson_codec}, - undef, - @sections ); + if ( $link->supports_op_msg ) { + $self->{query} = to_IxHash( $self->{query} ); + $self->{query}->Push( '$db', $self->db_name ); + ( $op_bson, $request_id ) = + MongoDB::_Protocol::write_msg( $self->{bson_codec}, undef, $self->{query} ); } else { ( $op_bson, $request_id ) = MongoDB::_Protocol::write_query( $self->{db_name} . '.$cmd', diff --git a/lib/MongoDB/Role/_CommandMonitoring.pm b/lib/MongoDB/Role/_CommandMonitoring.pm index 143c3b84..a08d4e8c 100644 --- a/lib/MongoDB/Role/_CommandMonitoring.pm +++ b/lib/MongoDB/Role/_CommandMonitoring.pm @@ -256,7 +256,6 @@ sub _to_tied_ixhash { my $type = ref($in); my %out; if ( $type eq 'ARRAY' ) { - $in = _maybe_expand_sections( $in ); # earlier type checks should ensure even elements tie %out, "Tie::IxHash", map { _decode_preencoded($_) } @$in; } @@ -270,17 +269,4 @@ sub _to_tied_ixhash { return \%out; } -sub _maybe_expand_sections { - my $in = shift; - my @out; - for my $section ( @$in ) { - # If everything isnt a Section, drop out the original - return $in unless ref($section) eq 'HASH' && defined $section->{type} && defined $section->{documents}; - # Type 0 only ever has one document - push @out, @{ $section->{documents}->[0] } if $section->{type} == 0; - push @out, ( $section->{identifier}, $section->{documents} ) if $section->{type} == 1; - } - return \@out; -} - 1; diff --git a/lib/MongoDB/Role/_SingleBatchDocWrite.pm b/lib/MongoDB/Role/_SingleBatchDocWrite.pm index 617b1f84..655f50cc 100644 --- a/lib/MongoDB/Role/_SingleBatchDocWrite.pm +++ b/lib/MongoDB/Role/_SingleBatchDocWrite.pm @@ -30,6 +30,7 @@ use MongoDB::_Constants; use MongoDB::_Protocol; use MongoDB::_Types qw( WriteConcern + to_IxHash ); use namespace::clean; @@ -168,14 +169,11 @@ sub _send_write_command { $self->_apply_session_and_cluster_time( $link, \$cmd ); my ( $op_bson, $request_id ); - if ( $ENV{DO_OP_MSG} ) {#$link->supports_op_msg ) { - push @$cmd, ( '$db', $self->db_name ); - my @sections = MongoDB::_Protocol::prepare_sections( $self->bson_codec, $cmd ); - $cmd = \@sections; - ( $op_bson, $request_id ) = MongoDB::_Protocol::write_msg( - $self->bson_codec, - undef, - @sections ); + if ( $link->supports_op_msg ) { + $cmd = to_IxHash( $cmd ); + $cmd->Push( '$db', $self->db_name ); + ( $op_bson, $request_id ) = + MongoDB::_Protocol::write_msg( $self->bson_codec, undef, $cmd ); } else { # send command and get response document my $command = $self->bson_codec->encode_one( $cmd ); diff --git a/lib/MongoDB/_Protocol.pm b/lib/MongoDB/_Protocol.pm index d1882efe..a3fd7a96 100644 --- a/lib/MongoDB/_Protocol.pm +++ b/lib/MongoDB/_Protocol.pm @@ -23,6 +23,7 @@ our $VERSION = 'v2.0.1'; use MongoDB::_Constants; use MongoDB::Error; +use MongoDB::_Types qw/ to_IxHash /; use Compress::Zlib (); @@ -137,15 +138,37 @@ sub prepare_sections { delete => 'deletes', ); - # TODO can $cmd be any other type? - my ( $command, $collection, $ident, $docs, @other ) = @$cmd; + $cmd = to_IxHash( $cmd ); + my ( $command, $ident ); + for my $cmd_key ( keys %split_commands ) { + if ( defined $cmd->FETCH( $cmd_key ) ) { + $command = $cmd_key; + if ( defined $cmd->FETCH( $command ) ) { + $ident = $split_commands{ $command }; + } + last; + } + } - if ( $split_commands{ $command } eq $ident ) { - # Assumes only a single split on the commands + if ( defined $ident + && $split_commands{ $command } eq $ident ) { + my $collection = $cmd->FETCH( $command ); + my $docs = $cmd->FETCH( $ident ); + # my ( undef, $collection ) = $cmd->Shift; + # my ( undef, $docs ) = $cmd->Shift; + # # Assumes only a single split on the commands return ( { type => 0, - documents => [ [ $command, $collection, @other ] ], + documents => [ [ + $command, + $collection, + # Done specifically to not alter $cmd + map { $_ eq $command || $_ eq $ident + ? () + : ( $_, $cmd->FETCH( $_ ) ) + } $cmd->Keys() + ] ], }, { type => 1, @@ -306,7 +329,7 @@ use constant { }; sub write_msg { - my ( $codec, $flags, @sections ) = @_; + my ( $codec, $flags, $cmd ) = @_; my $flagbits = 0; # checksum is reserved for future use if ( $flags ) { @@ -320,7 +343,14 @@ sub write_msg { my $request_id = int( rand( MAX_REQUEST_ID ) ); - my $encoded_sections = join_sections( $codec, @sections ); + my $encoded_sections; + eval { my @sections = prepare_sections( $codec, $cmd ); + + $encoded_sections = join_sections( $codec, @sections ); }; + if ($@) { + ::Dwarn $cmd; + die $@; + } my $msg = pack( P_MSG, 0, $request_id, 0, OP_MSG, 0 ) . $encoded_sections; diff --git a/t/lib/MongoDBTest.pm b/t/lib/MongoDBTest.pm index 7e9d03c2..58891c2e 100644 --- a/t/lib/MongoDBTest.pm +++ b/t/lib/MongoDBTest.pm @@ -124,7 +124,7 @@ sub get_capped { sub skip_unless_mongod { eval { - my $conn = build_client( server_selection_timeout_ms => 1000 ); + my $conn = build_client( server_selection_timeout_ms => 10000 ); my $topo = $conn->_topology; $topo->scan_all_servers; my $link; From de35248afe3913373e4500276521e9d0f422428a Mon Sep 17 00:00:00 2001 From: Thomas Bloor Date: Tue, 31 Jul 2018 12:36:23 +0100 Subject: [PATCH 7/9] PERL-789 fix protocol for OP_MSG to pull command out properly --- lib/MongoDB/_Protocol.pm | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/lib/MongoDB/_Protocol.pm b/lib/MongoDB/_Protocol.pm index a3fd7a96..e4fe6cb4 100644 --- a/lib/MongoDB/_Protocol.pm +++ b/lib/MongoDB/_Protocol.pm @@ -139,16 +139,10 @@ sub prepare_sections { ); $cmd = to_IxHash( $cmd ); - my ( $command, $ident ); - for my $cmd_key ( keys %split_commands ) { - if ( defined $cmd->FETCH( $cmd_key ) ) { - $command = $cmd_key; - if ( defined $cmd->FETCH( $command ) ) { - $ident = $split_commands{ $command }; - } - last; - } - } + + # Command is always first key in cmd + my $command = do { my @keys = $cmd->Keys; $keys[0] }; + my $ident = $split_commands{ $command }; if ( defined $ident && $split_commands{ $command } eq $ident ) { @@ -343,14 +337,10 @@ sub write_msg { my $request_id = int( rand( MAX_REQUEST_ID ) ); - my $encoded_sections; - eval { my @sections = prepare_sections( $codec, $cmd ); + my @sections = prepare_sections( $codec, $cmd ); + + my $encoded_sections = join_sections( $codec, @sections ); - $encoded_sections = join_sections( $codec, @sections ); }; - if ($@) { - ::Dwarn $cmd; - die $@; - } my $msg = pack( P_MSG, 0, $request_id, 0, OP_MSG, 0 ) . $encoded_sections; From 5f02d74638c6f606d81191d8523586bc49e75a25 Mon Sep 17 00:00:00 2001 From: Thomas Bloor Date: Thu, 23 Aug 2018 15:13:35 +0100 Subject: [PATCH 8/9] PERL-789 Work from code review --- lib/MongoDB/_Protocol.pm | 113 +++++++++++++----------------------- t/protocol_op_msg.t | 122 +++------------------------------------ 2 files changed, 47 insertions(+), 188 deletions(-) diff --git a/lib/MongoDB/_Protocol.pm b/lib/MongoDB/_Protocol.pm index e4fe6cb4..3fc5bb61 100644 --- a/lib/MongoDB/_Protocol.pm +++ b/lib/MongoDB/_Protocol.pm @@ -144,21 +144,18 @@ sub prepare_sections { my $command = do { my @keys = $cmd->Keys; $keys[0] }; my $ident = $split_commands{ $command }; - if ( defined $ident - && $split_commands{ $command } eq $ident ) { + if ( defined $ident ) { my $collection = $cmd->FETCH( $command ); my $docs = $cmd->FETCH( $ident ); - # my ( undef, $collection ) = $cmd->Shift; - # my ( undef, $docs ) = $cmd->Shift; - # # Assumes only a single split on the commands + # Assumes only a single split on the commands return ( { type => 0, documents => [ [ - $command, - $collection, - # Done specifically to not alter $cmd - map { $_ eq $command || $_ eq $ident + # Done specifically to not alter $cmd. + # The command ($command from earlier) is assumed to be + # first in the Keys set + map { $_ eq $ident ? () : ( $_, $cmd->FETCH( $_ ) ) } $cmd->Keys() @@ -198,15 +195,11 @@ sub encode_section { my $type = $section->{type}; my $ident = $section->{identifier}; - my @raw_docs = @{ $section->{documents} }; - - my @docs = map { $codec->encode_one( $_ ) } @raw_docs; + my @docs = map { $codec->encode_one( $_ ) } @{ $section->{documents} }; my $pl; if ( $type == 0 ) { - MongoDB::ProtocolError->throw( - "Creating an OP_MSG Section Payload 0 with multiple documents is not supported") - if scalar( @docs ) > 1; + # Assume a single doc if payload type is 0 $pl = $docs[0]; } elsif ( $type == 1 ) { $pl = pack( P_MSG_PL_1, 0, $ident ) @@ -232,7 +225,7 @@ Takes an encoded section and decodes it, exactly the opposite of encode_section. =cut sub decode_section { - my ( $codec, $doc, $flag ) = @_; + my ( $doc, $codec ) = @_; my ( $type, $ident, @enc_docs ); my $section = {}; @@ -241,20 +234,22 @@ sub decode_section { $section->{ type } = $type; + # Pull size off and double check. Size is in the same place regardless of + # payload type, as its a similar struct to a raw document + my ( $pl_size ) = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); + unless ( $pl_size == length( $payload ) ) { + MongoDB::ProtocolError->throw("Decode: Section size incorrect"); + } + if ( $type == 0 ) { # payload is a raw document push @enc_docs, $payload; } elsif ( $type == 1 ) { - # Pull size off and double check - my ( $pl_size ) = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); - unless ( $pl_size == length( $payload ) ) { - MongoDB::ProtocolError->throw("Decode: Section size incorrect"); - } $payload = substr( $payload, P_SECTION_SEQUENCE_SIZE_LENGTH ); # Pull out then remove ( $ident ) = unpack( 'Z*', $payload ); $section->{ identifier } = $ident; - $payload = substr( $payload, length ( pack 'Z*', $ident ) ); + $payload = substr( $payload, length ( $ident ) + 1 ); # add one for null termination while ( length $payload ) { my $doc_size = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); @@ -265,8 +260,8 @@ sub decode_section { } else { MongoDB::ProtocolError->throw("Decode: Unsupported section payload type"); } - ## XXX We dont seem to need this decoded till later? flag here for testing - if ( $flag ) { + ## XXX MongoDB::Role::_OpReplyParser normally does decoding - this is used for testing mainly + if ( $codec ) { @enc_docs = map { $codec->decode_one( $_ ) } @enc_docs; } $section->{ documents } = \@enc_docs; @@ -274,33 +269,16 @@ sub decode_section { return $section; } -=method join_sections - - MongoDB::_Protocol::join_sections( - [ 0, undef, $doc ], [ 1, 'documents', $doc, $doc2 ] ); - -Encodes and joins an array of sections. - -=cut - -sub join_sections { - my ( $codec, @sections ) = @_; - - my $msg = join ('', ( map { encode_section( $codec, $_ ) } @sections ) ); - - return $msg; -} - =method split_sections( $msg ) -Does the exact opposite of join_sections. +Splits sections based on their payload length header. Returns an array of +sections in packed form =cut sub split_sections { - my $codec = shift; my $msg = shift; - my $decode_flag = shift; + my $codec = shift; my @sections; while ( length $msg ) { # get first section length @@ -309,7 +287,7 @@ sub split_sections { # Add the payload type length as we reached over it for the length my $section = substr( $msg, 0, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); - push @sections, decode_section( $codec, $section, $decode_flag ); + push @sections, decode_section( $section, $codec ); $msg = substr( $msg, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); } @@ -328,19 +306,15 @@ sub write_msg { # checksum is reserved for future use if ( $flags ) { $flagbits = - ( $flags->{checksum_present} ? 1 << - MSG_FB_CHECKSUM : 0 ) # Newline to stop highlighter crapping itself.... - | ( $flags->{more_to_come} ? 1 << - MSG_FB_MORE_TO_COME : 0 ); # Newline to stop highlighter crapping itself.... - + ( $flags->{checksum_present} ? 1 << MSG_FB_CHECKSUM : 0 ) + | ( $flags->{more_to_come} ? 1 << MSG_FB_MORE_TO_COME : 0 ); } my $request_id = int( rand( MAX_REQUEST_ID ) ); my @sections = prepare_sections( $codec, $cmd ); - my $encoded_sections = join_sections( $codec, @sections ); - + my $encoded_sections = join ('', ( map { encode_section( $codec, $_ ) } @sections ) ); my $msg = pack( P_MSG, 0, $request_id, 0, OP_MSG, 0 ) . $encoded_sections; @@ -464,10 +438,8 @@ sub write_update { my $bitflags = 0; if ($flags) { $bitflags = - ( $flags->{upsert} ? 1 << - U_UPSERT : 0 ) - | ( $flags->{multi} ? 1 << - U_MULTI_UPDATE : 0 ); + ( $flags->{upsert} ? 1 << U_UPSERT : 0 ) + | ( $flags->{multi} ? 1 << U_MULTI_UPDATE : 0 ); } my $msg = @@ -495,8 +467,7 @@ sub write_insert { my $bitflags = 0; if ($flags) { - $bitflags = ( $flags->{continue_on_error} ? 1 << - I_CONTINUE_ON_ERROR : 0 ); + $bitflags = ( $flags->{continue_on_error} ? 1 << I_CONTINUE_ON_ERROR : 0 ); } my $msg = @@ -535,16 +506,11 @@ sub write_query { my $bitflags = 0; if ($flags) { $bitflags = - ( $flags->{tailable} ? 1 << - Q_TAILABLE : 0 ) - | ( $flags->{slave_ok} ? 1 << - Q_SLAVE_OK : 0 ) - | ( $flags->{await_data} ? 1 << - Q_AWAIT_DATA : 0 ) - | ( $flags->{immortal} ? 1 << - Q_NO_CURSOR_TIMEOUT : 0 ) - | ( $flags->{partial} ? 1 << - Q_PARTIAL : 0 ); + ( $flags->{tailable} ? 1 << Q_TAILABLE : 0 ) + | ( $flags->{slave_ok} ? 1 << Q_SLAVE_OK : 0 ) + | ( $flags->{await_data} ? 1 << Q_AWAIT_DATA : 0 ) + | ( $flags->{immortal} ? 1 << Q_NO_CURSOR_TIMEOUT : 0 ) + | ( $flags->{partial} ? 1 << Q_PARTIAL : 0 ); } my $request_id = int( rand( MAX_REQUEST_ID ) ); @@ -597,8 +563,7 @@ sub write_delete { my $bitflags = 0; if ($flags) { - $bitflags = ( $flags->{just_one} ? 1 << - D_SINGLE_REMOVE : 0 ); + $bitflags = ( $flags->{just_one} ? 1 << D_SINGLE_REMOVE : 0 ); } my $msg = @@ -694,15 +659,15 @@ sub parse_reply { if ( $opcode == OP_MSG ) { - MongoDB::InternalError->throw('OP_MSG requires codec for parse_reply') unless defined $codec; - # TODO Extract and check checksum - my @sections = split_sections( $codec, substr $msg, P_MSG_PREFIX_LENGTH ); + # XXX Extract and check checksum - future support of crc32c + my @sections = split_sections( substr( $msg, P_MSG_PREFIX_LENGTH ) ); # We have none of the other stuff? maybe flags... and an array of docs? erm return { flags => { checksum_present => vec( $bitflags, MSG_FB_CHECKSUM, 1 ), - query_failure => vec( $bitflags, MSG_FB_MORE_TO_COME, 1 ), + more_to_come => vec( $bitflags, MSG_FB_MORE_TO_COME, 1 ), }, + # XXX Assumes the server never sends a type 1 payload. May change in future docs => $sections[0]->{documents}->[0] }; } else { diff --git a/t/protocol_op_msg.t b/t/protocol_op_msg.t index 5437ab5c..a66b3e69 100644 --- a/t/protocol_op_msg.t +++ b/t/protocol_op_msg.t @@ -88,11 +88,6 @@ subtest 'encode section' => sub { my $expected_section = "\0" . $doc; is $got_section, $expected_section, 'encode payload 0 correctly'; - - ok exception { MongoDB::_Protocol::encode_section( $codec, { - type => 0, - documents => [ $raw_doc, $raw_doc ] - }); }, 'multiple docs in payload 0 causes error'; }; subtest 'payload 1 single doc' => sub { @@ -123,7 +118,7 @@ subtest 'encode section' => sub { subtest 'decode section' => sub { subtest 'payload 0' => sub { my $encoded = "\0" . $doc; - my $got_section = MongoDB::_Protocol::decode_section( $codec, $encoded, 1 ); + my $got_section = MongoDB::_Protocol::decode_section( $encoded, $codec ); is $got_section->{type}, 0, 'section type correct'; is $got_section->{identifier}, undef, 'section identifier correct'; @@ -132,7 +127,7 @@ subtest 'decode section' => sub { subtest 'payload 1' => sub { my $encoded = "\1&\0\0\0" ."documents\0" . $doc; - my $got_section = MongoDB::_Protocol::decode_section( $codec, $encoded, 1 ); + my $got_section = MongoDB::_Protocol::decode_section( $encoded, $codec ); is $got_section->{type}, 1, 'section type correct'; is $got_section->{identifier}, 'documents', 'section identifier correct'; @@ -141,7 +136,7 @@ subtest 'decode section' => sub { subtest 'payload 1 multiple docs' => sub { my $encoded = "\1>\0\0\0" ."documents\0" . $doc . $doc; - my $got_section = MongoDB::_Protocol::decode_section( $codec, $encoded, 1 ); + my $got_section = MongoDB::_Protocol::decode_section( $encoded, $codec ); is $got_section->{type}, 1, 'section type correct'; is $got_section->{identifier}, 'documents', 'section identifier correct'; @@ -149,111 +144,10 @@ subtest 'decode section' => sub { }; }; -subtest 'join sections' => sub { - subtest 'payload 0' => sub { - my @sections = ( - { - type => 0, - documents => [ $raw_doc ], - } - ); - my $got_sections = MongoDB::_Protocol::join_sections( $codec, @sections ); - my $expected_sections = "\0" . $doc; - - is_deeply $got_sections, $expected_sections, 'joined correctly'; - }; - - subtest 'payload 0 + 1' => sub { - subtest 'single document' => sub { - my @sections = ( - { - type => 0, - documents => [ $raw_doc ], - }, - { - type => 1, - identifier => 'documents', - documents => [ $raw_doc ], - }, - ); - my $got_sections = MongoDB::_Protocol::join_sections( $codec, @sections ); - my $expected_sections = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; - - is_deeply $got_sections, $expected_sections, 'joined correctly'; - }; - - subtest 'multiple documents' => sub { - my @sections = ( - { - type => 0, - documents => [ $raw_doc ], - }, - { - type => 1, - identifier => 'documents', - documents => [ $raw_doc, $raw_doc ], - }, - ); - my $got_sections = MongoDB::_Protocol::join_sections( $codec, @sections ); - my $expected_sections = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; - - is_deeply $got_sections, $expected_sections, 'joined correctly'; - }; - }; - - subtest 'payload 0 + multiple 1' => sub { - subtest 'single document' => sub { - my @sections = ( - { - type => 0, - documents => [ $raw_doc ], - }, - { - type => 1, - identifier => 'documents', - documents => [ $raw_doc ], - }, - { - type => 1, - identifier => 'documents', - documents => [ $raw_doc ], - }, - ); - my $got_sections = MongoDB::_Protocol::join_sections( $codec, @sections ); - my $expected_sections = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; - - is_deeply $got_sections, $expected_sections, 'joined correctly'; - }; - - subtest 'multiple documents' => sub { - my @sections = ( - { - type => 0, - documents => [ $raw_doc ], - }, - { - type => 1, - identifier => 'documents', - documents => [ $raw_doc, $raw_doc ], - }, - { - type => 1, - identifier => 'documents', - documents => [ $raw_doc, $raw_doc ], - }, - ); - my $got_sections = MongoDB::_Protocol::join_sections( $codec, @sections ); - my $expected_sections = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; - - is_deeply $got_sections, $expected_sections, 'joined correctly'; - }; - }; -}; - subtest 'split sections' => sub { subtest 'payload 0' => sub { my $encoded = "\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded, 1 ); + my @got_sections = MongoDB::_Protocol::split_sections( $encoded, $codec ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], ); @@ -268,7 +162,7 @@ subtest 'split sections' => sub { subtest 'payload 0 + 1' => sub { subtest 'single document' => sub { my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded, 1 ); + my @got_sections = MongoDB::_Protocol::split_sections( $encoded, $codec ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc ] ], @@ -283,7 +177,7 @@ subtest 'split sections' => sub { subtest 'multiple documents' => sub { my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded, 1 ); + my @got_sections = MongoDB::_Protocol::split_sections( $encoded, $codec ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ], @@ -300,7 +194,7 @@ subtest 'split sections' => sub { subtest 'payload 0 + multiple 1' => sub { subtest 'single document' => sub { my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded, 1 ); + my @got_sections = MongoDB::_Protocol::split_sections( $encoded, $codec ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc ] ], @@ -316,7 +210,7 @@ subtest 'split sections' => sub { subtest 'multiple documents' => sub { my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $codec, $encoded, 1 ); + my @got_sections = MongoDB::_Protocol::split_sections( $encoded, $codec ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ], From b250098c146d9498cf54b17a2526789195ae8c48 Mon Sep 17 00:00:00 2001 From: Thomas Bloor Date: Mon, 3 Sep 2018 17:48:22 +0100 Subject: [PATCH 9/9] PERL-789 Remove codec from reply parsing --- lib/MongoDB/Op/_Command.pm | 2 +- lib/MongoDB/Role/_SingleBatchDocWrite.pm | 2 +- lib/MongoDB/_Protocol.pm | 14 +++------ t/protocol_op_msg.t | 36 ++++++++++++++++++------ 4 files changed, 34 insertions(+), 20 deletions(-) diff --git a/lib/MongoDB/Op/_Command.pm b/lib/MongoDB/Op/_Command.pm index da4dadbb..bb76cfc5 100644 --- a/lib/MongoDB/Op/_Command.pm +++ b/lib/MongoDB/Op/_Command.pm @@ -119,7 +119,7 @@ sub execute { my $result; eval { $link->write( $op_bson, \%write_opt ), - ( $result = MongoDB::_Protocol::parse_reply( $link->read, $request_id, $self->{bson_codec} ) ); + ( $result = MongoDB::_Protocol::parse_reply( $link->read, $request_id ) ); }; if ( my $err = $@ ) { $self->_update_session_connection_error( $err ); diff --git a/lib/MongoDB/Role/_SingleBatchDocWrite.pm b/lib/MongoDB/Role/_SingleBatchDocWrite.pm index 655f50cc..3082fedd 100644 --- a/lib/MongoDB/Role/_SingleBatchDocWrite.pm +++ b/lib/MongoDB/Role/_SingleBatchDocWrite.pm @@ -196,7 +196,7 @@ sub _send_write_command { my $result; eval { $link->write( $op_bson ), - ( $result = MongoDB::_Protocol::parse_reply( $link->read, $request_id, $self->bson_codec ) ); + ( $result = MongoDB::_Protocol::parse_reply( $link->read, $request_id ) ); }; if ( my $err = $@ ) { $self->_update_session_connection_error( $err ); diff --git a/lib/MongoDB/_Protocol.pm b/lib/MongoDB/_Protocol.pm index 3fc5bb61..67abc687 100644 --- a/lib/MongoDB/_Protocol.pm +++ b/lib/MongoDB/_Protocol.pm @@ -218,14 +218,14 @@ sub encode_section { =method decode_section - MongoDB::_Protocol::decode_section( $codec, $section ) + MongoDB::_Protocol::decode_section( $section ) Takes an encoded section and decodes it, exactly the opposite of encode_section. =cut sub decode_section { - my ( $doc, $codec ) = @_; + my ( $doc ) = @_; my ( $type, $ident, @enc_docs ); my $section = {}; @@ -260,10 +260,6 @@ sub decode_section { } else { MongoDB::ProtocolError->throw("Decode: Unsupported section payload type"); } - ## XXX MongoDB::Role::_OpReplyParser normally does decoding - this is used for testing mainly - if ( $codec ) { - @enc_docs = map { $codec->decode_one( $_ ) } @enc_docs; - } $section->{ documents } = \@enc_docs; return $section; @@ -278,7 +274,6 @@ sections in packed form sub split_sections { my $msg = shift; - my $codec = shift; my @sections; while ( length $msg ) { # get first section length @@ -287,7 +282,7 @@ sub split_sections { # Add the payload type length as we reached over it for the length my $section = substr( $msg, 0, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); - push @sections, decode_section( $section, $codec ); + push @sections, decode_section( $section ); $msg = substr( $msg, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); } @@ -623,8 +618,7 @@ use constant { }; sub parse_reply { - my ( $msg, $request_id, $codec ) = @_; - + my ( $msg, $request_id ) = @_; MongoDB::ProtocolError->throw("response was truncated") if length($msg) < MIN_REPLY_LENGTH; diff --git a/t/protocol_op_msg.t b/t/protocol_op_msg.t index a66b3e69..53f668d7 100644 --- a/t/protocol_op_msg.t +++ b/t/protocol_op_msg.t @@ -23,6 +23,26 @@ use BSON; my $codec = BSON->new(); +sub decode_with_codec { + my $section = MongoDB::_Protocol::decode_section( @_ ); + my @docs = map { $codec->decode_one( $_ ) } @{ $section->{documents} }; + $section->{documents} = \@docs; + return $section; +} + +sub split_with_codec { + my @sections = MongoDB::_Protocol::split_sections( @_ ); + @sections = map { + my $cur = $_; + my @docs = map { + $codec->decode_one( $_ ) + } @{ $cur->{documents} }; + $cur->{documents} = \@docs; + $cur + } @sections; + return @sections; +} + subtest 'insert_doc' => sub { my $insert_doc = [ insert => 'collectionName', @@ -118,7 +138,7 @@ subtest 'encode section' => sub { subtest 'decode section' => sub { subtest 'payload 0' => sub { my $encoded = "\0" . $doc; - my $got_section = MongoDB::_Protocol::decode_section( $encoded, $codec ); + my $got_section = decode_with_codec( $encoded ); is $got_section->{type}, 0, 'section type correct'; is $got_section->{identifier}, undef, 'section identifier correct'; @@ -127,7 +147,7 @@ subtest 'decode section' => sub { subtest 'payload 1' => sub { my $encoded = "\1&\0\0\0" ."documents\0" . $doc; - my $got_section = MongoDB::_Protocol::decode_section( $encoded, $codec ); + my $got_section = decode_with_codec( $encoded ); is $got_section->{type}, 1, 'section type correct'; is $got_section->{identifier}, 'documents', 'section identifier correct'; @@ -136,7 +156,7 @@ subtest 'decode section' => sub { subtest 'payload 1 multiple docs' => sub { my $encoded = "\1>\0\0\0" ."documents\0" . $doc . $doc; - my $got_section = MongoDB::_Protocol::decode_section( $encoded, $codec ); + my $got_section = decode_with_codec( $encoded ); is $got_section->{type}, 1, 'section type correct'; is $got_section->{identifier}, 'documents', 'section identifier correct'; @@ -147,7 +167,7 @@ subtest 'decode section' => sub { subtest 'split sections' => sub { subtest 'payload 0' => sub { my $encoded = "\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $encoded, $codec ); + my @got_sections = split_with_codec( $encoded ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], ); @@ -162,7 +182,7 @@ subtest 'split sections' => sub { subtest 'payload 0 + 1' => sub { subtest 'single document' => sub { my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $encoded, $codec ); + my @got_sections = split_with_codec( $encoded ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc ] ], @@ -177,7 +197,7 @@ subtest 'split sections' => sub { subtest 'multiple documents' => sub { my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $encoded, $codec ); + my @got_sections = split_with_codec( $encoded ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ], @@ -194,7 +214,7 @@ subtest 'split sections' => sub { subtest 'payload 0 + multiple 1' => sub { subtest 'single document' => sub { my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $encoded, $codec ); + my @got_sections = split_with_codec( $encoded ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc ] ], @@ -210,7 +230,7 @@ subtest 'split sections' => sub { subtest 'multiple documents' => sub { my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; - my @got_sections = MongoDB::_Protocol::split_sections( $encoded, $codec ); + my @got_sections = split_with_codec( $encoded ); my @expected_sections = ( [ 0, undef, [ $decoded_doc ] ], [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ],