Skip to content
Browse files

first commit

  • Loading branch information...
0 parents commit 01c34fceaf38d4b7fb4036e33130df7d426421c8 @kraih committed Feb 6, 2013
Showing with 3,192 additions and 0 deletions.
  1. +12 −0 .gitignore
  2. +13 −0 .travis.yml
  3. +3 −0 Changes
  4. +201 −0 LICENSE
  5. +30 −0 Makefile.PL
  6. +47 −0 README.md
  7. +570 −0 lib/Mango.pm
  8. +513 −0 lib/Mango/BSON.pm
  9. +50 −0 lib/Mango/BSON/Binary.pm
  10. +49 −0 lib/Mango/BSON/Code.pm
  11. +78 −0 lib/Mango/BSON/Document.pm
  12. +71 −0 lib/Mango/BSON/ObjectID.pm
  13. +54 −0 lib/Mango/BSON/Time.pm
  14. +49 −0 lib/Mango/BSON/Timestamp.pm
  15. +201 −0 lib/Mango/Collection.pm
  16. +223 −0 lib/Mango/Cursor.pm
  17. +93 −0 lib/Mango/Database.pm
  18. +243 −0 lib/Mango/Protocol.pm
  19. +310 −0 t/bson.t
  20. +31 −0 t/collection.t
  21. +106 −0 t/connection.t
  22. +139 −0 t/cursor.t
  23. +30 −0 t/database.t
  24. +10 −0 t/pod.t
  25. +10 −0 t/pod_coverage.t
  26. +56 −0 t/protocol.t
