Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

RUBY-475 Changing read preference should unpin pool #175

Closed
wants to merge 2 commits into from

3 participants

@TylerBrock

Please review, I've refactored since we spoke yesterday to make it much cleaner than it was.

TylerBrock added some commits
@TylerBrock TylerBrock RUBY-475 refactor read preference logic
Move read_pool into Mongo::ReadPreference
Add #read_preference to produce read preference hash
2066417
@TylerBrock TylerBrock RUBY-475 pin/unpin logic added to cursor 89eaf97
@brandonblack

looks great, ship it :mailbox:

@TylerBrock

Thanks for the remarks. I'll update accordingly.

Do you think the tests are sufficient?

@estolfo estolfo commented on the diff
lib/mongo/util/read_preference.rb
((57 lines not shown))
end
end
- def select_secondary_pool(candidates, tag_sets, latency)
- tag_sets = [tag_sets] unless tag_sets.is_a?(Array)
+ def select_secondary_pool(candidates, read_pref)
+ tag_sets = read_pref[:tags]
+ #tag_sets = [tag_sets] unless tag_sets.is_a?(Array)
@estolfo Collaborator
estolfo added a note

is there a reason for this commented-out line?

Nice catch I wasn't sure that was needed but after running through the tests it seems like it wasn't.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@estolfo
Collaborator

Maybe you should include a second find in test_pinned_pool_is_local_to_thread to make sure that the pinned pool is used by the thread again, not just that it's pinned correctly.

@TylerBrock

@estolfo re: the tests, good idea!

I'm not sure just running the same query again would prove that we went through the correct logic. The pool could be unpinned and then pinned again and give the same result but for the wrong reason. We might need to use an expectation there to ensure the pinned_pool method is called and that the unpin method is not called.

@estolfo
Collaborator

You can probably confirm that the pool used is coming from thread_local[:pinned_pools][@manager.object_id]

or, as you said, you can make sure that unpin is not called, though testing the "negative" behavior is not as thorough.

@TylerBrock

Yeah, you have to test both pinned_pool gets called and that unpin is not called, as I said.

@TylerBrock TylerBrock closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Mar 13, 2013
  1. @TylerBrock

    RUBY-475 refactor read preference logic

    TylerBrock authored
    Move read_pool into Mongo::ReadPreference
    Add #read_preference to produce read preference hash
  2. @TylerBrock
