diff --git a/devel/config/mongod-4.0.yml b/devel/config/mongod-4.0.yml new file mode 100644 index 00000000..d11f7c6e --- /dev/null +++ b/devel/config/mongod-4.0.yml @@ -0,0 +1,6 @@ +--- +type: single +default_args: -v --bind_ip 0.0.0.0 --enableMajorityReadConcern +default_version: 4.0 +mongod: + - name: host1 diff --git a/lib/MongoDB/Op/_Command.pm b/lib/MongoDB/Op/_Command.pm index d2725c56..bb76cfc5 100644 --- a/lib/MongoDB/Op/_Command.pm +++ b/lib/MongoDB/Op/_Command.pm @@ -27,6 +27,7 @@ use MongoDB::_Constants; use MongoDB::_Types qw( Document ReadPreference + to_IxHash ); use List::Util qw/first/; use Types::Standard qw( @@ -87,9 +88,18 @@ sub execute { # $query is passed as a reference because it *may* be replaced $self->_apply_read_prefs( $link, $topology_type, $self->{query_flags}, \$self->{query}); - my ( $op_bson, $request_id ) = - MongoDB::_Protocol::write_query( $self->{db_name} . '.$cmd', - $self->{bson_codec}->encode_one( $self->{query} ), undef, 0, -1, $self->{query_flags}); + my ( $op_bson, $request_id ); + + if ( $link->supports_op_msg ) { + $self->{query} = to_IxHash( $self->{query} ); + $self->{query}->Push( '$db', $self->db_name ); + ( $op_bson, $request_id ) = + MongoDB::_Protocol::write_msg( $self->{bson_codec}, undef, $self->{query} ); + } else { + ( $op_bson, $request_id ) = + MongoDB::_Protocol::write_query( $self->{db_name} . '.$cmd', + $self->{bson_codec}->encode_one( $self->{query} ), undef, 0, -1, $self->{query_flags}); + } if ( length($op_bson) > MAX_BSON_WIRE_SIZE ) { # XXX should this become public? diff --git a/lib/MongoDB/Role/_SingleBatchDocWrite.pm b/lib/MongoDB/Role/_SingleBatchDocWrite.pm index 167aa234..3082fedd 100644 --- a/lib/MongoDB/Role/_SingleBatchDocWrite.pm +++ b/lib/MongoDB/Role/_SingleBatchDocWrite.pm @@ -30,6 +30,7 @@ use MongoDB::_Constants; use MongoDB::_Protocol; use MongoDB::_Types qw( WriteConcern + to_IxHash ); use namespace::clean; @@ -167,12 +168,19 @@ sub _send_write_command { $self->_apply_session_and_cluster_time( $link, \$cmd ); - # send command and get response document - my $command = $self->bson_codec->encode_one( $cmd ); - - my ( $op_bson, $request_id ) = - MongoDB::_Protocol::write_query( $self->db_name . '.$cmd', - $command, undef, 0, -1, undef ); + my ( $op_bson, $request_id ); + if ( $link->supports_op_msg ) { + $cmd = to_IxHash( $cmd ); + $cmd->Push( '$db', $self->db_name ); + ( $op_bson, $request_id ) = + MongoDB::_Protocol::write_msg( $self->bson_codec, undef, $cmd ); + } else { + # send command and get response document + my $command = $self->bson_codec->encode_one( $cmd ); + ( $op_bson, $request_id ) = + MongoDB::_Protocol::write_query( $self->db_name . '.$cmd', + $command, undef, 0, -1, undef ); + } if ( length($op_bson) > MAX_BSON_WIRE_SIZE ) { # XXX should this become public? diff --git a/lib/MongoDB/_Link.pm b/lib/MongoDB/_Link.pm index 5ed96eb8..d0cd9036 100644 --- a/lib/MongoDB/_Link.pm +++ b/lib/MongoDB/_Link.pm @@ -215,6 +215,13 @@ has supports_retryWrites => ( isa => Boolish, ); +has supports_op_msg => ( + is => 'rwp', + init_arg => undef, + isa => Boolish, +); + +# for wire version >= 7 has supports_4_0_changestreams => ( is => 'rwp', init_arg => undef, @@ -355,6 +362,7 @@ sub set_metadata { ? 1 : 0 ); + $self->_set_supports_op_msg(1); } if ( $self->accepts_wire_version(7) ) { $self->_set_supports_4_0_changestreams(1); diff --git a/lib/MongoDB/_Protocol.pm b/lib/MongoDB/_Protocol.pm index fd38ff7c..67abc687 100644 --- a/lib/MongoDB/_Protocol.pm +++ b/lib/MongoDB/_Protocol.pm @@ -23,12 +23,12 @@ our $VERSION = 'v2.0.1'; use MongoDB::_Constants; use MongoDB::Error; +use MongoDB::_Types qw/ to_IxHash /; use Compress::Zlib (); use constant { OP_REPLY => 1, # Reply to a client request. responseTo is set - OP_MSG => 1000, # generic msg command followed by a string OP_UPDATE => 2001, # update document OP_INSERT => 2002, # insert new document RESERVED => 2003, # formerly used for OP_GET_BY_OID @@ -37,6 +37,7 @@ use constant { OP_DELETE => 2006, # Delete documents OP_KILL_CURSORS => 2007, # Tell database client is done with a cursor OP_COMPRESSED => 2012, # wire compression + OP_MSG => 2013, # generic bi-directional op code }; use constant { @@ -64,6 +65,8 @@ use constant { P_KILL_CURSORS => PERL58 ? "l6(a8)*" : "l<6(a8)*", P_REPLY_HEADER => PERL58 ? "l5a8l2" : "l<5a8l<2", P_COMPRESSED => PERL58 ? "l6C" : "l<6C", + P_MSG => PERL58 ? "l5" : "l<5", + P_MSG_PL_1 => PERL58 ? "lZ*" : "l length(pack P_COMPRESSED, 0, 0, 0, 0, 0, 0, 0), + P_MSG_PREFIX_LENGTH => + length(pack P_MSG, 0, 0, 0, 0, 0), }; +# struct OP_MSG { +# MsgHeader header; // standard message header, with opCode 2013 +# uint32 flagBits; +# Section+ sections; +# [uint32 checksum;] +# }; +# +# struct Section { +# uint8 payloadType; +# union payload { +# document document; // payloadType == 0 +# struct sequence { // payloadType == 1 +# int32 size; +# cstring identifier; +# document* documents; +# }; +# }; +# }; + +use constant { + P_SECTION_PAYLOAD_TYPE => "C", + P_SECTION_SEQUENCE_SIZE => PERL58 ? "l" : "l<", +}; + +use constant { + P_SECTION_HEADER => P_SECTION_PAYLOAD_TYPE . P_SECTION_SEQUENCE_SIZE, + P_SECTION_PAYLOAD_TYPE_LENGTH => length( pack P_SECTION_PAYLOAD_TYPE, 0 ), + P_SECTION_SEQUENCE_SIZE_LENGTH => length( pack P_SECTION_SEQUENCE_SIZE, 0 ), +}; + +=method prepare_sections( $cmd ) + +Takes a command, returns sections ready for joining + +=cut + +sub prepare_sections { + my ( $codec, $cmd ) = @_; + + my %split_commands = ( + insert => 'documents', + update => 'updates', + delete => 'deletes', + ); + + $cmd = to_IxHash( $cmd ); + + # Command is always first key in cmd + my $command = do { my @keys = $cmd->Keys; $keys[0] }; + my $ident = $split_commands{ $command }; + + if ( defined $ident ) { + my $collection = $cmd->FETCH( $command ); + my $docs = $cmd->FETCH( $ident ); + # Assumes only a single split on the commands + return ( + { + type => 0, + documents => [ [ + # Done specifically to not alter $cmd. + # The command ($command from earlier) is assumed to be + # first in the Keys set + map { $_ eq $ident + ? () + : ( $_, $cmd->FETCH( $_ ) ) + } $cmd->Keys() + ] ], + }, + { + type => 1, + identifier => $ident, + documents => $docs, + } + ); + } else { + # Not a recognised command to split, just set up ready for later + return ( + { + type => 0, + documents => [ $cmd ], + } + ); + } +} + +=method encode_section + + MongoDB::_Protocol::encode_section( $codec, { + type => 0, # 0 or 1 + identifier => undef, # optional in type 0 + documents => [ $cmd ] # must be an array of documents + }); + +Takes a section hashref and encodes it for joining + +=cut + +sub encode_section { + my ( $codec, $section ) = @_; + + my $type = $section->{type}; + my $ident = $section->{identifier}; + my @docs = map { $codec->encode_one( $_ ) } @{ $section->{documents} }; + + my $pl; + if ( $type == 0 ) { + # Assume a single doc if payload type is 0 + $pl = $docs[0]; + } elsif ( $type == 1 ) { + $pl = pack( P_MSG_PL_1, 0, $ident ) + . join( '', @docs ); + # calculate size + substr( $pl, 0, 4, pack( P_SECTION_SEQUENCE_SIZE, length( $pl ) ) ); + } else { + MongoDB::ProtocolError->throw("Encode: Unsupported section payload type"); + } + + # Prepend the section type + $pl = pack( P_SECTION_PAYLOAD_TYPE, $type ) . $pl; + + return $pl; +} + +=method decode_section + + MongoDB::_Protocol::decode_section( $section ) + +Takes an encoded section and decodes it, exactly the opposite of encode_section. + +=cut + +sub decode_section { + my ( $doc ) = @_; + my ( $type, $ident, @enc_docs ); + my $section = {}; + + ( $type ) = unpack( 'C', $doc ); + my $payload = substr( $doc, P_SECTION_PAYLOAD_TYPE_LENGTH ); + + $section->{ type } = $type; + + # Pull size off and double check. Size is in the same place regardless of + # payload type, as its a similar struct to a raw document + my ( $pl_size ) = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); + unless ( $pl_size == length( $payload ) ) { + MongoDB::ProtocolError->throw("Decode: Section size incorrect"); + } + + if ( $type == 0 ) { + # payload is a raw document + push @enc_docs, $payload; + } elsif ( $type == 1 ) { + $payload = substr( $payload, P_SECTION_SEQUENCE_SIZE_LENGTH ); + # Pull out then remove + ( $ident ) = unpack( 'Z*', $payload ); + $section->{ identifier } = $ident; + $payload = substr( $payload, length ( $ident ) + 1 ); # add one for null termination + + while ( length $payload ) { + my $doc_size = unpack( P_SECTION_SEQUENCE_SIZE, $payload ); + my $doc = substr( $payload, 0, $doc_size ); + $payload = substr( $payload, $doc_size ); + push @enc_docs, $doc; + } + } else { + MongoDB::ProtocolError->throw("Decode: Unsupported section payload type"); + } + $section->{ documents } = \@enc_docs; + + return $section; +} + +=method split_sections( $msg ) + +Splits sections based on their payload length header. Returns an array of +sections in packed form + +=cut + +sub split_sections { + my $msg = shift; + my @sections; + while ( length $msg ) { + # get first section length + my ( undef, $section_length ) = unpack( P_SECTION_HEADER, $msg ); + + # Add the payload type length as we reached over it for the length + my $section = substr( $msg, 0, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); + + push @sections, decode_section( $section ); + + $msg = substr( $msg, $section_length + P_SECTION_PAYLOAD_TYPE_LENGTH ); + } + + return @sections; +} + +use constant { + MSG_FB_CHECKSUM => 0, + MSG_FB_MORE_TO_COME => 1, +}; + +sub write_msg { + my ( $codec, $flags, $cmd ) = @_; + my $flagbits = 0; + # checksum is reserved for future use + if ( $flags ) { + $flagbits = + ( $flags->{checksum_present} ? 1 << MSG_FB_CHECKSUM : 0 ) + | ( $flags->{more_to_come} ? 1 << MSG_FB_MORE_TO_COME : 0 ); + } + + my $request_id = int( rand( MAX_REQUEST_ID ) ); + + my @sections = prepare_sections( $codec, $cmd ); + + my $encoded_sections = join ('', ( map { encode_section( $codec, $_ ) } @sections ) ); + + my $msg = pack( P_MSG, 0, $request_id, 0, OP_MSG, 0 ) + . $encoded_sections; + substr( $msg, 0, 4, pack( P_INT32, length($msg) ) ); + return ( $msg, $request_id ); +} + # struct OP_COMPRESSED { # MsgHeader header; // standard message header # int32_t originalOpcode; // wrapped op code @@ -204,8 +433,8 @@ sub write_update { my $bitflags = 0; if ($flags) { $bitflags = - ( $flags->{upsert} ? 1 << U_UPSERT : 0 ) - | ( $flags->{multi} ? 1 << U_MULTI_UPDATE : 0 ); + ( $flags->{upsert} ? 1 << U_UPSERT : 0 ) + | ( $flags->{multi} ? 1 << U_MULTI_UPDATE : 0 ); } my $msg = @@ -390,7 +619,6 @@ use constant { sub parse_reply { my ( $msg, $request_id ) = @_; - MongoDB::ProtocolError->throw("response was truncated") if length($msg) < MIN_REPLY_LENGTH; @@ -399,7 +627,7 @@ sub parse_reply { my ( $len, $msg_id, $response_to, $opcode, $bitflags, $cursor_id, $starting_from, $number_returned - ) = unpack( P_REPLY_HEADER, $msg ); + ) = unpack( P_MSG, $msg ); # pre-check all conditions using a modifier in one statement for speed; # disambiguate afterwards only if an error exists @@ -410,8 +638,8 @@ sub parse_reply { MongoDB::ProtocolError->throw("response was truncated"); } - if ( $opcode != OP_REPLY ) { - MongoDB::ProtocolError->throw("response was not OP_REPLY"); + if ( $opcode != OP_REPLY && $opcode != OP_MSG ) { + MongoDB::ProtocolError->throw("response was not OP_REPLY or OP_MSG"); } if ( $response_to != $request_id ) { @@ -420,9 +648,30 @@ sub parse_reply { } } if ( length($msg) < $len ) - || ( $opcode != OP_REPLY ) + || ( ( $opcode != OP_REPLY ) && ( $opcode != OP_MSG ) ) || ( $response_to != $request_id ); + + if ( $opcode == OP_MSG ) { + # XXX Extract and check checksum - future support of crc32c + my @sections = split_sections( substr( $msg, P_MSG_PREFIX_LENGTH ) ); + # We have none of the other stuff? maybe flags... and an array of docs? erm + return { + flags => { + checksum_present => vec( $bitflags, MSG_FB_CHECKSUM, 1 ), + more_to_come => vec( $bitflags, MSG_FB_MORE_TO_COME, 1 ), + }, + # XXX Assumes the server never sends a type 1 payload. May change in future + docs => $sections[0]->{documents}->[0] + }; + } else { + # Yes its two unpacks but its just easier than mapping through to the right size + ( + $len, $msg_id, $response_to, $opcode, $bitflags, $cursor_id, $starting_from, + $number_returned + ) = unpack( P_REPLY_HEADER, $msg ); + } + # returns non-zero cursor_id as blessed object to identify it as an # 8-byte opaque ID rather than an ambiguous Perl scalar. N.B. cursors # from commands are handled differently: they are perl integers or diff --git a/t/lib/MongoDBTest.pm b/t/lib/MongoDBTest.pm index 7e9d03c2..58891c2e 100644 --- a/t/lib/MongoDBTest.pm +++ b/t/lib/MongoDBTest.pm @@ -124,7 +124,7 @@ sub get_capped { sub skip_unless_mongod { eval { - my $conn = build_client( server_selection_timeout_ms => 1000 ); + my $conn = build_client( server_selection_timeout_ms => 10000 ); my $topo = $conn->_topology; $topo->scan_all_servers; my $link; diff --git a/t/op_msg.t b/t/op_msg.t new file mode 100644 index 00000000..64717825 --- /dev/null +++ b/t/op_msg.t @@ -0,0 +1,135 @@ +# Copyright 2018 - present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +use strict; +use warnings; +use utf8; +use Test::More; + +use MongoDB; + +use lib "t/lib"; +use MongoDBTest qw/ + skip_unless_mongod + build_client + get_test_db + server_version + clear_testdbs +/; +use MongoDBTest::Callback; + +skip_unless_mongod(); + +my $cb = MongoDBTest::Callback->new; +my $conn = build_client(monitoring_callback => $cb->callback); +my $testdb = get_test_db($conn); +my $server_version = server_version($conn); +my $coll = $testdb->get_collection('test_collection'); + +plan skip_all => 'MongoDB version 3.6 or higher required for OP_MSG support' + unless $server_version >= version->parse('v3.6.0'); + +subtest 'insert single document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->insert_one([ _id => 1 ]); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is $ret->inserted_id, 1, 'Correct inserted id'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ { _id => 1 } ], 'Collection info correct'; +}; + +subtest 'insert multiple document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->insert_many([[ _id => 2 ], [ _id => 3 ]]); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is_deeply $ret->inserted_ids, { 0 => 2, 1 => 3 }, 'Correct inserted id'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ { _id => 1 }, { _id => 2 }, { _id => 3 } ], 'Collection info correct'; +}; + +subtest 'update single document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->update_one({ _id => 1 }, { '$set' => { eg => 2 } }); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is $ret->modified_count, 1, 'Correct modified count'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ { _id => 1, eg => 2 }, { _id => 2 }, { _id => 3 } ], 'Collection info correct'; +}; + +subtest 'update multiple document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->update_many({ _id => { '$gte' => 2 } }, { '$set' => { eg => 3 } }); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is $ret->modified_count, 2, 'Correct modified count'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ { _id => 1, eg => 2 }, { _id => 2, eg => 3 }, { _id => 3, eg => 3 } ], 'Collection info correct'; +}; + +subtest 'delete single document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->delete_one([ _id => 1 ]); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is $ret->deleted_count, 1, 'Correct deleted count'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ { _id => 2, eg => 3 }, { _id => 3, eg => 3 } ], 'Collection info correct'; +}; + +subtest 'delete multiple document' => sub { + $cb->clear_events; + $ENV{DO_OP_MSG} = 1; + my $ret = $coll->delete_many([ _id => { '$gte' => 2 } ]); + $ENV{DO_OP_MSG} = 0; + + # OP_MSG enforces $db to be in the command itself + is $cb->events->[-2]{command}{'$db'}, $testdb->name, 'Sent to correct database'; + is $ret->deleted_count, 2, 'Correct deleted count'; + + my @collection = $coll->find()->all; + + is_deeply \@collection, [ ], 'Collection info correct'; +}; + +clear_testdbs; + +done_testing; diff --git a/t/protocol_op_msg.t b/t/protocol_op_msg.t new file mode 100644 index 00000000..53f668d7 --- /dev/null +++ b/t/protocol_op_msg.t @@ -0,0 +1,249 @@ +# Copyright 2018 - present MongoDB, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +use strict; +use warnings; +use utf8; +use Test::More; +use Test::Fatal; + +use MongoDB::_Protocol; +use BSON; + +my $codec = BSON->new(); + +sub decode_with_codec { + my $section = MongoDB::_Protocol::decode_section( @_ ); + my @docs = map { $codec->decode_one( $_ ) } @{ $section->{documents} }; + $section->{documents} = \@docs; + return $section; +} + +sub split_with_codec { + my @sections = MongoDB::_Protocol::split_sections( @_ ); + @sections = map { + my $cur = $_; + my @docs = map { + $codec->decode_one( $_ ) + } @{ $cur->{documents} }; + $cur->{documents} = \@docs; + $cur + } @sections; + return @sections; +} + +subtest 'insert_doc' => sub { + my $insert_doc = [ + insert => 'collectionName', + documents => [ + [ id => 'Document#1', example => 1 ], + [ id => 'Document#2', example => 2 ], + [ id => 'Document#3', example => 3 ] + ], + writeConcern => [ w => 'majority' ] + ]; + + push @{$insert_doc}, ( '$db', 'someDatabase' ); + my @packed_payloads = MongoDB::_Protocol::prepare_sections( $codec, $insert_doc ); + my @expected_payloads = ( + { + type => 0, + documents => [ + [ + insert => 'collectionName', + writeConcern => [ w => 'majority' ], + '$db' => 'someDatabase', + ] + ] + }, + { + type => 1, + identifier => "documents", + documents => [ + [ id => 'Document#1', example => 1 ], + [ id => 'Document#2', example => 2 ], + [ id => 'Document#3', example => 3 ] + ], + }, + ); + + for my $i ( 0 .. $#expected_payloads ) { + is_deeply $packed_payloads[$i], $expected_payloads[$i], "section $i prepared correctly"; + } +}; +# struct Section { +# uint8 payloadType; +# union payload { +# document document; // payloadType == 0 +# struct sequence { // payloadType == 1 +# int32 size; +# cstring identifier; +# document* documents; +# }; +# }; +# }; + +my $raw_doc = [ test => 'document' ]; +my $doc = $codec->encode_one( $raw_doc ); +my $decoded_doc = $codec->decode_one( $doc ); + +subtest 'encode section' => sub { + subtest 'payload 0' => sub { + my $raw_section = { + type => 0, + documents => [ $raw_doc ], + }; + my $got_section = MongoDB::_Protocol::encode_section( $codec, $raw_section ); + my $expected_section = "\0" . $doc; + + is $got_section, $expected_section, 'encode payload 0 correctly'; + }; + + subtest 'payload 1 single doc' => sub { + my $raw_section = { + type => 1, + identifier => 'documents', + documents => [ $raw_doc ], + }; + my $got_section = MongoDB::_Protocol::encode_section( $codec, $raw_section ); + my $expected_section = "\1&\0\0\0" . "documents\0" . $doc; + + is $got_section, $expected_section, 'encode payload 1 correctly'; + }; + + subtest 'payload 1 multiple doc' => sub { + my $raw_section = { + type => 1, + identifier => 'documents', + documents => [ $raw_doc, $raw_doc ], + }; + my $got_section = MongoDB::_Protocol::encode_section( $codec, $raw_section ); + my $expected_section = "\1>\0\0\0" . "documents\0" . $doc . $doc; + + is $got_section, $expected_section, 'encode payload 1 correctly'; + }; +}; + +subtest 'decode section' => sub { + subtest 'payload 0' => sub { + my $encoded = "\0" . $doc; + my $got_section = decode_with_codec( $encoded ); + + is $got_section->{type}, 0, 'section type correct'; + is $got_section->{identifier}, undef, 'section identifier correct'; + is_deeply $got_section->{documents}, [ { test => 'document' } ], 'decoded document correctly'; + }; + + subtest 'payload 1' => sub { + my $encoded = "\1&\0\0\0" ."documents\0" . $doc; + my $got_section = decode_with_codec( $encoded ); + + is $got_section->{type}, 1, 'section type correct'; + is $got_section->{identifier}, 'documents', 'section identifier correct'; + is_deeply $got_section->{documents}, [ { test => 'document' } ], 'decoded document correctly'; + }; + + subtest 'payload 1 multiple docs' => sub { + my $encoded = "\1>\0\0\0" ."documents\0" . $doc . $doc; + my $got_section = decode_with_codec( $encoded ); + + is $got_section->{type}, 1, 'section type correct'; + is $got_section->{identifier}, 'documents', 'section identifier correct'; + is_deeply $got_section->{documents}, [ { test => 'document' }, { test => 'document' } ], 'decoded document correctly'; + }; +}; + +subtest 'split sections' => sub { + subtest 'payload 0' => sub { + my $encoded = "\0" . $doc; + my @got_sections = split_with_codec( $encoded ); + my @expected_sections = ( + [ 0, undef, [ $decoded_doc ] ], + ); + + for my $i ( 0 .. $#expected_sections ) { + is $got_sections[$i]->{type}, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->{identifier}, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->{documents}, $expected_sections[$i][2], 'documents correct'; + } + }; + + subtest 'payload 0 + 1' => sub { + subtest 'single document' => sub { + my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; + my @got_sections = split_with_codec( $encoded ); + my @expected_sections = ( + [ 0, undef, [ $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc ] ], + ); + + for my $i ( 0 .. $#expected_sections ) { + is $got_sections[$i]->{type}, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->{identifier}, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->{documents}, $expected_sections[$i][2], 'documents correct'; + } + }; + + subtest 'multiple documents' => sub { + my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; + my @got_sections = split_with_codec( $encoded ); + my @expected_sections = ( + [ 0, undef, [ $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ], + ); + + for my $i ( 0 .. $#expected_sections ) { + is $got_sections[$i]->{type}, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->{identifier}, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->{documents}, $expected_sections[$i][2], 'documents correct'; + } + }; + }; + + subtest 'payload 0 + multiple 1' => sub { + subtest 'single document' => sub { + my $encoded = "\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc . "\1&\0\0\0" ."documents\0" . $doc; + my @got_sections = split_with_codec( $encoded ); + my @expected_sections = ( + [ 0, undef, [ $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc ] ], + ); + + for my $i ( 0 .. $#expected_sections ) { + is $got_sections[$i]->{type}, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->{identifier}, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->{documents}, $expected_sections[$i][2], 'documents correct'; + } + }; + + subtest 'multiple documents' => sub { + my $encoded = "\0" . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc . "\1>\0\0\0" ."documents\0" . $doc . $doc; + my @got_sections = split_with_codec( $encoded ); + my @expected_sections = ( + [ 0, undef, [ $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ], + [ 1, 'documents', [ $decoded_doc, $decoded_doc ] ], + ); + + for my $i ( 0 .. $#expected_sections ) { + is $got_sections[$i]->{type}, $expected_sections[$i][0], 'type correct'; + is $got_sections[$i]->{identifier}, $expected_sections[$i][1], 'identifier correct'; + is_deeply $got_sections[$i]->{documents}, $expected_sections[$i][2], 'documents correct'; + } + }; + }; +}; + +done_testing;