Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions lib/mongo.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
require 'mongo/index'
require 'mongo/server'
require 'mongo/server_selector'
require 'mongo/session'
require 'mongo/socket'
require 'mongo/uri'
require 'mongo/version'
Expand Down
2 changes: 2 additions & 0 deletions lib/mongo/auth/cr.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def initialize(user)
def login(connection)
conversation = Conversation.new(user)
reply = connection.dispatch([ conversation.start(connection) ])
connection.update_cluster_time(Operation::Result.new(reply))
reply = connection.dispatch([ conversation.continue(reply, connection) ])
connection.update_cluster_time(Operation::Result.new(reply))
conversation.finalize(reply, connection)
end
end
Expand Down
4 changes: 4 additions & 0 deletions lib/mongo/auth/cr/conversation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def continue(reply, connection = nil)
if connection && connection.features.op_msg_enabled?
selector = LOGIN.merge(user: user.name, nonce: nonce, key: user.auth_key(nonce))
selector[Protocol::Msg::DATABASE_IDENTIFIER] = user.auth_source
cluster_time = connection.mongos? && connection.cluster_time
selector[Operation::CLUSTER_TIME] = cluster_time if cluster_time
Protocol::Msg.new([:none], {}, selector)
else
Protocol::Query.new(
Expand Down Expand Up @@ -98,6 +100,8 @@ def finalize(reply, connection = nil)
def start(connection = nil)
if connection && connection.features.op_msg_enabled?
selector = Auth::GET_NONCE.merge(Protocol::Msg::DATABASE_IDENTIFIER => user.auth_source)
cluster_time = connection.mongos? && connection.cluster_time
selector[Operation::CLUSTER_TIME] = cluster_time if cluster_time
Protocol::Msg.new([:none], {}, selector)
else
Protocol::Query.new(
Expand Down
4 changes: 3 additions & 1 deletion lib/mongo/auth/ldap.rb
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ def initialize(user)
# @since 2.0.0
def login(connection)
conversation = Conversation.new(user)
conversation.finalize(connection.dispatch([ conversation.start(connection) ]))
reply = connection.dispatch([ conversation.start(connection) ])
connection.update_cluster_time(Operation::Result.new(reply))
conversation.finalize(reply)
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/mongo/auth/ldap/conversation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ def start(connection = nil)
if connection && connection.features.op_msg_enabled?
selector = LOGIN.merge(payload: payload, mechanism: LDAP::MECHANISM)
selector[Protocol::Msg::DATABASE_IDENTIFIER] = Auth::EXTERNAL
cluster_time = connection.mongos? && connection.cluster_time
selector[Operation::CLUSTER_TIME] = cluster_time if cluster_time
Protocol::Msg.new([:none], {}, selector)
else
Protocol::Query.new(
Expand Down
3 changes: 3 additions & 0 deletions lib/mongo/auth/scram.rb
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ def initialize(user)
def login(connection)
conversation = Conversation.new(user)
reply = connection.dispatch([ conversation.start(connection) ])
connection.update_cluster_time(Operation::Result.new(reply))
reply = connection.dispatch([ conversation.continue(reply, connection) ])
connection.update_cluster_time(Operation::Result.new(reply))
until reply.documents[0][Conversation::DONE]
reply = connection.dispatch([ conversation.finalize(reply, connection) ])
connection.update_cluster_time(Operation::Result.new(reply))
end
reply
end
Expand Down
6 changes: 6 additions & 0 deletions lib/mongo/auth/scram/conversation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ def continue(reply, connection = nil)
if connection && connection.features.op_msg_enabled?
selector = CLIENT_CONTINUE_MESSAGE.merge(payload: client_final_message, conversationId: id)
selector[Protocol::Msg::DATABASE_IDENTIFIER] = user.auth_source
cluster_time = connection.mongos? && connection.cluster_time
selector[Operation::CLUSTER_TIME] = cluster_time if cluster_time
Protocol::Msg.new([:none], {}, selector)
else
Protocol::Query.new(
Expand Down Expand Up @@ -150,6 +152,8 @@ def finalize(reply, connection = nil)
if connection && connection.features.op_msg_enabled?
selector = CLIENT_CONTINUE_MESSAGE.merge(payload: client_empty_message, conversationId: id)
selector[Protocol::Msg::DATABASE_IDENTIFIER] = user.auth_source
cluster_time = connection.mongos? && connection.cluster_time
selector[Operation::CLUSTER_TIME] = cluster_time if cluster_time
Protocol::Msg.new([:none], {}, selector)
else
Protocol::Query.new(
Expand All @@ -176,6 +180,8 @@ def start(connection = nil)
if connection && connection.features.op_msg_enabled?
selector = CLIENT_FIRST_MESSAGE.merge(payload: client_first_message, mechanism: SCRAM::MECHANISM)
selector[Protocol::Msg::DATABASE_IDENTIFIER] = user.auth_source
cluster_time = connection.mongos? && connection.cluster_time
selector[Operation::CLUSTER_TIME] = cluster_time if cluster_time
Protocol::Msg.new([:none], {}, selector)
else
Protocol::Query.new(
Expand Down
66 changes: 44 additions & 22 deletions lib/mongo/auth/user/view.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class View
# @return [ Database ] database The view's database.
attr_reader :database

def_delegators :database, :cluster, :read_preference
def_delegators :database, :cluster, :read_preference, :client
def_delegators :cluster, :next_primary

# Create a new user in the database.
Expand All @@ -36,15 +36,20 @@ class View
# @param [ Auth::User, String ] user_or_name The user object or user name.
# @param [ Hash ] options The user options.
#
# @option options [ Session ] :session The session to use for the operation.
#
# @return [ Result ] The command response.
#
# @since 2.0.0
def create(user_or_name, options = {})
user = generate(user_or_name, options)
Operation::Write::CreateUser.new(
user: user,
db_name: database.name
).execute(next_primary)
client.send(:with_session, options) do |session|
Operation::Write::CreateUser.new(
user: user,
db_name: database.name,
session: session
).execute(next_primary)
end
end

# Initialize the new user view.
Expand All @@ -65,15 +70,21 @@ def initialize(database)
# view.remove('user')
#
# @param [ String ] name The user name.
# @param [ Hash ] options The options for the remove operation.
#
# @option options [ Session ] :session The session to use for the operation.
#
# @return [ Result ] The command response.
#
# @since 2.0.0
def remove(name)
Operation::Write::RemoveUser.new(
user_name: name,
db_name: database.name
).execute(next_primary)
def remove(name, options = {})
client.send(:with_session, options) do |session|
Operation::Write::RemoveUser.new(
user_name: name,
db_name: database.name,
session: session
).execute(next_primary)
end
end

# Update a user in the database.
Expand All @@ -84,15 +95,20 @@ def remove(name)
# @param [ Auth::User, String ] user_or_name The user object or user name.
# @param [ Hash ] options The user options.
#
# @option options [ Session ] :session The session to use for the operation.
#
# @return [ Result ] The response.
#
# @since 2.0.0
def update(user_or_name, options = {})
user = generate(user_or_name, options)
Operation::Write::UpdateUser.new(
user: user,
db_name: database.name
).execute(next_primary)
client.send(:with_session, options) do |session|
user = generate(user_or_name, options)
Operation::Write::UpdateUser.new(
user: user,
db_name: database.name,
session: session
).execute(next_primary)
end
end

# Get info for a particular user in the database.
Expand All @@ -101,21 +117,27 @@ def update(user_or_name, options = {})
# view.info('emily')
#
# @param [ String ] name The user name.
# @param [ Hash ] options The options for the info operation.
#
# @option options [ Session ] :session The session to use for the operation.
#
# @return [ Hash ] A document containing information on a particular user.
#
# @since 2.1.0
def info(name)
user_query(name).documents
def info(name, options = {})
user_query(name, options).documents
end

private

def user_query(name)
Operation::Commands::UserQuery.new(
user_name: name,
db_name: database.name
).execute(next_primary)
def user_query(name, options = {})
client.send(:with_session, options) do |session|
Operation::Commands::UserQuery.new(
user_name: name,
db_name: database.name,
session: session
).execute(next_primary)
end
end

def generate(user, options)
Expand Down
4 changes: 3 additions & 1 deletion lib/mongo/auth/x509.rb
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,9 @@ def initialize(user)
# @since 2.0.0
def login(connection)
conversation = Conversation.new(user)
conversation.finalize(connection.dispatch([ conversation.start(connection) ]))
reply = connection.dispatch([ conversation.start(connection) ])
connection.update_cluster_time(Operation::Result.new(reply))
conversation.finalize(reply)
end
end
end
Expand Down
2 changes: 2 additions & 0 deletions lib/mongo/auth/x509/conversation.rb
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ def start(connection = nil)
if connection && connection.features.op_msg_enabled?
selector = login
selector[Protocol::Msg::DATABASE_IDENTIFIER] = user.auth_source
cluster_time = connection.mongos? && connection.cluster_time
selector[Operation::CLUSTER_TIME] = cluster_time if cluster_time
Protocol::Msg.new([:none], {}, selector)
else
Protocol::Query.new(
Expand Down
62 changes: 33 additions & 29 deletions lib/mongo/bulk_write.rb
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,23 @@ class BulkWrite
def execute
operation_id = Monitoring.next_operation_id
result_combiner = ResultCombiner.new
write_with_retry do
operations = op_combiner.combine
server = next_primary
raise Error::UnsupportedCollation.new if op_combiner.has_collation && !server.features.collation_enabled?
raise Error::UnsupportedArrayFilters.new if op_combiner.has_array_filters && !server.features.array_filters_enabled?

operations.each do |operation|
execute_operation(
operation.keys.first,
operation.values.first,
server,
operation_id,
result_combiner
)

client.send(:with_session, @options) do |session|
write_with_retry(session, Proc.new { next_primary }) do |server|
operations = op_combiner.combine
raise Error::UnsupportedCollation.new if op_combiner.has_collation && !server.features.collation_enabled?
raise Error::UnsupportedArrayFilters.new if op_combiner.has_array_filters && !server.features.array_filters_enabled?

operations.each do |operation|
execute_operation(
operation.keys.first,
operation.values.first,
server,
operation_id,
result_combiner,
session
)
end
end
end
result_combiner.result
Expand Down Expand Up @@ -134,7 +137,7 @@ def write_concern

private

def base_spec(operation_id)
def base_spec(operation_id, session)
{
:db_name => database.name,
:coll_name => collection.name,
Expand All @@ -143,50 +146,51 @@ def base_spec(operation_id)
:operation_id => operation_id,
:bypass_document_validation => !!options[:bypass_document_validation],
:options => options,
:id_generator => client.options[:id_generator]
:id_generator => client.options[:id_generator],
:session => session
}
end

def execute_operation(name, values, server, operation_id, combiner)
def execute_operation(name, values, server, operation_id, combiner, session)
begin
if values.size > server.max_write_batch_size
split_execute(name, values, server, operation_id, combiner)
split_execute(name, values, server, operation_id, combiner, session)
else
combiner.combine!(send(name, values, server, operation_id), values.size)
combiner.combine!(send(name, values, server, operation_id, session), values.size)
end
rescue Error::MaxBSONSize, Error::MaxMessageSize => e
raise e if values.size <= 1
split_execute(name, values, server, operation_id, combiner)
split_execute(name, values, server, operation_id, combiner, session)
end
end

def op_combiner
@op_combiner ||= ordered? ? OrderedCombiner.new(requests) : UnorderedCombiner.new(requests)
end

def split_execute(name, values, server, operation_id, combiner)
execute_operation(name, values.shift(values.size / 2), server, operation_id, combiner)
execute_operation(name, values, server, operation_id, combiner)
def split_execute(name, values, server, operation_id, combiner, session)
execute_operation(name, values.shift(values.size / 2), server, operation_id, combiner, session)
execute_operation(name, values, server, operation_id, combiner, session)
end

def delete(documents, server, operation_id)
def delete(documents, server, operation_id, session)
Operation::Write::Bulk::Delete.new(
base_spec(operation_id).merge(:deletes => documents)
base_spec(operation_id, session).merge(:deletes => documents)
).execute(server)
end

alias :delete_one :delete
alias :delete_many :delete

def insert_one(documents, server, operation_id)
def insert_one(documents, server, operation_id, session)
Operation::Write::Bulk::Insert.new(
base_spec(operation_id).merge(:documents => documents)
base_spec(operation_id, session).merge(:documents => documents)
).execute(server)
end

def update(documents, server, operation_id)
def update(documents, server, operation_id, session)
Operation::Write::Bulk::Update.new(
base_spec(operation_id).merge(:updates => documents)
base_spec(operation_id, session).merge(:updates => documents)
).execute(server)
end

Expand Down
Loading