This page is out of date. Refresh to see the latest.
View
11 lib/mongo/cursor.rb
@@ -6,12 +6,13 @@ class Cursor
include Mongo::Constants
include Mongo::Conversions
include Mongo::Logging
+ include Mongo::ReadPreference
attr_reader :collection, :selector, :fields,
:order, :hint, :snapshot, :timeout,
:full_collection_name, :transformer,
:options, :cursor_id, :show_disk_loc,
- :comment, :read, :tag_sets
+ :comment, :read, :tag_sets, :acceptable_latency
# Create a new cursor.
#
@@ -471,6 +472,7 @@ def send_initial_query
nil, @options & OP_QUERY_EXHAUST != 0)
rescue ConnectionFailure => ex
socket.close if socket
+ @connection.unpin_pool
@connection.refresh
if tries < 3 && !@socket && (!@command || Mongo::Support::secondary_ok?(@selector))
tries += 1
@@ -483,6 +485,9 @@ def send_initial_query
ensure
socket.checkin unless @socket || socket.nil?
end
+ if !@socket && !@command
+ @connection.pin_pool(socket.pool, read_preference)
+ end
@returned += @n_received
@cache += results
@query_run = true
@@ -530,9 +535,9 @@ def checkout_socket_from_connection
if @pool
socket = @pool.checkout
elsif @command && !Mongo::Support::secondary_ok?(@selector)
- socket = @connection.checkout_reader(:primary)
+ socket = @connection.checkout_reader({:mode => :primary})
else
- socket = @connection.checkout_reader(@read, @tag_sets, @acceptable_latency)
+ socket = @connection.checkout_reader(read_preference)
end
rescue SystemStackError, NoMemoryError, SystemCallError => ex
@connection.close
View
10 lib/mongo/mongo_client.rb
@@ -351,10 +351,14 @@ def [](db_name)
def refresh
end
- def pin_pool(pool)
+ def pinned_pool
+ @primary_pool
+ end
+
+ def pin_pool(pool, read_prefs)
end
- def unpin_pool(pool)
+ def unpin_pool
end
# Drop a database.
@@ -521,7 +525,7 @@ def max_message_size
# Checkout a socket for reading (i.e., a secondary node).
# Note: this is overridden in MongoReplicaSetClient.
- def checkout_reader(mode=:primary, tag_sets={}, acceptable_latency=15)
+ def checkout_reader(read_preference)
connect unless connected?
@primary_pool.checkout
end
View
27 lib/mongo/mongo_replica_set_client.rb
@@ -2,6 +2,7 @@ module Mongo
# Instantiates and manages connections to a MongoDB replica set.
class MongoReplicaSetClient < MongoClient
+ include ReadPreference
include ThreadLocalVariableManager
REPL_SET_OPTS = [
@@ -268,7 +269,7 @@ def nodes
#
# @return [Boolean]
def read_primary?
- @manager.read_pool == @manager.primary_pool
+ read_pool == primary_pool
end
alias :primary? :read_primary?
@@ -282,6 +283,7 @@ def close(opts={})
# Clear the reference to this object.
thread_local[:managers].delete(self)
+ unpin_pool
@connected = false
end
@@ -335,9 +337,9 @@ def checkout
end
end
- def checkout_reader(mode=@read, tag_sets=@tag_sets, acceptable_latency=@acceptable_latency)
+ def checkout_reader(read_pref=nil)
checkout do
- pool = read_pool(mode, tag_sets, acceptable_latency)
+ pool = read_pool(read_pref)
get_socket_from_pool(pool)
end
end
@@ -361,11 +363,20 @@ def ensure_manager
thread_local[:managers][self] = @manager
end
- def pin_pool(pool)
- thread_local[:pinned_pools][@manager.object_id] = pool if @manager
+ def pinned_pool
+ thread_local[:pinned_pools][@manager.object_id] if @manager
end
- def unpin_pool(pool)
+ def pin_pool(pool, read_preference)
+ if @manager
+ thread_local[:pinned_pools][@manager.object_id] = {
+ :pool => pool,
+ :read_preference => read_preference
+ }
+ end
+ end
+
+ def unpin_pool
thread_local[:pinned_pools].delete @manager.object_id if @manager
end
@@ -402,10 +413,6 @@ def primary_pool
local_manager ? local_manager.primary_pool : nil
end
- def read_pool(mode=@read, tags=@tag_sets, acceptable_latency=@acceptable_latency)
- local_manager ? local_manager.read_pool(mode, tags, acceptable_latency) : nil
- end
-
def secondary_pool
local_manager ? local_manager.secondary_pool : nil
end
View
19 lib/mongo/util/pool.rb
@@ -85,25 +85,6 @@ def up?
!@closed
end
- def matches_mode(mode)
- if mode == :primary && @node.secondary? ||
- mode == :secondary && @node.primary?
- false
- else
- true
- end
- end
-
- def matches_tag_set(tag_set)
- tag_set.all? do |tag, value|
- tags.has_key?(tag) && tags[tag] == value
- end
- end
-
- def matches_tag_sets(tag_sets)
- tag_sets.all? {|set| matches_tag_set(set)}
- end
-
def inspect
"#<Mongo::Pool:0x#{self.object_id.to_s(16)} @host=#{@host} @port=#{port} " +
"@ping_time=#{@ping_time} #{@checked_out.size}/#{@size} sockets available " +
View
21 lib/mongo/util/pool_manager.rb
@@ -1,6 +1,5 @@
module Mongo
class PoolManager
- include Mongo::ReadPreference
include ThreadLocalVariableManager
attr_reader :client,
@@ -114,26 +113,6 @@ def read
read_pool.host_port
end
- def read_pool(mode=@client.read,
- tags=@client.tag_sets,
- acceptable_latency=@client.acceptable_latency)
-
- pinned = thread_local[:pinned_pools][self.object_id]
-
- if pinned && pinned.matches_mode(mode) && pinned.matches_tag_sets(tags) && pinned.up?
- pool = pinned
- else
- pool = select_pool(mode, tags, acceptable_latency)
- end
-
- unless pool
- raise ConnectionFailure, "No replica set member available for query " +
- "with read preference matching mode #{mode} and tags matching #{tags}."
- end
-
- pool
- end
-
def max_bson_size
@max_bson_size ||= config_min('maxBsonObjectSize', DEFAULT_MAX_BSON_SIZE)
end
View
53 lib/mongo/util/read_preference.rb
@@ -33,29 +33,57 @@ def self.validate(value)
end
end
- def select_pool(mode, tags, latency)
- return primary_pool if @client.mongos?
+ def read_preference
+ {
+ :mode => @read,
+ :tags => @tag_sets,
+ :latency => @acceptable_latency
+ }
+ end
+
+ def read_pool(read_preference_override={})
+ return primary_pool if mongos?
+
+ read_pref = read_preference.merge read_preference_override
+
+ if pinned_pool && pinned_pool[:read_preference] == read_pref
+ pool = pinned_pool[:pool]
+ else
+ unpin_pool
+ pool = select_pool(read_pref)
+ end
+
+ unless pool
+ raise ConnectionFailure, "No replica set member available for query " +
+ "with read preference matching mode #{read_pref[:mode]} and tags " +
+ "matching #{read_pref[:tags]}."
+ end
+
+ pool
+ end
- if mode == :primary && !tags.empty?
+ def select_pool(read_pref)
+ if read_pref[:mode] == :primary && !read_pref[:tags].empty?
raise MongoArgumentError, "Read preference :primary cannot be combined with tags"
end
- case mode
+ case read_pref[:mode]
when :primary
primary_pool
when :primary_preferred
- primary_pool || select_secondary_pool(secondary_pools, tags, latency)
+ primary_pool || select_secondary_pool(secondary_pools, read_pref)
when :secondary
- select_secondary_pool(secondary_pools, tags, latency)
+ select_secondary_pool(secondary_pools, read_pref)
when :secondary_preferred
- select_secondary_pool(secondary_pools, tags, latency) || primary_pool
+ select_secondary_pool(secondary_pools, read_pref) || primary_pool
when :nearest
- select_secondary_pool(pools, tags, latency)
+ select_secondary_pool(pools, read_pref)
end
end
- def select_secondary_pool(candidates, tag_sets, latency)
- tag_sets = [tag_sets] unless tag_sets.is_a?(Array)
+ def select_secondary_pool(candidates, read_pref)
+ tag_sets = read_pref[:tags]
+ #tag_sets = [tag_sets] unless tag_sets.is_a?(Array)
@estolfo Collaborator
estolfo added a note

is there a reason for this commented-out line?

Nice catch I wasn't sure that was needed but after running through the tests it seems like it wasn't.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
if !tag_sets.empty?
matches = []
@@ -70,10 +98,11 @@ def select_secondary_pool(candidates, tag_sets, latency)
matches = candidates
end
- matches.empty? ? nil : select_near_pool(matches, latency)
+ matches.empty? ? nil : select_near_pool(matches, read_pref)
end
- def select_near_pool(candidates, latency)
+ def select_near_pool(candidates, read_pref)
+ latency = read_pref[:latency]
nearest_pool = candidates.min_by { |candidate| candidate.ping_time }
near_pools = candidates.select do |candidate|
(candidate.ping_time - nearest_pool.ping_time) <= latency
View
2  test/replica_set/basic_test.rb
@@ -68,7 +68,7 @@ def test_accessors
assert_equal 2, client.secondaries.length
assert_equal 2, client.secondary_pools.length
assert_equal @rs.repl_set_name, client.replica_set_name
- assert client.secondary_pools.include?(client.read_pool(:secondary))
+ assert client.secondary_pools.include?(client.read_pool({:mode => :secondary}))
assert_equal 90, client.refresh_interval
assert_equal client.refresh_mode, false
client.close
View
41 test/replica_set/pinning_test.rb
@@ -0,0 +1,41 @@
+require 'test_helper'
+
+class ReplicaSetPinningTest < Test::Unit::TestCase
+ def setup
+ ensure_cluster(:rs)
+ @client = MongoReplicaSetClient.new(@rs.repl_set_seeds, :name => @rs.repl_set_name)
+ @db = @client.db(MONGO_TEST_DB)
+ @coll = @db.collection("test-sets")
+ @coll.insert({:a => 1})
+ end
+
+ def test_unpinning
+ # pin primary
+ @coll.find_one
+ assert_equal @client.pinned_pool[:pool], @client.primary_pool
+
+ # pin secondary
+ @coll.find_one({}, :read => :secondary_preferred)
+ assert @client.secondary_pools.include? @client.pinned_pool[:pool]
+
+ # repin primary
+ @coll.find_one({}, :read => :primary_preferred)
+ assert_equal @client.pinned_pool[:pool], @client.primary_pool
+ end
+
+ def test_pinned_pool_is_local_to_thread
+ threads = []
+ 30.times do |i|
+ threads << Thread.new do
+ if i % 2 == 0
+ @coll.find_one({}, :read => :secondary_preferred)
+ assert @client.secondary_pools.include? @client.pinned_pool[:pool]
+ else
+ @coll.find_one({}, :read => :primary_preferred)
+ assert_equal @client.pinned_pool[:pool], @client.primary_pool
+ end
+ end
+ end
+ threads.each(&:join)
+ end
+end
Something went wrong with that request. Please try again.