Skip to content

Commit

Permalink
Add addresses to connection options
Browse files Browse the repository at this point in the history
Before this the connection options only allowed multiple hosts, an
address is a combination of a host and a port. This makes it possible to
specify different hosts with different ports.

Allow addresses when initializing the session

An address is a host combined with a port.

Adds some option validations and syntax cleanup

Connection options are now validated to not have both a single and
multiple hosts. Or hosts and addresses.

Removes whitespace from address arrays.

Cleans up addresses_from method

Adds extra spec for incorrect connection options

Adds an additional spec where a single host with a set of addresses is
tested to raise an error.

Refactors addresses_from for readability

The double "host || hostname" wasn't very readable.

Update connection param validations

A single host combined with multiple hosts should log a warning instead
of raising an error. This way it will not break apps currently doing
this.

Combine hosts and addressess in connection options

This way hosts can be in the addresses parameter and vice versa.

Refactors addresses_from method
  • Loading branch information
bartj3 committed Aug 13, 2015
1 parent 79dd8ef commit ba23674
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 18 deletions.
77 changes: 60 additions & 17 deletions lib/bunny/session.rb
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ class Session
#
# @option connection_string_or_opts [String] :host ("127.0.0.1") Hostname or IP address to connect to
# @option connection_string_or_opts [Array<String>] :hosts (["127.0.0.1"]) list of hostname or IP addresses to select hostname from when connecting
# @option connection_string_or_opts [Array<String>] :addresses (["127.0.0.1:5672"]) list of addresses to select hostname and port from when connecting
# @option connection_string_or_opts [Integer] :port (5672) Port RabbitMQ listens on
# @option connection_string_or_opts [String] :username ("guest") Username
# @option connection_string_or_opts [String] :password ("guest") Password
Expand Down Expand Up @@ -135,10 +136,9 @@ def initialize(connection_string_or_opts = Hash.new, optz = Hash.new)
@default_hosts_shuffle_strategy = Proc.new { |hosts| hosts.shuffle }

@opts = opts
@hosts = self.hostnames_from(opts)
@host_index = 0
@addresses = self.addresses_from(opts)
@address_index = 0

@port = self.port_from(opts)
@user = self.username_from(opts)
@pass = self.password_from(opts)
@vhost = self.vhost_from(opts)
Expand All @@ -148,6 +148,8 @@ def initialize(connection_string_or_opts = Hash.new, optz = Hash.new)
log_level = opts[:log_level] || ENV["BUNNY_LOG_LEVEL"] || Logger::WARN
@logger = opts.fetch(:logger, init_default_logger(log_file, log_level))

validate_connection_options(opts)

# should automatic recovery from network failures be used?
@automatically_recover = if opts[:automatically_recover].nil? && opts[:automatic_recovery].nil?
true
Expand Down Expand Up @@ -183,7 +185,7 @@ def initialize(connection_string_or_opts = Hash.new, optz = Hash.new)
# the non-reentrant Ruby mutexes. MK.
@transport_mutex = @mutex_impl.new
@status_mutex = @mutex_impl.new
@host_index_mutex = @mutex_impl.new
@address_index_mutex = @mutex_impl.new

@channels = Hash.new

Expand All @@ -193,6 +195,16 @@ def initialize(connection_string_or_opts = Hash.new, optz = Hash.new)
self.initialize_transport
end

def validate_connection_options(options)
if options[:hosts] && options[:addresses]
raise ArgumentError, "Connection options can't contain hosts and addresses at the same time"
end

if (options[:host] || options[:hostname]) && (options[:hosts] || options[:addresses])
@logger.warn "The connection options contain both a host and an array of hosts, the single host is ignored."
end
end

# @return [String] RabbitMQ hostname (or IP address) used
def hostname; self.host; end
# @return [String] Username used
Expand Down Expand Up @@ -223,11 +235,15 @@ def threaded?
end

def host
@transport ? @transport.host : @hosts[@host_index]
@transport ? @transport.host : host_from_address(@addresses[@address_index])
end

def reset_host_index
@host_index_mutex.synchronize { @host_index = 0 }
def port
@transport ? @transport.port : port_from_address(@addresses[@address_index])
end

def reset_address_index
@address_index_mutex.synchronize { @address_index = 0 }
end

# @private
Expand Down Expand Up @@ -293,7 +309,7 @@ def start
raise
end
rescue HostListDepleted
self.reset_host_index
self.reset_address_index
@status_mutex.synchronize { @status = :not_connected }
raise TCPConnectionFailedForAllHosts
end
Expand Down Expand Up @@ -659,7 +675,7 @@ def recover_from_network_failure
recover_channels
end
rescue HostListDepleted
reset_host_index
reset_address_index
retry
rescue TCPConnectionFailedForAllHosts, TCPConnectionFailed, AMQ::Protocol::EmptyResponseError => e
@logger.warn "TCP connection failed, reconnecting in #{@network_recovery_interval} seconds"
Expand Down Expand Up @@ -744,10 +760,23 @@ def clean_up_on_shutdown
end

