Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
Merge bff8f65 into 2d92a6b
Browse files Browse the repository at this point in the history
  • Loading branch information
wandenberg committed Sep 18, 2014
2 parents 2d92a6b + bff8f65 commit b653083
Show file tree
Hide file tree
Showing 14 changed files with 398 additions and 67 deletions.
1 change: 1 addition & 0 deletions Gemfile
@@ -1,6 +1,7 @@
source "https://rubygems.org"

group :test do
gem "popen4"
gem "rspec", "~> 2.14.1"
if ENV["CI"]
gem "coveralls", :require => false
Expand Down
6 changes: 6 additions & 0 deletions lib/moped/authenticatable.rb
Expand Up @@ -94,5 +94,11 @@ def logout(database)
end
credentials.delete(database)
end

def refresh_authentication
credentials.each do |database, (username, password)|
login(database, username, password)
end
end
end
end
4 changes: 4 additions & 0 deletions lib/moped/cluster.rb
Expand Up @@ -195,6 +195,10 @@ def refresh(nodes_to_refresh = seeds)
refreshed_nodes
end

def refresh_authentication(nodes_to_refresh = seeds)
nodes_to_refresh.each(&:refresh_authentication)
end

# Get the interval in which the node list should be refreshed.
#
# @example Get the refresh interval, in seconds.
Expand Down
10 changes: 7 additions & 3 deletions lib/moped/collection.rb
@@ -1,5 +1,6 @@
# encoding: utf-8
require "moped/query"
require "moped/retryable"

module Moped

Expand All @@ -8,6 +9,7 @@ module Moped
# @since 1.0.0
class Collection
include Readable
include Retryable

# @!attribute database
# @return [ Database ] The database for the collection.
Expand Down Expand Up @@ -120,9 +122,11 @@ def initialize(database, name)
#
# @since 1.0.0
def insert(documents, flags = nil)
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
with_retry(cluster) do
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
end
end
end

Expand Down
22 changes: 17 additions & 5 deletions lib/moped/connection.rb
Expand Up @@ -191,12 +191,24 @@ def write(operations)
#
# @since 1.2.9
def read_data(socket, length)
data = socket.read(length)
unless data
raise Errors::ConnectionFailure.new(
"Attempted to read #{length} bytes from the socket but nothing was returned."
)
# Block on data to read for op_timeout seconds
begin
op_timeout = @options[:op_timeout] || timeout
ready = IO.select([socket], nil, [socket], op_timeout)
unless ready
raise Errors::OperationTimeout.new("Took more than #{op_timeout} seconds to receive data.")
end
rescue IOError => e
raise Errors::ConnectionFailure
end

# Read data from socket
begin
data = socket.read(length)
rescue SystemCallError, IOError => e
raise Errors::ConnectionFailure.new("Attempted to read #{length} bytes from the socket but an error happend #{e.message}.")
end

if data.length < length
data << read_data(socket, length - data.length)
end
Expand Down
3 changes: 3 additions & 0 deletions lib/moped/errors.rb
Expand Up @@ -20,6 +20,9 @@ class PoolTimeout < RuntimeError; end
# Generic error class for exceptions related to connection failures.
class ConnectionFailure < StandardError; end

# Generic error class for exceptions related to read timeout failures.
class OperationTimeout < StandardError; end

# Raised when a database name is invalid.
class InvalidDatabaseName < StandardError; end

Expand Down
6 changes: 6 additions & 0 deletions lib/moped/node.rb
Expand Up @@ -443,6 +443,12 @@ def refresh
end
end

def refresh_authentication
connection do |conn|
conn.refresh_authentication
end
end

# Execute a remove command for the provided selector.
#
# @example Remove documents.
Expand Down
55 changes: 31 additions & 24 deletions lib/moped/query.rb
Expand Up @@ -21,6 +21,7 @@ module Moped
# people.find.count # => 1
class Query
include Enumerable
include Retryable

# @attribute [r] collection The collection to execute the query on.
# @attribute [r] operation The query operation.
Expand Down Expand Up @@ -321,14 +322,16 @@ def modify(change, options = {})
#
# @since 1.0.0
def remove
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
with_retry(cluster) do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
end
end
end

Expand All @@ -341,13 +344,15 @@ def remove
#
# @since 1.0.0
def remove_all
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
with_retry(cluster) do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
end
end
end

Expand Down Expand Up @@ -423,15 +428,17 @@ def tailable
#
# @since 1.0.0
def update(change, flags = nil)
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
with_retry(cluster) do
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
end
end
end

Expand Down
37 changes: 2 additions & 35 deletions lib/moped/read_preference/selectable.rb
@@ -1,4 +1,5 @@
# encoding: utf-8
require "moped/retryable"
module Moped
module ReadPreference

Expand All @@ -7,6 +8,7 @@ module ReadPreference
#
# @since 2.0.0
module Selectable
include Retryable

# @!attribute tags
# @return [ Array<Hash> ] The tag sets.
Expand Down Expand Up @@ -39,41 +41,6 @@ def query_options(options)
options[:flags] |= [ :slave_ok ]
options
end

private

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @api private
#
# @example Execute with retry.
# preference.with_retry(cluster) do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Cluster ] cluster The cluster.
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure => e
if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.refresh
with_retry(cluster, retries - 1, &block)
else
raise e
end
end
end
end
end
end
47 changes: 47 additions & 0 deletions lib/moped/retryable.rb
@@ -0,0 +1,47 @@
# encoding: utf-8
module Moped
# Provides the shared behaviour for retry failed operations.
#
# @since 2.0.0
module Retryable

private

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @api private
#
# @example Execute with retry.
# preference.with_retry(cluster) do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Cluster ] cluster The cluster.
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e
raise e if e.is_a?(Errors::PotentialReconfiguration) &&
!(e.message.include?("not master") || e.message.include?("not authorized") || e.message.include?("unauthorized"))

if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.refresh
cluster.refresh_authentication if e.message.include?("not authorized") || e.message.include?("unauthorized")
with_retry(cluster, retries - 1, &block)
else
raise e
end
end
end
end
end
5 changes: 5 additions & 0 deletions lib/moped/session.rb
Expand Up @@ -235,6 +235,11 @@ def logout
# @since 2.0.0
option(:timeout).allow(Optionable.any(Numeric))

# Setup validation of allowed timeout options. (Any numeric)
#
# @since 2.0.0
option(:op_timeout).allow(Optionable.any(Numeric))

# Pass an object that responds to instrument as an instrumenter.
#
# @since 2.0.0
Expand Down

0 comments on commit b653083

Please sign in to comment.