Skip to content
This repository was archived by the owner on Dec 22, 2021. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
6373104
PERL-791 Change stream support
phaylon Mar 22, 2018
e32fd12
adjusted to use Safe::Isa
phaylon Mar 22, 2018
5b769fc
Added to MongoDB::ChangeStream
phaylon Mar 23, 2018
9a323f0
Don't return undef as per Perl::Critic
phaylon Mar 23, 2018
681be3d
Enable raising of MongoDB::CursorNotFoundError instead of just MongoD…
phaylon Mar 23, 2018
a7d1434
Added test for MongoDB::ChangeStream reconnections on MongoDB::Cursor…
phaylon Mar 23, 2018
294585d
Adjust MongoDB::Error docs to include ::InvalidOperationError and mov…
phaylon Mar 23, 2018
3f28048
Add ->_is_resumable() flag to MongoDB::Errors indicating if an error …
phaylon Mar 24, 2018
9b034d8
Change MongoDB::ChangeStream to use MongoDB::Error->is_resumable() fo…
phaylon Mar 24, 2018
1e9d72c
Add types to attributes in MongoDB::ChangeStream
phaylon Mar 28, 2018
cfdbd30
Rename ChangeStream->_cursor to ->_result
phaylon Mar 29, 2018
3ba03db
Remove non-existant resume_token builder in ChangeStream
phaylon Mar 29, 2018
d97dffc
Rename ChangeStream->options to ->aggregation_options
phaylon Mar 29, 2018
9b2a75f
Removed leftover unused client argument to ChangeStream construction …
phaylon Mar 29, 2018
c147c70
Removed unused variable
phaylon Mar 29, 2018
8aa871e
Removed default empty array/hash ref constructs for pipeline/options …
phaylon Mar 29, 2018
4622361
Removed cursorType from Op::_Aggregate
phaylon Mar 29, 2018
0a581e6
Fixed ChangeStream->next documentation
phaylon Mar 29, 2018
5fa5265
Removed temporary for change _id
phaylon Mar 29, 2018
da8f577
explicitly return undef instead of nothing when ChangeStream->next ha…
phaylon Mar 29, 2018
c8dd4bb
Fixed synopsis for ChangeStream
phaylon Mar 29, 2018
27289bf
Fixed usage example in ChangeStream->watch
phaylon Mar 29, 2018
fa7dde8
Add note about where to obtain resumeAfter token to Collection->watch…
phaylon Mar 29, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 199 additions & 0 deletions lib/MongoDB/ChangeStream.pm
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
#
# Copyright 2009-2018 MongoDB, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

use strict;
use warnings;
package MongoDB::ChangeStream;

# ABSTRACT: A stream providing update information for collections.

use version;
our $VERSION = 'v1.999.0';

use Moo;
use Try::Tiny;
use MongoDB::Cursor;
use MongoDB::Op::_Aggregate;
use MongoDB::Error;
use Safe::Isa;
use MongoDB::_Types qw(
MongoDBCollection
ArrayOfHashRef
);
use Types::Standard qw(
InstanceOf
HashRef
Str
);

use namespace::clean -except => 'meta';

has _result => (
is => 'rw',
isa => InstanceOf['MongoDB::QueryResult'],
lazy => 1,
builder => '_build_result',
clearer => '_clear_result',
);

has collection => (
is => 'ro',
isa => MongoDBCollection,
required => 1,
);

has aggregation_options => (
is => 'ro',
isa => HashRef,
);

has pipeline => (
is => 'ro',
isa => ArrayOfHashRef,
required => 1,
);

has full_document => (
is => 'ro',
isa => Str,
predicate => '_has_full_document',
);

has _resume_token => (
is => 'rw',
init_arg => 'resume_after',
predicate => '_has_resume_token',
lazy => 1,
);

sub BUILD {
my ($self) = @_;

# starting point is construction time instead of first next call
$self->_result;
}

sub _build_result {
my ($self) = @_;

my @pipeline = @{ $self->pipeline };
@pipeline = (
{'$changeStream' => {
($self->_has_full_document
? (fullDocument => $self->full_document)
: ()
),
($self->_has_resume_token
? (resumeAfter => $self->_resume_token)
: ()
),
}},
@pipeline,
);

return $self->collection->aggregate(
\@pipeline,
$self->aggregation_options,
);
}

