Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions docs/reference/create-client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -619,8 +619,9 @@ Ruby Options
- ``{ :mode => :primary }``

* - ``:read_concern``
- Specifies the read concern options. The only valid key is ``level``, for which the valid
values are ``:local``, ``:majority``, and ``:snapshot``.
- Specifies the read concern options. The only valid key is ``level``,
for which the valid values are ``:local``, ``:majority``, and
``:snapshot``.
- ``Hash``
- none

Expand Down
4 changes: 4 additions & 0 deletions lib/mongo/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -887,6 +887,7 @@ def reconnect
#
# See https://docs.mongodb.com/manual/reference/command/listDatabases/
# for more information and usage.
# @option opts [ Session ] :session The session to use.
#
# @return [ Array<String> ] The names of the databases.
#
Expand All @@ -910,6 +911,7 @@ def database_names(filter = {}, opts = {})
#
# See https://docs.mongodb.com/manual/reference/command/listDatabases/
# for more information and usage.
# @option opts [ Session ] :session The session to use.
#
# @return [ Array<Hash> ] The info for each database.
#
Expand All @@ -930,6 +932,8 @@ def list_databases(filter = {}, name_only = false, opts = {})
# @param [ Hash ] filter The filter criteria for getting a list of databases.
# @param [ Hash ] opts The command options.
#
# @option opts [ Session ] :session The session to use.
#
# @return [ Array<Mongo::Database> ] The list of database objects.
#
# @since 2.5.0
Expand Down
12 changes: 6 additions & 6 deletions lib/mongo/database.rb
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,12 @@ def read_command(operation, opts = {})

client.send(:with_session, opts) do |session|
read_with_retry(session, preference) do |server|
Operation::Command.new({
:selector => operation.dup,
:db_name => name,
:read => preference,
:session => session
}).execute(server, context: Operation::Context.new(client: client, session: session))
Operation::Command.new(
selector: operation.dup,
db_name: name,
read: preference,
session: session,
).execute(server, context: Operation::Context.new(client: client, session: session))
end
end
end
Expand Down
4 changes: 3 additions & 1 deletion lib/mongo/database/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class View
#
# See https://docs.mongodb.com/manual/reference/command/listCollections/
# for more information and usage.
# @option options [ Session ] :session The session to use.
#
# @return [ Array<String> ] The names of all non-system collections.
#
Expand Down Expand Up @@ -100,12 +101,13 @@ def collection_names(options = {})
#
# See https://docs.mongodb.com/manual/reference/command/listCollections/
# for more information and usage.
# @option options [ Session ] :session The session to use.
#
# @return [ Array<Hash> ] Info for each collection in the database.
#
# @since 2.0.5
def list_collections(options = {})
session = client.send(:get_session)
session = client.send(:get_session, options)
collections_info(session, ServerSelector.primary, options)
end

Expand Down
2 changes: 2 additions & 0 deletions lib/mongo/error.rb
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,8 @@ def add_label(label)
require 'mongo/error/no_srv_records'
require 'mongo/error/session_ended'
require 'mongo/error/sessions_not_supported'
require 'mongo/error/snapshot_session_invalid_server_version'
require 'mongo/error/snapshot_session_transaction_prohibited'
require 'mongo/error/operation_failure'
require 'mongo/error/pool_closed_error'
require 'mongo/error/raise_original_error'
Expand Down
31 changes: 31 additions & 0 deletions lib/mongo/error/snapshot_session_invalid_server_version.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# frozen_string_literal: true
# encoding: utf-8

# Copyright (C) 2021 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class Error

# Exception raised if an operation using a snapshot session is
# directed to a pre-5.0 server.
class SnapshotSessionInvalidServerVersion < Error

# Instantiate the new exception.
def initialize
super("Snapshot reads require MongoDB 5.0 or later")
end
end
end
end
30 changes: 30 additions & 0 deletions lib/mongo/error/snapshot_session_transaction_prohibited.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# frozen_string_literal: true
# encoding: utf-8

# Copyright (C) 2021 MongoDB Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

module Mongo
class Error

# Exception raised if a transaction is attempted on a snapshot session.
class SnapshotSessionTransactionProhibited < Error

# Instantiate the new exception.
def initialize
super("Transactions are not supported in snapshot sessions")
end
end
end
end
6 changes: 6 additions & 0 deletions lib/mongo/operation/result.rb
Original file line number Diff line number Diff line change
Expand Up @@ -429,6 +429,12 @@ def write_concern_error?
!!(first_document && first_document['writeConcernError'])
end

def snapshot_timestamp
if doc = reply.documents.first
doc['cursor']&.[]('atClusterTime') || doc['atClusterTime']
end
end

private

def aggregate_returned_count
Expand Down
4 changes: 4 additions & 0 deletions lib/mongo/operation/shared/executable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ def do_execute(connection, context, options = {})
session.pin_to_service(connection.service_id)
end
end

if session.snapshot? && !session.snapshot_timestamp
session.snapshot_timestamp = result.snapshot_timestamp
end
end
process_result(result, connection)
end
Expand Down
11 changes: 11 additions & 0 deletions lib/mongo/operation/shared/sessions_supported.rb
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,17 @@ def apply_session_options(sel, connection)
then
sel[:recoveryToken] = session.recovery_token
end

if session.snapshot?
unless connection.description.server_version_gte?('5.0')
raise Error::SnapshotSessionInvalidServerVersion
end

sel[:readConcern] = {level: 'snapshot'}
if session.snapshot_timestamp
sel[:readConcern][:atClusterTime] = session.snapshot_timestamp
end
end
end

