Skip to content
This repository was archived by the owner on Dec 22, 2021. It is now read-only.

Conversation

@phaylon
Copy link
Contributor

@phaylon phaylon commented Jun 20, 2018

Note: This includes code for the fallback of startAtOperationTime, but disables it because:

  • When active tests fail with Resume of change stream was not possible, as the resume point may no longer be in the oplog
  • The spec test command event expectation data doesn't include, causing spec test failures.

Otherwise this PR includes the following changes:

  • Add Op::_ChangeStream as adapted version of Op::_Aggregate
  • Make MongoDB::ChangeStream use the new Op for command construction logic
  • Introduce startAtOperationTime (with disabled fallback, see above)
  • ChangeStream specification tests in t/changestream_spec.t
  • Add watch methods to ::Database and ::MongoClient
  • Add session as recognized option to the watch methods
  • Make ChangeStream tests also run on database and client, and test the session option

Copy link
Contributor

@xdg xdg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like a great start! As a general point, I'd like to have the ChangeStreams.pm "_op_args" only be the cached _op_args fields from Collection (and the manually assembled equivalents in Database and MongoClient). All the watch API specific arguments and options should be in private variables. You'll see some notes to that effect in my comments.

ArrayOfHashRef
);
use Types::Standard qw(
Bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use "Boolish" from MongoDB::_Types instead here and everywhere else instead, as it handles overloaded objects in a way the built-ins don't.

use Types::Standard qw(
Bool
InstanceOf
Int
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto Intish


has start_at_operation_time => (
is => 'ro',
isa => Maybe[Int],
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to need to be a BSON::Timestamp, not Int, so that users can pass in the BSON::Timestamp they get back from session operation time.

It should also be private

C<startAtOperationTime>
* C<maxAwaitTimeMS> - The maximum number of milliseconds for the server
to wait before responding.
* C<startAtOperationTime> - A timestamp specifying at what point in time
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clarify that this should be a BSON::Timestamp


my $session = $self->_get_session_from_hashref( $options );

# boolify some options
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't need these for change streams

isa => MongoDBCollection,
);

has pipeline => (
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On reflection, pipeline and full_document should be private, too. The only public method (other than the constructor) should be next.

has _op_args => (
is => 'ro',
isa => HashRef,
default => sub { {} },
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be required without a default.

pipeline => $self->pipeline,
all_changes_for_cluster => $self->all_changes_for_cluster,
changes_received => $self->_changes_received,
defined($self->start_at_operation_time)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if this should use _has_start_at_operation_time for consistency with resume token. Think again about it, but I'm not set on it if there is a good reason to check for definedness.

my @pipeline = (
{'$changeStream' => {
(defined($start_op_time)
? (startAtOperationTime => BSON::Timestamp->new(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we make startAtOperationTime a BSON::Timestamp then this conversion isn't needed.


# For explain, we give the whole response as fields have changed in
# different server versions
if ( $options->{explain} ) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

dont' need explain

Copy link
Contributor

@xdg xdg left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One minor nit. Otherwise LGTM. Thanks!

init_arg => 'start_at_operation_time',
predicate => '_has_start_at_operation_time',
coerce => sub {
ref($_[0]) ? $_[0] : BSON::Timestamp->new($_[0])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you're wanting to allow seconds, I'd prefer to have this be BSON::Timestamp->new( seconds => $_[0]) rather than relying on BUILDARGS to fix up the legacy constructor format.

@xdg
Copy link
Contributor

xdg commented Jun 22, 2018

Rebased and merged.

@xdg xdg closed this Jun 22, 2018
@TBSliver TBSliver deleted the phaylon/PERL-912 branch June 13, 2019 14:52
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants