Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

Commit

Permalink
Use deserialisation scheme for messages
Browse files Browse the repository at this point in the history
  • Loading branch information
samstokes committed Sep 25, 2011
1 parent 43a0b9e commit 011713f
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 11 deletions.
5 changes: 5 additions & 0 deletions pom.xml
Expand Up @@ -20,6 +20,11 @@
<artifactId>amqp-client</artifactId>
<version>2.6.1</version>
</dependency>
<dependency>
<groupId>com.googlecode.json-simple</groupId>
<artifactId>json-simple</artifactId>
<version>1.1</version>
</dependency>
</dependencies>
<build>
<plugins>
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/uk/co/samstokes/storm/TestTopology.java
Expand Up @@ -6,6 +6,7 @@

import backtype.storm.topology.TopologyBuilder;

import uk.co.samstokes.storm.scheme.JSONScheme;
import uk.co.samstokes.storm.spout.AMQPSpout;

public class TestTopology {
Expand All @@ -21,7 +22,8 @@ public static void main (String[] args) {
args[3],
args[4],
args[5],
args[6]
args[6],
new JSONScheme()
));

final Config config = new Config();
Expand Down
46 changes: 46 additions & 0 deletions src/main/java/uk/co/samstokes/storm/scheme/JSONScheme.java
@@ -0,0 +1,46 @@
package uk.co.samstokes.storm.scheme;

import java.io.UnsupportedEncodingException;

import java.util.Collections;
import java.util.List;

import org.json.simple.JSONValue;

import backtype.storm.spout.Scheme;

import backtype.storm.tuple.Fields;


public class JSONScheme implements Scheme {
private static final long serialVersionUID = -7734176307841199017L;

private final String encoding;


public JSONScheme(String encoding) {
this.encoding = encoding;
}
public JSONScheme() {
this("UTF-8");
}


@Override
public List<Object> deserialize(byte[] bytes) {
final String chars;
try {
chars = new String(bytes, encoding);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
final Object json = JSONValue.parse(chars);
return Collections.singletonList(json);
}


@Override
public Fields getOutputFields() {
return new Fields("object");
}
}
18 changes: 8 additions & 10 deletions src/main/java/uk/co/samstokes/storm/spout/AMQPSpout.java
Expand Up @@ -7,15 +7,12 @@
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import backtype.storm.spout.Scheme;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;

import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;


public class AMQPSpout implements IRichSpout {
private static final long serialVersionUID = 11258942292629263L;

Expand All @@ -29,6 +26,8 @@ public class AMQPSpout implements IRichSpout {
private final String amqpExchange;
private final String amqpRoutingKey;

private final Scheme serialisationScheme;

private transient Connection amqpConnection;
private transient Channel amqpChannel;
private transient QueueingConsumer amqpConsumer;
Expand All @@ -37,14 +36,16 @@ public class AMQPSpout implements IRichSpout {
private SpoutOutputCollector collector;


public AMQPSpout(String host, int port, String username, String password, String vhost, String exchange, String routingKey) {
public AMQPSpout(String host, int port, String username, String password, String vhost, String exchange, String routingKey, Scheme scheme) {
this.amqpHost = host;
this.amqpPort = port;
this.amqpUsername = username;
this.amqpPassword = password;
this.amqpVhost = vhost;
this.amqpExchange = exchange;
this.amqpRoutingKey = routingKey;

this.serialisationScheme = scheme;
}


Expand Down Expand Up @@ -96,9 +97,7 @@ public void nextTuple() {
final QueueingConsumer.Delivery delivery = amqpConsumer.nextDelivery(WAIT_FOR_NEXT_MESSAGE);
if (delivery == null) break;
final byte[] message = delivery.getBody();
// TODO do something more intelligent
final String todoHack = new String(message);
collector.emit(new Values(todoHack));
collector.emit(serialisationScheme.deserialize(message));
} catch (InterruptedException e) {
break;
}
Expand Down Expand Up @@ -164,8 +163,7 @@ private void setupAMQP() throws IOException {

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO do something more intelligent
declarer.declare(new Fields("todoHack"));
declarer.declare(serialisationScheme.getOutputFields());
}


Expand Down

0 comments on commit 011713f

Please sign in to comment.