Skip to content

Commit

Permalink
added a config line for amqp attributes. this was so i could get rabb…
Browse files Browse the repository at this point in the history
…itmq mirrored queues working
  • Loading branch information
Kevin Nuckolls committed Jan 24, 2012
1 parent 73312ae commit b25422a
Showing 1 changed file with 7 additions and 2 deletions.
9 changes: 7 additions & 2 deletions lib/logstash/inputs/amqp.rb
Expand Up @@ -14,6 +14,9 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
config_name "amqp"
plugin_status "unstable"

# Your amqp broker's custom arguments. For mirrored queues in RabbitMQ: [ "x-ha-policy", "all" ]
config :arguments, :validate => :array, :default => []

# Your amqp server address
config :host, :validate => :string, :required => true

Expand All @@ -26,7 +29,7 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
# Your amqp password
config :password, :validate => :password, :default => "guest"

# The name of the queue.
# The name of the queue.
config :name, :validate => :string, :default => ''

# The name of the exchange to bind the queue.
Expand Down Expand Up @@ -108,7 +111,9 @@ def run(queue)
@bunny.start
@bunny.qos({:prefetch_count => @prefetch_count})

@queue = @bunny.queue(@name, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive})
@arguments_hash = Hash[*@arguments]

@queue = @bunny.queue(@name, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive, :arguments => @arguments_hash })
@queue.bind(@exchange, :key => @key)

@queue.subscribe({:ack => @ack}) do |data|
Expand Down

2 comments on commit b25422a

@lusis
Copy link

@lusis lusis commented on b25422a Jan 24, 2012

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@knuckolls this is awesome!

Does it make sense to expose some of these args as individual configs or are they so variant that it's best just to splat them? Also, do we need to consider adding this to the output as well?

@knuckolls
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lusis thanks!

My experience is only limited to RabbitMQ but it appears to me that the arguments are meant to be variables to tune specifics of an individual amqp broker's implementation. In that case, I think the most common use cases should be documented and that the arguments config should stay the way it is. As far as I've seen just digging through things the x-ha-policy argument is used on queues to denote that the broker should treat them as mirrored. I also saw x-expires and x-message-ttl as other arguments when declaring a queue.

I didn't consider that exchanges might have an arguments table in the 0.9.1 spec but I looked it up via the amq-protocol rubygems docs [1] and it does show that some brokers might have arguments as well. Docs for the exchanges [2] and the queues [3] are at these links respectively. The bindings themselves may have arguments but I'm not sure how those are intended to be configured via the amqp input and output plugins anyway. Looking specifically at the RabbitMQ docs it appears that only the queues and not the exchanges actually have any arguments to configure. [4]

I did a little safari through the config parser code to see if I could make arguments actually accept a Hash instead of just an array but that portion of the logstash code is rather impenetrable and I was just trying to get something out the door that others could use. Someone more familiar with that part of the codebase should probably attack that problem if it's deemed worth it.

Hope that helps. Let me know if you have any more questions while this is all fresh on my mind. :)

[1] http://rubygems.org/gems/amq-protocol
[2] http://rubydoc.info/gems/amq-protocol/0.9.0/AMQ/Protocol/Exchange/Declare
[3] http://rubydoc.info/gems/amq-protocol/0.9.0/AMQ/Protocol/Queue/Declare
[4] http://www.rabbitmq.com/amqp-0-9-1-quickref.html

Please sign in to comment.