=head1 STREAM METHODS

=cut

=head2 next

$change_stream = $collection->watch(...);
$change = $change_stream->next;

Waits for the next change in the collection and returns it.

B<Note>: This method will wait for the amount of milliseconds passed
as C<maxAwaitTimeMS> to L<MongoDB::Collection/watch> or the server's
default wait-time. It will not wait indefinitely.

=cut

sub next {
my ($self) = @_;

my $change;
my $retried;
while (1) {
last if try {
$change = $self->_result->next;
1 # successfully fetched result
}
catch {
my $error = $_;
if (
not($retried)
and $error->$_isa('MongoDB::Error')
and $error->_is_resumable
) {
$retried = 1;
$self->_result($self->_build_result);
}
else {
die $error;
}
0 # failed, cursor was rebuilt
};
}

# this differs from drivers that block indefinitely. we have to
# deal with the situation where no results are available.
if (not defined $change) {
return undef;
}

if (exists $change->{_id}) {
$self->_resume_token($change->{_id});
return $change;
}
else {
MongoDB::InvalidOperationError->throw(
"Cannot provide resume functionality when the ".
"resume token is missing");
}
}

1;

=head1 SYNOPSIS

$stream = $collection->watch( $pipeline, $options );
while(1) {

# This inner loop will only iterate until there are no more
# changes available.
while (my $change = $stream->next) {
...
}
}

=head1 DESCRIPTION

This class models change stream results as returned by the
L<MongoDB::Collection/watch> method.

=head1 SEE ALSO

The L<Change Streams manual section|https://docs.mongodb.com/manual/changeStreams/>.

The L<Change Streams specification|https://github.com/mongodb/specifications/blob/master/source/change-streams.rst>.

=cut
86 changes: 86 additions & 0 deletions lib/MongoDB/Collection.pm
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ package MongoDB::Collection;
use version;
our $VERSION = 'v1.999.0';

use MongoDB::ChangeStream;
use MongoDB::Error;
use MongoDB::IndexView;
use MongoDB::InsertManyResult;
Expand Down Expand Up @@ -1000,6 +1001,85 @@ sub find_one_and_update {
return $self->_find_one_and_update_or_replace($filter, $update, $options);
}

=method watch

Watches for changes on this collection-

Perform an aggregation with an implicit initial C<$changeStream> stage
and returns a L<MongoDB::ChangeStream> result which can be used to
iterate over the changes in the collection. This functionality is
available since MongoDB 3.6.

my $stream = $collection->watch();
my $stream = $collection->watch( \@pipeline );
my $stream = $collection->watch( \@pipeline, \%options );

while (1) {

# This inner loop will only run until no more changes are
# available.
while (my $change = $stream->next) {
# process $change
}
}

The returned stream will not block forever waiting for changes. If you
want to respond to changes over a longer time use C<maxAwaitTimeMS> and
regularly call C<next> in a loop.

B<Note>: Using this helper method is preferred to manually aggregating
with a C<$changeStream> stage, since it will automatically resume when
the connection was terminated.

The optional first argument must be an array-ref of
L<aggregation pipeline|http://docs.mongodb.org/manual/core/aggregation-pipeline/>
documents. Each pipeline document must be a hash reference. Not all
pipeline stages are supported after C<$changeStream>.

The optional second argument is a hash reference with options:

=for :list
* C<fullDocument> - The fullDocument to pass as an option to the
C<$changeStream> stage. Allowed values: C<default>, C<updateLookup>.
Defaults to C<default>. When set to C<updateLookup>, the change
notification for partial updates will include both a delta describing the
changes to the document, as well as a copy of the entire document that
was changed from some time after the change occurred.
* C<resumeAfter> - The logical starting point for this change stream.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add to this that it must be the _id from a document returned from calling next on the ChangeStream object.

This value can be obtained from the C<_id> field of a document returned
by L<MongoDB::ChangeStream/next>.
* C<maxAwaitTimeMS> - The maximum number of milliseconds for the server
to wait before responding.

See L</aggregate> for more available options.

See the L<manual section on Change Streams|https://docs.mongodb.com/manual/changeStreams/>
for general usage information on change streams.

