Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

A plugin for Flume that allows you to use an AMQP broker as a source.

branch: master
README.mkd

flume-amqp-plugin

The flume-amqp-plugin allows you to use an AMQP broker as a Flume Source.

Getting Started

  1. Copy the entire folder to ${flume_dir}/plugins

  2. cd ${flume_dir}/plugins/flume-amqp-plugin

  3. Build by running 'ant'. A flume-amqp-plugin.jar file should be created.

  4. Modify ${flume_dir}/conf/flume-site.xml

flume.plugin.classesamqp.AmqpEventSourceComma separated list of plugin classes

  1. cd into the top-level flume directory (above plugins).

Set FLUME_CLASSPATH for all terminals which will run Flume master or node:

export FLUME_CLASSPATH=pwd/plugins/flume-amqp-plugin/flume-amqp-plugin.jar:pwd/plugins/flume-amqp-plugin/lib/amqp-client-2.1.0.jar

Usage

The AMQP Event Source will take message from an AMQP broker and create Flume events from the message's body.

The only required argument is the exchangeName parameter. Note that each parameter needs to be enclosed in quotes because the plugin uses named parameters for setting up the source.

amqp("exchangeName=exchangeName",
     "[,host=host] "
     "[,port=port] "
     "[,virtualHost=virtualHost] "
     "[,userName=user] "
     "[,password=password] "
     "[,exchangeType=direct] "
     "[,durableExchange=false] "
     "[,queueName=queueName] "
     "[,durableQueue=false] "
     "[,exclusiveQueue=false] "
     "[,autoDeleteQueue=false] "
     "[,bindings=binding1,binding2,bindingN]
     "[,useMessageTimestamp=false])"
  • exchangeName - this is the name of the AMQP exchange we are getting messages from.
  • host - the host name or IP address of the broker. Default to localhost when not specified.
  • port - the port on which the broker is accepting connections. Defaults to 5672 when not specified.
  • virtualHost - the virtual host to use when connecting to the broker. Default to "/" when not specified.
  • userName - the AMQP user name to use when connecting to the broker. Defaults to "guest" when not specified.
  • password - the password to use when connecting to the broker. Defaults to "guest" when not specified.
  • exchangeType - the type exchange. Valid types are direct, fanout, topic, and header. Defaults to direct when not specified.
  • durableExchange - true if we are declaring a durable exchange (the exchange will survive a server restart). Defaults to false.
  • queueName - if left unspecified, the server chooses a name and provides this to the client. Generally, when applications share a message queue they agree on a message queue name beforehand, and when an application needs a message queue for its own purposes, it lets the server provide a name.
  • durableQueue - if true, the message queue remains present and active when the server restarts. It may lose transient messages if the server restarts.
  • exclusiveQueue - if true, the queue belongs to the current connection only, and is deleted when the connection closes.
  • autoDeleteQueue - true if we are declaring an autodelete queue (server will delete it when no longer in use).
  • bindings - comma separated list of strings that will be used to bind the queue to the exchange. This is not required for certain exchange types.
  • useMessageTimestamp - if true, the timestamp for the Flume event will be based on the timestamp from the AMQP message.

Examples

  1. Broker is on same node as flume agent, direct exchange type and two bindings

    amqp("exchangeName=alertMessages", "bindings=system, application")

  2. Broker is on different machine, direct exchange type, shared queue

    amqp("exchangeName=stockTrades", "host=10.23.224.2", "bindings=IBM,GOOG,MSFT")

Something went wrong with that request. Please try again.