12 .gitignore
@@ -0,0 +1,12 @@
+.*
+*~
+!.gitignore
+!.perltidyrc
+!.travis.yml
+blib
+pm_to_blib
+Makefile*
+!Makefile.PL
+MANIFEST*
+!MANIFEST.SKIP
+*META.*
13 .travis.yml
@@ -0,0 +1,13 @@
+language: perl
+perl:
+ - "5.16"
+ - "5.14"
+ - "5.12"
+ - "5.10"
+env:
+ - "HARNESS_OPTIONS=j9 TEST_POD=1"
+install:
+ - "cpanm -n Test::Pod Test::Pod::Coverage"
+ - "cpanm -n --installdeps ."
+notifications:
+ email: false
3 Changes
@@ -0,0 +1,3 @@
+
+0.01 2013-02-06
+ - First release.
201 LICENSE
@@ -0,0 +1,201 @@
+ The Artistic License 2.0
+
+ Copyright (c) 2000-2006, The Perl Foundation.
+
+ Everyone is permitted to copy and distribute verbatim copies
+ of this license document, but changing it is not allowed.
+
+Preamble
+
+This license establishes the terms under which a given free software
+Package may be copied, modified, distributed, and/or redistributed.
+The intent is that the Copyright Holder maintains some artistic
+control over the development of that Package while still keeping the
+Package available as open source and free software.
+
+You are always permitted to make arrangements wholly outside of this
+license directly with the Copyright Holder of a given Package. If the
+terms of this license do not permit the full use that you propose to
+make of the Package, you should contact the Copyright Holder and seek
+a different licensing arrangement.
+
+Definitions
+
+ "Copyright Holder" means the individual(s) or organization(s)
+ named in the copyright notice for the entire Package.
+
+ "Contributor" means any party that has contributed code or other
+ material to the Package, in accordance with the Copyright Holder's
+ procedures.
+
+ "You" and "your" means any person who would like to copy,
+ distribute, or modify the Package.
+
+ "Package" means the collection of files distributed by the
+ Copyright Holder, and derivatives of that collection and/or of
+ those files. A given Package may consist of either the Standard
+ Version, or a Modified Version.
+
+ "Distribute" means providing a copy of the Package or making it
+ accessible to anyone else, or in the case of a company or
+ organization, to others outside of your company or organization.
+
+ "Distributor Fee" means any fee that you charge for Distributing
+ this Package or providing support for this Package to another
+ party. It does not mean licensing fees.
+
+ "Standard Version" refers to the Package if it has not been
+ modified, or has been modified only in ways explicitly requested
+ by the Copyright Holder.
+
+ "Modified Version" means the Package, if it has been changed, and
+ such changes were not explicitly requested by the Copyright
+ Holder.
+
+ "Original License" means this Artistic License as Distributed with
+ the Standard Version of the Package, in its current version or as
+ it may be modified by The Perl Foundation in the future.
+
+ "Source" form means the source code, documentation source, and
+ configuration files for the Package.
+
+ "Compiled" form means the compiled bytecode, object code, binary,
+ or any other form resulting from mechanical transformation or
+ translation of the Source form.
+
+
+Permission for Use and Modification Without Distribution
+
+(1) You are permitted to use the Standard Version and create and use
+Modified Versions for any purpose without restriction, provided that
+you do not Distribute the Modified Version.
+
+
+Permissions for Redistribution of the Standard Version
+
+(2) You may Distribute verbatim copies of the Source form of the
+Standard Version of this Package in any medium without restriction,
+either gratis or for a Distributor Fee, provided that you duplicate
+all of the original copyright notices and associated disclaimers. At
+your discretion, such verbatim copies may or may not include a
+Compiled form of the Package.
+
+(3) You may apply any bug fixes, portability changes, and other
+modifications made available from the Copyright Holder. The resulting
+Package will still be considered the Standard Version, and as such
+will be subject to the Original License.
+
+
+Distribution of Modified Versions of the Package as Source
+
+(4) You may Distribute your Modified Version as Source (either gratis
+or for a Distributor Fee, and with or without a Compiled form of the
+Modified Version) provided that you clearly document how it differs
+from the Standard Version, including, but not limited to, documenting
+any non-standard features, executables, or modules, and provided that
+you do at least ONE of the following:
+
+ (a) make the Modified Version available to the Copyright Holder
+ of the Standard Version, under the Original License, so that the
+ Copyright Holder may include your modifications in the Standard
+ Version.
+
+ (b) ensure that installation of your Modified Version does not
+ prevent the user installing or running the Standard Version. In
+ addition, the Modified Version must bear a name that is different
+ from the name of the Standard Version.
+
+ (c) allow anyone who receives a copy of the Modified Version to
+ make the Source form of the Modified Version available to others
+ under
+
+ (i) the Original License or
+
+ (ii) a license that permits the licensee to freely copy,
+ modify and redistribute the Modified Version using the same
+ licensing terms that apply to the copy that the licensee
+ received, and requires that the Source form of the Modified
+ Version, and of any works derived from it, be made freely
+ available in that license fees are prohibited but Distributor
+ Fees are allowed.
+
+
+Distribution of Compiled Forms of the Standard Version
+or Modified Versions without the Source
+
+(5) You may Distribute Compiled forms of the Standard Version without
+the Source, provided that you include complete instructions on how to
+get the Source of the Standard Version. Such instructions must be
+valid at the time of your distribution. If these instructions, at any
+time while you are carrying out such distribution, become invalid, you
+must provide new instructions on demand or cease further distribution.
+If you provide valid instructions or cease distribution within thirty
+days after you become aware that the instructions are invalid, then
+you do not forfeit any of your rights under this license.
+
+(6) You may Distribute a Modified Version in Compiled form without
+the Source, provided that you comply with Section 4 with respect to
+the Source of the Modified Version.
+
+
+Aggregating or Linking the Package
+
+(7) You may aggregate the Package (either the Standard Version or
+Modified Version) with other packages and Distribute the resulting
+aggregation provided that you do not charge a licensing fee for the
+Package. Distributor Fees are permitted, and licensing fees for other
+components in the aggregation are permitted. The terms of this license
+apply to the use and Distribution of the Standard or Modified Versions
+as included in the aggregation.
+
+(8) You are permitted to link Modified and Standard Versions with
+other works, to embed the Package in a larger work of your own, or to
+build stand-alone binary or bytecode versions of applications that
+include the Package, and Distribute the result without restriction,
+provided the result does not expose a direct interface to the Package.
+
+
+Items That are Not Considered Part of a Modified Version
+
+(9) Works (including, but not limited to, modules and scripts) that
+merely extend or make use of the Package, do not, by themselves, cause
+the Package to be a Modified Version. In addition, such works are not
+considered parts of the Package itself, and are not subject to the
+terms of this license.
+
+
+General Provisions
+
+(10) Any use, modification, and distribution of the Standard or
+Modified Versions is governed by this Artistic License. By using,
+modifying or distributing the Package, you accept this license. Do not
+use, modify, or distribute the Package, if you do not accept this
+license.
+
+(11) If your Modified Version has been derived from a Modified
+Version made by someone other than you, you are nevertheless required
+to ensure that your Modified Version complies with the requirements of
+this license.
+
+(12) This license does not grant you the right to use any trademark,
+service mark, tradename, or logo of the Copyright Holder.
+
+(13) This license includes the non-exclusive, worldwide,
+free-of-charge patent license to make, have made, use, offer to sell,
+sell, import and otherwise transfer the Package with respect to any
+patent claims licensable by the Copyright Holder that are necessarily
+infringed by the Package. If you institute patent litigation
+(including a cross-claim or counterclaim) against any party alleging
+that the Package constitutes direct or contributory patent
+infringement, then this Artistic License to you shall terminate on the
+date that such litigation is filed.
+
+(14) Disclaimer of Warranty:
+THE PACKAGE IS PROVIDED BY THE COPYRIGHT HOLDER AND CONTRIBUTORS "AS
+IS" AND WITHOUT ANY EXPRESS OR IMPLIED WARRANTIES. THE IMPLIED
+WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE, OR
+NON-INFRINGEMENT ARE DISCLAIMED TO THE EXTENT PERMITTED BY YOUR LOCAL
+LAW. UNLESS REQUIRED BY LAW, NO COPYRIGHT HOLDER OR CONTRIBUTOR WILL
+BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, OR CONSEQUENTIAL
+DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THE PACKAGE, EVEN IF
+ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 Makefile.PL
@@ -0,0 +1,30 @@
+use 5.010001;
+
+use strict;
+use warnings;
+
+use Config;
+use ExtUtils::MakeMaker;
+
+die "64bit Perl is required!\n" unless $Config{ivsize} > 4;
+
+WriteMakefile(
+ NAME => 'Mango',
+ VERSION_FROM => 'lib/Mango.pm',
+ ABSTRACT => 'Pure-Perl non-blocking I/O MongoDB client',
+ AUTHOR => 'Sebastian Riedel <sri@cpan.org>',
+ LICENSE => 'artistic_2',
+ META_MERGE => {
+ requires => {perl => '5.010001'},
+ resources => {
+ homepage => 'http://mojolicio.us',
+ license => 'http://www.opensource.org/licenses/artistic-license-2.0',
+ MailingList => 'http://groups.google.com/group/mojolicious',
+ repository => 'http://github.com/kraih/mango',
+ bugtracker => 'http://github.com/kraih/mojo/issues'
+ },
+ no_index => {directory => ['t']}
+ },
+ PREREQ_PM => {Mojolicious => '2.99'},
+ test => {TESTS => 't/*.t t/*/*.t'}
+);
47 README.md
@@ -0,0 +1,47 @@
+
+# Mango [![Build Status](https://secure.travis-ci.org/kraih/mango.png)](http://travis-ci.org/kraih/mango)
+
+ Pure-Perl non-blocking I/O MongoDB client, optimized for use with the
+ [Mojolicious](http://mojolicio.us) real-time web framework.
+
+ use Mango;
+ my $mango = Mango->new('mongodb://localhost:27017');
+
+ # Insert document
+ my $oid = $mango->db('test')->collection('foo')->insert({bar => 'baz'});
+
+ # Find document
+ use Mango::BSON ':bson';
+ my $doc = $mango->db('test')->collection('foo')->find_one({bar => 'baz'});
+ say $doc->{bar};
+
+ # Update document with special BSON type
+ use Mango::BSON ':bson';
+ $mango->db('test')->collection('foo')
+ ->update({bar => 'baz'}, {bar => bson_true});
+
+ # Remove document with special BSON type
+ use Mango::BSON ':bson';
+ $mango->db('test')->collection('foo')->remove({bar => bson_true});
+
+ # Find documents non-blocking (does work inside a running event loop)
+ my $delay = Mojo::IOLoop->delay(sub {
+ my ($delay, @docs) = @_;
+ ...
+ });
+ for my $name (qw(foo bar)) {
+ $delay->begin;
+ $mango->db('test')->collection('users')->find({name => $name})->all(sub {
+ my ($cursor, $err, $docs) = @_;
+ $delay->end(@$docs);
+ });
+ }
+ $delay->wait unless Mojo::IOLoop->is_running;
+
+## Installation
+
+ All you need is a oneliner, it takes less than a minute.
+
+ $ curl -L cpanmin.us | perl - -n Mango
+
+ We recommend the use of a [Perlbrew](http://perlbrew.pl) environment.
570 lib/Mango.pm
@@ -0,0 +1,570 @@
+package Mango;
+use Mojo::Base 'Mojo::EventEmitter';
+
+use Carp 'croak';
+use Mango::BSON qw(bson_doc bson_false bson_true);
+use Mango::Database;
+use Mango::Protocol;
+use Mojo::IOLoop;
+use Mojo::URL;
+use Mojo::Util qw(md5_sum monkey_patch);
+use Scalar::Util 'weaken';
+
+use constant DEBUG => $ENV{MANGO_DEBUG} || 0;
+use constant DEFAULT_PORT => 27017;
+
+has credentials => sub { [] };
+has default_db => 'admin';
+has hosts => sub { [['localhost']] };
+has ioloop => sub { Mojo::IOLoop->new };
+has j => 0;
+has protocol => sub { Mango::Protocol->new };
+has w => 1;
+has wtimeout => 1000;
+
+our $VERSION = '0.01';
+
+# Operations with reply
+for my $name (qw(get_more query)) {
+ monkey_patch __PACKAGE__, $name, sub {
+ my $self = shift;
+ my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
+ my ($id, $bytes) = $self->_build($name, @_);
+ warn "-- Operation $id ($name)\n" if DEBUG;
+ $self->_start($id, 1, $bytes, $cb);
+ };
+}
+
+# Operations followed by getLastError
+for my $name (qw(delete insert update)) {
+ monkey_patch __PACKAGE__, $name, sub {
+ my ($self, $ns) = (shift, shift);
+ my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
+
+ # Make sure both operations can be written together
+ my ($id, $bytes) = $self->_build($name, $ns, @_);
+ $id = $self->_id;
+ $ns =~ s/\..+$/\.\$cmd/;
+ my $command = bson_doc
+ getLastError => 1,
+ j => $self->j ? bson_true : bson_false,
+ w => $self->w,
+ wtimeout => $self->wtimeout;
+ $bytes .= $self->protocol->build_query($id, $ns, {}, 0, -1, $command, {});
+
+ warn "-- Operation $id ($name)\n" if DEBUG;
+ $self->_start($id, 1, $bytes, $cb);
+ };
+}
+
+sub DESTROY { shift->_cleanup }
+
+sub new {
+ my $self = shift->SUPER::new;
+
+ # Protocol
+ return $self unless my $string = shift;
+ my $url = Mojo::URL->new($string);
+ croak qq{Invalid MongoDB connection string "$string"}
+ unless $url->protocol eq 'mongodb';
+
+ # Hosts
+ my @hosts;
+ /^([^,:]+)(?::(\d+))?/ and push @hosts, $2 ? [$1, $2] : [$1]
+ for split /,/, join(':', map { $_ // '' } $url->host, $url->port);
+ $self->hosts(\@hosts) if @hosts;
+
+ # Database
+ if (my $db = $url->path->parts->[0]) { $self->default_db($db) }
+
+ # User and password
+ push @{$self->credentials}, [$self->default_db, $1, $2]
+ if ($url->userinfo // '') =~ /^([^:]+):([^:]+)$/;
+
+ # Options
+ my $query = $url->query;
+ if (my $j = $query->param('journal')) { $self->j($j) }
+ if (my $w = $query->param('w')) { $self->w($w) }
+ if (my $timeout = $query->param('wtimeoutMS')) { $self->wtimeout($timeout) }
+
+ return $self;
+}
+
+sub db {
+ my ($self, $name) = @_;
+ $name //= $self->default_db;
+ return Mango::Database->new(mango => $self, name => $name);
+}
+
+sub is_active { !!(scalar @{$_[0]{queue} || []} || $_[0]{current}) }
+
+sub kill_cursors {
+ my $self = shift;
+ my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
+ my ($id, $bytes) = $self->_build('kill_cursors', @_);
+ warn "-- Unsafe operation $id (kill_cursors)\n" if DEBUG;
+ $self->_start($id, 0, $bytes, $cb);
+}
+
+sub _auth {
+ my ($self, $credentials, $auth, $err, $reply) = @_;
+ my ($db, $user, $pass) = @$auth;
+
+ # No nonce value
+ return $self->_connected($credentials) if $err || !$reply->[5][0]{ok};
+ my $nonce = $reply->[5][0]{nonce};
+
+ # Authenticate
+ my $key = md5_sum $nonce . $user . md5_sum "${user}:mongo:${pass}";
+ my $command
+ = bson_doc(authenticate => 1, user => $user, nonce => $nonce, key => $key);
+ $self->_command($db, $command, sub { shift->_connected($credentials) });
+}
+
+sub _build {
+ my ($self, $name) = (shift, shift);
+ my $id = $self->_id;
+ my $method = "build_$name";
+ return ($id, $self->protocol->$method($id, @_));
+}
+
+sub _cleanup {
+ my $self = shift;
+ return unless my $loop = $self->_loop;
+
+ # Clean up connection
+ $loop->remove(delete $self->{connection}) if $self->{connection};
+
+ # Clean up all operations
+ my $queue = delete $self->{queue} || [];
+ unshift @$queue, $self->{current} if $self->{current};
+ $self->_finish(undef, $_->[2], 'Premature connection close') for @$queue;
+}
+
+sub _close {
+ my $self = shift;
+ $self->_error;
+ $self->_connect;
+}
+
+sub _command {
+ my ($self, $db, $command, $cb) = @_;
+
+ # Skip the queue and run command right away
+ my $id = $self->_id;
+ my $bytes
+ = $self->protocol->build_query($id, "$db.\$cmd", {}, 0, -1, $command, {});
+ unshift @{$self->{queue}}, [$id, 1, $cb, $bytes];
+ warn "-- Fast operation $id (query)\n" if DEBUG;
+ $self->_write;
+}
+
+sub _connect {
+ my $self = shift;
+
+ weaken $self;
+ my ($host, $port) = @{$self->hosts->[0]};
+ $self->{connection} = $self->_loop->client(
+ {address => $host, port => $port // DEFAULT_PORT} => sub {
+ my ($loop, $err, $stream) = @_;
+
+ # Connection error
+ return $self->_error($err) if $err;
+
+ # Connection established
+ $stream->timeout(0);
+ $stream->on(close => sub { $self->_close });
+ $stream->on(error => sub { $self && $self->_error(pop) });
+ $stream->on(read => sub { $self->_read(pop) });
+ $self->_connected([@{$self->credentials}]);
+ }
+ );
+}
+
+sub _connected {
+ my ($self, $credentials) = @_;
+
+ # No authentication
+ return $self->_write unless my $auth = shift @$credentials;
+
+ # Get nonce value and authenticate
+ my $cb = sub { shift->_auth($credentials, $auth, @_) };
+ $self->_command($auth->[0], {getnonce => 1}, $cb);
+}
+
+sub _error {
+ my ($self, $err) = @_;
+ my $current = delete $self->{current};
+ $current //= shift @{$self->{queue}} if $err;
+ return $err ? $self->emit(error => $err) : undef unless $current;
+ $self->_finish(undef, $current->[2], $err || 'Premature connection close');
+}
+
+sub _finish {
+ my ($self, $reply, $cb, $err) = @_;
+ $err ||= $reply->[5][0]{'$err'}
+ if $reply && $reply->[3] == 0 && @{$reply->[5]};
+ $self->$cb($err, $reply);
+}
+
+sub _id {
+ my $self = shift;
+ my $id = ++$self->{id};
+ $id = $self->{id} = 1 if $id > 2147483647;
+ return $id;
+}
+
+sub _loop { $_[0]{nb} ? Mojo::IOLoop->singleton : $_[0]->ioloop }
+
+sub _op {
+ my ($self, $id, $safe, $bytes, $cb) = @_;
+ push @{$self->{queue} ||= []}, [$id, $safe, $cb, $bytes];
+ if ($self->{connection}) { $self->_write }
+ else { $self->_connect }
+}
+
+sub _read {
+ my ($self, $chunk) = @_;
+
+ $self->{buffer} .= $chunk;
+ while (my $reply = $self->protocol->parse_reply(\$self->{buffer})) {
+ warn "-- Client <<< Server ($reply->[1])\n" if DEBUG;
+ next unless $reply->[1] == $self->{current}[0];
+ $self->_finish($reply, (delete $self->{current})->[2]);
+ }
+ $self->_write;
+}
+
+sub _start {
+ my ($self, $id, $safe, $bytes, $cb) = @_;
+
+ # Non-blocking
+ if ($cb) {
+
+ # Start non-blocking
+ unless ($self->{nb}) {
+ croak 'Blocking operation in progress' if $self->is_active;
+ $self->_cleanup;
+ $self->{nb}++;
+ }
+ return $self->_op($id, $safe, $bytes, $cb);
+ }
+
+ # Start blocking
+ if ($self->{nb}) {
+ croak 'Non-blocking operations in progress' if $self->is_active;
+ $self->_cleanup;
+ delete $self->{nb};
+ }
+ my ($err, $reply);
+ $self->_op(
+ ($id, $safe, $bytes) => sub {
+ (my $self, $err, $reply) = @_;
+ $self->ioloop->stop;
+ }
+ );
+
+ # Start event loop
+ $self->ioloop->start;
+
+ # Throw blocking errors
+ croak $err if $err;
+
+ return $reply;
+}
+
+sub _write {
+ my $self = shift;
+
+ return if $self->{current};
+ return unless my $stream = $self->_loop->stream($self->{connection});
+ return unless my $current = $self->{current} = shift @{$self->{queue}};
+
+ warn "-- Client >>> Server ($current->[0])\n" if DEBUG;
+ $stream->write(pop @$current);
+
+ # Unsafe operations are done when they are written
+ return if $current->[1];
+ weaken $self;
+ $stream->write(
+ '' => sub { $self->_finish(undef, delete($self->{current})->[2]) });
+}
+
+1;
+
+=head1 NAME
+
+Mango - Pure-Perl non-blocking I/O MongoDB client
+
+=head1 SYNOPSIS
+
+ use Mango;
+ my $mango = Mango->new('mongodb://localhost:27017');
+
+ # Insert document
+ my $oid = $mango->db('test')->collection('foo')->insert({bar => 'baz'});
+
+ # Find document
+ use Mango::BSON ':bson';
+ my $doc = $mango->db('test')->collection('foo')->find_one({bar => 'baz'});
+ say $doc->{bar};
+
+ # Update document with special BSON type
+ use Mango::BSON ':bson';
+ $mango->db('test')->collection('foo')
+ ->update({bar => 'baz'}, {bar => bson_true});
+
+ # Remove document with special BSON type
+ use Mango::BSON ':bson';
+ $mango->db('test')->collection('foo')->remove({bar => bson_true});
+
+ # Find documents non-blocking (does work inside a running event loop)
+ my $delay = Mojo::IOLoop->delay(sub {
+ my ($delay, @docs) = @_;
+ ...
+ });
+ for my $name (qw(foo bar)) {
+ $delay->begin;
+ $mango->db('test')->collection('users')->find({name => $name})->all(sub {
+ my ($cursor, $err, $docs) = @_;
+ $delay->end(@$docs);
+ });
+ }
+ $delay->wait unless Mojo::IOLoop->is_running;
+
+=head1 DESCRIPTION
+
+L<Mango> is a pure-Perl non-blocking I/O MongoDB client, optimized for use
+with the L<Mojolicious> real-time web framework, and with multiple event loop
+support.
+
+Note that this whole distribution is EXPERIMENTAL and will change without
+warning!
+
+Many features are still incomplete or missing, so you should wait for a stable
+1.0 release before using any of the modules in this distribution in a
+production environment. Unsafe operations are not supported, so far this is
+considered a feature.
+
+This is a L<Mojolicious> spin-off project, so we follow the
+L<same rules|Mojolicious::Guides::Contributing>.
+
+Optional modules L<EV> (4.0+), L<IO::Socket::IP> (0.16+) and
+L<IO::Socket::SSL> (1.75+) are supported transparently through
+L<Mojo::IOLoop>, and used if installed. Individual features can also be
+disabled with the C<MOJO_NO_IPV6> and C<MOJO_NO_TLS> environment variables.
+
+=head1 EVENTS
+
+L<Mango> inherits all events from L<Mojo::EventEmitter> and can emit the
+following new ones.
+
+=head2 error
+
+ $mango->on(error => sub {
+ my ($mango, $err) = @_;
+ ...
+ });
+
+Emitted if an error occurs that can't be associated with an operation.
+
+ $mango->on(error => sub {
+ my ($mango, $err) = @_;
+ say "This looks bad: $err";
+ });
+
+=head1 ATTRIBUTES
+
+L<Mango> implements the following attributes.
+
+=head2 credentials
+
+ my $credentials = $mango->credentials;
+ $mango = $mango->credentials([['test', 'sri', 's3cret']]);
+
+Authentication credentials that will be used on every reconnect.
+
+=head2 default_db
+
+ my $name = $mango->default_db;
+ $mango = $mango->default_db('test');
+
+Default database, defaults to C<admin>.
+
+=head2 hosts
+
+ my $hosts = $mango->hosts;
+ $mango = $mango->hosts([['localhost', 3000]]);
+
+Server to connect to, defaults to C<localhost> and port C<27017>.
+
+=head2 ioloop
+
+ my $loop = $mango->ioloop;
+ $mango = $mango->ioloop(Mojo::IOLoop->new);
+
+Event loop object to use for blocking I/O operations, defaults to a
+L<Mojo::IOLoop> object.
+
+=head2 j
+
+ my $j = $mango->j;
+ $mango = $mango->j(1);
+
+Wait for all operations to have reached the journal, defaults to C<0>.
+
+=head2 protocol
+
+ my $protocol = $mango->protocol;
+ $mango = $mango->protocol(Mango::Protocol->new);
+
+Protocol handler, defaults to a L<Mango::Protocol> object.
+
+=head2 w
+
+ my $w = $mango->w;
+ $mango = $mango->w(1);
+
+Wait for all operations to have reached at least this many servers, C<1>
+indicates just primary, C<2> indicates primary and at least one secondary,
+defaults to C<1>.
+
+=head2 wtimeout
+
+ my $timeout = $mango->wtimeout;
+ $mango = $mango->wtimeout(1);
+
+Timeout for write propagation in milliseconds, defaults to C<1000>.
+
+=head1 METHODS
+
+L<Mango> inherits all methods from L<Mojo::Base> and implements the following
+new ones.
+
+=head2 new
+
+ my $mango = Mango->new;
+ my $mango = Mango->new('mongodb://localhost:3000/mango_test?w=2');
+
+Construct a new L<Mango> object.
+
+=head2 db
+
+ my $db = $mango->db;
+ my $db = $mango->db('test');
+
+Get L<Mango::Database> object for database, uses C<default_db> if no name is
+provided.
+
+=head2 delete
+
+ my $reply = $mango->delete($name, $flags, $query);
+
+Perform low level C<delete> operation followed by C<getLastError> command. You
+can also append a callback to perform operation non-blocking.
+
+ $mango->delete(($name, $flags, $query) => sub {
+ my ($mango, $err, $reply) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head2 get_more
+
+ my $reply = $mango->get_more($name, $limit, $cursor);
+
+Perform low level C<get_more> operation. You can also append a callback to
+perform operation non-blocking.
+
+ $mango->get_more(($name, $limit, $cursor) => sub {
+ my ($mango, $err, $reply) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head2 insert
+
+ my $reply = $mango->insert($name, $flags, @docs);
+
+Perform low level C<insert> operation followed by C<getLastError> command. You
+can also append a callback to perform operation non-blocking.
+
+ $mango->delete(($name, $flags, @docs) => sub {
+ my ($mango, $err, $reply) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head2 is_active
+
+ my $success = $mango->is_active;
+
+Check if there are still operations in progress.
+
+=head2 kill_cursors
+
+ $mango->kill_cursors(@ids);
+
+Perform low level C<kill_cursors> operation. You can also append a callback to
+perform operation non-blocking.
+
+ $mango->kill_cursors(@ids => sub {
+ my $mango = shift;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head2 query
+
+ my $reply = $mango->query($name, $flags, $skip, $limit, $query, $fields);
+
+Perform low level C<query> operation. You can also append a callback to
+perform operation non-blocking.
+
+ $mango->query(($name, $flags, $skip, $limit, $query, $fields) => sub {
+ my ($mango, $err, $reply) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head2 update
+
+ my $reply = $mango->update($name, $flags, $query, $update);
+
+Perform low level C<update> operation followed by C<getLastError> command. You
+can also append a callback to perform operation non-blocking.
+
+ $mango->delete(($name, $flags, $query, $update) => sub {
+ my ($mango, $err, $reply) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head1 DEBUGGING
+
+You can set the C<MANGO_DEBUG> environment variable to get some advanced
+diagnostics information printed to C<STDERR>.
+
+ MANGO_DEBUG=1
+
+=head1 SPONSORS
+
+Some of the work on this distribution has been sponsored by an anonymous
+donor, thank you!
+
+=head1 AUTHOR
+
+Sebastian Riedel, C<sri@cpan.org>.
+
+=head1 COPYRIGHT AND LICENSE
+
+Copyright (C) 2013, Sebastian Riedel.
+
+This program is free software, you can redistribute it and/or modify it under
+the terms of the Artistic License version 2.0.
+
+=head1 SEE ALSO
+
+L<Mojolicious::Guides>, L<http://mojolicio.us>.
+
+=cut
513 lib/Mango/BSON.pm
@@ -0,0 +1,513 @@
+package Mango::BSON;
+use Mojo::Base -strict;
+
+use Carp 'croak';
+use re 'regexp_pattern';
+use B;
+use Exporter 'import';
+use Mango::BSON::Binary;
+use Mango::BSON::Code;
+use Mango::BSON::Document;
+use Mango::BSON::ObjectID;
+use Mango::BSON::Time;
+use Mango::BSON::Timestamp;
+use Mojo::JSON;
+use Mojo::Util qw(decode encode);
+use Scalar::Util 'blessed';
+
+my @BSON = (
+ qw(bson_bin bson_code bson_decode bson_doc bson_encode bson_false),
+ qw(bson_length bson_max bson_min bson_oid bson_time bson_true bson_ts)
+);
+our @EXPORT_OK = (
+ @BSON,
+ qw(decode_int32 decode_int64 encode_cstring encode_int32 encode_int64),
+);
+our %EXPORT_TAGS = (bson => \@BSON);
+
+# Types
+use constant {
+ DOUBLE => "\x01",
+ STRING => "\x02",
+ DOCUMENT => "\x03",
+ ARRAY => "\x04",
+ BINARY => "\x05",
+ OBJECT_ID => "\x07",
+ BOOL => "\x08",
+ DATETIME => "\x09",
+ NULL => "\x0a",
+ REGEX => "\x0b",
+ CODE => "\x0d",
+ CODE_SCOPE => "\x0f",
+ INT32 => "\x10",
+ TIMESTAMP => "\x11",
+ INT64 => "\x12",
+ MIN_KEY => "\x7f",
+ MAX_KEY => "\xff"
+};
+
+# Binary subtypes
+use constant {
+ BINARY_GENERIC => "\x00",
+ BINARY_FUNCTION => "\x01",
+ BINARY_UUID => "\x04",
+ BINARY_MD5 => "\x05",
+ BINARY_USER_DEFINED => "\x80"
+};
+
+# 32bit integer range
+use constant {INT32_MIN => -(1 << 31) + 1, INT32_MAX => (1 << 31) - 1};
+
+# Reuse boolean singletons
+my $FALSE = Mojo::JSON->false;
+my $TRUE = Mojo::JSON->true;
+
+my $MAXKEY = bless {}, 'Mango::BSON::_MaxKey';
+my $MINKEY = bless {}, 'Mango::BSON::_MinKey';
+
+sub bson_bin { Mango::BSON::Binary->new(data => shift) }
+
+sub bson_code { Mango::BSON::Code->new(code => shift) }
+
+sub bson_decode {
+ my $bson = shift;
+ return undef unless my $len = bson_length($bson);
+ return length $bson == $len ? _decode_doc(\$bson) : undef;
+}
+
+sub bson_doc {
+ tie my %hash, 'Mango::BSON::Document', @_;
+ return \%hash;
+}
+
+sub bson_encode {
+ my $doc = shift;
+
+ my $bson = '';
+ while (my ($key, $value) = each %$doc) {
+ $bson .= _encode_value(encode_cstring($key), $value);
+ }
+
+ # Document ends with null byte
+ return encode_int32(length($bson) + 5) . $bson . "\x00";
+}
+
+sub bson_false {$FALSE}
+
+sub bson_length {
+ my $bson = shift;
+ return length($bson) < 4 ? undef : decode_int32(substr $bson, 0, 4);
+}
+
+sub bson_max {$MAXKEY}
+
+sub bson_min {$MINKEY}
+
+sub bson_oid { Mango::BSON::ObjectID->new(@_) }
+
+sub bson_time { Mango::BSON::Time->new(@_) }
+
+sub bson_ts {
+ Mango::BSON::Timestamp->new(seconds => shift, increment => shift);
+}
+
+sub bson_true {$TRUE}
+
+sub decode_int32 { 0 + unpack 'l<', shift }
+sub decode_int64 { 0 + unpack 'q<', shift }
+
+sub encode_cstring { pack 'Z*', encode('UTF-8', shift) }
+
+sub encode_int32 { pack 'l<', shift }
+sub encode_int64 { pack 'q<', shift }
+
+sub _decode_binary {
+ my $bsonref = shift;
+
+ my $len = decode_int32(substr $$bsonref, 0, 4, '');
+ my $subtype = substr $$bsonref, 0, 1, '';
+ my $binary = substr $$bsonref, 0, $len, '';
+
+ return bson_bin($binary)->type('function') if $subtype eq BINARY_FUNCTION;
+ return bson_bin($binary)->type('md5') if $subtype eq BINARY_MD5;
+ return bson_bin($binary)->type('uuid') if $subtype eq BINARY_UUID;
+ return bson_bin($binary)->type('user_defined')
+ if $subtype eq BINARY_USER_DEFINED;
+ return bson_bin($binary)->type('generic');
+}
+
+sub _decode_cstring {
+ my $bsonref = shift;
+ (my $string, $$bsonref) = unpack 'Z*a*', $$bsonref;
+ return decode 'UTF-8', $string;
+}
+
+sub _decode_doc {
+ my $bsonref = shift;
+
+ # Every element starts with a type
+ my $doc = bson_doc();
+ substr $$bsonref, 0, 4, '';
+ while (my $type = substr $$bsonref, 0, 1, '') {
+
+ # Null byte (end of document)
+ last if $type eq "\x00";
+
+ # Value with valid name
+ my $name = _decode_cstring($bsonref);
+ my $value = _decode_value($type, $bsonref);
+ $doc->{$name} = $value if length $name;
+ }
+
+ return $doc;
+}
+
+sub _decode_string {
+ my $bsonref = shift;
+ my $len = decode_int32(substr $$bsonref, 0, 4, '');
+ substr $$bsonref, $len - 1, 1, '';
+ return decode 'UTF-8', substr($$bsonref, 0, $len - 1, '');
+}
+
+sub _decode_value {
+ my ($type, $bsonref) = @_;
+
+ # String
+ return _decode_string($bsonref) if $type eq STRING;
+
+ # Object ID
+ return bson_oid(unpack 'H*', substr $$bsonref, 0, 12, '')
+ if $type eq OBJECT_ID;
+
+ # Double/Int32/Int64
+ return 0 + unpack 'd<', substr $$bsonref, 0, 8, '' if $type eq DOUBLE;
+ return decode_int32(substr $$bsonref, 0, 4, '') if $type eq INT32;
+ return decode_int64(substr $$bsonref, 0, 8, '') if $type eq INT64;
+
+ # Document
+ return _decode_doc($bsonref) if $type eq DOCUMENT;
+
+ # Array
+ return [values %{_decode_doc($bsonref)}] if $type eq ARRAY;
+
+ # Booleans and Null
+ return substr($$bsonref, 0, 1, '') eq "\x00" ? bson_false() : bson_true()
+ if $type eq BOOL;
+ return undef if $type eq NULL;
+
+ # Time
+ return bson_time(decode_int64(substr $$bsonref, 0, 8, ''))
+ if $type eq DATETIME;
+
+ # Regex
+ return eval join '/', 'qr', _decode_cstring($bsonref),
+ _decode_cstring($bsonref)
+ if $type eq REGEX;
+
+ # Binary (with subtypes)
+ return _decode_binary($bsonref) if $type eq BINARY;
+
+ # Min/Max
+ return bson_min() if $type eq MIN_KEY;
+ return bson_max() if $type eq MAX_KEY;
+
+ # Code (with and without scope)
+ return bson_code(_decode_string($bsonref)) if $type eq CODE;
+ if ($type eq CODE_SCOPE) {
+ decode_int32(substr $$bsonref, 0, 4, '');
+ return bson_code(_decode_string($bsonref))->scope(_decode_doc($bsonref));
+ }
+
+ # Timestamp
+ return bson_ts(
+ reverse map({decode_int32(substr $$_, 0, 4, '')} $bsonref, $bsonref))
+ if $type eq TIMESTAMP;
+
+ # Unknown
+ return undef;
+}
+
+sub _encode_binary {
+ my ($e, $subtype, $value) = @_;
+ return BINARY . $e . encode_int32(length $value) . $subtype . $value;
+}
+
+sub _encode_object {
+ my ($e, $value, $class) = @_;
+
+ # ObjectID
+ return OBJECT_ID . $e . pack('H*', $value)
+ if $class eq 'Mango::BSON::ObjectID';
+
+ # Time
+ return DATETIME . $e . encode_int64($value) if $class eq 'Mango::BSON::Time';
+
+ # Regex
+ if ($class eq 'Regexp') {
+ my ($p, $m) = regexp_pattern($value);
+ return REGEX . $e . encode_cstring($p) . encode_cstring($m);
+ }
+
+ # Binary
+ if ($class eq 'Mango::BSON::Binary') {
+ my $type = $value->type // 'generic';
+ my $data = $value->data;
+ return _encode_binary($e, BINARY_FUNCTION, $data) if $type eq 'function';
+ return _encode_binary($e, BINARY_MD5, $data) if $type eq 'md5';
+ return _encode_binary($e, BINARY_USER_DEFINED, $data)
+ if $type eq 'user_defined';
+ return _encode_binary($e, BINARY_UUID, $data) if $type eq 'uuid';
+ return _encode_binary($e, BINARY_GENERIC, $data);
+ }
+
+ # Code
+ if ($class eq 'Mango::BSON::Code') {
+
+ # With scope
+ if (my $scope = $value->scope) {
+ my $code = _encode_string($value->code) . bson_encode($scope);
+ return CODE_SCOPE . $e . encode_int32(length $code) . $code;
+ }
+
+ # Without scope
+ return CODE . $e . _encode_string($value->code);
+ }
+
+ # Timestamp
+ return join '', TIMESTAMP, $e, map { encode_int32 $_} $value->increment,
+ $value->seconds
+ if $class eq 'Mango::BSON::Timestamp';
+
+ # Blessed reference with TO_JSON method
+ if (my $sub = $value->can('TO_JSON')) {
+ return _encode_value($e, $value->$sub);
+ }
+
+ # Stringify
+ return STRING . $e . _encode_string($value);
+}
+
+sub _encode_string {
+ my $string = encode('UTF-8', shift) . "\x00";
+ return encode_int32(length $string) . $string;
+}
+
+sub _encode_value {
+ my ($e, $value) = @_;
+
+ # Null
+ return NULL . $e unless defined $value;
+
+ # Blessed
+ if (my $class = blessed $value) {
+
+ # True
+ return BOOL . $e . "\x01" if $value eq $TRUE;
+
+ # False
+ return BOOL . $e . "\x00" if $value eq $FALSE;
+
+ # Max
+ return MAX_KEY . $e if $value eq $MAXKEY;
+
+ # Min
+ return MIN_KEY . $e if $value eq $MINKEY;
+
+ # Multiple classes
+ return _encode_object($e, $value, $class);
+ }
+
+ # Reference
+ elsif (my $ref = ref $value) {
+
+ # Hash (Document)
+ return DOCUMENT . $e . bson_encode($value) if $ref eq 'HASH';
+
+ # Array
+ if ($ref eq 'ARRAY') {
+ my $array = bson_doc();
+ my $i = 1;
+ $array->{$i++} = $_ for @$value;
+ return ARRAY . $e . bson_encode($array);
+ }
+
+ # Scalar (boolean shortcut)
+ return _encode_value($e, $$value ? $TRUE : $FALSE) if $ref eq 'SCALAR';
+ }
+
+ # Double
+ my $flags = B::svref_2object(\$value)->FLAGS;
+ if ($flags & B::SVp_NOK && !($flags & B::SVp_POK)) {
+ return DOUBLE . $e . pack('d<', $value);
+ }
+
+ elsif ($flags & B::SVp_IOK && !($flags & B::SVp_POK)) {
+
+ # Int32
+ return INT32 . $e . encode_int32($value)
+ if $value <= INT32_MAX && $value >= INT32_MIN;
+
+ # Int64
+ return INT64 . $e . encode_int64($value);
+ }
+
+ # String
+ return STRING . $e . _encode_string("$value");
+}
+
+# Constants
+package Mango::BSON::_MaxKey;
+
+package Mango::BSON::_MinKey;
+
+1;
+
+=head1 NAME
+
+Mango::BSON - BSON
+
+=head1 SYNOPSIS
+
+ use Mango::BSON ':bson';
+
+ my $bson = bson_encode now => bson_time, counter => 13;
+ my $doc = bson_decode $bson;
+
+=head1 DESCRIPTION
+
+L<Mango::BSON> is a minimalistic implementation of L<http://bsonspec.org>.
+
+=head1 FUNCTIONS
+
+L<Mango::BSON> implements the following functions.
+
+=head2 bson_bin
+
+ my $generic = bson_bin $bytes;
+
+Create new BSON element of the binary type.
+
+ # Function
+ bson_bin($bytes)->type('function');
+
+ # MD5
+ bson_bin($bytes)->type('md5');
+
+ # UUID
+ bson_bin($bytes)->type('uuid');
+
+ # User defined
+ bson_bin($bytes)->type('user_defined');
+
+=head2 bson_code
+
+ my $code = bson_code 'function () {}';
+
+Create new BSON element of the code type.
+
+ # With scope
+ bson_code('function () {}')->scope({foo => 'bar'});
+
+=head2 bson_decode
+
+ my $doc = bson_decode $bson;
+
+Decode BSON into Perl data structures.
+
+=head2 bson_doc
+
+ my $doc = bson_doc foo => 'bar', baz => 23;
+
+Create new BSON document.
+
+=head2 bson_encode
+
+ my $bson = bson_encode $doc;
+
+Encode Perl data structures into BSON.
+
+=head2 bson_false
+
+ my $false = bson_false;
+
+Create new BSON element of the boolean type false.
+
+=head2 bson_length
+
+ my $len = bson_length $bson;
+
+Check BSON length prefix.
+
+=head2 bson_max
+
+ my $max_key = bson_max;
+
+Create new BSON element of the max key type.
+
+=head2 bson_min
+
+ my $min_key = bson_min;
+
+Create new BSON element of the min key type.
+
+=head2 bson_oid
+
+ my $oid = bson_oid;
+ my $oid = bson_oid '1a2b3c4e5f60718293a4b5c6';
+
+Create new BSON element of the object id type.
+
+=head2 bson_time
+
+ my $now = bson_time;
+ my $time = bson_time time * 1000;
+
+Create new BSON element of the UTC datetime type.
+
+=head2 bson_true
+
+ my $true = bson_true;
+
+Create new BSON element of the boolean type true.
+
+=head2 bson_ts
+
+ my $timestamp = bson_ts 23, 24;
+
+Create new BSON element of the timestamp type.
+
+=head2 decode_int32
+
+ my $int32 = decode_int32 $bytes;
+
+Decode 32bit integer.
+
+=head2 decode_int64
+
+ my $int64 = decode_int64 $bytes;
+
+Decode 64bit integer.
+
+=head2 encode_cstring
+
+ my $bytes = encode_cstring $cstring;
+
+Encode cstring.
+
+=head2 encode_int32
+
+ my $bytes = encode_int32 $int32;
+
+Encode 32bit integer.
+
+=head2 encode_int64
+
+ my $bytes = encode_int64 $int64;
+
+Encode 64bit integer.
+
+=head1 SEE ALSO
+
+L<Mango>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
+
+=cut
50 lib/Mango/BSON/Binary.pm
@@ -0,0 +1,50 @@
+package Mango::BSON::Binary;
+use Mojo::Base -base;
+use overload '""' => sub { shift->data }, fallback => 1;
+
+has [qw(data type)];
+
+1;
+
+=head1 NAME
+
+Mango::BSON::Binary - Binary type
+
+=head1 SYNOPSIS
+
+ use Mango::BSON::Binary;
+
+ my $bin = Mango::BSON::Binary->new(data => $bytes, type => 'generic');
+
+=head1 DESCRIPTION
+
+L<Mango::BSON::Binary> is a container for the BSON binary type used by
+L<Mango::BSON>.
+
+=head1 ATTRIBUTES
+
+L<Mango::BSON::Binary> implements the following attributes.
+
+=head2 data
+
+ my $bytes = $bin->data;
+ $bin = $bin->data($bytes);
+
+Binary data.
+
+=head2 type
+
+ my $type = $bin->type;
+ $bin = $bin->type('generic');
+
+Binary subtype.
+
+=head1 METHODS
+
+L<Mango::BSON::Binary> inherits all methods from L<Mojo::Base>.
+
+=head1 SEE ALSO
+
+L<Mango>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
+
+=cut
49 lib/Mango/BSON/Code.pm
@@ -0,0 +1,49 @@
+package Mango::BSON::Code;
+use Mojo::Base -base;
+
+has [qw(code scope)];
+
+1;
+
+=head1 NAME
+
+Mango::BSON::Code - Code type
+
+=head1 SYNOPSIS
+
+ use Mango::BSON::Code;
+
+ my $code = Mango::BSON::Code->new(code => 'function () {}');
+
+=head1 DESCRIPTION
+
+L<Mango::BSON::Code> is a container for the BSON code type used by
+L<Mango::BSON>.
+
+=head1 ATTRIBUTES
+
+L<Mango::BSON::Code> implements the following attributes.
+
+=head2 code
+
+ my $js = $code->code;
+ $code = $code->code('function () {}');
+
+JavaScript code.
+
+=head2 scope
+
+ my $scode = $code->scope;
+ $code = $code->scope({foo => 'bar'});
+
+Scope.
+
+=head1 METHODS
+
+L<Mango::BSON::Code> inherits all methods from L<Mojo::Base>.
+
+=head1 SEE ALSO
+
+L<Mango>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
+
+=cut
78 lib/Mango/BSON/Document.pm
@@ -0,0 +1,78 @@
+package Mango::BSON::Document;
+use Mojo::Base 'Tie::Hash';
+
+sub DELETE {
+ my ($self, $key) = @_;
+
+ if (exists $self->[0]{$key}) {
+ my $i = $self->[0]{$key};
+ $self->[0]{$self->[1][$_]}-- for $i + 1 .. $#{$self->[1]};
+ delete $self->[0]{$key};
+ splice @{$self->[1]}, $i, 1;
+ return (splice(@{$self->[2]}, $i, 1))[0];
+ }
+
+ return undef;
+}
+
+sub EXISTS { exists $_[0][0]{$_[1]} }
+
+sub FETCH {
+ my ($self, $key) = @_;
+ return exists $self->[0]{$key} ? $self->[2][$self->[0]{$key}] : undef;
+}
+
+sub FIRSTKEY {
+ $_[0][3] = 0;
+ &NEXTKEY;
+}
+
+sub NEXTKEY {
+ return $_[0][1][$_[0][3]++] if $_[0][3] <= $#{$_[0][1]};
+ return undef;
+}
+
+sub STORE {
+ my ($self, $key, $value) = @_;
+
+ if (exists $self->[0]{$key}) {
+ my $i = $self->[0]{$key};
+ $self->[0]{$key} = $i;
+ $self->[1][$i] = $key;
+ $self->[2][$i] = $value;
+ }
+ else {
+ push @{$self->[1]}, $key;
+ push @{$self->[2]}, $value;
+ $self->[0]{$key} = $#{$self->[1]};
+ }
+}
+
+sub TIEHASH {
+ my $self = bless [{}, [], [], 0], shift;
+ $self->STORE(shift, shift) while @_;
+ return $self;
+}
+
+1;
+
+=head1 NAME
+
+Mango::BSON::Document - Document type
+
+=head1 SYNOPSIS
+
+ use Mango::BSON::Document;
+
+ tie my %hash, 'Mango::BSON::Document';
+
+=head1 DESCRIPTION
+
+L<Mango::BSON::Document> is a container for the BSON document type used by
+L<Mango::BSON>.
+
+=head1 SEE ALSO
+
+L<Mango>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
+
+=cut
71 lib/Mango/BSON/ObjectID.pm
@@ -0,0 +1,71 @@
+package Mango::BSON::ObjectID;
+use Mojo::Base -base;
+use overload '""' => sub { ${$_[0]} }, fallback => 1;
+
+use Mojo::Util 'md5_bytes';
+use Sys::Hostname 'hostname';
+
+# 3 byte machine identifier
+my $MACHINE = substr md5_bytes(hostname), 0, 3;
+
+# Global counter
+my $COUNTER = 0;
+
+sub new {
+ my ($class, $oid) = @_;
+ return bless \($oid //= _generate()), ref $class || $class;
+}
+
+sub to_epoch { unpack 'N', substr(pack('H*', ${$_[0]}), 0, 4) }
+
+sub _generate {
+
+ # 4 byte time, 3 byte machine identifier and 2 byte process id
+ my $oid = pack('N', time) . $MACHINE . pack('n', $$ % 0xFFFF);
+
+ # 3 byte counter
+ $COUNTER = ($COUNTER + 1) % 0xFFFFFF;
+ return unpack 'H*', $oid . substr(pack('V', $COUNTER), 0, 3);
+}
+
+1;
+
+=head1 NAME
+
+Mango::BSON::ObjectID - Object ID type
+
+=head1 SYNOPSIS
+
+ use Mango::BSON::ObjectID;
+
+ my $oid = Mango::BSON::ObjectID->new('1a2b3c4e5f60718293a4b5c6');
+ say $oid->to_epoch;
+
+=head1 DESCRIPTION
+
+L<Mango::BSON::ObjectID> is a container for the BSON object id type used by
+L<Mango::BSON>.
+
+=head1 METHODS
+
+L<Mango::BSON::ObjectID> inherits all methods from L<Mojo::Base> and
+implements the following new ones.
+
+=head2 new
+
+ my $oid = Mango::BSON::ObjectID->new;
+ my $oid = Mango::BSON::ObjectID->new('1a2b3c4e5f60718293a4b5c6');
+
+Construct a new scalar-based L<Mango::BSON::ObjectID> object.
+
+=head2 to_epoch
+
+ my $epoch = $oid->to_epoch;
+
+Extract epoch seconds from object id.
+
+=head1 SEE ALSO
+
+L<Mango>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
+
+=cut
54 lib/Mango/BSON/Time.pm
@@ -0,0 +1,54 @@
+package Mango::BSON::Time;
+use Mojo::Base -base;
+use overload '""' => sub { ${$_[0]} }, fallback => 1;
+
+use Time::HiRes 'time';
+
+sub new {
+ my ($class, $time) = @_;
+ return bless \($time //= int(time * 1000)), ref $class || $class;
+}
+
+sub to_epoch { int(${$_[0]} / 1000) }
+
+1;
+
+=head1 NAME
+
+Mango::BSON::Time - Datetime type
+
+=head1 SYNOPSIS
+
+ use Mango::BSON::Time;
+
+ my $time = Mango::BSON::Time->new(time * 1000);
+ say $time->to_epoch;
+
+=head1 DESCRIPTION
+
+L<Mango::BSON::Time> is a container for the BSON datetime type used by
+L<Mango::BSON>.
+
+=head1 METHODS
+
+L<Mango::BSON::Time> inherits all methods from L<Mojo::Base> and implements
+the following new ones.
+
+=head2 new
+
+ my $time = Mango::BSON::Time->new;
+ my $time = Mango::BSON::Time->new(time * 1000);
+
+Construct a new scalar-based L<Mango::BSON::Time> object.
+
+=head2 to_epoch
+
+ my $epoch = $time->to_epoch;
+
+Convert time to epoch seconds.
+
+=head1 SEE ALSO
+
+L<Mango>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
+
+=cut
49 lib/Mango/BSON/Timestamp.pm
@@ -0,0 +1,49 @@
+package Mango::BSON::Timestamp;
+use Mojo::Base -base;
+
+has [qw(seconds increment)];
+
+1;
+
+=head1 NAME
+
+Mango::BSON::Timestamp - Timestamp type
+
+=head1 SYNOPSIS
+
+ use Mango::BSON::Timestamp;
+
+ my $ts = Mango::BSON::Timestamp->new(seconds => 23, increment => 5);
+
+=head1 DESCRIPTION
+
+L<Mango::BSON::Timestamp> is a container for the BSON timestamp type used by
+L<Mango::BSON>.
+
+=head1 ATTRIBUTES
+
+L<Mango::BSON::Timestamp> implements the following attributes.
+
+=head2 seconds
+
+ my $seconds = $ts->seconds;
+ $ts = $ts->seconds(23);
+
+Seconds.
+
+=head2 increment
+
+ my $inc = $ts->increment;
+ $tz = $ts->increment(5);
+
+Increment.
+
+=head1 METHODS
+
+L<Mango::BSON::Timestamp> inherits all methods from L<Mojo::Base>.
+
+=head1 SEE ALSO
+
+L<Mango>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
+
+=cut
201 lib/Mango/Collection.pm
@@ -0,0 +1,201 @@
+package Mango::Collection;
+use Mojo::Base -base;
+
+use Carp 'croak';
+use Mango::BSON 'bson_oid';
+use Mango::Cursor;
+
+has [qw(db name)];
+
+sub find {
+ my ($self, $query) = @_;
+ return Mango::Cursor->new(collection => $self, query => $query);
+}
+
+sub find_one {
+ my ($self, $query) = @_;
+ $query = {_id => $query} if ref $query eq 'Mango::BSON::ObjectID';
+
+ # Non-blocking
+ my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
+ return $self->find($query)->limit(-1)->next(
+ sub {
+ my ($cursor, $err, $doc) = @_;
+ $self->$cb($err, $doc);
+ }
+ ) if $cb;
+
+ # Blocking
+ return $self->find($query)->limit(-1)->next;
+}
+
+sub full_name { join '.', $_[0]->db->name, $_[0]->name }
+
+sub insert {
+ my ($self, $docs) = @_;
+ $docs = [$docs] unless ref $docs eq 'ARRAY';
+
+ # Make sure all documents have ids
+ my @ids = map { $_->{_id} //= bson_oid } @$docs;
+
+ # Non-blocking
+ my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
+ return $self->db->mango->insert(
+ ($self->full_name, {}, @$docs) => sub {
+ my ($mango, $err, $reply) = @_;
+ $err ||= _error($reply);
+ $self->$cb($err, @ids > 1 ? \@ids : $ids[0]);
+ }
+ ) if $cb;
+
+ # Blocking
+ my $reply = $self->db->mango->insert($self->full_name, {}, @$docs);
+ if (my $err = _error($reply)) { croak $err }
+ return @ids > 1 ? \@ids : $ids[0];
+}
+
+sub remove {
+ my $self = shift;
+ my $query = ref $_[0] eq 'CODE' ? {} : shift // {};
+ return $self->_handle('delete', {}, $query, @_);
+}
+
+sub update {
+ my ($self, $query, $update) = (shift, shift, shift);
+ return $self->_handle('update', {}, $query, $update, @_);
+}
+
+sub _error { $_[0]->[5][0]{ok} ? $_[0]->[5][0]{err} : $_[0]->[5][0]{errmsg} }
+
+sub _handle {
+ my ($self, $method) = (shift, shift);
+
+ # Non-blocking
+ my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
+ return $self->db->mango->$method(
+ ($self->full_name, @_) => sub {
+ my ($mango, $err, $reply) = @_;
+ $err ||= _error($reply);
+ $self->$cb($err, $reply->[5][0]);
+ }
+ ) if $cb;
+
+ # Blocking
+ my $reply = $self->db->mango->$method($self->full_name, @_);
+ if (my $err = _error($reply)) { croak $err }
+ return $reply->[5][0];
+}
+
+1;
+
+=head1 NAME
+
+Mango::Collection - MongoDB collection
+
+=head1 SYNOPSIS
+
+ use Mango::Collection;
+
+ my $collection = Mango::Collection->new(db => $db);
+ my $cursor = $collection->find({foo => 'bar'});
+
+=head1 DESCRIPTION
+
+L<Mango::Collection> is a container for MongoDB collections used by
+L<Mango::Database>.
+
+=head1 ATTRIBUTES
+
+L<Mango::Collection> implements the following attributes.
+
+=head2 db
+
+ my $db = $collection->db;
+ $collection = $collection->db(Mango::Database->new);
+
+L<Mango::Database> object this collection belongs to.
+
+=head2 name
+
+ my $name = $collection->name;
+ $collection = $collection->name('bar');
+
+Name of this collection.
+
+=head1 METHODS
+
+L<Mango::Collection> inherits all methods from L<Mojo::Base> and implements
+the following new ones.
+
+=head2 find
+
+ my $cursor = $collection->find({foo => 'bar'});
+
+Get L<Mango::Cursor> object for query.
+
+=head2 find_one
+
+ my $doc = $collection->find_one({foo => 'bar'});
+ my $doc = $collection->find_one($oid);
+
+Find one document. You can also append a callback to perform operation
+non-blocking.
+
+ $collection->find_one({foo => 'bar'} => sub {
+ my ($collection, $err, $doc) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head2 full_name
+
+ my $name = $collection->full_name;
+
+Full name of this collection.
+
+=head2 insert
+
+ my $oid = $collection->insert({foo => 'bar'});
+ my $oids = $collection->insert([{foo => 'bar'}, {baz => 'yada'}]);
+
+Insert one or more documents into collection. You can also append a callback
+to perform operation non-blocking.
+
+ $collection->insert({foo => 'bar'} => sub {
+ my ($collection, $err, $oid) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head2 remove
+
+ my $doc = $collection->remove;
+ my $doc = $collection->remove({foo => 'bar'});
+
+Remove documents from collection. You can also append a callback to perform
+operation non-blocking.
+
+ $collection->remove({foo => 'bar'} => sub {
+ my ($collection, $err, $doc) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head2 update
+
+ my $doc = $collection->update({foo => 'bar'}, {foo => 'baz'});
+
+Update document in collection. You can also append a callback to perform
+operation non-blocking.
+
+ $collection->update(({foo => 'bar'}, {foo => 'baz'}) => sub {
+ my ($collection, $err, $doc) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head1 SEE ALSO
+
+L<Mango>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
+
+=cut
223 lib/Mango/Cursor.pm
@@ -0,0 +1,223 @@
+package Mango::Cursor;
+use Mojo::Base -base;
+
+has [qw(collection id sort)];
+has limit => 10;
+has [qw(fields query)] => sub { {} };
+has skip => 0;
+
+sub all {
+ my ($self, $cb) = @_;
+
+ # Non-blocking
+ my @all;
+ return $self->next(sub { shift->_collect(\@all, $cb, @_) }) if $cb;
+
+ # Blocking
+ while (my $next = $self->next) { push @all, $next }
+ return \@all;
+}
+
+sub next {
+ my ($self, $cb) = @_;
+ return exists $self->{results} ? $self->_continue($cb) : $self->_start($cb);
+}
+
+sub rewind {
+ my ($self, $cb) = @_;
+
+ return unless my $id = $self->id;
+ delete $self->id(undef)->{results};
+
+ # Non-blocking
+ return $self->collection->db->mango->kill_cursors($id => sub { $self->$cb })
+ if $cb;
+
+ # Blocking
+ $self->collection->db->mango->kill_cursors($id);
+}
+
+sub _collect {
+ my ($self, $all, $cb, $err, $doc) = @_;
+ return $self->$cb($err, $all) if $err || !$doc;
+ push @$all, $doc;
+ $self->next(sub { shift->_collect($all, $cb, @_) });
+}
+
+sub _continue {
+ my ($self, $cb) = @_;
+
+ # Non-blocking
+ my $collection = $self->collection;
+ my $name = $collection->full_name;
+ if ($cb) {
+ return $self->$cb(undef, shift @{$self->{results}}) if @{$self->{results}};
+ return $collection->db->mango->get_more(
+ ($name, $self->limit, $self->id) => sub {
+ my ($mango, $err, $reply) = @_;
+ $self->$cb($err, $self->_queue($reply));
+ }
+ );
+ }
+
+ # Blocking
+ return shift @{$self->{results}} if @{$self->{results}};
+ return $self->_queue(
+ $collection->db->mango->get_more($name, $self->limit, $self->id));
+}
+
+sub _query {
+ my $self = shift;
+ my $query = $self->query;
+ return $query unless my $sort = $self->sort;
+ return {'$query' => $query, '$orderby' => $sort};
+}
+
+sub _queue {
+ my ($self, $reply) = @_;
+ push @{$self->{results} ||= []}, @{$reply->[5]};
+ return shift @{$self->{results}};
+}
+
+sub _start {
+ my ($self, $cb) = @_;
+
+ my $collection = $self->collection;
+ my $name = $collection->full_name;
+ my @args
+ = ($name, {}, $self->skip, $self->limit, $self->_query, $self->fields);
+
+ # Non-blocking
+ return $collection->db->mango->query(
+ @args => sub {
+ my ($mango, $err, $reply) = @_;
+ $self->id($reply->[3]);
+ $self->$cb($err, $self->_queue($reply));
+ }
+ ) if $cb;
+
+ # Blocking
+ my $reply = $collection->db->mango->query(@args);
+ $self->id($reply->[3]);
+ return $self->_queue($reply);
+}
+
+1;
+
+=head1 NAME
+
+Mango::Cursor - MongoDB cursor
+
+=head1 SYNOPSIS
+
+ use Mango::Cursor;
+
+ my $cursor = Mango::Cursor->new(collection => $collection);
+
+=head1 DESCRIPTION
+
+L<Mango::Cursor> is a container for MongoDB cursors used by
+L<Mango::Collection>.
+
+=head1 ATTRIBUTES
+
+L<Mango::Cursor> implements the following attributes.
+
+=head2 collection
+
+ my $collection = $cursor->collection;
+ $cursor = $cursor->collection(Mango::Collection->new);
+
+L<Mango::Collection> object this cursor belongs to.
+
+=head2 id
+
+ my $id = $cursor->id;
+ $cursor = $cursor->id(123456);
+
+Cursor id.
+
+=head2 limit
+
+ my $limit = $cursor->limit;
+ $cursor = $cursor->limit(1);
+
+Limit, defaults to C<10>.
+
+=head2 fields
+
+ my $fields = $cursor->fields;
+ $cursor = $cursor->fields({foo => 1});
+
+Fields.
+
+=head2 query
+
+ my $query = $cursor->query;
+ $cursor = $cursor->query({foo => 'bar'});
+
+Query.
+
+=head2 skip
+
+ my $skip = $cursor->skip;
+ $cursor = $cursor->skip(5);
+
+Documents to skip, defaults to C<0>.
+
+=head2 sort
+
+ my $sort = $cursor->sort;
+ $cursor = $cursor->sort({foo => 1});
+
+Sort.
+
+=head1 METHODS
+
+L<Mango::Cursor> inherits all methods from L<Mojo::Base> and implements the
+following new ones.
+
+=head2 all
+
+ my $docs = $cursor->all;
+
+Fetch all documents. You can also append a callback to perform operation
+non-blocking.
+
+ $cursor->all(sub {
+ my ($cursor, $err, $docs) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head2 next
+
+ my $doc = $cursor->next;
+
+Fetch next document. You can also append a callback to perform operation
+non-blocking.
+
+ $cursor->next(sub {
+ my ($cursor, $err, $doc) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head2 rewind
+
+ $cursor->rewind;
+
+Rewind cursor. You can also append a callback to perform operation
+non-blocking.
+
+ $cursor->rewind(sub {
+ my $cursor = shift;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head1 SEE ALSO
+
+L<Mango>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
+
+=cut
93 lib/Mango/Database.pm
@@ -0,0 +1,93 @@
+package Mango::Database;
+use Mojo::Base -base;
+
+use Mango::BSON 'bson_doc';
+use Mango::Collection;
+
+has [qw(mango name)];
+
+sub collection {
+ my ($self, $name) = @_;
+ return Mango::Collection->new(db => $self, name => $name);
+}
+
+sub command {
+ my ($self, $command) = (shift, shift);
+
+ # Non-blocking
+ my $cb = ref $_[-1] eq 'CODE' ? pop : undef;
+ return $self->collection('$cmd')->find_one(
+ bson_doc($command => 1, @_) => sub {
+ my ($collection, $err, $doc) = @_;
+ $self->$cb($err, $doc);
+ }
+ ) if $cb;
+
+ # Blocking
+ return $self->collection('$cmd')->find_one(bson_doc($command => 1, @_));
+}
+
+1;
+
+=head1 NAME
+
+Mango::Database - MongoDB database
+
+=head1 SYNOPSIS
+
+ use Mango::Database;
+
+ my $db = Mango::Database->new(mango => $mango);
+ my $collection = $db->collection('foo');
+
+=head1 DESCRIPTION
+
+L<Mango::Database> is a container for MongoDB databases used by L<Mango>.
+
+=head1 ATTRIBUTES
+
+L<Mango::Database> implements the following attributes.
+
+=head2 mango
+
+ my $mango = $db->mango;
+ $db = $db->mango(Mango->new);
+
+L<Mango> object this database belongs to.
+
+=head2 name
+
+ my $name = $db->name;
+ $db = $db->name('bar');
+
+Name of this database.
+
+=head1 METHODS
+
+L<Mango::Database> inherits all methods from L<Mojo::Base> and implements the
+following new ones.
+
+=head2 collection
+
+ my $collection = $db->collection('foo');
+
+Get L<Mango::Collection> object for collection.
+
+=head2 command
+
+ my $doc = $db->command('getLastError', {w => 2});
+
+Run command against database. You can also append a callback to run command
+non-blocking.
+
+ $db->command(('getLastError', {w => 2}) => sub {
+ my ($db, $err, $doc) = @_;
+ ...
+ });
+ Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
+
+=head1 SEE ALSO
+
+L<Mango>, L<Mojolicious::Guides>, L<http://mojolicio.us>.
+
+=cut
243 lib/Mango/Protocol.pm
@@ -0,0 +1,243 @@
+package Mango::Protocol;
+use Mojo::Base -base;
+
+use Mango::BSON qw(bson_decode bson_encode bson_length decode_int32),
+ qw(decode_int64 encode_cstring encode_int32 encode_int64);
+
+# Opcodes
+use constant {
+ REPLY => 1,
+ UPDATE => 2001,
+ INSERT => 2002,
+ QUERY => 2004,
+ GET_MORE => 2005,
+ DELETE => 2006,
+ KILL_CURSORS => 2007
+};
+
+sub build_delete {
+ my ($self, $id, $name, $flags, $query) = @_;
+
+ # Zero and name
+ my $msg = encode_int32(0) . encode_cstring($name);
+
+ # Flags
+ my $vec = 0b00000000000000000000000000000000;
+ _set($vec, 0b10000000000000000000000000000000, $flags->{single_remove});
+ $msg .= encode_int32 $vec;
+
+ # Query
+ $msg .= bson_encode $query;
+
+ # Header
+ return _build_header($id, length($msg), DELETE) . $msg;
+}
+
+sub build_get_more {
+ my ($self, $id, $name, $limit, $cursor) = @_;
+
+ # Zero and name
+ my $msg = encode_int32(0) . encode_cstring($name);
+
+ # Limit and cursor id
+ $msg .= encode_int32($limit) . encode_int64($cursor);
+
+ # Header
+ return _build_header($id, length($msg), GET_MORE) . $msg;
+}
+
+sub build_insert {
+ my ($self, $id, $name, $flags) = (shift, shift, shift, shift);
+
+ # Flags
+ my $vec = 0b00000000000000000000000000000000;
+ _set($vec, 0b10000000000000000000000000000000, $flags->{continue_on_error});
+ my $msg = encode_int32 $vec;
+
+ # Name
+ $msg .= encode_cstring $name;
+
+ # Documents
+ $msg .= bson_encode $_ for @_;
+
+ # Header
+ return _build_header($id, length($msg), INSERT) . $msg;
+}
+
+sub build_kill_cursors {
+ my ($self, $id) = (shift, shift);
+
+ # Zero and number of cursor ids
+ my $msg = encode_int32(0) . encode_int32(scalar @_);
+
+ # Cursor ids
+ $msg .= encode_int64 $_ for @_;
+
+ # Header
+ return _build_header($id, length($msg), KILL_CURSORS) . $msg;
+}
+
+sub build_query {
+ my ($self, $id, $name, $flags, $skip, $limit, $query, $fields) = @_;
+
+ # Flags
+ my $vec = 0b00000000000000000000000000000000;
+ _set($vec, 0b01000000000000000000000000000000, $flags->{tailable_cursor});
+ _set($vec, 0b00100000000000000000000000000000, $flags->{slave_ok});
+ _set($vec, 0b00001000000000000000000000000000, $flags->{no_cursor_timeout});
+ _set($vec, 0b00000100000000000000000000000000, $flags->{await_data});
+ _set($vec, 0b00000010000000000000000000000000, $flags->{exhaust});
+ _set($vec, 0b00000001000000000000000000000000, $flags->{partial});
+ my $msg = encode_int32 $vec;
+
+ # Name
+ $msg .= encode_cstring $name;
+
+ # Skip and limit
+ $msg .= encode_int32($skip) . encode_int32($limit);
+
+ # Query
+ $msg .= bson_encode $query;
+
+ # Optional field selector
+ $msg .= bson_encode $fields if $fields;
+
+ # Header
+ return _build_header($id, length($msg), QUERY) . $msg;
+}
+
+sub build_update {
+ my ($self, $id, $name, $flags, $query, $update) = @_;
+
+ # Zero and name
+ my $msg = encode_int32(0) . encode_cstring($name);
+
+ # Flags
+ my $vec = 0b00000000000000000000000000000000;
+ _set($vec, 0b10000000000000000000000000000000, $flags->{upsert});
+ _set($vec, 0b01000000000000000000000000000000, $flags->{multi_update});
+ $msg .= encode_int32 $vec;
+
+ # Query and update sepecification
+ $msg .= bson_encode($query) . bson_encode($update);
+
+ # Header
+ return _build_header($id, length($msg), UPDATE) . $msg;
+}
+
+sub parse_reply {
+ my ($self, $bufref) = @_;
+
+ # Make sure we have the whole message
+ return undef unless my $len = bson_length $$bufref;
+ return undef if length $$bufref < $len;
+ my $msg = substr $$bufref, 0, $len, '';
+ substr $msg, 0, 4, '';
+
+ # Header
+ my $id = decode_int32(substr $msg, 0, 4, '');
+ my $to = decode_int32(substr $msg, 0, 4, '');
+ my $op = decode_int32(substr $msg, 0, 4, '');
+ return undef unless $op == REPLY;
+
+ # FLags
+ my $flags = {};
+ my $vec = decode_int32(substr $msg, 0, 4, '');
+ $flags->{cursor_not_found} = _get($vec, 0b10000000000000000000000000000000);
+ $flags->{query_failure} = _get($vec, 0b01000000000000000000000000000000);
+ $flags->{await_capable} = _get($vec, 0b00010000000000000000000000000000);
+
+ # Cursor id
+ my $cursor = decode_int64(substr $msg, 0, 8, '');
+
+ # Starting from
+ my $from = decode_int32(substr $msg, 0, 4, '');
+
+ # Documents (remove number of documents prefix)
+ substr $msg, 0, 4, '';
+ my @docs;
+ push @docs, bson_decode(substr $msg, 0, bson_length($msg), '') while $msg;
+
+ return [$id, $to, $flags, $cursor, $from, \@docs];
+}
+
+sub _build_header {
+ my ($id, $length, $op) = @_;