Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Make connection.close handling behaviour configurable

Some apps (e.g. our test suite) may choose to recover from them
instead of raising an exception.
  • Loading branch information...
commit 1dff7db6f10c93acfab30f66b63053579a6addb1 1 parent d3bd5c1
Michael Klishin authored
Showing with 52 additions and 24 deletions.
  1. +52 −24 lib/bunny/session.rb
View
76 lib/bunny/session.rb
@@ -146,6 +146,7 @@ def initialize(connection_string_or_opts = Hash.new, optz = Hash.new)
opts[:automatically_recover] || opts[:automatic_recovery]
end
@network_recovery_interval = opts.fetch(:network_recovery_interval, DEFAULT_NETWORK_RECOVERY_INTERVAL)
+ @recover_from_connection_close = opts.fetch(:recover_from_connection_close, false)
# in ms
@continuation_timeout = opts.fetch(:continuation_timeout, DEFAULT_CONTINUATION_TIMEOUT)
@@ -219,6 +220,11 @@ def configure_socket(&block)
@transport.configure_socket(&block)
end
+ # @return [Integer] Client socket port
+ def local_port
+ @transport.local_address.ip_port
+ end
+
# Starts the connection process.
#
# @see http://rubybunny.info/articles/getting_started.html
@@ -522,21 +528,13 @@ def handle_frame(ch_number, method)
when AMQ::Protocol::Channel::CloseOk then
@continuations.push(method)
when AMQ::Protocol::Connection::Close then
- @last_connection_error = instantiate_connection_level_exception(method)
- @continuations.push(method)
-
- begin
- shut_down_all_consumer_work_pools!
- maybe_shutdown_reader_loop
- rescue ShutdownSignal => sse
- # no-op
- rescue Exception => e
- @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}"
- ensure
- close_transport
+ if recover_from_connection_close?
+ @logger.warn "Recovering from connection.close (#{method.reply_text})"
+ clean_up_on_shutdown
+ handle_network_failure(instantiate_connection_level_exception(method))
+ else
+ clean_up_and_fail_on_connection_close!(method)
end
-
- @origin_thread.raise(@last_connection_error)
when AMQ::Protocol::Connection::CloseOk then
@last_connection_close_ok = method
begin
@@ -594,23 +592,32 @@ def handle_frameset(ch_number, frames)
end
# @private
+ def recover_from_connection_close?
+ @recover_from_connection_close
+ end
+
+ # @private
def handle_network_failure(exception)
raise NetworkErrorWrapper.new(exception) unless @threaded
@status = :disconnected
if !recovering_from_network_failure?
- @recovering_from_network_failure = true
- if recoverable_network_failure?(exception)
- @logger.warn "Recovering from a network failure..."
- @channels.each do |n, ch|
- ch.maybe_kill_consumer_work_pool!
+ begin
+ @recovering_from_network_failure = true
+ if recoverable_network_failure?(exception)
+ @logger.warn "Recovering from a network failure..."
+ @channels.each do |n, ch|
+ ch.maybe_kill_consumer_work_pool!
+ end
+ maybe_shutdown_heartbeat_sender
+
+ recover_from_network_failure
+ else
+ # TODO: investigate if we can be a bit smarter here. MK.
end
- maybe_shutdown_heartbeat_sender
-
- recover_from_network_failure
- else
- # TODO: investigate if we can be a bit smarter here. MK.
+ ensure
+ @recovering_from_network_failure = false
end
end
end
@@ -687,6 +694,27 @@ def instantiate_connection_level_exception(frame)
end
end
+ def clean_up_and_fail_on_connection_close!(method)
+ @last_connection_error = instantiate_connection_level_exception(method)
+ @continuations.push(method)
+
+ clean_up_on_shutdown
+ @origin_thread.raise(@last_connection_error)
+ end
+
+ def clean_up_on_shutdown
+ begin
+ shut_down_all_consumer_work_pools!
+ maybe_shutdown_reader_loop
+ rescue ShutdownSignal => sse
+ # no-op
+ rescue Exception => e
+ @logger.warn "Caught an exception when cleaning up after receiving connection.close: #{e.message}"
+ ensure
+ close_transport
+ end
+ end
+
# @private
def hostname_from(options)
options[:host] || options[:hostname] || DEFAULT_HOST
Please sign in to comment.
Something went wrong with that request. Please try again.