Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support read preference for a bunch of commands #139

Merged
merged 6 commits into from Dec 5, 2012
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
55 changes: 47 additions & 8 deletions lib/mongo/collection.rb
Expand Up @@ -649,20 +649,29 @@ def find_and_modify(opts={})
#
# '$sort' Sorts all input documents and returns them to the pipeline in sorted order.
#
# @option opts [:primary, :secondary] :read Read preference indicating which server to perform this query
# on. See Collection#find for more details.
#
# @return [Array] An Array with the aggregate command's results.
#
# @raise MongoArgumentError if operators either aren't supplied or aren't in the correct format.
# @raise MongoOperationFailure if the aggregate command fails.
#
def aggregate(pipeline=nil)
def aggregate(pipeline=nil, opts={})
raise MongoArgumentError, "pipeline must be an array of operators" unless pipeline.class == Array
raise MongoArgumentError, "pipeline operators must be hashes" unless pipeline.all? { |op| op.class == Hash }

if read_pref = opts[:read]
Mongo::Support.validate_read_preference(read_pref)
else
read_pref = read_preference
end

hash = BSON::OrderedHash.new
hash['aggregate'] = self.name
hash['pipeline'] = pipeline

result = @db.command(hash)
result = @db.command(hash, :read => read_pref)
unless Mongo::Support.ok?(result)
raise Mongo::OperationFailure, "aggregate failed: #{result['errmsg']}"
end
Expand Down Expand Up @@ -692,6 +701,8 @@ def aggregate(pipeline=nil)
# the instantiated collection that's returned by default. Note if a collection name isn't returned in the
# map-reduce output (as, for example, when using :out => { :inline => 1 }), then you must specify this option
# or an ArgumentError will be raised.
# @option opts [:primary, :secondary] :read Read preference indicating which server to run this map-reduce
# on. See Collection#find for more details.
#
# @return [Collection, Hash] a Mongo::Collection object or a Hash with the map-reduce command's results.
#
Expand All @@ -705,6 +716,12 @@ def map_reduce(map, reduce, opts={})
reduce = BSON::Code.new(reduce) unless reduce.is_a?(BSON::Code)
raw = opts.delete(:raw)

if read_pref = opts[:read]
Mongo::Support.validate_read_preference(read_pref)
else
read_pref = read_preference
end

hash = BSON::OrderedHash.new
hash['mapreduce'] = self.name
hash['map'] = map
Expand All @@ -714,7 +731,7 @@ def map_reduce(map, reduce, opts={})
hash[:sort] = Mongo::Support.format_order_clause(hash[:sort])
end

result = @db.command(hash)
result = @db.command(hash, :read => read_pref)
unless Mongo::Support.ok?(result)
raise Mongo::OperationFailure, "map-reduce failed: #{result['errmsg']}"
end
Expand Down Expand Up @@ -749,6 +766,8 @@ def map_reduce(map, reduce, opts={})
# @option opts [String, BSON::Code] :finalize (nil) a JavaScript function that receives and modifies
# each of the resultant grouped objects. Available only when group is run with command
# set to true.
# @option opts [:primary, :secondary] :read Read preference indicating which server to perform this group
# on. See Collection#find for more details.
#
# @return [Array] the command response consisting of grouped items.
def group(opts, condition={}, initial={}, reduce=nil, finalize=nil)
Expand Down Expand Up @@ -814,6 +833,12 @@ def new_group(opts={})
raise MongoArgumentError, "Group requires at minimum values for initial and reduce."
end

if read_pref = opts[:read]
Mongo::Support.validate_read_preference(read_pref)
else
read_pref = read_preference
end

