Permalink
Browse files

Merge pull request #10 from nathanmarz/multischeme

Add MultiScheme Support
  • Loading branch information...
2 parents ac20d14 + 5a12db0 commit 68e9a24d7d2b6b2ff657dda2dd852a3aba07de20 @nathanmarz committed Nov 30, 2012
Showing with 42 additions and 28 deletions.
  1. +42 −28 src/jvm/backtype/storm/spout/KestrelThriftSpout.java
@@ -1,22 +1,25 @@
package backtype.storm.spout;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+
+import net.lag.kestrel.thrift.Item;
+
+import org.apache.log4j.Logger;
+import org.apache.thrift7.TException;
+
import backtype.storm.Config;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.List;
-import java.util.Iterator;
-import java.util.Queue;
-import java.util.LinkedList;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
-import java.util.ArrayList;
-import java.util.Arrays;
-import org.apache.log4j.Logger;
-import net.lag.kestrel.thrift.Item;
-import org.apache.thrift7.TException;
/**
@@ -36,7 +39,7 @@
private int _port = -1;
private String _queueName = null;
private SpoutOutputCollector _collector;
- private Scheme _scheme;
+ private MultiScheme _scheme;
private List<KestrelClientInfo> _kestrels;
private int _emitIndex;
@@ -93,26 +96,34 @@ public void closeClient() {
}
}
- public KestrelThriftSpout(List<String> hosts, int port, String queueName, Scheme scheme) {
- if(hosts.isEmpty()) {
- throw new IllegalArgumentException("Must configure at least one host");
- }
- _port = port;
- _hosts = hosts;
- _queueName = queueName;
- _scheme = scheme;
- }
+ public KestrelThriftSpout(List<String> hosts, int port, String queueName, Scheme scheme) {
+ this(hosts, port, queueName, new SchemeAsMultiScheme(scheme));
+ }
- public KestrelThriftSpout(String hostname, int port, String queueName, Scheme scheme) {
- this(Arrays.asList(hostname), port, queueName, scheme);
+ public KestrelThriftSpout(List<String> hosts, int port, String queueName, MultiScheme scheme) {
+ if(hosts.isEmpty()) {
+ throw new IllegalArgumentException("Must configure at least one host");
}
+ _port = port;
+ _hosts = hosts;
+ _queueName = queueName;
+ _scheme = scheme;
+ }
+
+ public KestrelThriftSpout(String hostname, int port, String queueName, Scheme scheme) {
+ this(hostname, port, queueName, new SchemeAsMultiScheme(scheme));
+ }
+
+ public KestrelThriftSpout(String hostname, int port, String queueName, MultiScheme scheme) {
+ this(Arrays.asList(hostname), port, queueName, scheme);
+ }
public KestrelThriftSpout(String hostname, int port, String queueName) {
- this(hostname, port, queueName, new RawScheme());
+ this(hostname, port, queueName, new RawMultiScheme());
}
public KestrelThriftSpout(List<String> hosts, int port, String queueName) {
- this(hosts, port, queueName, new RawScheme());
+ this(hosts, port, queueName, new RawMultiScheme());
}
public Fields getOutputFields() {
@@ -173,14 +184,17 @@ public boolean bufferKestrelGet(int index) {
HashSet toAck = new HashSet();
for(Item item : items) {
- List<Object> retItems = _scheme.deserialize(item.get_data());
+ Iterable<List<Object>> retItems = _scheme.deserialize(item.get_data());
if (retItems != null) {
- EmitItem emitItem = new EmitItem(retItems, new KestrelSourceId(index, item.get_id()));
+ for(List<Object> retItem: retItems) {
+ EmitItem emitItem = new EmitItem(retItem, new KestrelSourceId(index, item.get_id()));
- if(!_emitBuffer.offer(emitItem)) {
+ if(!_emitBuffer.offer(emitItem)) {
throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed.");
+ }
}
+
} else {
toAck.add(item.get_id());
}

0 comments on commit 68e9a24

Please sign in to comment.