Skip to content

Commit

Permalink
ReplSetConnection updates
Browse files Browse the repository at this point in the history
  • Loading branch information
banker committed Dec 13, 2010
1 parent 08b7cdd commit 27b410f
Show file tree
Hide file tree
Showing 11 changed files with 212 additions and 285 deletions.
159 changes: 81 additions & 78 deletions lib/mongo/connection.rb
Expand Up @@ -92,47 +92,18 @@ class Connection
#
# @core connections
def initialize(host=nil, port=nil, options={})
@auths = options.fetch(:auths, [])

@host_to_try = format_pair(host, port)

# Host and port of current master.
@host = @port = nil

# Lock for request ids.
@id_lock = Mutex.new

# Pool size and timeout.
@pool_size = options[:pool_size] || 1
@timeout = options[:timeout] || 5.0

# Mutex for synchronizing pool access
@connection_mutex = Mutex.new

# Global safe option. This is false by default.
@safe = options[:safe] || false

# Create a mutex when a new key, in this case a socket,
# is added to the hash.
@safe_mutexes = Hash.new { |h, k| h[k] = Mutex.new }

# Condition variable for signal and wait
@queue = ConditionVariable.new

# slave_ok can be true only if one node is specified
@slave_ok = options[:slave_ok]

@primary = nil

# Connection pool for primay node
@primary_pool = nil

@logger = options[:logger] || nil

should_connect = options.fetch(:connect, true)
connect if should_connect
setup(options)
end


# DEPRECATED
#
# Initialize a connection to a MongoDB replica set using an array of seed nodes.
Expand Down Expand Up @@ -160,19 +131,10 @@ def initialize(host=nil, port=nil, options={})
#
# @deprecated
def self.multi(nodes, opts={})
unless nodes.length > 0 && nodes.all? {|n| n.is_a? Array}
raise MongoArgumentError, "Connection.multi requires at least one node to be specified."
end
warn "Connection.multi is now deprecated. Please use ReplSetConnection.new instead."

# Block returns an array, the first element being an array of nodes and the second an array
# of authorizations for the database.
new(nil, nil, opts) do |con|
nodes.map do |node|
con.instance_variable_set(:@replica_set, true)
con.instance_variable_set(:@read_secondary, true) if opts[:read_secondary]
con.pair_val_to_connection(node)
end
end
nodes << opts
ReplSetConnection.new(*nodes)
end

# Initialize a connection to MongoDB using the MongoDB URI spec:
Expand All @@ -191,6 +153,8 @@ def self.from_uri(uri, opts={})
elsif nodes.length > 1
nodes << opts
ReplSetConnection.new(*nodes)
else
raise MongoArgumentError, "No nodes specified. Please ensure that you've provided at least one node."
end
end

Expand Down Expand Up @@ -500,40 +464,8 @@ def close
@primary_pool = nil
end

## Configuration helper methods

# Returns a host-port pair.
#
# @return [Array]
#
# @private
def format_pair(host, port)
case host
when String
[host, port ? port.to_i : DEFAULT_PORT]
when nil
['localhost', DEFAULT_PORT]
end
end

# Convert an argument containing a host name string and a
# port number integer into a [host, port] pair array.
#
# @private
def pair_val_to_connection(a)
case a
when nil
['localhost', DEFAULT_PORT]
when String
[a, DEFAULT_PORT]
when Integer
['localhost', a]
when Array
a
end
end

# Checkout a socket for reading (i.e., a secondary node).

# Checkout a socket for reading (i.e., a secondary node).
def checkout_reader
connect unless connected?

Expand Down Expand Up @@ -567,6 +499,77 @@ def checkin_writer(socket)
end
end

protected

# Generic initialization code.
# @protected
def setup(options)
# Authentication objects
@auths = options.fetch(:auths, [])

# Lock for request ids.
@id_lock = Mutex.new

# Pool size and timeout.
@pool_size = options[:pool_size] || 1
@timeout = options[:timeout] || 5.0

# Mutex for synchronizing pool access
@connection_mutex = Mutex.new

# Global safe option. This is false by default.
@safe = options[:safe] || false

# Create a mutex when a new key, in this case a socket,
# is added to the hash.
@safe_mutexes = Hash.new { |h, k| h[k] = Mutex.new }

