Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

more improvements to amqp node

  • Loading branch information...
commit 1c27d4528eb49ac156563b7ae37cda8004a954e8 1 parent b0a5958
@movitto authored committed
Showing with 8 additions and 3 deletions.
  1. +7 −3 lib/rjr/amqp_node.rb
  2. +1 −0  specs/ws_node_spec.rb
View
10 lib/rjr/amqp_node.rb
@@ -93,6 +93,7 @@ def initialize(args = {})
# Initialize the amqp subsystem
def init_node
+ return unless @conn.nil? || !@conn.connected?
@conn = AMQP.connect(:host => @broker)
@conn.on_tcp_connection_failure { puts "OTCF #{@node_id}" }
@@ -104,13 +105,14 @@ def init_node
@queue = @channel.queue(@queue_name, :auto_delete => true)
@exchange = @channel.default_exchange
+ @listening = false
@disconnected = false
@exchange.on_return do |basic_return, metadata, payload|
puts "#{payload} was returned! reply_code = #{basic_return.reply_code}, reply_text = #{basic_return.reply_text}"
- @disconnected = true
- @node.connection_event(:error)
- @node.connection_event(:closed)
+ @disconnected = true # FIXME member will be set on wrong class
+ connection_event(:error)
+ connection_event(:closed)
end
end
@@ -122,6 +124,8 @@ def publish(*args)
# subscribe to messages using the amqp queue
def subscribe(*args, &bl)
+ return if @listening
+ @listening = true
@queue.subscribe do |metadata, msg|
bl.call metadata, msg
end
View
1  specs/ws_node_spec.rb
@@ -20,6 +20,7 @@
}
server.listen
+ sleep 1
res = client.invoke_request 'ws://localhost:9876', 'foobar', 'myparam'
res.should == 'retval'
server.halt
Please sign in to comment.
Something went wrong with that request. Please try again.