Permalink
Browse files

Merge pull request #6 from SeanTAllen/master

Allow access to rabbitmq message information.
  • Loading branch information...
ppat committed Feb 10, 2014
2 parents 972bedc + 27201ca commit 8242de9ce85bb87794f336ebdffa3c7cc55f1d46
@@ -1,6 +1,8 @@
package io.latent.storm.rabbitmq;
import com.rabbitmq.client.QueueingConsumer;
import java.util.Date;
import java.util.Map;
public class Message {
public static final Message NONE = new None();
@@ -23,28 +25,66 @@ public static Message forSending(byte[] body) {
return body;
}
public static class DeliveredMessage extends Message {
private final boolean redelivery;
private final long deliveryTag;
private final String receivedRoutingKey;
private final String receivedExchange;
private final String routingKey;
private final String exchange;
private final String className;
private final String clusterId;
private final String contentEncoding;
private final String contentType;
private final String correlationId;
private final Integer deliveryMode;
private final String expiration;
private final Map<String, Object> headers;
private final String messageId;
private final Integer priority;
private final String replyTo;
private final Date timestamp;
private final String type;
private final String userId;
private DeliveredMessage(QueueingConsumer.Delivery delivery) {
super(delivery.getBody());
redelivery = delivery.getEnvelope().isRedeliver();
deliveryTag = delivery.getEnvelope().getDeliveryTag();
receivedRoutingKey = delivery.getEnvelope().getRoutingKey();
receivedExchange = delivery.getEnvelope().getExchange();
routingKey = delivery.getEnvelope().getRoutingKey();
exchange = delivery.getEnvelope().getExchange();
className = delivery.getProperties().getClassName();
clusterId = delivery.getProperties().getClusterId();
contentEncoding = delivery.getProperties().getContentEncoding();
contentType = delivery.getProperties().getContentType();
correlationId = delivery.getProperties().getCorrelationId();
deliveryMode = delivery.getProperties().getDeliveryMode();
expiration = delivery.getProperties().getExpiration();
headers = delivery.getProperties().getHeaders();
messageId = delivery.getProperties().getMessageId();
priority = delivery.getProperties().getPriority();
replyTo = delivery.getProperties().getReplyTo();
timestamp = delivery.getProperties().getTimestamp();
type = delivery.getProperties().getType();
userId = delivery.getProperties().getUserId();
}
public boolean isRedelivery() { return redelivery; }
public long getDeliveryTag() { return deliveryTag; }
public String getReceivedRoutingKey() { return receivedRoutingKey; }
public String getReceivedExchange() { return receivedExchange; }
public String getRoutingKey() { return routingKey; }
public String getExchange() { return exchange; }
public String getClassName() { return className;}
public String getClusterId(){ return clusterId; }
public String getContentEncoding() { return contentEncoding; }
public String getContentType() { return contentType; }
public String getCorrelationId() { return correlationId; }
public Integer getDeliveryMode() { return deliveryMode; }
public String getExpiration() { return expiration; }
public Map<String, Object> getHeaders() { return headers; }
public String getMessageId() { return messageId; }
public Integer getPriority() { return priority; }
public String getReplyTo() { return replyTo; }
public Date getTimestamp() { return timestamp; }
public String getType() { return type; }
public String getUserId() { return userId; }
}
public static class None extends Message {
@@ -0,0 +1,164 @@
package io.latent.storm.rabbitmq;
import backtype.storm.spout.Scheme;
import backtype.storm.tuple.Fields;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import backtype.storm.task.TopologyContext;
public class RabbitMQMessageScheme implements MessageScheme {
private final Scheme payloadScheme;
private final List<String> fieldNames;
public RabbitMQMessageScheme(Scheme payloadScheme, String envelopeFieldName, String propertiesFieldName) {
this.payloadScheme = payloadScheme;
List<String> payloadFieldNames = payloadScheme.getOutputFields().toList();
this.fieldNames = new ArrayList<String>();
fieldNames.addAll(payloadFieldNames);
fieldNames.add(envelopeFieldName);
fieldNames.add(propertiesFieldName);
}
@Override
public void open(Map config, TopologyContext context) {
}
@Override
public void close() {
}
@Override
public List<Object> deserialize(Message message) {
Message.DeliveredMessage dm = (Message.DeliveredMessage)message;
Envelope envelope = createEnvelope(dm);
Properties properties = createProperties(dm);
List<Object> payloadValues = deserialize(dm.getBody());
List<Object> values = new ArrayList<Object>();
values.addAll(payloadValues);
values.add(envelope);
values.add(properties);
return values;
}
@Override
public List<Object> deserialize(byte[] payload) {
return payloadScheme.deserialize(payload);
}
@Override
public Fields getOutputFields() {
return new Fields(fieldNames);
}
private Envelope createEnvelope(Message.DeliveredMessage dm) {
return new Envelope(dm.isRedelivery(), dm.getDeliveryTag(), dm.getExchange(), dm.getRoutingKey());
}
private Properties createProperties(Message.DeliveredMessage dm) {
return new Properties(dm.getClassName(),
dm.getClusterId(),
dm.getContentEncoding(),
dm.getContentType(),
dm.getCorrelationId(),
dm.getDeliveryMode(),
dm.getExpiration(),
dm.getHeaders(),
dm.getMessageId(),
dm.getPriority(),
dm.getReplyTo(),
dm.getTimestamp(),
dm.getType(),
dm.getUserId());
}
public static class Envelope {
private final boolean isRedelivery;
private final long deliveryTag;
private final String exchange;
private final String routingKey;
Envelope(boolean isRedelivery, long deliveryTag, String exchange, String routingKey) {
this.isRedelivery = isRedelivery;
this.deliveryTag = deliveryTag;
this.exchange = exchange;
this.routingKey = routingKey;
}
public boolean isRedelivery() { return isRedelivery; }
public long getDeliveryTag() { return deliveryTag; }
public String getExchange() { return exchange; }
public String getRoutingKey() { return routingKey; }
}
public static class Properties {
private final String className;
private final String clusterId;
private final String contentEncoding;
private final String contentType;
private final String correlationId;
private final Integer deliveryMode;
private final String expiration;
private final Map<String, Object> headers;
private final String messageId;
private final Integer priority;
private final String replyTo;
private final Date timestamp;
private final String type;
private final String userId;
public Properties(String className,
String clusterId,
String contentEncoding,
String contentType,
String correlationId,
Integer deliveryMode,
String expiration,
Map<String, Object> headers,
String messageId,
Integer priority,
String replyTo,
Date timestamp,
String type,
String userId) {
this.className = className;
this.clusterId = clusterId;
this.contentEncoding = contentEncoding;
this.contentType = contentType;
this.correlationId = correlationId;
this.deliveryMode = deliveryMode;
this.expiration = expiration;
this.headers = headers;
this.messageId = messageId;
this.priority = priority;
this.replyTo = replyTo;
this.timestamp = timestamp;
this.type = type;
this.userId = userId;
}
public String getClassName() { return className; }
public String getClusterId() { return clusterId; }
public String getContentEncoding() { return contentEncoding; }
public String getContentType() { return contentType; }
public String getCorrelationId() { return correlationId; }
public Integer getDeliveryMode() { return deliveryMode; }
public String getExpiration() { return expiration; }
public Map<String, Object> getHeaders() { return headers; }
public String getMessageId() { return messageId; }
public Integer getPriority() { return priority; }
public String getReplyTo() { return replyTo; }
public Date getTimestamp() { return timestamp; }
public String getType() { return type; }
public String getUserId() { return userId; }
}
}

0 comments on commit 8242de9

Please sign in to comment.