Skip to content
This repository has been archived by the owner on Jan 15, 2024. It is now read-only.

Commit

Permalink
Merge 470224c into 68923e0
Browse files Browse the repository at this point in the history
  • Loading branch information
wandenberg committed Jun 3, 2015
2 parents 68923e0 + 470224c commit 9c0c508
Show file tree
Hide file tree
Showing 10 changed files with 146 additions and 12 deletions.
21 changes: 16 additions & 5 deletions lib/moped/connection.rb
Original file line number Diff line number Diff line change
Expand Up @@ -191,12 +191,23 @@ def write(operations)
#
# @since 1.2.9
def read_data(socket, length)
data = socket.read(length)
unless data
raise Errors::ConnectionFailure.new(
"Attempted to read #{length} bytes from the socket but nothing was returned."
)
# Block on data to read for op_timeout seconds
# using the suggested implementation of http://www.ruby-doc.org/core-2.1.3/Kernel.html#method-i-select
# to work with SSL connections
time_left = op_timeout = @options[:op_timeout] || timeout
begin
raise Errors::OperationTimeout.new("Took more than #{op_timeout} seconds to receive data.") if (time_left -= 0.1) <= 0
data = socket.read_nonblock(length)
rescue IO::WaitReadable
Kernel::select([socket], nil, [socket], 0.1)
retry
rescue IO::WaitWritable
Kernel::select(nil, [socket], [socket], 0.1)
retry
rescue SystemCallError, IOError => e
raise Errors::ConnectionFailure.new("Attempted to read #{length} bytes from the socket but an error happend #{e.message}.")
end

if data.length < length
data << read_data(socket, length - data.length)
end
Expand Down
3 changes: 3 additions & 0 deletions lib/moped/errors.rb
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class PoolTimeout < RuntimeError; end
# Generic error class for exceptions related to connection failures.
class ConnectionFailure < StandardError; end

# Generic error class for exceptions related to read timeout failures.
class OperationTimeout < StandardError; end

# Raised when a database name is invalid.
class InvalidDatabaseName < StandardError; end

Expand Down
3 changes: 2 additions & 1 deletion lib/moped/failover.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ module Failover
Errors::ConnectionFailure => Retry,
Errors::CursorNotFound => Ignore,
Errors::OperationFailure => Reconfigure,
Errors::QueryFailure => Reconfigure
Errors::QueryFailure => Reconfigure,
Errors::PoolTimeout => Retry
}.freeze

# Get the appropriate failover handler given the provided exception.
Expand Down
6 changes: 4 additions & 2 deletions lib/moped/failover/retry.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ module Failover
module Retry
extend self

# Executes the failover strategy. In the case of retyr, we disconnect and
# Executes the failover strategy. In the case of retry, we disconnect and
# reconnect, then try the operation one more time.
#
# @example Execute the retry strategy.
Expand All @@ -24,11 +24,13 @@ module Retry
#
# @since 2.0.0
def execute(exception, node)
node.disconnect
node.disconnect unless exception.is_a?(Errors::PoolTimeout)
begin
node.connection do |conn|
yield(conn) if block_given?
end
rescue Errors::PoolTimeout => e
raise Errors::ConnectionFailure.new e
rescue Exception => e
node.down!
raise(e)
Expand Down
16 changes: 14 additions & 2 deletions lib/moped/node.rb
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,14 @@ def connected?
#
# @since 2.0.0
def connection
pool.with do |conn|
yield(conn)
connection_acquired = false
begin
pool.with do |conn|
connection_acquired = true
yield(conn)
end
rescue Timeout::Error => e
raise connection_acquired ? e : Errors::PoolTimeout.new(e)
end
end

Expand Down Expand Up @@ -156,6 +162,12 @@ def down!
Connection::Manager.shutdown(self)
end

def flush_connection_credentials
connection do |conn|
conn.credentials.clear
end
end

