Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

Support read preference for a bunch of commands #139

Merged
merged 6 commits into from

2 participants

@nelhage

This branch enables read preference in a consistent manner for a number of commands where it makes sense, notably: count, distinct, group, and mapreduce.

The wrappers for those commands, as well as DB#command all accept a :read flag in their options hashes, which is treated consistently as with Collection#find. Calling DB#command with a non-:primary read preference and a primary-only command is an error.

All named commands will also respect Collection- level read preferences.

@nelhage nelhage Revert "RUBY-514 Specify a socket when making a count query"
This doesn't work properly, since without setting a read preference on
the cursor, we won't set OP_QUERY_SLAVE_OK, and the server will refuse
the command.

This reverts commit 12186bb05873148f8aead79d341834c4b28ffef7.
2efb87f
@TylerBrock

Hey Nelson. Thank you for this work. Unfortunately, as you probably know, this doesn't pass the test suite under any ruby version (see Travis for more details).

Please test before issuing these pull requests.

I'll take a look if you get it passing under all supported rubies.

nelhage added some commits
@nelhage nelhage Support read preference on commands in a replset.
Many commands (such as count) are safe on a secondary. Support read
preferences with the same semantics as normal queries for those.
6086264
@nelhage nelhage Support read preferences for Collection#count().
Also, make Cursor#count inherit the read preference of the cursor.

Add a test case for secondary counts.
090cd7e
@nelhage nelhage Support read preferences for Collection#map_reduce. 735a0a0
@nelhage nelhage Support read preferences for Collection#group and Collection#distinct. 0a7ac58
@nelhage nelhage Support read preferences for Collection#aggregate. a183d42
@nelhage

I'm sorry about that -- I (obviously) failed to run the full test suite. Fixed, and it looks green on Travis now.

@TylerBrock

Cool thanks.

@TylerBrock TylerBrock merged commit 72d071c into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Dec 5, 2012
  1. @nelhage

    Revert "RUBY-514 Specify a socket when making a count query"

    nelhage authored
    This doesn't work properly, since without setting a read preference on
    the cursor, we won't set OP_QUERY_SLAVE_OK, and the server will refuse
    the command.
    
    This reverts commit 12186bb05873148f8aead79d341834c4b28ffef7.
  2. @nelhage

    Support read preference on commands in a replset.

    nelhage authored
    Many commands (such as count) are safe on a secondary. Support read
    preferences with the same semantics as normal queries for those.
  3. @nelhage

    Support read preferences for Collection#count().

    nelhage authored
    Also, make Cursor#count inherit the read preference of the cursor.
    
    Add a test case for secondary counts.
  4. @nelhage
  5. @nelhage
  6. @nelhage
