Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

amqp abstraction on inputs

  • Loading branch information...
commit e23e4a3f61a77d95c482b58d6f548caae83e98fc 1 parent 290f5f2
@lusis authored
Showing with 108 additions and 17 deletions.
  1. +1 −0  Gemfile
  2. +47 −17 lib/logstash/inputs/amqp.rb
  3. +60 −0 lib/logstash/util/amqp.rb
View
1  Gemfile
@@ -2,6 +2,7 @@ source :rubygems
gem "cabin", "0.3.8" # for logging. apache 2 license
gem "bunny" # for amqp support, MIT-style license
+gem "hot_bunnies", :git => "git://github.com/ruby-amqp/hot_bunnies.git", :platforms => :jruby #License: MIT
gem "uuidtools" # for naming amqp queues, License ???
gem "filewatch", "0.3.3" # for file tailing, BSD License
View
64 lib/logstash/inputs/amqp.rb
@@ -61,6 +61,8 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
config :prefetch_count, :validate => :number, :default => 1
# Enable message acknowledgement
+ # Disabling this can greatly increase speed
+ # at the expense of possible duplicate messages
config :ack, :validate => :boolean, :default => true
# Enable or disable debugging
@@ -72,6 +74,16 @@ class LogStash::Inputs::Amqp < LogStash::Inputs::Base
# Validate SSL certificate
config :verify_ssl, :validate => :boolean, :default => false
+ # Driver selection
+ # By default, logstash will use the `hot_bunnies` gem under JRuby
+ # and the `bunny` gem under MRI/YARV variants
+ # If you need to explcitly set this, do so here
+ # see [choosing a driver](choosing-a-driver) for more information
+ # Please note that currently, `hot_bunnies` does not yet
+ # support SSL. If you need SSL, please explicitly set this to
+ # `bunny`
+ config :driver, :validate => ["bunny", "hot_bunnies"]
+
public
def initialize(params)
super
@@ -82,8 +94,12 @@ def initialize(params)
public
def register
+ require "logstash/util/amqp"
@logger.info("Registering input #{@url}")
- require "bunny" # rubygem 'bunny'
+ self.class.send(:include, LogStash::Util::AMQP)
+ @driver ||= select_driver
+ @logger.info("Logstash driver selected", :driver => driver)
+ require "#{@driver}"
@vhost ||= "/"
@port ||= 5672
@key ||= "#"
@@ -111,28 +127,41 @@ def register
def run(queue)
begin
@logger.debug("Connecting with AMQP settings #{@amqpsettings.inspect} to set up queue #{@name.inspect}")
- @bunny = Bunny.new(@amqpsettings)
+ @connection = connect(@driver, @amqpsettings)
return if terminating?
- @bunny.start
- @bunny.qos({:prefetch_count => @prefetch_count})
+ @channel = start!(@driver, @connection, @prefetch_count)
- @queue = @bunny.queue(@name, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive})
- @queue.bind(@exchange, :key => @key)
+ @queue = @channel.queue(@name, {:durable => @durable, :auto_delete => @auto_delete, :exclusive => @exclusive})
+ do_bind(@driver, @queue, @exchange, @key)
timer = @metric_amqp_read.time
- @queue.subscribe({:ack => @ack}) do |data|
- timer.stop
- e = to_event(data[:payload], @amqpurl)
- if e
- @metric_queue_write.time do
- queue << e
+ if @driver == 'hot_bunnies'
+ subscription = @queue.subscribe(:ack => @ack, :blocking => true) do |headers,data|
+ timer.stop
+ e = to_event(data, @amqp_url)
+ if e
+ @metric_queue_write.time do
+ queue << e
+ headers.ack if @ack == true # ack after we know we're good
+ end
+ end
+ time = @metric_amqp_read.time
+ end # @queue.subscribe
+ else
+ @queue.subscribe({:ack => @ack}) do |data|
+ timer.stop
+ e = to_event(data[:payload], @amqpurl)
+ if e
+ @metric_queue_write.time do
+ queue << e
+ end
end
- end
- timer = @metric_amqp_read.time
- end # @queue.subscribe
+ timer = @metric_amqp_read.time
+ end # @queue.subscribe
+ end # @driver.subscribe
- rescue *[Bunny::ConnectionError, Bunny::ServerDownError] => e
- @logger.error("AMQP connection error, will reconnect: #{e}")
+ rescue Exception => e
+ @logger.error("AMQP connection error: #{e}")
# Sleep for a bit before retrying.
# TODO(sissel): Write 'backoff' method?
sleep(1)
@@ -141,6 +170,7 @@ def run(queue)
end # def run
def teardown
+ do_unbind(@driver, @queue, @exchange, @key) unless @durable == true
@queue.unsubscribe unless @durable == true
@queue.delete unless @durable == true
@bunny.close if @bunny
View
60 lib/logstash/util/amqp.rb
@@ -0,0 +1,60 @@
+require 'logstash/namespace'
+
+module LogStash::Util::AMQP
+
+ def select_driver
+ case RUBY_ENGINE
+ when 'jruby'
+ driver = 'hot_bunnies'
+ else
+ driver = 'bunny'
+ end
+ return driver
+ end
+
+ def connect(driver, options={})
+ case driver
+ when 'hot_bunnies'
+ options.delete_if do |k,v|
+ %w{ssl verify_ssl logging}.include?(k)
+ end
+ connection = HotBunnies.connect(options)
+ else
+ connection = Bunny.new(options)
+ end
+ return connection
+ end
+
+ def start!(driver, connection, prefetch_count)
+ case driver
+ when 'hot_bunnies'
+ # hot_bunnies operates on channel object
+ channel = connection.create_channel
+ channel.prefetch = prefetch_count
+ return channel
+ else
+ # bunny operations on connection object
+ connection.start
+ connection.qos({:prefetch_count => prefetch_count})
+ return connection
+ end
+ end
+
+ def do_bind(driver, queue, exchange, key)
+ case driver
+ when 'hot_bunnies'
+ queue.bind(exchange, :routing_key => key)
+ else
+ queue.bind(exchange, :key => key)
+ end
+ end
+
+ def do_unbind(driver, queue, exchange, key)
+ case driver
+ when 'hot_bunnies'
+ queue.unbind(exchange, :routing_key => key)
+ else
+ queue.unbind(exchange, :key => key)
+ end
+ end
+end
Please sign in to comment.
Something went wrong with that request. Please try again.