# @private
def hostnames_from(options)
options.fetch(:hosts_shuffle_strategy, @default_hosts_shuffle_strategy).call(
[ options[:hosts] || options[:host] || options[:hostname] || DEFAULT_HOST ].flatten
)
def addresses_from(options)
shuffle_strategy = options.fetch(:hosts_shuffle_strategy, @default_hosts_shuffle_strategy)

addresses = options[:host] || options[:hostname] || options[:addresses] ||
options[:hosts] || ["#{DEFAULT_HOST}:#{port_from(options)}"]
addresses = [addresses] unless addresses.is_a? Array

addresses.map! do |address|
host_with_port?(address) ? address : "#{address}:#{port_from(@opts)}"
end

shuffle_strategy.call addresses
end

# @private
def host_with_port?(address)
address.include? ':'
end

# @private
Expand All @@ -761,6 +790,16 @@ def port_from(options)
options.fetch(:port, fallback)
end

# @private
def host_from_address(address)
address.split(":")[0]
end

# @private
def port_from_address(address)
address.split(":")[1].to_i
end

# @private
def vhost_from(options)
options[:virtual_host] || options[:vhost] || DEFAULT_VHOST
Expand Down Expand Up @@ -946,7 +985,7 @@ def send_raw_without_timeout(data, channel)
# @api public
def to_s
oid = ("0x%x" % (self.object_id << 1))
"#<#{self.class.name}:#{oid} #{@user}@#{host}:#{@port}, vhost=#{@vhost}, hosts=[#{@hosts.join(',')}]>"
"#<#{self.class.name}:#{oid} #{@user}@#{host}:#{port}, vhost=#{@vhost}, addresses=[#{@addresses.join(',')}]>"
end

def inspect
Expand Down Expand Up @@ -1109,10 +1148,14 @@ def maybe_shutdown_heartbeat_sender

# @private
def initialize_transport
if host = @hosts[ @host_index ]
@host_index_mutex.synchronize { @host_index += 1 }
if address = @addresses[ @address_index ]
@address_index_mutex.synchronize { @address_index += 1 }
@transport.close rescue nil # Let's make sure the previous transport socket is closed
@transport = Transport.new(self, host, @port, @opts.merge(:session_thread => @origin_thread))
@transport = Transport.new(self,
host_from_address(address),
port_from_address(address),
@opts.merge(:session_thread => @origin_thread)
)

# Reset the cached progname for the logger only when no logger was provided
@default_logger.progname = self.to_s
Expand Down
54 changes: 53 additions & 1 deletion spec/higher_level_api/integration/connection_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@
let(:hosts) { [host] }
let(:subject) { described_class.new(:hosts => hosts) }

it "uses hostname = localhost" do
it "uses hostname = 192.168.1.10" do
expect(subject.host).to eq host
expect(subject.hostname).to eq host
end
Expand All @@ -166,6 +166,58 @@
end
end

context "initialized with :addresses => [...]" do
after :each do
subject.close if subject.open?
end

let(:host) { "192.168.1.10" }
let(:port) { 5673 }
let(:address) { "#{host}:#{port}" }
let(:addresses) { [address] }
let(:subject) { described_class.new(:addresses => addresses) }

it "uses hostname = 192.168.1.10" do
expect(subject.host).to eq host
expect(subject.hostname).to eq host
end

it "uses port 5673" do
expect(subject.port).to eq port
end

it "uses username = guest" do
expect(subject.username).to eq username
expect(subject.user).to eq username
end
end

context "initialized with conflicting hosts and addresses" do
let(:host) { "192.168.1.10" }
let(:port) { 5673 }
let(:address) { "#{host}:#{port}" }
let(:io) { StringIO.new }
let(:logger) { ::Logger.new(io) }

it "raises an argument error when there is are hosts and an address" do
expect { described_class.new(addresses: [address], hosts: [host]) }.to raise_error(ArgumentError)
end

it "logs a warning when there is a single host and an array" do
described_class.new(addresses: [address], host: host, logger: logger)
expect(io.string).to include 'WARN -- : The connection options contain '\
'both a host and an array of hosts, the single host is ignored.'
end

it "converts hosts in addresses to addresses" do
strategy = Proc.new { |addresses| addresses }
session = described_class.new(addresses: [address,host ], hosts_shuffle_strategy: strategy)
strategy = Proc.new { |addresses| addresses }

expect(session.to_s).to include 'addresses=[192.168.1.10:5673,192.168.1.10:5672]'
end
end

context "initialized with :channel_max => 4096" do
after :each do
subject.close if subject.open?
Expand Down

0 comments on commit ba23674

Please sign in to comment.