This page is out of date. Refresh to see the latest.
View
55 lib/mongo/collection.rb
@@ -649,20 +649,29 @@ def find_and_modify(opts={})
#
# '$sort' Sorts all input documents and returns them to the pipeline in sorted order.
#
+ # @option opts [:primary, :secondary] :read Read preference indicating which server to perform this query
+ # on. See Collection#find for more details.
+ #
# @return [Array] An Array with the aggregate command's results.
#
# @raise MongoArgumentError if operators either aren't supplied or aren't in the correct format.
# @raise MongoOperationFailure if the aggregate command fails.
#
- def aggregate(pipeline=nil)
+ def aggregate(pipeline=nil, opts={})
raise MongoArgumentError, "pipeline must be an array of operators" unless pipeline.class == Array
raise MongoArgumentError, "pipeline operators must be hashes" unless pipeline.all? { |op| op.class == Hash }
+ if read_pref = opts[:read]
+ Mongo::Support.validate_read_preference(read_pref)
+ else
+ read_pref = read_preference
+ end
+
hash = BSON::OrderedHash.new
hash['aggregate'] = self.name
hash['pipeline'] = pipeline
- result = @db.command(hash)
+ result = @db.command(hash, :read => read_pref)
unless Mongo::Support.ok?(result)
raise Mongo::OperationFailure, "aggregate failed: #{result['errmsg']}"
end
@@ -692,6 +701,8 @@ def aggregate(pipeline=nil)
# the instantiated collection that's returned by default. Note if a collection name isn't returned in the
# map-reduce output (as, for example, when using :out => { :inline => 1 }), then you must specify this option
# or an ArgumentError will be raised.
+ # @option opts [:primary, :secondary] :read Read preference indicating which server to run this map-reduce
+ # on. See Collection#find for more details.
#
# @return [Collection, Hash] a Mongo::Collection object or a Hash with the map-reduce command's results.
#
@@ -705,6 +716,12 @@ def map_reduce(map, reduce, opts={})
reduce = BSON::Code.new(reduce) unless reduce.is_a?(BSON::Code)
raw = opts.delete(:raw)
+ if read_pref = opts[:read]
+ Mongo::Support.validate_read_preference(read_pref)
+ else
+ read_pref = read_preference
+ end
+
hash = BSON::OrderedHash.new
hash['mapreduce'] = self.name
hash['map'] = map
@@ -714,7 +731,7 @@ def map_reduce(map, reduce, opts={})
hash[:sort] = Mongo::Support.format_order_clause(hash[:sort])
end
- result = @db.command(hash)
+ result = @db.command(hash, :read => read_pref)
unless Mongo::Support.ok?(result)
raise Mongo::OperationFailure, "map-reduce failed: #{result['errmsg']}"
end
@@ -749,6 +766,8 @@ def map_reduce(map, reduce, opts={})
# @option opts [String, BSON::Code] :finalize (nil) a JavaScript function that receives and modifies
# each of the resultant grouped objects. Available only when group is run with command
# set to true.
+ # @option opts [:primary, :secondary] :read Read preference indicating which server to perform this group
+ # on. See Collection#find for more details.
#
# @return [Array] the command response consisting of grouped items.
def group(opts, condition={}, initial={}, reduce=nil, finalize=nil)
@@ -814,6 +833,12 @@ def new_group(opts={})
raise MongoArgumentError, "Group requires at minimum values for initial and reduce."
end
+ if read_pref = opts[:read]
+ Mongo::Support.validate_read_preference(read_pref)
+ else
+ read_pref = read_preference
+ end
+
cmd = {
"group" => {
"ns" => @name,
@@ -838,7 +863,7 @@ def new_group(opts={})
cmd["group"]["$keyf"] = keyf.to_bson_code
end
- result = @db.command(cmd)
+ result = @db.command(cmd, :read => read_pref)
result["retval"]
end
@@ -850,6 +875,10 @@ def new_group(opts={})
#
# @param [String, Symbol, OrderedHash] key or hash to group by.
# @param [Hash] query a selector for limiting the result set over which to group.
+ # @param [Hash] opts the options for this distinct operation.
+ #
+ # @option opts [:primary, :secondary] :read Read preference indicating which server to perform this query
+ # on. See Collection#find for more details.
#
# @example Saving zip codes and ages and returning distinct results.
# @collection.save({:zip => 10010, :name => {:age => 27}})
@@ -869,13 +898,20 @@ def new_group(opts={})
# [27]
#
# @return [Array] an array of distinct values.
- def distinct(key, query=nil)
+ def distinct(key, query=nil, opts={})
raise MongoArgumentError unless [String, Symbol].include?(key.class)
command = BSON::OrderedHash.new
command[:distinct] = @name
command[:key] = key.to_s
command[:query] = query
- @db.command(command)["values"]
+
+ if read_pref = opts[:read]
+ Mongo::Support.validate_read_preference(read_pref)
+ else
+ read_pref = read_preference
+ end
+
+ @db.command(command, :read => read_pref)["values"]
end
# Rename this collection.
@@ -940,12 +976,15 @@ def stats
# @option opts [Hash] :query ({}) A query selector for filtering the documents counted.
# @option opts [Integer] :skip (nil) The number of documents to skip.
# @option opts [Integer] :limit (nil) The number of documents to limit.
+ # @option opts [:primary, :secondary] :read Read preference for this command. See Collection#find for
+ # more details.
#
# @return [Integer]
def count(opts={})
find(opts[:query],
- :skip => opts[:skip],
- :limit => opts[:limit]).count(true)
+ :skip => opts[:skip],
+ :limit => opts[:limit],
+ :read => opts[:read]).count(true)
end
alias :size :count
View
8 lib/mongo/cursor.rb
@@ -188,13 +188,7 @@ def count(skip_and_limit = false)
command.merge!(BSON::OrderedHash["fields", @fields])
- sock = @socket || checkout_socket_from_connection
- begin
- response = @db.command(command, :socket => sock)
- ensure
- checkin_socket(sock) unless @socket
- end
-
+ response = @db.command(command, :read => @read_preference)
return response['n'].to_i if Mongo::Support.ok?(response)
return 0 if response['errmsg'] == "ns missing"
raise OperationFailure.new("Count failed: #{response['errmsg']}", response['code'], response)
View
11 lib/mongo/db.rb
@@ -512,6 +512,8 @@ def ok?(doc)
# @option opts [Boolean] :check_response (true) If +true+, raises an exception if the
# command fails.
# @option opts [Socket] :socket a socket to use for sending the command. This is mainly for internal use.
+ # @option opts [:primary, :secondary] :read Read preference for this command. See Collection#find for
+ # more details.
#
# @return [Hash]
#
@@ -524,9 +526,16 @@ def command(selector, opts={})
raise MongoArgumentError, "DB#command requires an OrderedHash when hash contains multiple keys"
end
+ if read_pref = opts[:read]
+ Mongo::Support.validate_read_preference(read_pref)
+ if read_pref != :primary && !Mongo::Support::secondary_ok?(selector)
+ raise Mongo.ArgumentError, "Command is not supported on secondaries: #{selector.keys.first}"
+ end
+ end
+
begin
result = Cursor.new(system_command_collection,
- :limit => -1, :selector => selector, :socket => socket).next_document
+ :limit => -1, :selector => selector, :socket => socket, :read => read_pref).next_document
rescue OperationFailure => ex
raise OperationFailure, "Database command '#{selector.keys.first}' failed: #{ex.message}"
end
View
3  lib/mongo/util/support.rb
@@ -43,7 +43,8 @@ module Support
'distinct',
'geonear',
'geosearch',
- 'geowalk'
+ 'geowalk',
+ 'mapreduce'
]
# Generate an MD5 for authentication.
View
9 test/replica_set/count_test.rb
@@ -43,4 +43,13 @@ def test_count_command_sent_to_primary
count_after = @primary['admin'].command({:serverStatus => 1})['opcounters']['command']
assert_equal 2, count_after - count_before
end
+
+ def test_count_with_read_preference
+ @coll.insert({:a => 20}, :w => 2, :wtimeout => 10000)
+ count_before = @primary['admin'].command({:serverStatus => 1})['opcounters']['command']
+ assert_equal 1, @coll.count(:read => :secondary)
+ assert_equal 1, @coll.find({}, :read => :secondary).count()
+ count_after = @primary['admin'].command({:serverStatus => 1})['opcounters']['command']
+ assert_equal 1, count_after - count_before
+ end
end
View
4 test/unit/db_test.rb
@@ -44,7 +44,7 @@ class DBTest < Test::Unit::TestCase
should "create the proper cursor" do
@cursor = mock(:next_document => {"ok" => 1})
Cursor.expects(:new).with(@collection,
- :limit => -1, :selector => {:buildinfo => 1}, :socket => nil).returns(@cursor)
+ :limit => -1, :selector => {:buildinfo => 1}, :socket => nil, :read => nil).returns(@cursor)
command = {:buildinfo => 1}
@db.command(command, :check_response => true)
end
@@ -52,7 +52,7 @@ class DBTest < Test::Unit::TestCase
should "raise an error when the command fails" do
@cursor = mock(:next_document => {"ok" => 0})
Cursor.expects(:new).with(@collection,
- :limit => -1, :selector => {:buildinfo => 1}, :socket => nil).returns(@cursor)
+ :limit => -1, :selector => {:buildinfo => 1}, :socket => nil, :read => nil).returns(@cursor)
assert_raise OperationFailure do
command = {:buildinfo => 1}
@db.command(command, :check_response => true)
Something went wrong with that request. Please try again.