Skip to content

Commit

Permalink
Merge branch '2.6.x-stable'
Browse files Browse the repository at this point in the history
Conflicts:
	ChangeLog.md
	spec/higher_level_api/integration/connection_recovery_spec.rb
	spec/higher_level_api/integration/connection_stop_spec.rb
  • Loading branch information
michaelklishin committed Feb 24, 2017
2 parents cd22178 + d4c7bc7 commit 83ea0ad
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 38 deletions.
17 changes: 17 additions & 0 deletions ChangeLog.md
@@ -1,5 +1,22 @@
## Changes between Bunny 2.6.0 and 2.7.0 (unreleased)

### Recovery Attempt Counting Strategy Changed

Previous behehavior is not unreasonable but is not what many users and
even RabbitMQ team members come to expect. Therefore it can be
considered a bug.

Previously a reconnection counter was preserved between successful
recoveries. This made the integration test that uses server-sent
connection.close possible.

With this change, the counter is reset after successful reconnection
but there's an option to go back to the original behavior. We also do
a hell of a lot more logging.

GitHub issue: [#408](https://github.com/ruby-amqp/bunny/issues/408)


### Bunny::ConsumerWorkPool#shutdown Terminates Early When It's Safe to Do So

`Bunny::ConsumerWorkPool#shutdown(true)` waited for consumer shutdown
Expand Down
68 changes: 58 additions & 10 deletions lib/bunny/session.rb
Expand Up @@ -117,8 +117,9 @@ class Session
# @option connection_string_or_opts [IO, String] :logfile DEPRECATED: use :log_file instead. The file or path to use when creating a logger. Defaults to STDOUT.
# @option connection_string_or_opts [Integer] :log_level The log level to use when creating a logger. Defaults to LOGGER::WARN
# @option connection_string_or_opts [Boolean] :automatically_recover (true) Should automatically recover from network failures?
# @option connection_string_or_opts [Integer] :recovery_attempts (nil) Max number of recovery attempts, nil means forever, 0 means never
# @option connection_string_or_opts [Boolean] :recover_from_connection_close (true) Recover from server-sent connection.close
# @option connection_string_or_opts [Integer] :recovery_attempts (nil) Max number of recovery attempts, nil means forever
# @option connection_string_or_opts [Integer] :reset_recovery_attempts_after_reconnection (true) Should recovery attempt counter be reset after successful reconnection? When set to false, the attempt counter will last through the entire lifetime of the connection object.
# @option connection_string_or_opts [Boolean] :recover_from_connection_close (true) Should this connection recover after receiving a server-sent connection.close (e.g. connection was force closed)?
#
# @option optz [String] :auth_mechanism ("PLAIN") Authentication mechanism, PLAIN or EXTERNAL
# @option optz [String] :locale ("PLAIN") Locale RabbitMQ should use
Expand Down Expand Up @@ -164,7 +165,13 @@ def initialize(connection_string_or_opts = ENV['RABBITMQ_URL'], optz = Hash.new)
else
opts[:automatically_recover] || opts[:automatic_recovery]
end
@recovery_attempts = opts[:recovery_attempts]
@max_recovery_attempts = opts[:recovery_attempts]
@recovery_attempts = @max_recovery_attempts
# When this is set, connection attempts won't be reset after
# successful reconnection. Some find this behavior more sensible
# than the per-failure attempt counter. MK.
@reset_recovery_attempt_counter_after_reconnection = opts.fetch(:reset_recovery_attempts_after_reconnection, true)

@network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
@recover_from_connection_close = opts.fetch(:recover_from_connection_close, true)
# in ms
Expand Down Expand Up @@ -640,7 +647,7 @@ def handle_network_failure(exception)
begin
@recovering_from_network_failure = true
if recoverable_network_failure?(exception)
@logger.warn "Recovering from a network failure..."
announce_network_failure_recovery
@channel_mutex.synchronize do
@channels.each do |n, ch|
ch.maybe_kill_consumer_work_pool!
Expand All @@ -651,7 +658,7 @@ def handle_network_failure(exception)

recover_from_network_failure
else
# TODO: investigate if we can be a bit smarter here. MK.
@logger.error "Exception #{exception.message} is considered unrecoverable..."
end
ensure
@recovering_from_network_failure = false
Expand All @@ -661,7 +668,8 @@ def handle_network_failure(exception)

# @private
def recoverable_network_failure?(exception)
# TODO: investigate if we can be a bit smarter here. MK.
# No reasonably smart strategy was suggested in a few years.
# So just recover unconditionally. MK.
true
end

Expand All @@ -670,18 +678,36 @@ def recovering_from_network_failure?
@recovering_from_network_failure
end

# @private
def announce_network_failure_recovery
if recovery_attempts_limited?
@logger.warn "Will recover from a network failure (#{@recovery_attempts} out of #{@max_recovery_attempts} left)..."
else
@logger.warn "Will recover from a network failure (no retry limit)..."
end
end

# @private
def recover_from_network_failure
sleep @network_recovery_interval
@logger.debug "About to start connection recovery..."
@logger.debug "Will attempt connection recovery..."

self.initialize_transport

@logger.warn "Retrying connection on next host in line: #{@transport.host}:#{@transport.port}"
self.start

if open?

@recovering_from_network_failure = false
@logger.debug "Connection is now open"
if @reset_recovery_attempt_counter_after_reconnection
@logger.debug "Resetting recovery attempt counter after successful reconnection"
reset_recovery_attempt_counter!
else
@logger.debug "Not resetting recovery attempt counter after successful reconnection, as configured"
end
reset_recovery_attempt_counter!

recover_channels
end
Expand All @@ -692,14 +718,36 @@ def recover_from_network_failure
@logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds"
sleep @network_recovery_interval
if should_retry_recovery?
@recovery_attempts -= 1 if @recovery_attempts
retry if recoverable_network_failure?(e)
decrement_recovery_attemp_counter!
if recoverable_network_failure?(e)
announce_network_failure_recovery
retry
end
else
@logger.error "Ran out of recovery attempts (limit set to #{@max_recovery_attempts})"
end
end

# @private
def recovery_attempts_limited?
!!@max_recovery_attempts
end

# @private
def should_retry_recovery?
@recovery_attempts.nil? || @recovery_attempts > 1
!recovery_attempts_limited? || @recovery_attempts > 1
end

# @private
def decrement_recovery_attemp_counter!
@recovery_attempts -= 1 if @recovery_attempts
@logger.debug "#{@recovery_attempts} recovery attempts left"
@recovery_attempts
end

# @private
def reset_recovery_attempt_counter!
@recovery_attempts = @max_recovery_attempts
end

# @private
Expand Down
2 changes: 1 addition & 1 deletion lib/bunny/transport.rb
Expand Up @@ -424,7 +424,7 @@ def initialize_tls_context(ctx, opts={})
if !@verify_peer
@logger.warn <<-MSG
Using TLS but peer hostname verification is disabled. This is convenient for local development
but prone man-in-the-middle attacks. Please set :verify_peer => true in production!
but prone to man-in-the-middle attacks. Please set :verify_peer => true in production!
MSG
end

Expand Down
25 changes: 1 addition & 24 deletions spec/higher_level_api/integration/connection_recovery_spec.rb
Expand Up @@ -3,7 +3,7 @@

describe "Connection recovery" do
let(:http_client) { RabbitMQ::HTTP::Client.new("http://127.0.0.1:15672") }
let(:logger) { Logger.new($stderr).tap {|logger| logger.level = Logger::INFO} }
let(:logger) { Logger.new($stderr).tap {|logger| logger.level = ENV["BUNNY_LOG_LEVEL"] || Logger::WARN } }
let(:recovery_interval) { 0.2 }

it "reconnects after grace period" do
Expand Down Expand Up @@ -309,21 +309,6 @@
end
end

it "tries to recover for a given number of attempts" do
pending "Need a fix for https://github.com/ruby-amqp/bunny/issues/408"
with_recovery_attempts_limited_to(2) do |c|
close_all_connections!
wait_for_recovery_with { connections.any? }

close_all_connections!
wait_for_recovery_with { connections.any? }

close_all_connections!
sleep(recovery_interval + 0.5)
expect(connections).to be_empty
end
end

def exchange_names_in_vhost(vhost)
http_client.list_exchanges(vhost).map {|e| e["name"]}
end
Expand Down Expand Up @@ -403,14 +388,6 @@ def with_open_multi_broken_host(&block)
with_open(c, &block)
end

def with_recovery_attempts_limited_to(attempts = 3, &block)
c = Bunny.new(recover_from_connection_close: true,
network_recovery_interval: recovery_interval,
recovery_attempts: attempts,
logger: logger)
with_open(c, &block)
end

def ensure_queue_recovery(ch, q)
ch.confirm_select
q.purge
Expand Down
7 changes: 4 additions & 3 deletions spec/higher_level_api/integration/connection_stop_spec.rb
Expand Up @@ -62,15 +62,16 @@ def wait_for_recovery

describe "that recovers from connection.close" do
it "can be closed" do
c = Bunny.new(automatically_recover: true, recover_from_connection_close: true, network_recovery_interval: 0.2)
c = Bunny.new(automatically_recover: true,
recover_from_connection_close: true,
network_recovery_interval: 0.2)
c.start
ch = c.create_channel

sleep 1.5
expect(c).to be_open
sleep 1.5
close_connection(c.local_port)
sleep 0.1
expect(c).not_to be_open

wait_for_recovery
expect(c).to be_open
Expand Down

0 comments on commit 83ea0ad

Please sign in to comment.