Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
Merge 1fddac2 into a44b4e1
Browse files Browse the repository at this point in the history
  • Loading branch information
reidmorrison committed Apr 18, 2014
2 parents a44b4e1 + 1fddac2 commit e18cb16
Show file tree
Hide file tree
Showing 12 changed files with 103 additions and 89 deletions.
11 changes: 10 additions & 1 deletion lib/moped/address.rb
Expand Up @@ -45,6 +45,7 @@ def initialize(address, timeout)
#
# @since 2.0.0
def resolve(node)
attempt = 1
begin
Timeout::timeout(@timeout) do
Resolv.each_address(host) do |ip|
Expand All @@ -56,7 +57,15 @@ def resolve(node)
raise Resolv::ResolvError unless @ip
end
@resolved ||= "#{ip}:#{port}"
rescue Timeout::Error, Resolv::ResolvError, SocketError
rescue Timeout::Error, Resolv::ResolvError
Loggable.warn(" MOPED:", "Could not resolve IP for: #{original}", "n/a")
node.down! and false
rescue SocketError
if attempt <= 3
Loggable.warn(" MOPED:", "Retrying DNS Resolv for: #{original}, Retry: #{attempt}", "n/a")
attempt += 1
retry
end
Loggable.warn(" MOPED:", "Could not resolve IP for: #{original}", "n/a")
node.down! and false
end
Expand Down
61 changes: 46 additions & 15 deletions lib/moped/cluster.rb
Expand Up @@ -220,45 +220,36 @@ def retry_interval
@retry_interval ||= options[:retry_interval] || RETRY_INTERVAL
end

# Yields the replica set's primary node to the provided block. This method
# will retry the block in case of connection errors or replica set
# reconfiguration.
# Yields the replica set's primary node to the provided block.
#
# @example Yield the primary to the block.
# cluster.with_primary do |node|
# # ...
# end
#
# @param [ Integer ] retries The number of times to retry.
#
# @raises [ ConnectionFailure ] When no primary node can be found
#
# @return [ Object ] The result of the yield.
#
# @since 1.0.0
def with_primary(&block)
if node = nodes.find(&:primary?)
begin
node.ensure_primary do
return yield(node)
end
rescue Errors::ConnectionFailure, Errors::ReplicaSetReconfigured
node.ensure_primary do
return yield(node)
end
end
raise Errors::ConnectionFailure, "Could not connect to a primary node for replica set #{inspect}"
end

