Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

2.0.1 operation timeout #329

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
10 changes: 7 additions & 3 deletions lib/moped/collection.rb
@@ -1,5 +1,6 @@
# encoding: utf-8
require "moped/query"
require "moped/retryable"

module Moped

Expand All @@ -8,6 +9,7 @@ module Moped
# @since 1.0.0
class Collection
include Readable
include Retryable

# @!attribute database
# @return [ Database ] The database for the collection.
Expand Down Expand Up @@ -120,9 +122,11 @@ def initialize(database, name)
#
# @since 1.0.0
def insert(documents, flags = nil)
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
with_retry(cluster) do
docs = documents.is_a?(Array) ? documents : [ documents ]
cluster.with_primary do |node|
node.insert(database.name, name, docs, write_concern, flags: flags || [])
end
end
end

Expand Down
21 changes: 16 additions & 5 deletions lib/moped/connection.rb
Expand Up @@ -191,12 +191,23 @@ def write(operations)
#
# @since 1.2.9
def read_data(socket, length)
data = socket.read(length)
unless data
raise Errors::ConnectionFailure.new(
"Attempted to read #{length} bytes from the socket but nothing was returned."
)
# Block on data to read for op_timeout seconds
# using the suggested implementation of http://www.ruby-doc.org/core-2.1.3/Kernel.html#method-i-select
# to work with SSL connections
time_left = op_timeout = @options[:op_timeout] || timeout
begin
raise Errors::OperationTimeout.new("Took more than #{op_timeout} seconds to receive data.") if (time_left -= 0.1) <= 0
data = socket.read_nonblock(length)
rescue IO::WaitReadable
Kernel::select([socket], nil, [socket], 0.1)
retry
rescue IO::WaitWritable
Kernel::select(nil, [socket], [socket], 0.1)
retry
rescue SystemCallError, IOError => e
raise Errors::ConnectionFailure.new("Attempted to read #{length} bytes from the socket but an error happend #{e.message}.")
end

if data.length < length
data << read_data(socket, length - data.length)
end
Expand Down
3 changes: 3 additions & 0 deletions lib/moped/errors.rb
Expand Up @@ -20,6 +20,9 @@ class PoolTimeout < RuntimeError; end
# Generic error class for exceptions related to connection failures.
class ConnectionFailure < StandardError; end

# Generic error class for exceptions related to read timeout failures.
class OperationTimeout < StandardError; end

# Raised when a database name is invalid.
class InvalidDatabaseName < StandardError; end

Expand Down
55 changes: 31 additions & 24 deletions lib/moped/query.rb
Expand Up @@ -21,6 +21,7 @@ module Moped
# people.find.count # => 1
class Query
include Enumerable
include Retryable

# @attribute [r] collection The collection to execute the query on.
# @attribute [r] operation The query operation.
Expand Down Expand Up @@ -321,14 +322,16 @@ def modify(change, options = {})
#
# @since 1.0.0
def remove
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
with_retry(cluster) do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern,
flags: [ :remove_first ]
)
end
end
end

Expand All @@ -341,13 +344,15 @@ def remove
#
# @since 1.0.0
def remove_all
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
with_retry(cluster) do
cluster.with_primary do |node|
node.remove(
operation.database,
operation.collection,
operation.basic_selector,
write_concern
)
end
end
end

Expand Down Expand Up @@ -423,15 +428,17 @@ def tailable
#
# @since 1.0.0
def update(change, flags = nil)
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
with_retry(cluster) do
cluster.with_primary do |node|
node.update(
operation.database,
operation.collection,
operation.selector["$query"] || operation.selector,
change,
write_concern,
flags: flags
)
end
end
end

Expand Down
37 changes: 2 additions & 35 deletions lib/moped/read_preference/selectable.rb
@@ -1,4 +1,5 @@
# encoding: utf-8
require "moped/retryable"
module Moped
module ReadPreference

Expand All @@ -7,6 +8,7 @@ module ReadPreference
#
# @since 2.0.0
module Selectable
include Retryable

# @!attribute tags
# @return [ Array<Hash> ] The tag sets.
Expand Down Expand Up @@ -39,41 +41,6 @@ def query_options(options)
options[:flags] |= [ :slave_ok ]
options
end

private

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @api private
#
# @example Execute with retry.
# preference.with_retry(cluster) do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Cluster ] cluster The cluster.
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure => e
if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.refresh
with_retry(cluster, retries - 1, &block)
else
raise e
end
end
end
end
end
end
46 changes: 46 additions & 0 deletions lib/moped/retryable.rb
@@ -0,0 +1,46 @@
# encoding: utf-8
module Moped
# Provides the shared behaviour for retry failed operations.
#
# @since 2.0.0
module Retryable

private

# Execute the provided block on the cluster and retry if the execution
# fails.
#
# @api private
#
# @example Execute with retry.
# preference.with_retry(cluster) do
# cluster.with_primary do |node|
# node.refresh
# end
# end
#
# @param [ Cluster ] cluster The cluster.
# @param [ Integer ] retries The number of times to retry.
#
# @return [ Object ] The result of the block.
#
# @since 2.0.0
def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e
raise e if e.is_a?(Errors::PotentialReconfiguration) &&
! (e.message.include?("not master") || e.message.include?("Not primary"))

if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.refresh
with_retry(cluster, retries - 1, &block)
else
raise e
end
end
end
end
end
5 changes: 5 additions & 0 deletions lib/moped/session.rb
Expand Up @@ -240,6 +240,11 @@ def logout
# @since 2.0.0
option(:timeout).allow(Optionable.any(Numeric))

# Setup validation of allowed timeout options. (Any numeric)
#
# @since 2.0.0
option(:op_timeout).allow(Optionable.any(Numeric))

# Pass an object that responds to instrument as an instrumenter.
#
# @since 2.0.0
Expand Down