Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

add support for null return vals from spout #5

Merged
merged 4 commits into from

1 participant

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 31, 2012
  1. @sritchie
  2. @sritchie

    Batch ack failed items.

    sritchie authored
  3. @sritchie

    Confirm, not abort.

    sritchie authored
Commits on Aug 1, 2012
  1. @sritchie

    version bump.

    sritchie authored
This page is out of date. Refresh to see the latest.
Showing with 46 additions and 32 deletions.
  1. +2 −3 project.clj
  2. +44 −29 src/jvm/backtype/storm/spout/KestrelThriftSpout.java
View
5 project.clj
@@ -1,4 +1,4 @@
-(defproject storm/storm-kestrel "0.7.2-snap2"
+(defproject storm/storm-kestrel "0.7.2-snap3"
:source-path "src/clj"
:java-source-path "src/jvm"
:javac-options {:debug "true" :fork "true"}
@@ -6,5 +6,4 @@
:dev-dependencies [[storm "0.7.0"]
[org.clojure/clojure "1.2.0"]
[org.clojure/clojure-contrib "1.2.0"]]
- :jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"]
-)
+ :jvm-opts ["-Djava.library.path=/usr/local/lib:/opt/local/lib:/usr/lib"])
View
73 src/jvm/backtype/storm/spout/KestrelThriftSpout.java
@@ -20,7 +20,7 @@
/**
- * This spout can be used to consume messages in a reliable way from a cluster
+ * This spout can be used to consume messages in a reliable way from a cluster
* of Kestrel servers. It is recommended that you set the parallelism hint to a
* multiple of the number of Kestrel servers, otherwise the read load will be
* higher on some Kestrel servers than others.
@@ -30,8 +30,8 @@
public static final long BLACKLIST_TIME_MS = 1000 * 60;
public static final int BATCH_SIZE = 4000;
-
-
+
+
private List<String> _hosts = null;
private int _port = -1;
private String _queueName = null;
@@ -52,24 +52,24 @@ public EmitItem(List<Object> tuple, KestrelSourceId sourceId) {
this.sourceId = sourceId;
}
}
-
+
private static class KestrelSourceId {
public KestrelSourceId(int index, long id) {
this.index = index;
this.id = id;
}
-
+
int index;
long id;
}
-
+
private static class KestrelClientInfo {
public Long blacklistTillTimeMs;
public String host;
public int port;
private KestrelThriftClient client;
-
+
public KestrelClientInfo(String host, int port) {
this.host = host;
this.port = port;
@@ -92,7 +92,7 @@ public void closeClient() {
}
}
}
-
+
public KestrelThriftSpout(List<String> hosts, int port, String queueName, Scheme scheme) {
if(hosts.isEmpty()) {
throw new IllegalArgumentException("Must configure at least one host");
@@ -100,9 +100,9 @@ public KestrelThriftSpout(List<String> hosts, int port, String queueName, Scheme
_port = port;
_hosts = hosts;
_queueName = queueName;
- _scheme = scheme;
+ _scheme = scheme;
}
-
+
public KestrelThriftSpout(String hostname, int port, String queueName, Scheme scheme) {
this(Arrays.asList(hostname), port, queueName, scheme);
}
@@ -110,7 +110,7 @@ public KestrelThriftSpout(String hostname, int port, String queueName, Scheme sc
public KestrelThriftSpout(String hostname, int port, String queueName) {
this(hostname, port, queueName, new RawScheme());
}
-
+
public KestrelThriftSpout(List<String> hosts, int port, String queueName) {
this(hosts, port, queueName, new RawScheme());
}
@@ -118,9 +118,9 @@ public KestrelThriftSpout(List<String> hosts, int port, String queueName) {
public Fields getOutputFields() {
return _scheme.getOutputFields();
}
-
+
int _messageTimeoutMillis;
-
+
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
//TODO: should switch this to maxTopologyMessageTimeout
Number timeout = (Number) conf.get(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS);
@@ -138,13 +138,13 @@ public void open(Map conf, TopologyContext context, SpoutOutputCollector collect
} else {
String host = _hosts.get(myIndex % numHosts);
_kestrels.add(new KestrelClientInfo(host, _port));
- }
+ }
}
public void close() {
for(KestrelClientInfo info: _kestrels) info.closeClient();
- // Closing the client connection causes all the open reliable reads to be aborted.
+ // Closing the client connection causes all the open reliable reads to be aborted.
// Thus, clear our local buffer of these reliable reads.
_emitBuffer.clear();
@@ -167,14 +167,30 @@ public boolean bufferKestrelGet(int index) {
}
assert items.size() <= BATCH_SIZE;
-// LOG.info("Kestrel batch get fetched " + items.size() + " items. (batchSize= " + BATCH_SIZE +
+// LOG.info("Kestrel batch get fetched " + items.size() + " items. (batchSize= " + BATCH_SIZE +
// " queueName=" + _queueName + ", index=" + index + ", host=" + info.host + ")");
+ HashSet toAck = new HashSet();
+
for(Item item : items) {
- EmitItem emitItem = new EmitItem(_scheme.deserialize(item.get_data()),
- new KestrelSourceId(index, item.get_id()));
- if(!_emitBuffer.offer(emitItem)) {
- throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed.");
+ List<Object> retItems = _scheme.deserialize(item.get_data());
+
+ if (retItems != null) {
+ EmitItem emitItem = new EmitItem(retItems, new KestrelSourceId(index, item.get_id()));
+
+ if(!_emitBuffer.offer(emitItem)) {
+ throw new RuntimeException("KestrelThriftSpout's Internal Buffer Enqeueue Failed.");
+ }
+ } else {
+ toAck.add(item.get_id());
+ }
+ }
+
+ if(toAck.size() > 0) {
+ try {
+ info.client.confirm(_queueName, toAck);
+ } catch(TException e) {
+ blacklist(info, e);
}
}
@@ -198,10 +214,10 @@ public void nextTuple() {
if(_emitBuffer.isEmpty()) tryEachKestrelUntilBufferFilled();
EmitItem item = _emitBuffer.poll();
- if(item != null) {
+ if(item != null) {
_collector.emit(item.tuple, item.sourceId);
} else { // If buffer is still empty here, then every kestrel Q is also empty.
- Utils.sleep(10);
+ Utils.sleep(10);
}
}
@@ -224,9 +240,9 @@ private void blacklist(KestrelClientInfo info, Throwable t) {
public void ack(Object msgId) {
KestrelSourceId sourceId = (KestrelSourceId) msgId;
KestrelClientInfo info = _kestrels.get(sourceId.index);
-
+
//if the transaction didn't exist, it just returns false. so this code works
- //even if client gets blacklisted, disconnects, and kestrel puts the item
+ //even if client gets blacklisted, disconnects, and kestrel puts the item
//back on the queue
try {
if(info.client!=null) {
@@ -235,14 +251,14 @@ public void ack(Object msgId) {
info.client.confirm(_queueName, xids);
}
} catch(TException e) {
- blacklist(info, e);
+ blacklist(info, e);
}
}
-
+
public void fail(Object msgId) {
KestrelSourceId sourceId = (KestrelSourceId) msgId;
KestrelClientInfo info = _kestrels.get(sourceId.index);
-
+
// see not above about why this works with blacklisting strategy
try {
if(info.client!=null) {
@@ -251,7 +267,7 @@ public void fail(Object msgId) {
info.client.abort(_queueName, xids);
}
} catch(TException e) {
- blacklist(info, e);
+ blacklist(info, e);
}
}
@@ -259,4 +275,3 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(getOutputFields());
}
}
-
Something went wrong with that request. Please try again.