diff --git a/pom.xml b/pom.xml index 6b09d09..2b501fb 100644 --- a/pom.xml +++ b/pom.xml @@ -20,6 +20,11 @@ amqp-client 2.6.1 + + com.googlecode.json-simple + json-simple + 1.1 + diff --git a/src/main/java/uk/co/samstokes/storm/TestTopology.java b/src/main/java/uk/co/samstokes/storm/TestTopology.java index 9eb959f..511833e 100644 --- a/src/main/java/uk/co/samstokes/storm/TestTopology.java +++ b/src/main/java/uk/co/samstokes/storm/TestTopology.java @@ -6,6 +6,7 @@ import backtype.storm.topology.TopologyBuilder; +import uk.co.samstokes.storm.scheme.JSONScheme; import uk.co.samstokes.storm.spout.AMQPSpout; public class TestTopology { @@ -21,7 +22,8 @@ public static void main (String[] args) { args[3], args[4], args[5], - args[6] + args[6], + new JSONScheme() )); final Config config = new Config(); diff --git a/src/main/java/uk/co/samstokes/storm/scheme/JSONScheme.java b/src/main/java/uk/co/samstokes/storm/scheme/JSONScheme.java new file mode 100644 index 0000000..6049a6c --- /dev/null +++ b/src/main/java/uk/co/samstokes/storm/scheme/JSONScheme.java @@ -0,0 +1,46 @@ +package uk.co.samstokes.storm.scheme; + +import java.io.UnsupportedEncodingException; + +import java.util.Collections; +import java.util.List; + +import org.json.simple.JSONValue; + +import backtype.storm.spout.Scheme; + +import backtype.storm.tuple.Fields; + + +public class JSONScheme implements Scheme { + private static final long serialVersionUID = -7734176307841199017L; + + private final String encoding; + + + public JSONScheme(String encoding) { + this.encoding = encoding; + } + public JSONScheme() { + this("UTF-8"); + } + + + @Override + public List deserialize(byte[] bytes) { + final String chars; + try { + chars = new String(bytes, encoding); + } catch (UnsupportedEncodingException e) { + throw new RuntimeException(e); + } + final Object json = JSONValue.parse(chars); + return Collections.singletonList(json); + } + + + @Override + public Fields getOutputFields() { + return new Fields("object"); + } +} diff --git a/src/main/java/uk/co/samstokes/storm/spout/AMQPSpout.java b/src/main/java/uk/co/samstokes/storm/spout/AMQPSpout.java index c5d994e..d158b5d 100644 --- a/src/main/java/uk/co/samstokes/storm/spout/AMQPSpout.java +++ b/src/main/java/uk/co/samstokes/storm/spout/AMQPSpout.java @@ -7,15 +7,12 @@ import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.QueueingConsumer; +import backtype.storm.spout.Scheme; import backtype.storm.spout.SpoutOutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.topology.IRichSpout; import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.tuple.Fields; -import backtype.storm.tuple.Values; - - public class AMQPSpout implements IRichSpout { private static final long serialVersionUID = 11258942292629263L; @@ -29,6 +26,8 @@ public class AMQPSpout implements IRichSpout { private final String amqpExchange; private final String amqpRoutingKey; + private final Scheme serialisationScheme; + private transient Connection amqpConnection; private transient Channel amqpChannel; private transient QueueingConsumer amqpConsumer; @@ -37,7 +36,7 @@ public class AMQPSpout implements IRichSpout { private SpoutOutputCollector collector; - public AMQPSpout(String host, int port, String username, String password, String vhost, String exchange, String routingKey) { + public AMQPSpout(String host, int port, String username, String password, String vhost, String exchange, String routingKey, Scheme scheme) { this.amqpHost = host; this.amqpPort = port; this.amqpUsername = username; @@ -45,6 +44,8 @@ public AMQPSpout(String host, int port, String username, String password, String this.amqpVhost = vhost; this.amqpExchange = exchange; this.amqpRoutingKey = routingKey; + + this.serialisationScheme = scheme; } @@ -96,9 +97,7 @@ public void nextTuple() { final QueueingConsumer.Delivery delivery = amqpConsumer.nextDelivery(WAIT_FOR_NEXT_MESSAGE); if (delivery == null) break; final byte[] message = delivery.getBody(); - // TODO do something more intelligent - final String todoHack = new String(message); - collector.emit(new Values(todoHack)); + collector.emit(serialisationScheme.deserialize(message)); } catch (InterruptedException e) { break; } @@ -164,8 +163,7 @@ private void setupAMQP() throws IOException { @Override public void declareOutputFields(OutputFieldsDeclarer declarer) { - // TODO do something more intelligent - declarer.declare(new Fields("todoHack")); + declarer.declare(serialisationScheme.getOutputFields()); }