This comment has been minimized.

Copy link
@raggi

raggi Oct 6, 2011

This isn't thread safe yo.

It can return two different mutex instances for the same key.


# Condition variable for signal and wait
@queue = ConditionVariable.new

# Connection pool for primay node
@primary = nil
@primary_pool = nil

@logger = options[:logger] || nil

should_connect = options.fetch(:connect, true)
connect if should_connect
end

## Configuration helper methods

# Returns a host-port pair.
#
# @return [Array]
#
# @private
def format_pair(host, port)
case host
when String
[host, port ? port.to_i : DEFAULT_PORT]
when nil
['localhost', DEFAULT_PORT]
end
end

# Convert an argument containing a host name string and a
# port number integer into a [host, port] pair array.
#
# @private
def pair_val_to_connection(a)
case a
when nil
['localhost', DEFAULT_PORT]
when String
[a, DEFAULT_PORT]
when Integer
['localhost', a]
when Array
a
end
end

private

# If a ConnectionFailure is raised, this method will be called
Expand All @@ -584,7 +587,7 @@ def reset_connection
# apply any saved authentication.
# TODO: simplify
def is_primary?(config)
config && (config['ismaster'] == 1 || config['ismaster'] == true) || !@replica_set && @slave_ok
config && (config['ismaster'] == 1 || config['ismaster'] == true) || @slave_ok
end

def check_is_master(node)
Expand Down
37 changes: 26 additions & 11 deletions lib/mongo/repl_set_connection.rb
Expand Up @@ -19,19 +19,25 @@
module Mongo

# Instantiates and manages connections to MongoDB.
class ReplSetConnection
class ReplSetConnection < Connection
attr_reader :nodes, :secondaries, :arbiters, :read_pool, :secondary_pools

def initialize(*args)

if args.last.is_a?(Hash)
options = args.pop
opts = args.pop
else
opts = {}
end

unless args.length > 0
raise MongoArgumentError, "A ReplSetConnection requires at least one node."
end

# Get seed nodes
@nodes = args

# Replica set name
@replica_set_name = options[:rs_name]
@replica_set = opts[:rs_name]

# Cache the various node types when connecting to a replica set.
@secondaries = []
Expand All @@ -41,7 +47,10 @@ def initialize(*args)
@secondary_pools = []
@read_pool = nil

super
# Are we allowing reads from secondaries?
@read_secondary = opts.fetch(:read_secondary, false)

setup(opts)
end

# Create a new socket and attempt to connect to master.
Expand Down Expand Up @@ -86,13 +95,14 @@ def close

# If a ConnectionFailure is raised, this method will be called
# to close the connection and reset connection values.
# TODO: what's the point of this method?
def reset_connection
super
@secondaries = []
@secondary_pools = []
@arbiters = []
@nodes_tried = []
@nodes_to_try = []
@nodes_tried = []
@nodes_to_try = []
end

private
Expand Down Expand Up @@ -124,7 +134,12 @@ def check_is_master(node)
config
end


# Primary, when connecting to a replica can, can only be a true primary node.
# (And not a slave, which is possible when connecting with the standard
# Connection class.
def is_primary?(config)
config && (config['ismaster'] == 1 || config['ismaster'] == true)
end

# Pick a node randomly from the set of possible secondaries.
def pick_secondary_for_read
Expand All @@ -135,15 +150,15 @@ def pick_secondary_for_read

# Make sure that we're connected to the expected replica set.
def check_set_name(config, socket)
if @replica_set_name
if @replica_set
config = self['admin'].command({:replSetGetStatus => 1},
:sock => socket, :check_response => false)

if !Mongo::Support.ok?(config)
raise ReplicaSetConnectionError, config['errmsg']
elsif config['set'] != @replica_set_name
elsif config['set'] != @replica_set
raise ReplicaSetConnectionError,
"Attempting to connect to replica set '#{config['set']}' but expected '#{@replica_set_name}'"
"Attempting to connect to replica set '#{config['set']}' but expected '#{@replica_set}'"
end
end
end
Expand Down
19 changes: 11 additions & 8 deletions test/replica_sets/connect_test.rb
Expand Up @@ -9,39 +9,42 @@ class ConnectTest < Test::Unit::TestCase
include Mongo