See the L<Change Streams specification|https://github.com/mongodb/specifications/blob/master/source/change-streams.rst>
for details on change streams.

=cut

sub watch {
my ( $self, $pipeline, $options ) = @_;

$pipeline ||= [];
$options ||= {};

return MongoDB::ChangeStream->new(
exists($options->{fullDocument})
? (full_document => delete $options->{fullDocument})
: (full_document => 'default'),
exists($options->{resumeAfter})
? (resume_after => delete $options->{resumeAfter})
: (),
collection => $self,
pipeline => $pipeline,
aggregation_options => $options,
);
}

=method aggregate

@pipeline = (
Expand Down Expand Up @@ -1082,6 +1162,12 @@ sub aggregate {
options => $options,
read_concern => $self->read_concern,
has_out => $last_op eq '$out',
exists($options->{cursorType})
? (cursorType => delete $options->{cursorType})
: (cursorType => 'non_tailable'),
exists($options->{maxAwaitTimeMS})
? (maxAwaitTimeMS => delete $options->{maxAwaitTimeMS})
: (),
%{ $self->_op_args },
);

Expand Down
25 changes: 22 additions & 3 deletions lib/MongoDB/Error.pm
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ BEGIN {
UNKNOWN_ERROR => 8,
NAMESPACE_NOT_FOUND => 26,
INDEX_NOT_FOUND => 27,
CURSOR_NOT_FOUND => 43,
EXCEEDED_TIME_LIMIT => 50,
COMMAND_NOT_FOUND => 59,
WRITE_CONCERN_ERROR => 64,
Expand Down Expand Up @@ -109,6 +110,10 @@ sub throw {
die $throwable;
}

# internal flag indicating if an operation should be retried when
# an error occurs.
sub _is_resumable { 1 }

#--------------------------------------------------------------------------#
# Subclasses with attributes included inline below
#--------------------------------------------------------------------------#
Expand All @@ -134,6 +139,8 @@ has code => (

sub _build_code { return MongoDB::Error::UNKNOWN_ERROR() }

sub _is_resumable { 0 }

package MongoDB::DocumentError;

use Moo;
Expand Down Expand Up @@ -191,6 +198,8 @@ use Moo;
use namespace::clean;
extends 'MongoDB::Error';

sub _is_resumable { 1 }

package MongoDB::HandshakeError;
use Moo;
use namespace::clean;
Expand Down Expand Up @@ -229,6 +238,7 @@ use Moo;
use namespace::clean;
extends 'MongoDB::DatabaseError';
sub _build_code { return MongoDB::Error::NOT_MASTER() }
sub _is_resumable { 1 }

package MongoDB::WriteError;
use Moo;
Expand All @@ -250,7 +260,9 @@ extends 'MongoDB::Error';
package MongoDB::CursorNotFoundError;
use Moo;
use namespace::clean;
extends 'MongoDB::Error';
extends 'MongoDB::DatabaseError';
sub _build_code { return MongoDB::Error::CURSOR_NOT_FOUND() }
sub _is_resumable { 1 }

package MongoDB::DecodingError;
use Moo;
Expand All @@ -277,6 +289,11 @@ use Moo;
use namespace::clean;
extends 'MongoDB::Error';

package MongoDB::InvalidOperationError;
use Moo;
use namespace::clean;
extends 'MongoDB::Error';

#--------------------------------------------------------------------------#
# Private error classes
#--------------------------------------------------------------------------#
Expand Down Expand Up @@ -347,10 +364,10 @@ To retry failures automatically, consider using L<Try::Tiny::Retry>.
| |
| |->MongoDB::NetworkError
|
|->MongoDB::CursorNotFoundError
|
|->MongoDB::DatabaseError
| |
| |->MongoDB::CursorNotFoundError
| |
| |->MongoDB::DuplicateKeyError
| |
| |->MongoDB::NotMasterError
Expand All @@ -367,6 +384,8 @@ To retry failures automatically, consider using L<Try::Tiny::Retry>.
|
|->MongoDB::InternalError
|
|->MongoDB::InvalidOperationError
|
|->MongoDB::ProtocolError
|
|->MongoDB::SelectionError
Expand Down
Loading