Skip to content
This repository was archived by the owner on Dec 22, 2021. It is now read-only.
Closed
61 changes: 58 additions & 3 deletions lib/MongoDB/ClientSession.pm
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use MongoDB::Error;
use Moo;
use MongoDB::_Types qw(
Document
MongoDBTimestamp
);
use Types::Standard qw(
Bool
Expand Down Expand Up @@ -63,7 +64,15 @@ has cluster_time => (

=attr options

Options provided for this particular session.
Options provided for this particular session. Available options include:

=for :list
* C<causalConsistency> - If true, will enable causalConsistency for
this session. For more information, see L<MongoDB documentation on Causal
Consistency|https://docs.mongodb.com/manual/core/read-isolation-consistency-recency/#causal-consistency>.
Note that causalConsistency does not apply for unacknowledged writes.
Defaults to true.


=cut

Expand All @@ -74,7 +83,10 @@ has options => (
# Shallow copy to prevent action at a distance.
# Upgrade to use Storable::dclone if a more complex option is required
coerce => sub {
$_[0] = { %{ $_[0] } };
$_[0] = {
causalConsistency => 1,
%{ $_[0] }
};
},
);

Expand All @@ -86,6 +98,21 @@ has _server_session => (
clearer => '__clear_server_session',
);

=attr operation_time

The last operation time. This is updated when an operation is performed during
this session, or when L</advance_operation_time> is called. Used for causal
consistency.

=cut

has operation_time => (
is => 'rwp',
isa => Maybe[MongoDBTimestamp],
init_arg => undef,
default => undef,
);

=method session_id

The session id for this particular session. This should be considered
Expand Down Expand Up @@ -119,7 +146,7 @@ sub get_latest_cluster_time {
if ( defined $self->client->_cluster_time ) {
# Both must be defined here so can just compare
if ( $self->cluster_time->{'clusterTime'}
> $self->client->_cluster_time->{'clusterTime'} ) {
> $self->client->_cluster_time->{'clusterTime'} ) {
return $self->cluster_time;
} else {
return $self->client->_cluster_time;
Expand Down Expand Up @@ -160,6 +187,34 @@ sub advance_cluster_time {
return;
}

=method advance_operation_time

$session->advance_operation_time( $operation_time );

Update the L</operation_time> for this session. If the value provided is more
recent than the sessions current operation time, then the session will be
updated to this provided value.

Setting C<operation_time> with a manually crafted value may cause a server
error. It is recommended to only use an C<operation_time> retreived from
another session or directly from a database call.

=cut

sub advance_operation_time {
my ( $self, $operation_time ) = @_;

# Just dont update operation_time if they've denied this, as it'l stop
# everywhere else that updates based on this value from the session
return unless $self->options->{causalConsistency};

if ( !defined( $self->operation_time )
|| ( $operation_time > $self->operation_time ) ) {
$self->_set_operation_time( $operation_time );
}
return;
}

=method end_session

$session->end_session;
Expand Down
1 change: 1 addition & 0 deletions lib/MongoDB/Cursor.pm
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,7 @@ sub explain {
query => $self->_query,
read_preference => $self->_query->read_preference,
read_concern => $self->_query->read_concern,
session => $self->_query->session,
monitoring_callback => $self->client->monitoring_callback,
);

Expand Down
8 changes: 8 additions & 0 deletions lib/MongoDB/MongoClient.pm
Original file line number Diff line number Diff line change
Expand Up @@ -1485,6 +1485,14 @@ Returns a new L<MongoDB::ClientSession> with the supplied options.
will throw a C<MongoDB::ConfigurationError> if sessions are not supported by
the connected MongoDB deployment.

the options hash is an optional hash which can have the following keys:

=for :list
* C<causalConsistency> - Enable Causally Consistent reads for this session.
Defaults to true.

for more information see L<MongoDB::ClientSession/options>.

=cut

sub start_session {
Expand Down
2 changes: 1 addition & 1 deletion lib/MongoDB/Op/_Aggregate.pm
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ sub execute {
pipeline => $self->pipeline,
%$options,
(
!$has_out && $link->accepts_wire_version(4) ? @{ $self->read_concern->as_args } : ()
!$has_out && $link->accepts_wire_version(4) ? @{ $self->read_concern->as_args( $self->session ) } : ()
),
(
$has_out && $link->accepts_wire_version(5) ? @{ $self->write_concern->as_args } : ()
Expand Down
3 changes: 3 additions & 0 deletions lib/MongoDB/Op/_Command.pm
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,9 @@ sub execute {
address => $link->address,
);

# Must happen even on an error (ie. the command fails)
$self->_update_operation_time( $res );

$res->assert;

$self->_update_session_and_cluster_time($res);
Expand Down
2 changes: 1 addition & 1 deletion lib/MongoDB/Op/_Count.pm
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ sub execute {
query => $self->filter,

($link->accepts_wire_version(4) ?
@{ $self->read_concern->as_args } : () ),
@{ $self->read_concern->as_args( $self->session ) } : () ),

%{ $self->options },
];
Expand Down
2 changes: 1 addition & 1 deletion lib/MongoDB/Op/_Distinct.pm
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ sub execute {
key => $self->fieldname,
query => $filter,
($link->accepts_wire_version(4) ?
@{ $self->read_concern->as_args } : ()),
@{ $self->read_concern->as_args( $self->session ) } : ()),
%$options
);

Expand Down
3 changes: 2 additions & 1 deletion lib/MongoDB/Op/_Explain.pm
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,12 @@ sub _command_explain {
db_name => $self->db_name,
query => [
explain => $cmd,
@{ $self->read_concern->as_args }
@{ $self->read_concern->as_args( $self->session ) }
],
query_flags => {},
read_preference => $self->read_preference,
bson_codec => $self->bson_codec,
session => $self->session,
monitoring_callback => $self->monitoring_callback,
);
my $res = $op->execute( $link, $topology );
Expand Down
2 changes: 1 addition & 1 deletion lib/MongoDB/Op/_ParallelScan.pm
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ sub execute {
parallelCollectionScan => $self->coll_name,
numCursors => $self->num_cursors,
($link->accepts_wire_version(4) ?
@{ $self->read_concern->as_args } : () ),
@{ $self->read_concern->as_args( $self->session ) } : () ),
%{$self->options},
];

Expand Down
2 changes: 1 addition & 1 deletion lib/MongoDB/Op/_Query.pm
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ sub _as_command {
tailable => $tailable,
awaitData => $await_data,
singleBatch => ( $single_batch ? $TRUE : $FALSE ),
@{ $self->{read_concern}->as_args },
@{ $self->{read_concern}->as_args( $self->session ) },

( $limit ? ( limit => $limit ) : () ),
( $batch_size ? ( batchSize => $batch_size ) : () ),
Expand Down
29 changes: 18 additions & 11 deletions lib/MongoDB/ReadConcern.pm
Original file line number Diff line number Diff line change
Expand Up @@ -58,30 +58,37 @@ has level => (
predicate => 'has_level',
);

has _as_args => (
is => 'lazy',
isa => ArrayRef,
reader => 'as_args',
builder => '_build_as_args',
);

sub BUILD {
my $self = shift;
if ( defined $self->{level} ) {
$self->{level} = lc $self->{level};
}
}

sub _build_as_args {
my ($self) = @_;
# public interface for compatibility, but undocumented
sub as_args {
my ( $self, $session ) = @_;

# if session is defined and operation_time is not, then either the
# operation_time was not sent on the response from the server for this
# session or the session has causal consistency disabled.
if ( $self->{level} ) {
return [
readConcern => { level => $self->{level} }
readConcern => {
level => $self->{level},
( defined $session && defined $session->operation_time
? ( afterClusterTime => $session->operation_time )
: () ),
}
];
}
else {
return [];
return [
( defined $session && defined $session->operation_time
? ( readConcern => { afterClusterTime => $session->operation_time } )
: ()
)
];
}
}

Expand Down
44 changes: 30 additions & 14 deletions lib/MongoDB/Role/_SessionSupport.pm
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,12 @@ sub _apply_session_and_cluster_time {
my $cluster_time = $self->session->get_latest_cluster_time;

# No cluster time in either session or client
return unless defined $cluster_time;

if ( $link->server->is_master->{maxWireVersion} >= 6 ) {
# Gossip the clusterTime
$$query_ref = to_IxHash( $$query_ref );
($$query_ref)->Push( '$clusterTime' => $cluster_time );
if ( defined $cluster_time ) {
if ( $link->server->is_master->{maxWireVersion} >= 6 ) {
# Gossip the clusterTime
$$query_ref = to_IxHash( $$query_ref );
($$query_ref)->Push( '$clusterTime' => $cluster_time );
}
}

return;
Expand All @@ -69,19 +69,35 @@ sub _update_session_and_cluster_time {
# No point continuing as theres nothing to do even if clusterTime is returned
return unless defined $self->session;

my $cluster_time;
if ( $response->$_isa( 'MongoDB::CommandResult' ) ) {
$cluster_time = $response->output->{'$clusterTime'};
} else {
$cluster_time = $response->{'$clusterTime'};
my $cluster_time = $self->__extract_from( $response, '$clusterTime' );

if ( defined $cluster_time ) {
$self->session->client->_update_cluster_time( $cluster_time );
$self->session->advance_cluster_time( $cluster_time );
}

return unless defined $cluster_time;
return;
}

sub _update_operation_time {
my ( $self, $response ) = @_;

return unless defined $self->session;

$self->session->client->_update_cluster_time( $cluster_time );
$self->session->advance_cluster_time( $cluster_time );
my $operation_time = $self->__extract_from( $response, 'operationTime' );
$self->session->advance_operation_time( $operation_time ) if defined $operation_time;

return;
}

sub __extract_from {
my ( $self, $response, $key ) = @_;

if ( $response->$_isa( 'MongoDB::CommandResult' ) ) {
return $response->output->{ $key };
} else {
return $response->{ $key };
}
}

1;
2 changes: 2 additions & 0 deletions lib/MongoDB/Role/_SingleBatchDocWrite.pm
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ sub _send_write_command {

my $res = $self->bson_codec->decode_one( $result->{docs} );

$self->_update_operation_time( $res );

$self->_update_session_and_cluster_time($res);

# Error checking depends on write concern
Expand Down
3 changes: 3 additions & 0 deletions lib/MongoDB/_Types.pm
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use Type::Library
MaybeHashRef
MongoDBCollection
MongoDBDatabase
MongoDBTimestamp
NonEmptyStr
NonNegNum
OID
Expand Down Expand Up @@ -128,6 +129,8 @@ class_type MongoDBCollection, { class => 'MongoDB::Collection' };

class_type MongoDBDatabase, { class => 'MongoDB::Database' };

class_type MongoDBTimestamp, { class => 'MongoDB::Timestamp' };

declare NonEmptyStr, as Str, where { defined $_ && length $_ };

declare NonNegNum, as Num,
Expand Down
Loading