def test_connect_bad_name
assert_raise_error(ReplicaSetConnectionError, "expected 'wrong-repl-set-name'") do
Mongo::Connection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], [TEST_HOST, TEST_PORT + 2]],
:rs_name => "wrong-repl-set-name")
assert_raise_error(ReplicaSetReplSetConnectionError, "expected 'wrong-repl-set-name'") do
ReplSetConnection.multi([TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1],
[TEST_HOST, TEST_PORT + 2], :rs_name => "wrong-repl-set-name")
end
end

def test_connect
@conn = Mongo::Connection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], [TEST_HOST, TEST_PORT + 2]],
:name => "foo")
@conn = ReplSetConnection.multi([TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1],
[TEST_HOST, TEST_PORT + 2], :name => "foo")
assert @conn.connected?
end

def test_connect_with_first_node_down
puts "Please kill the node at #{TEST_PORT}."
gets

@conn = Mongo::Connection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], [TEST_HOST, TEST_PORT + 2]])
@conn = ReplSetConnection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1],
[TEST_HOST, TEST_PORT + 2]])
assert @conn.connected?
end

def test_connect_with_second_node_down
puts "Please kill the node at #{TEST_PORT + 1}."
gets

@conn = Mongo::Connection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], [TEST_HOST, TEST_PORT + 2]])
@conn = ReplSetConnection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1],
[TEST_HOST, TEST_PORT + 2]])
assert @conn.connected?
end

def test_connect_with_third_node_down
puts "Please kill the node at #{TEST_PORT + 2}."
gets

@conn = Mongo::Connection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], [TEST_HOST, TEST_PORT + 2]])
@conn = ReplSetConnection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1],
[TEST_HOST, TEST_PORT + 2]])
assert @conn.connected?
end
end
3 changes: 2 additions & 1 deletion test/replica_sets/count_test.rb
Expand Up @@ -9,7 +9,8 @@ class ReplicaSetCountTest < Test::Unit::TestCase
include Mongo

def setup
@conn = Mongo::Connection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], [TEST_HOST, TEST_PORT + 2]])
@conn = ReplSetConnection.multi([TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1],
[TEST_HOST, TEST_PORT + 2])
@db = @conn.db(MONGO_TEST_DB)
@db.drop_collection("test-sets")
@coll = @db.collection("test-sets")
Expand Down
3 changes: 2 additions & 1 deletion test/replica_sets/insert_test.rb
Expand Up @@ -9,7 +9,8 @@ class ReplicaSetInsertTest < Test::Unit::TestCase
include Mongo

def setup
@conn = Mongo::Connection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], [TEST_HOST, TEST_PORT + 2]])
@conn = ReplSetConnection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1],
[TEST_HOST, TEST_PORT + 2]])
@db = @conn.db(MONGO_TEST_DB)
@db.drop_collection("test-sets")
@coll = @db.collection("test-sets")
Expand Down
3 changes: 2 additions & 1 deletion test/replica_sets/node_type_test.rb
Expand Up @@ -9,7 +9,8 @@ class ReplicaSetNodeTypeTest < Test::Unit::TestCase
include Mongo

def setup
@conn = Mongo::Connection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], [TEST_HOST, TEST_PORT + 2]])
@conn = ReplSetConnection.multi([TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1],
[TEST_HOST, TEST_PORT + 2])
@db = @conn.db(MONGO_TEST_DB)
@db.drop_collection("test-sets")
@coll = @db.collection("test-sets")
Expand Down
4 changes: 2 additions & 2 deletions test/replica_sets/pooled_insert_test.rb
Expand Up @@ -9,8 +9,8 @@ class ReplicaSetPooledInsertTest < Test::Unit::TestCase
include Mongo

def setup
@conn = Mongo::Connection.multi([[TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1], [TEST_HOST, TEST_PORT + 2]],
:pool_size => 10, :timeout => 5)
@conn = ReplSetConnection.multi([TEST_HOST, TEST_PORT], [TEST_HOST, TEST_PORT + 1],
[TEST_HOST, TEST_PORT + 2], :pool_size => 10, :timeout => 5)
@db = @conn.db(MONGO_TEST_DB)
@db.drop_collection("test-sets")
@coll = @db.collection("test-sets")
Expand Down

0 comments on commit 27b410f

Please sign in to comment.