# Yields the block if a connection can be established, retrying when a
# connection error is raised.
#
Expand Down
5 changes: 3 additions & 2 deletions lib/moped/retryable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,13 @@ def with_retry(cluster, retries = cluster.max_retries, &block)
begin
block.call
rescue Errors::ConnectionFailure, Errors::PotentialReconfiguration => e
raise e if e.is_a?(Errors::PotentialReconfiguration) &&
! (e.message.include?("not master") || e.message.include?("Not primary"))
authentication_error = e.is_a?(Errors::PotentialReconfiguration) && e.message.match(/not (master|primary|authorized)/i)
raise e if e.is_a?(Errors::PotentialReconfiguration) && !authentication_error

if retries > 0
Loggable.warn(" MOPED:", "Retrying connection attempt #{retries} more time(s).", "n/a")
sleep(cluster.retry_interval)
cluster.nodes.each { |node| node.flush_connection_credentials } if authentication_error
cluster.refresh
with_retry(cluster, retries - 1, &block)
else
Expand Down
5 changes: 5 additions & 0 deletions lib/moped/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,11 @@ def logout
# @since 2.0.0
option(:timeout).allow(Optionable.any(Numeric))

# Setup validation of allowed timeout options. (Any numeric)
#
# @since 2.0.0
option(:op_timeout).allow(Optionable.any(Numeric))

# Pass an object that responds to instrument as an instrumenter.
#
# @since 2.0.0
Expand Down
43 changes: 43 additions & 0 deletions spec/moped/node_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -486,4 +486,47 @@
end
end
end

describe "#connection" do
let(:node) do
described_class.new("127.0.0.1:27017", pool_size: 1, pool_timeout: 0.1)
end

context "when take a long time to get a connection from pool" do
it "raise a Errors::PoolTimeout error" do
expect {

exception = nil
100.times.map do |i|
Thread.new do
begin
node.connection do |conn|
conn.apply_credentials({})
node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true})
end
rescue => e
exception = e if exception.nil?
end
end
end.each {|t| t.join }
raise exception unless exception.nil?

}.to raise_error(Moped::Errors::PoolTimeout)
end
end

context "when the timeout happens after get a connection from pool" do
it "raise a Timeout::Error" do
expect {
node.connection do |conn|
Timeout::timeout(0.01) do
conn.apply_credentials({})
node.update("test", "test_collection", { name: "test_counter" }, {'$inc' => {'cnt' => 1}}, Moped::WriteConcern.get({ w: 1 }), flags: {safe: true, upsert: true})
sleep(0.1) # just to simulate a long block which raise a timeout
end
end
}.to raise_error(Timeout::Error)
end
end
end
end
36 changes: 36 additions & 0 deletions spec/moped/query_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,42 @@
end
end

context "with test commands enabled" do

let(:session) do
Moped::Session.new([ "127.0.0.1:#{port}" ], database: "moped_test")
end

let(:users) do
session.with(safe: true)[:users]
end

describe "when a query take too long" do
let(:port) { 31104 }

before do
start_mongo_server(port, "--setParameter enableTestCommands=1")
Process.detach(spawn("echo 'db.adminCommand({sleep: 1, w: true, secs: 10})' | mongo localhost:#{port} 2>&1 > /dev/null"))
sleep 1 # to sleep command on mongodb begins work
end

after do
stop_mongo_server(port)
end

it "raises a operation timeout exception" do
time = Benchmark.realtime do
expect {
Timeout::timeout(7) do
users.find("age" => { "$gte" => 65 }).first
end
}.to raise_exception("Took more than 5 seconds to receive data.")
end
expect(time).to be < 5.5
end
end
end

context "with a remote connection", mongohq: :auth do

before(:all) do
Expand Down
20 changes: 20 additions & 0 deletions spec/moped/session_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -347,4 +347,24 @@
nodes.last.should be_down
end
end

context "when connections on pool are busy" do
let(:session) do
Moped::Session.new([ "127.0.0.1:27017" ], database: "moped_test", pool_size: 1, pool_timeout: 0.2, max_retries: 30, retry_interval: 1)
end

it "should retry the operation" do
session[:test].find({ name: "test_counter" }).update({'$set' => {'cnt' => 1}}, {upsert: true})

results = []

300.times.map do |i|
Thread.new do
results.push session[:test].find({ name: "test_counter" }).first["cnt"]
end
end.each {|t| t.join }

expect(results.count).to eql(300)
end
end
end

0 comments on commit 9c0c508

Please sign in to comment.