cmd = {
"group" => {
"ns" => @name,
Expand All @@ -838,7 +863,7 @@ def new_group(opts={})
cmd["group"]["$keyf"] = keyf.to_bson_code
end

result = @db.command(cmd)
result = @db.command(cmd, :read => read_pref)
result["retval"]
end

Expand All @@ -850,6 +875,10 @@ def new_group(opts={})
#
# @param [String, Symbol, OrderedHash] key or hash to group by.
# @param [Hash] query a selector for limiting the result set over which to group.
# @param [Hash] opts the options for this distinct operation.
#
# @option opts [:primary, :secondary] :read Read preference indicating which server to perform this query
# on. See Collection#find for more details.
#
# @example Saving zip codes and ages and returning distinct results.
# @collection.save({:zip => 10010, :name => {:age => 27}})
Expand All @@ -869,13 +898,20 @@ def new_group(opts={})
# [27]
#
# @return [Array] an array of distinct values.
def distinct(key, query=nil)
def distinct(key, query=nil, opts={})
raise MongoArgumentError unless [String, Symbol].include?(key.class)
command = BSON::OrderedHash.new
command[:distinct] = @name
command[:key] = key.to_s
command[:query] = query
@db.command(command)["values"]

if read_pref = opts[:read]
Mongo::Support.validate_read_preference(read_pref)
else
read_pref = read_preference
end

@db.command(command, :read => read_pref)["values"]
end

# Rename this collection.
Expand Down Expand Up @@ -940,12 +976,15 @@ def stats
# @option opts [Hash] :query ({}) A query selector for filtering the documents counted.
# @option opts [Integer] :skip (nil) The number of documents to skip.
# @option opts [Integer] :limit (nil) The number of documents to limit.
# @option opts [:primary, :secondary] :read Read preference for this command. See Collection#find for
# more details.
#
# @return [Integer]
def count(opts={})
find(opts[:query],
:skip => opts[:skip],
:limit => opts[:limit]).count(true)
:skip => opts[:skip],
:limit => opts[:limit],
:read => opts[:read]).count(true)
end

alias :size :count
Expand Down
8 changes: 1 addition & 7 deletions lib/mongo/cursor.rb
Expand Up @@ -188,13 +188,7 @@ def count(skip_and_limit = false)

command.merge!(BSON::OrderedHash["fields", @fields])

sock = @socket || checkout_socket_from_connection
begin
response = @db.command(command, :socket => sock)
ensure
checkin_socket(sock) unless @socket
end

response = @db.command(command, :read => @read_preference)
return response['n'].to_i if Mongo::Support.ok?(response)
return 0 if response['errmsg'] == "ns missing"
raise OperationFailure.new("Count failed: #{response['errmsg']}", response['code'], response)
Expand Down
11 changes: 10 additions & 1 deletion lib/mongo/db.rb
Expand Up @@ -512,6 +512,8 @@ def ok?(doc)
# @option opts [Boolean] :check_response (true) If +true+, raises an exception if the
# command fails.
# @option opts [Socket] :socket a socket to use for sending the command. This is mainly for internal use.
# @option opts [:primary, :secondary] :read Read preference for this command. See Collection#find for
# more details.
#
# @return [Hash]
#
Expand All @@ -524,9 +526,16 @@ def command(selector, opts={})
raise MongoArgumentError, "DB#command requires an OrderedHash when hash contains multiple keys"
end

if read_pref = opts[:read]
Mongo::Support.validate_read_preference(read_pref)
if read_pref != :primary && !Mongo::Support::secondary_ok?(selector)
raise Mongo.ArgumentError, "Command is not supported on secondaries: #{selector.keys.first}"
end
end

begin
result = Cursor.new(system_command_collection,
:limit => -1, :selector => selector, :socket => socket).next_document
:limit => -1, :selector => selector, :socket => socket, :read => read_pref).next_document
rescue OperationFailure => ex
raise OperationFailure, "Database command '#{selector.keys.first}' failed: #{ex.message}"
end
Expand Down
3 changes: 2 additions & 1 deletion lib/mongo/util/support.rb
Expand Up @@ -43,7 +43,8 @@ module Support
'distinct',
'geonear',
'geosearch',
'geowalk'
'geowalk',
'mapreduce'
]

# Generate an MD5 for authentication.
Expand Down
9 changes: 9 additions & 0 deletions test/replica_set/count_test.rb
Expand Up @@ -43,4 +43,13 @@ def test_count_command_sent_to_primary
count_after = @primary['admin'].command({:serverStatus => 1})['opcounters']['command']
assert_equal 2, count_after - count_before
end

def test_count_with_read_preference
@coll.insert({:a => 20}, :w => 2, :wtimeout => 10000)
count_before = @primary['admin'].command({:serverStatus => 1})['opcounters']['command']
assert_equal 1, @coll.count(:read => :secondary)
assert_equal 1, @coll.find({}, :read => :secondary).count()
count_after = @primary['admin'].command({:serverStatus => 1})['opcounters']['command']
assert_equal 1, count_after - count_before
end
end
4 changes: 2 additions & 2 deletions test/unit/db_test.rb
Expand Up @@ -44,15 +44,15 @@ class DBTest < Test::Unit::TestCase
should "create the proper cursor" do
@cursor = mock(:next_document => {"ok" => 1})
Cursor.expects(:new).with(@collection,
:limit => -1, :selector => {:buildinfo => 1}, :socket => nil).returns(@cursor)
:limit => -1, :selector => {:buildinfo => 1}, :socket => nil, :read => nil).returns(@cursor)
command = {:buildinfo => 1}
@db.command(command, :check_response => true)
end

should "raise an error when the command fails" do
@cursor = mock(:next_document => {"ok" => 0})
Cursor.expects(:new).with(@collection,
:limit => -1, :selector => {:buildinfo => 1}, :socket => nil).returns(@cursor)
:limit => -1, :selector => {:buildinfo => 1}, :socket => nil, :read => nil).returns(@cursor)
assert_raise OperationFailure do
command = {:buildinfo => 1}
@db.command(command, :check_response => true)
Expand Down