Skip to content

Commit

Permalink
add multischeme support.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sam Ritchie committed Nov 30, 2012
1 parent ac20d14 commit 5a12db0
Showing 1 changed file with 42 additions and 28 deletions.
70 changes: 42 additions & 28 deletions src/jvm/backtype/storm/spout/KestrelThriftSpout.java
Original file line number Original file line Diff line number Diff line change
@@ -1,22 +1,25 @@
package backtype.storm.spout; package backtype.storm.spout;


import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;

import net.lag.kestrel.thrift.Item;

import org.apache.log4j.Logger;
import org.apache.thrift7.TException;

import backtype.storm.Config; import backtype.storm.Config;
import backtype.storm.task.TopologyContext; import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.topology.base.BaseRichSpout; import backtype.storm.topology.base.BaseRichSpout;
import java.util.HashSet;
import java.util.Map;
import java.util.List;
import java.util.Iterator;
import java.util.Queue;
import java.util.LinkedList;
import backtype.storm.tuple.Fields; import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils; import backtype.storm.utils.Utils;
import java.util.ArrayList;
import java.util.Arrays;
import org.apache.log4j.Logger;
import net.lag.kestrel.thrift.Item;
import org.apache.thrift7.TException;




/** /**
Expand All @@ -36,7 +39,7 @@ public class KestrelThriftSpout extends BaseRichSpout {
private int _port = -1; private int _port = -1;
private String _queueName = null; private String _queueName = null;
private SpoutOutputCollector _collector; private SpoutOutputCollector _collector;
private Scheme _scheme; private MultiScheme _scheme;


private List<KestrelClientInfo> _kestrels; private List<KestrelClientInfo> _kestrels;
private int _emitIndex; private int _emitIndex;
Expand Down Expand Up @@ -93,26 +96,34 @@ public void closeClient() {
} }
} }


public KestrelThriftSpout(List<String> hosts, int port, String queueName, Scheme scheme) { public KestrelThriftSpout(List<String> hosts, int port, String queueName, Scheme scheme) {
if(hosts.isEmpty()) { this(hosts, port, queueName, new SchemeAsMultiScheme(scheme));
throw new IllegalArgumentException("Must configure at least one host"); }
}
_port = port;
_hosts = hosts;
_queueName = queueName;
_scheme = scheme;
}


public KestrelThriftSpout(String hostname, int port, String queueName, Scheme scheme) { public KestrelThriftSpout(List<String> hosts, int port, String queueName, MultiScheme scheme) {
this(Arrays.asList(hostname), port, queueName, scheme); if(hosts.isEmpty()) {
throw new IllegalArgumentException("Must configure at least one host");
} }
_port = port;
_hosts = hosts;
_queueName = queueName;
_scheme = scheme;
}

public KestrelThriftSpout(String hostname, int port, String queueName, Scheme scheme) {
this(hostname, port, queueName, new SchemeAsMultiScheme(scheme));
}

public KestrelThriftSpout(String hostname, int port, String queueName, MultiScheme scheme) {
this(Arrays.asList(hostname), port, queueName, scheme);
}


public KestrelThriftSpout(String hostname, int port, String queueName) { public KestrelThriftSpout(String hostname, int port, String queueName) {
this(hostname, port, queueName, new RawScheme()); this(hostname, port, queueName, new RawMultiScheme());
} }


public KestrelThriftSpout(List<String> hosts, int port, String queueName) { public KestrelThriftSpout(List<String> hosts, int port, String queueName) {
this(hosts, port, queueName, new RawScheme()); this(hosts, port, queueName, new RawMultiScheme());
} }


public Fields getOutputFields() { public Fields getOutputFields() {
Expand Down Expand Up @@ -173,14 +184,17 @@ public boolean bufferKestrelGet(int index) {
HashSet toAck = new HashSet(); HashSet toAck = new HashSet();


for(Item item : items) { for(Item item : items) {
List<Object> retItems = _scheme.deserialize(item.get_data()); Iterable<List<Object>> retItems = _scheme.deserialize(item.get_data());


if (retItems != null) { if (retItems != null) {
EmitItem emitItem = new EmitItem(retItems, new KestrelSourceId(index, item.get_id())); for(List<Object> retItem: retItems) {
EmitItem emitItem = new EmitItem(retItem, new KestrelSourceId(index, item.get_id()));


if(!_emitBuffer.offer(emitItem)) { if(!_emitBuffer.offer(emitItem)) {
throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed."); throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed.");
}
} }

} else { } else {
toAck.add(item.get_id()); toAck.add(item.get_id());
} }
Expand Down

0 comments on commit 5a12db0

Please sign in to comment.