def build_message(connection, context)
Expand Down
19 changes: 19 additions & 0 deletions lib/mongo/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,16 @@ class Session
# - *:mode* -- the read preference as a string or symbol; valid values are
# *:primary*, *:primary_preferred*, *:secondary*, *:secondary_preferred*
# and *:nearest*.
# @option options [ true | false ] :snapshot Set up the session for
# snapshot reads.
#
# @since 2.5.0
# @api private
def initialize(server_session, client, options = {})
if options[:causal_consistency] && options[:snapshot]
raise ArgumentError, ':causal_consistency and :snapshot options cannot be both set on a session'
end

@server_session = server_session
options = options.dup

Expand All @@ -83,6 +89,12 @@ def cluster
@client.cluster
end

# @return [ true | false ] Whether the session is configured for snapshot
# reads.
def snapshot?
!!options[:snapshot]
end

# @return [ BSON::Timestamp ] The latest seen operation time for this session.
#
# @since 2.5.0
Expand Down Expand Up @@ -506,6 +518,10 @@ def start_transaction(options = nil)
=end
end

if snapshot?
raise Mongo::Error::SnapshotSessionTransactionProhibited
end

check_if_ended!

if within_states?(STARTING_TRANSACTION_STATE, TRANSACTION_IN_PROGRESS_STATE)
Expand Down Expand Up @@ -1024,6 +1040,9 @@ def txn_num
@server_session.txn_num
end

# @api private
attr_accessor :snapshot_timestamp

private

# Get the read concern the session will use when starting a transaction.
Expand Down
72 changes: 53 additions & 19 deletions spec/runners/unified/crud_operations.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,11 @@ def find(op)
def count_documents(op)
collection = entities.get(:collection, op.use!('object'))
use_arguments(op) do |args|
collection.find(args.use!('filter')).count_documents
opts = {}
if session = args.use('session')
opts[:session] = entities.get(:session, session)
end
collection.find(args.use!('filter')).count_documents(**opts)
end
end

Expand All @@ -47,7 +51,11 @@ def estimated_document_count(op)
def distinct(op)
collection = entities.get(:collection, op.use!('object'))
use_arguments(op) do |args|
req = collection.find(args.use!('filter')).distinct(args.use!('fieldName'))
opts = {}
if session = args.use('session')
opts[:session] = entities.get(:session, session)
end
req = collection.find(args.use!('filter'), **opts).distinct(args.use!('fieldName'), **opts)
result = req.to_a
end
end
Expand All @@ -61,6 +69,9 @@ def find_one_and_update(op)
if return_document = args.use('returnDocument')
opts[:return_document] = return_document.downcase.to_sym
end
if session = args.use('session')
opts[:session] = entities.get(:session, session)
end
collection.find_one_and_update(filter, update, **opts)
end
end
Expand All @@ -70,15 +81,23 @@ def find_one_and_replace(op)
use_arguments(op) do |args|
filter = args.use!('filter')
update = args.use!('replacement')
collection.find_one_and_replace(filter, update)
opts = {}
if session = args.use('session')
opts[:session] = entities.get(:session, session)
end
collection.find_one_and_replace(filter, update, **opts)
end
end

def find_one_and_delete(op)
collection = entities.get(:collection, op.use!('object'))
use_arguments(op) do |args|
filter = args.use!('filter')
collection.find_one_and_delete(filter)
opts = {}
if session = args.use('session')
opts[:session] = entities.get(:session, session)
end
collection.find_one_and_delete(filter, **opts)
end
end

Expand All @@ -96,18 +115,25 @@ def insert_one(op)
def insert_many(op)
collection = entities.get(:collection, op.use!('object'))
use_arguments(op) do |args|
options = {}
opts = {}
unless (ordered = args.use('ordered')).nil?
options[:ordered] = ordered
opts[:ordered] = ordered
end
if session = args.use('session')
opts[:session] = entities.get(:session, session)
end
collection.insert_many(args.use!('documents'), **options)
collection.insert_many(args.use!('documents'), **opts)
end
end

def update_one(op)
collection = entities.get(:collection, op.use!('object'))
use_arguments(op) do |args|
collection.update_one(args.use!('filter'), args.use!('update'))
opts = {}
if session = args.use('session')
opts[:session] = entities.get(:session, session)
end
collection.update_one(args.use!('filter'), args.use!('update'), **opts)
end
end

Expand All @@ -132,7 +158,11 @@ def replace_one(op)
def delete_one(op)
collection = entities.get(:collection, op.use!('object'))
use_arguments(op) do |args|
collection.delete_one(args.use!('filter'))
opts = {}
if session = args.use('session')
opts[:session] = entities.get(:session, session)
end
collection.delete_one(args.use!('filter'), **opts)
end
end

Expand All @@ -157,6 +187,20 @@ def bulk_write(op)
end
end

def aggregate(op)
obj = entities.get_any(op.use!('object'))
args = op.use!('arguments')
pipeline = args.use!('pipeline')
opts = {}
if session = args.use('session')
opts[:session] = entities.get(:session, session)
end
unless args.empty?
raise NotImplementedError, "Unhandled spec keys: #{args} in #{test_spec}"
end
obj.aggregate(pipeline, **opts).to_a
end

private

def convert_bulk_write_spec(spec)
Expand Down Expand Up @@ -192,15 +236,5 @@ def convert_bulk_write_spec(spec)
end
{Utils.underscore(op) =>out}
end

def aggregate(op)
obj = entities.get_any(op.use!('object'))
args = op.use!('arguments')
pipeline = args.use!('pipeline')
unless args.empty?
raise NotImplementedError, "Unhandled spec keys: #{test_spec}"
end
obj.aggregate(pipeline).to_a
end
end
end
Loading