# Yields a secondary node if available, otherwise the primary node. This
# method will retry the block in case of connection errors.
# Yields a secondary node
# When multiple secondary nodes are present, one is yielded at random
#
# @example Yield the secondary to the block.
# cluster.with_secondary do |node|
# # ...
# end
#
# @param [ Integer ] retries The number of times to retry.
#
# @raises [ ConnectionFailure ] When no primary node can be found
# @raises [ ConnectionFailure ] When no secondary node can be found
#
# @return [ Object ] The result of the yield.
#
Expand All @@ -275,6 +266,46 @@ def with_secondary(&block)
raise Errors::ConnectionFailure, "Could not connect to a secondary node for replica set #{inspect}"
end

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @example Execute with retry.
# cluster.with_retry(retries) do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Cluster ] cluster The cluster.
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(retries = max_retries, &block)
begin
block.call
rescue Errors::OperationFailure, Moped::Errors::QueryFailure => e
if retries > 0 && e.reconfiguring_replica_set?
Loggable.warn(" MOPED:", "Operation Failure, Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(retry_interval)
refresh
retries -= 1
retry
end
raise e
rescue Errors::ConnectionFailure => e
if retries > 0
Loggable.warn(" MOPED:", "ConnectionFailure, Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(retry_interval)
refresh
retries -= 1
retry
end
raise e
end
end

private

# Apply the credentials on all nodes
Expand Down
6 changes: 4 additions & 2 deletions lib/moped/collection.rb
Expand Up @@ -121,8 +121,10 @@ def initialize(database, name)
# @since 1.0.0
def insert(documents, flags = nil)
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
cluster.with_retry do
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
end
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/moped/errors.rb
Expand Up @@ -112,7 +112,7 @@ class DoNotDisconnect < MongoError; end
class PotentialReconfiguration < MongoError

# Not master error codes.
NOT_MASTER = [ 13435, 13436, 10009 ]
NOT_MASTER = [ 13435, 13436, 10009, 10054, 10058, 10056 ]

# Error codes received around reconfiguration
CONNECTION_ERRORS_RECONFIGURATION = [ 15988, 10276, 11600, 9001 ]
Expand Down
6 changes: 3 additions & 3 deletions lib/moped/node.rb
Expand Up @@ -287,7 +287,7 @@ def kill_cursors(cursor_ids)
process(Protocol::KillCursors.new(cursor_ids))
end

# Can we send messages to this node in normal cirucmstances? This is true
# Can we send messages to this node in normal circumstances? This is true
# only if the node is a primary or secondary node - arbiters or passives
# cannot be sent anything.
#
Expand Down Expand Up @@ -434,7 +434,7 @@ def refresh
raise Errors::ReplicaSetReconfigured.new("#{inspect} is no longer the primary node.", {})
elsif !messagable?
# not primary or secondary so mark it as down, since it's probably
# a recovering node withing the replica set
# a recovering node within the replica set
down!
end
rescue Timeout::Error
Expand Down Expand Up @@ -511,7 +511,7 @@ def update(database, collection, selector, change, concern, options = {})
#
# @since 1.0.0
def inspect
"<#{self.class.name} resolved_address=#{address.resolved.inspect}>"
"<#{self.class.name} resolved_address=#{address.resolved.inspect}, address=#{address.original.inspect}, arbiter=#{arbiter?}, passive=#{passive?}, primary=#{primary?}, secondary=#{secondary?}, down_at=#{down_at.inspect}, latency=#{latency.inspect}>"
end

private
Expand Down
54 changes: 30 additions & 24 deletions lib/moped/query.rb
Expand Up @@ -321,14 +321,16 @@ def modify(change, options = {})
#
# @since 1.0.0
def remove
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
cluster.with_retry do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
end
end
end

Expand All @@ -341,13 +343,15 @@ def remove
#
# @since 1.0.0
def remove_all
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
cluster.with_retry do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
end
end
end

Expand Down Expand Up @@ -423,15 +427,17 @@ def tailable
#
# @since 1.0.0
def update(change, flags = nil)
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
cluster.with_retry do
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
end
end
end

Expand Down
8 changes: 4 additions & 4 deletions lib/moped/read_preference/nearest.rb
Expand Up @@ -41,12 +41,12 @@ def name
#
# @since 2.0.0
def with_node(cluster, &block)
with_retry(cluster) do
nearest = cluster.nodes.sort_by(&:latency).first
if nearest
cluster.with_retry do
# Find node with lowest latency, if not calculated yet use :primary
if nearest = cluster.nodes.select(&:latency).sort_by(&:latency).first
block.call(nearest)
else
raise Errors::ConnectionFailure, "No nodes available to select in the cluster"
cluster.with_primary(&block)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/moped/read_preference/primary.rb
Expand Up @@ -51,7 +51,7 @@ def query_options(options)
#
# @since 2.0.0
def with_node(cluster, &block)
with_retry(cluster) do
cluster.with_retry do
cluster.with_primary(&block)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/moped/read_preference/primary_preferred.rb
Expand Up @@ -42,7 +42,7 @@ def name
#
# @since 2.0.0
def with_node(cluster, &block)
with_retry(cluster) do
cluster.with_retry do
begin
cluster.with_primary(&block)
rescue Errors::ConnectionFailure
Expand Down
2 changes: 1 addition & 1 deletion lib/moped/read_preference/secondary.rb
Expand Up @@ -41,7 +41,7 @@ def name
#
# @since 2.0.0
def with_node(cluster, &block)
with_retry(cluster) do
cluster.with_retry do
cluster.with_secondary(&block)
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/moped/read_preference/secondary_preferred.rb
Expand Up @@ -40,7 +40,7 @@ def name
#
# @since 2.0.0
def with_node(cluster, &block)
with_retry(cluster) do
cluster.with_retry do
begin
cluster.with_secondary(&block)
rescue Errors::ConnectionFailure
Expand Down
36 changes: 1 addition & 35 deletions lib/moped/read_preference/selectable.rb
Expand Up @@ -39,41 +39,7 @@ def query_options(options)
options[:flags] |= [ :slave_ok ]
options
end

private

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @api private
#
# @example Execute with retry.
# preference.with_retry(cluster) do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Cluster ] cluster The cluster.
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure => e
if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.refresh
with_retry(cluster, retries - 1, &block)
else
raise e
end
end
end

end
end
end

0 comments on commit e18cb16

Please sign in to comment.