Skip to content

Commit

Permalink
Bunny::Timer => Bunny::Timeout, with an alias
Browse files Browse the repository at this point in the history
  • Loading branch information
Michael Klishin committed Aug 24, 2013
1 parent 81ca13b commit 3b3aaac
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 37 deletions.
15 changes: 1 addition & 14 deletions lib/bunny.rb
Expand Up @@ -9,6 +9,7 @@
require "bunny/framing"
require "bunny/exceptions"
require "bunny/socket"
require "bunny/timeout"

begin
require "openssl"
Expand All @@ -33,20 +34,6 @@ module Bunny
# AMQP protocol version Bunny implements
PROTOCOL_VERSION = AMQ::Protocol::PROTOCOL_VERSION

# unifies Ruby standard library's Timeout (which is not accurate on
# Ruby 1.8 and has other issues) and SystemTimer (the gem)
Timer = if RUBY_VERSION < "1.9"
begin
require "bunny/system_timer"
Bunny::SystemTimer
rescue LoadError
Timeout
end
else
Timeout
end


#
# API
#
Expand Down
36 changes: 18 additions & 18 deletions lib/bunny/channel.rb
Expand Up @@ -602,7 +602,7 @@ def basic_qos(prefetch_count, global = false)

@connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, 0, prefetch_count, global))

Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_basic_qos_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -621,7 +621,7 @@ def basic_recover(requeue)
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_basic_recover_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down Expand Up @@ -823,7 +823,7 @@ def basic_consume(queue, consumer_tag = generate_consumer_tag, no_ack = false, e
arguments))

begin
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_basic_consume_ok = wait_on_continuations
end
rescue Exception => e
Expand Down Expand Up @@ -873,7 +873,7 @@ def basic_consume_with(consumer)
consumer.arguments))

begin
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_basic_consume_ok = wait_on_continuations
end
rescue Exception => e
Expand Down Expand Up @@ -908,7 +908,7 @@ def basic_consume_with(consumer)
def basic_cancel(consumer_tag)
@connection.send_frame(AMQ::Protocol::Basic::Cancel.encode(@id, consumer_tag, false))

Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_basic_cancel_ok = wait_on_continuations
end

Expand Down Expand Up @@ -982,7 +982,7 @@ def queue_delete(name, opts = {})
opts[:if_unused],
opts[:if_empty],
false))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_queue_delete_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -1002,7 +1002,7 @@ def queue_purge(name, opts = {})

@connection.send_frame(AMQ::Protocol::Queue::Purge.encode(@id, name, false))

Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_queue_purge_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down Expand Up @@ -1038,7 +1038,7 @@ def queue_bind(name, exchange, opts = {})
opts[:routing_key],
false,
opts[:arguments]))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_queue_bind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1073,7 +1073,7 @@ def queue_unbind(name, exchange, opts = {})
exchange_name,
opts[:routing_key],
opts[:arguments]))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_queue_unbind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1112,7 +1112,7 @@ def exchange_declare(name, type, opts = {})
false,
false,
opts[:arguments]))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_declare_ok = wait_on_continuations
end

Expand All @@ -1137,7 +1137,7 @@ def exchange_delete(name, opts = {})
name,
opts[:if_unused],
false))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_delete_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1181,7 +1181,7 @@ def exchange_bind(source, destination, opts = {})
opts[:routing_key],
false,
opts[:arguments]))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_bind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1225,7 +1225,7 @@ def exchange_unbind(source, destination, opts = {})
opts[:routing_key],
false,
opts[:arguments]))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_exchange_unbind_ok = wait_on_continuations
end

Expand Down Expand Up @@ -1253,7 +1253,7 @@ def channel_flow(active)
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_channel_flow_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -1274,7 +1274,7 @@ def tx_select
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_tx_select_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -1289,7 +1289,7 @@ def tx_commit
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_tx_commit_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand All @@ -1304,7 +1304,7 @@ def tx_rollback
raise_if_no_longer_open!

@connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_tx_rollback_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down Expand Up @@ -1344,7 +1344,7 @@ def confirm_select(callback=nil)
@confirms_callback = callback

@connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, false))
Bunny::Timer.timeout(read_write_timeout, ClientTimeout) do
Bunny::Timeout.timeout(read_write_timeout, ClientTimeout) do
@last_confirm_select_ok = wait_on_continuations
end
raise_if_continuation_resulted_in_a_channel_error!
Expand Down
2 changes: 1 addition & 1 deletion lib/bunny/concurrent/continuation_queue.rb
Expand Up @@ -21,7 +21,7 @@ def pop

def poll(timeout_in_ms = nil)
if timeout_in_ms
Bunny::Timer.timeout(timeout_in_ms / 1000, Timeout::Error) do
Bunny::Timeout.timeout(timeout_in_ms / 1000, Timeout::Error) do
@q.pop
end
else
Expand Down
4 changes: 2 additions & 2 deletions lib/bunny/session.rb
Expand Up @@ -265,7 +265,7 @@ def close
if @transport.open?
close_all_channels

Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) do
Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) do
self.close_connection(true)
end

Expand Down Expand Up @@ -376,7 +376,7 @@ def close_channel(ch)
# @private
def close_all_channels
@channels.reject {|n, ch| n == 0 || !ch.open? }.each do |_, ch|
Bunny::Timer.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
Bunny::Timeout.timeout(@transport.disconnect_timeout, ClientTimeout) { ch.close }
end
end

Expand Down
4 changes: 2 additions & 2 deletions lib/bunny/transport.rb
Expand Up @@ -112,7 +112,7 @@ def configure_tls_context(&block)
def write(data)
begin
if @read_write_timeout
Bunny::Timer.timeout(@read_write_timeout, Bunny::ClientTimeout) do
Bunny::Timeout.timeout(@read_write_timeout, Bunny::ClientTimeout) do
if open?
@writes_mutex.synchronize { @socket.write(data) }
@socket.flush
Expand Down Expand Up @@ -250,7 +250,7 @@ def self.ping!(host, port, timeout)

def initialize_socket
begin
@socket = Bunny::Timer.timeout(@connect_timeout, ConnectionTimeout) do
@socket = Bunny::Timeout.timeout(@connect_timeout, ConnectionTimeout) do
Bunny::Socket.open(@host, @port,
:keepalive => @opts[:keepalive],
:socket_timeout => @connect_timeout)
Expand Down

0 comments on commit 3b3aaac

Please sign in to comment.