Permalink
Browse files

Added useMessageTimestamp to enable using the timestamp of the messag…

…e to be used on the flume event.
  • Loading branch information...
stampy88 committed Feb 22, 2011
1 parent c7d2f9f commit 0ce62c672c7eb17a77b0546adfd870cc6829f866
Showing with 49 additions and 8 deletions.
  1. +3 −1 README.mkd
  2. +39 −3 src/main/java/amqp/AmqpConsumer.java
  3. +7 −4 src/main/java/amqp/AmqpEventSource.java
View
@@ -48,7 +48,8 @@ because the plugin uses named parameters for setting up the source.
"[,durableQueue=false] "
"[,exclusiveQueue=false] "
"[,autoDeleteQueue=false] "
- "[,bindings=binding1,binding2,bindingN])"
+ "[,bindings=binding1,binding2,bindingN]
+ "[,useMessageTimestamp=false])"
* exchangeName - this is the name of the AMQP exchange we are getting messages from.
* host - the host name or IP address of the broker. Default to localhost when not specified.
@@ -67,6 +68,7 @@ messages if the server restarts.
* autoDeleteQueue - true if we are declaring an autodelete queue (server will delete it when no longer in use).
* bindings - comma separated list of strings that will be used to bind the queue to the exchange. This is not required for
certain exchange types.
+* useMessageTimestamp - if true, the timestamp for the Flume event will be based on the timestamp from the AMQP message.
Examples
--------
@@ -20,6 +20,9 @@
import com.cloudera.flume.conf.FlumeConfiguration;
import com.cloudera.flume.core.Event;
import com.cloudera.flume.core.EventImpl;
+import com.cloudera.util.Clock;
+import com.cloudera.util.NetUtils;
+import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
@@ -87,6 +90,11 @@
*/
private boolean autoDelete;
private final String[] bindings;
+ /**
+ * true if we use the timestamp from {@link com.rabbitmq.client.AMQP.BasicProperties#getTimestamp()} for the
+ * flume event's timestamp
+ */
+ private boolean useMessageTimestamp;
private Channel channel;
@@ -107,7 +115,8 @@
public AmqpConsumer(String host, int port, String virutalHost, String userName, String password,
String exchangeName, String exchangeType, boolean durableExchange,
- String queueName, boolean durable, boolean exclusive, boolean autoDelete, String... bindings) {
+ String queueName, boolean durable, boolean exclusive, boolean autoDelete,
+ boolean useMessageTimestamp, String... bindings) {
super(host, port, virutalHost, userName, password);
this.exchangeName = exchangeName;
@@ -117,6 +126,7 @@ public AmqpConsumer(String host, int port, String virutalHost, String userName,
this.durable = durable;
this.exclusive = exclusive;
this.autoDelete = autoDelete;
+ this.useMessageTimestamp = useMessageTimestamp;
this.bindings = bindings;
}
@@ -263,8 +273,7 @@ private void runConsumeLoop() {
LOG.warn("Received message with body size of {} which is above the {} of {}, ignoring message",
new Object[]{body.length, FlumeConfiguration.EVENT_MAX_SIZE, MAX_BODY_SIZE});
} else {
- // create a new flume event based on the message body
- Event event = new EventImpl(delivery.getBody());
+ Event event = createEventFromDelivery(delivery);
// add to queue
events.add(event);
@@ -297,6 +306,33 @@ private void runConsumeLoop() {
LOG.info("Exited runConsumeLoop with running={} and interrupt status={}", isRunning(), currentThread.isInterrupted());
}
+ /**
+ * Creates a new flume {@link Event} from the message delivery. Note that if the {@link #useMessageTimestamp} is true
+ * and there is a timestamp on the message, the newly created flume event will have the message's timestamp.
+ * @param delivery message that is being processed
+ * @return new flume event
+ */
+ private Event createEventFromDelivery(QueueingConsumer.Delivery delivery) {
+ long timeInMS = -1, timeInNanos = -1;
+
+ if(useMessageTimestamp) {
+ AMQP.BasicProperties msgProperties = delivery.getProperties();
+
+ if(msgProperties != null && msgProperties.getTimestamp() != null) {
+ timeInMS = msgProperties.getTimestamp().getTime();
+ // there is no time in nanoseconds from the message, so we use the same timestamp for nanos
+ timeInNanos = timeInMS;
+ }
+ }
+
+ if(timeInMS == -1) {
+ timeInMS = Clock.unixTime();
+ timeInNanos = Clock.nanos();
+ }
+
+ return new EventImpl(delivery.getBody(), timeInMS, Event.Priority.INFO, timeInNanos, NetUtils.localhost());
+ }
+
/**
* Will log the specified exception and will set the {@link #exception} field if the
* {@link com.rabbitmq.client.ShutdownSignalException#isInitiatedByApplication()} is false meaning
@@ -53,9 +53,10 @@
public AmqpEventSource(String host, int port, String virtualHost, String userName, String password,
String exchangeName, String exchangeType, boolean durableExchange,
- String queueName, boolean durable, boolean exclusive, boolean autoDelete, String... bindings) {
+ String queueName, boolean durable, boolean exclusive, boolean autoDelete,
+ boolean useMessageTimestamp, String... bindings) {
consumer = new AmqpConsumer(host, port, virtualHost, userName, password,
- exchangeName, exchangeType, durableExchange, queueName, durable, exclusive, autoDelete, bindings);
+ exchangeName, exchangeType, durableExchange, queueName, durable, exclusive, autoDelete, useMessageTimestamp, bindings);
}
public AmqpEventSource(ConnectionFactory connectionFactory, String exchangeName, String queueName, String... bindings) {
@@ -130,7 +131,8 @@ public EventSource build(Context ctx, String... args) {
"[,durableQueue=false] " +
"[,exclusiveQueue=false] " +
"[,autoDeleteQueue=false] " +
- "[,bindings=\"binding1,binding2,bindingN\"])");
+ "[,bindings=\"binding1,binding2,bindingN\"] " +
+ "[,useMessageTimestamp=false])");
}
CommandLineParser parser = new CommandLineParser(args);
@@ -148,6 +150,7 @@ public EventSource build(Context ctx, String... args) {
boolean exclusiveQueue = parser.getOptionValue("exclusiveQueue", false);
boolean autoDeleteQueue = parser.getOptionValue("autoDeleteQueue", false);
String[] bindings = parser.getOptionValues("bindings");
+ boolean useMessageTimestamp = parser.getOptionValue("useMessageTimestamp", false);
// exchange name is the only required parameter
if (exchangeName == null) {
@@ -156,7 +159,7 @@ public EventSource build(Context ctx, String... args) {
return new AmqpEventSource(host, port, virtualHost, userName, password,
exchangeName, exchangeType, durableExchange, queueName, durableQueue,
- exclusiveQueue, autoDeleteQueue, bindings);
+ exclusiveQueue, autoDeleteQueue, useMessageTimestamp, bindings);
}
};
}

0 comments on commit 0ce62c6

Please sign in to comment.