Permalink
Browse files

Merge branch 'master' into logback3

Conflicts:
	project.clj
  • Loading branch information...
Jason Jackson
Jason Jackson committed Aug 20, 2012
2 parents a04991c + a4af188 commit 7f73c148248854181691b0eb81de48760b0a6e69
View
@@ -10,6 +10,7 @@
* Bug fix: Fix NPE when no input fields given for regular Aggregator
* Bug fix: Fix IndexOutOfBoundsExceptions when a bolt for global aggregation had a parallelism greater than 1 (possible with splitting, stateQuerying, and multiReduce)
* Bug fix: Fix "fields size" error that would sometimes occur when splitting a stream with multiple eaches
+ * Bug fix: Fix bug where a committer spout (including opaque spouts) could cause Trident processing to halt
## 0.8.0
View
@@ -1,4 +1,4 @@
-(defproject storm/storm "0.8.1-wip4"
+(defproject storm/storm "0.8.1-wip5"
:url "http://storm-project.clj"
:description "Distributed and fault-tolerant realtime computation"
:license {:name "Eclipse Public License - Version 1.0" :url "https://github.com/nathanmarz/storm/blob/master/LICENSE.html"}
@@ -488,8 +488,8 @@
(defn collectify [obj]
(if (or (sequential? obj) (instance? Collection obj)) obj [obj]))
-(defn to-json [^Map m]
- (JSONValue/toJSONString m))
+(defn to-json [obj]
+ (JSONValue/toJSONString obj))
(defn from-json [^String str]
(if str
@@ -1,5 +1,5 @@
(ns storm.trident.testing
- (:import [storm.trident.testing FeederBatchSpout MemoryMapState MemoryMapState$Factory])
+ (:import [storm.trident.testing FeederBatchSpout FeederCommitterBatchSpout MemoryMapState MemoryMapState$Factory TuplifyArgs])
(:import [backtype.storm LocalDRPC])
(:import [backtype.storm.tuple Fields])
(:import [backtype.storm.generated KillOptions])
@@ -14,9 +14,15 @@
(let [res (.execute drpc function-name args)]
(from-json res)))
+(defn exec-drpc-tuples [^LocalDRPC drpc function-name tuples]
+ (exec-drpc drpc function-name (to-json tuples)))
+
(defn feeder-spout [fields]
(FeederBatchSpout. fields))
+(defn feeder-committer-spout [fields]
+ (FeederCommitterBatchSpout. fields))
+
(defn feed [feeder tuples]
(.feed feeder tuples))
@@ -46,3 +52,11 @@
(import 'storm.trident.TridentTopology)
(import '[storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN])
)
+
+(defn drpc-tuples-input [topology function-name drpc outfields]
+ (-> topology
+ (.newDRPCStream function-name drpc)
+ (.each (fields "args") (TuplifyArgs.) outfields)
+ ))
+
+
@@ -49,7 +49,6 @@ public void execute(BatchInfo info, Tuple input) {
// there won't be a BatchInfo for the success stream
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
- _collector.setBatch(info.batchId);
if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
_activeBatches.remove(attempt.getTransactionId());
@@ -15,11 +15,12 @@
import storm.trident.topology.TransactionAttempt;
import storm.trident.topology.TridentTopologyBuilder;
-public class FeederBatchSpout implements ITridentSpout {
+public class FeederBatchSpout implements ITridentSpout, IFeeder {
String _id;
String _semaphoreId;
Fields _outFields;
+ boolean _waitToEmit = true;
public FeederBatchSpout(List<String> fields) {
@@ -28,6 +29,10 @@ public FeederBatchSpout(List<String> fields) {
_semaphoreId = RegisteredGlobalState.registerState(new CopyOnWriteArrayList());
}
+ public void setWaitToEmit(boolean trueIfWait) {
+ _waitToEmit = trueIfWait;
+ }
+
public void feed(Object tuples) {
Semaphore sem = new Semaphore(0);
((List)RegisteredGlobalState.getState(_semaphoreId)).add(sem);
@@ -93,6 +98,7 @@ public void success(long txid) {
@Override
public boolean isReady(long txid) {
+ if(!_waitToEmit) return true;
List allBatches = (List) RegisteredGlobalState.getState(_id);
if(allBatches.size() > _masterEmitted) {
_masterEmitted++;
@@ -0,0 +1,79 @@
+package storm.trident.testing;
+
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Fields;
+import java.util.List;
+import java.util.Map;
+import storm.trident.operation.TridentCollector;
+import storm.trident.spout.ICommitterTridentSpout;
+import storm.trident.spout.ITridentSpout;
+import storm.trident.topology.TransactionAttempt;
+
+
+public class FeederCommitterBatchSpout implements ICommitterTridentSpout, IFeeder {
+
+ FeederBatchSpout _spout;
+
+ public FeederCommitterBatchSpout(List<String> fields) {
+ _spout = new FeederBatchSpout(fields);
+ }
+
+ public void setWaitToEmit(boolean trueIfWait) {
+ _spout.setWaitToEmit(trueIfWait);
+ }
+
+ static class CommitterEmitter implements ICommitterTridentSpout.Emitter {
+ ITridentSpout.Emitter _emitter;
+
+
+ public CommitterEmitter(ITridentSpout.Emitter e) {
+ _emitter = e;
+ }
+
+ @Override
+ public void commit(TransactionAttempt attempt) {
+ }
+
+ @Override
+ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
+ _emitter.emitBatch(tx, coordinatorMeta, collector);
+ }
+
+ @Override
+ public void success(TransactionAttempt tx) {
+ _emitter.success(tx);
+ }
+
+ @Override
+ public void close() {
+ _emitter.close();
+ }
+
+ }
+
+ @Override
+ public Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
+ return new CommitterEmitter(_spout.getEmitter(txStateId, conf, context));
+ }
+
+ @Override
+ public BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
+ return _spout.getCoordinator(txStateId, conf, context);
+ }
+
+ @Override
+ public Fields getOutputFields() {
+ return _spout.getOutputFields();
+ }
+
+ @Override
+ public Map getComponentConfiguration() {
+ return _spout.getComponentConfiguration();
+ }
+
+ @Override
+ public void feed(Object tuples) {
+ _spout.feed(tuples);
+ }
+
+}
@@ -0,0 +1,6 @@
+package storm.trident.testing;
+
+
+public interface IFeeder {
+ void feed(Object tuples);
+}
@@ -0,0 +1,20 @@
+package storm.trident.testing;
+
+import java.util.List;
+import org.json.simple.JSONValue;
+import storm.trident.operation.BaseFunction;
+import storm.trident.operation.TridentCollector;
+import storm.trident.tuple.TridentTuple;
+
+public class TuplifyArgs extends BaseFunction {
+
+ @Override
+ public void execute(TridentTuple input, TridentCollector collector) {
+ String args = input.getString(0);
+ List<List<Object>> tuples = (List) JSONValue.parse(args);
+ for(List<Object> tuple: tuples) {
+ collector.emit(tuple);
+ }
+ }
+
+}
@@ -83,12 +83,14 @@ public static String spoutIdFromCoordinatorId(String coordId) {
return coordId.substring(SPOUT_COORD_PREFIX.length());
}
- Map<GlobalStreamId, String> fleshOutStreamBatchIds() {
+ Map<GlobalStreamId, String> fleshOutStreamBatchIds(boolean includeCommitStream) {
Map<GlobalStreamId, String> ret = new HashMap<GlobalStreamId, String>(_batchIds);
Set<String> allBatches = new HashSet(_batchIds.values());
for(String b: allBatches) {
ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.BATCH_STREAM_ID), b);
- ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID), b);
+ if(includeCommitStream) {
+ ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID), b);
+ }
// DO NOT include the success stream as part of the batch. it should not trigger coordination tuples,
// and is just a metadata tuple to assist in cleanup, should not trigger batch tracking
}
@@ -111,7 +113,8 @@ public static String spoutIdFromCoordinatorId(String coordId) {
public StormTopology buildTopology() {
TopologyBuilder builder = new TopologyBuilder();
- Map<GlobalStreamId, String> batchIds = fleshOutStreamBatchIds();
+ Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
+ Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);
Map<String, List<String>> batchesToCommitIds = new HashMap<String, List<String>>();
Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<String, List<ITridentSpout>>();
@@ -152,7 +155,7 @@ public StormTopology buildTopology() {
c.commitStateId,
c.streamName,
((ITridentSpout) c.spout)),
- batchIds,
+ batchIdsForSpouts,
specs),
c.parallelism);
bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
@@ -186,7 +189,7 @@ public StormTopology buildTopology() {
Map<String, CoordSpec> specs = new HashMap();
for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
- String batch = batchIds.get(s);
+ String batch = batchIdsForBolts.get(s);
if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec());
CoordSpec spec = specs.get(batch);
CoordType ct;
@@ -202,7 +205,7 @@ public StormTopology buildTopology() {
specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
}
- BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIds, specs), c.parallelism);
+ BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
for(Map conf: c.componentConfs) {
d.addConfigurations(conf);
}
@@ -38,6 +38,46 @@
(is (= [[8]] (exec-drpc drpc "words" "man where you the")))
)))))
+;; this test reproduces a bug where committer spouts freeze processing when
+;; there's at least one repartitioning after the spout
+(deftest test-word-count-committer-spout
+ (t/with-local-cluster [cluster]
+ (with-drpc [drpc]
+ (letlocals
+ (bind topo (TridentTopology.))
+ (bind feeder (feeder-committer-spout ["sentence"]))
+ (.setWaitToEmit feeder false) ;;this causes lots of empty batches
+ (bind word-counts
+ (-> topo
+ (.newStream "tester" feeder)
+ (.parallelismHint 2)
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (.parallelismHint 6)
+ ))
+ (-> topo
+ (.newDRPCStream "words" drpc)
+ (.each (fields "args") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
+ (.aggregate (fields "count") (Sum.) (fields "sum"))
+ (.project (fields "sum")))
+ (with-topology [cluster topo]
+ (feed feeder [["hello the man said"] ["the"]])
+ (is (= [[2]] (exec-drpc drpc "words" "the")))
+ (is (= [[1]] (exec-drpc drpc "words" "hello")))
+ (Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
+ (feed feeder [["the man on the moon"] ["where are you"]])
+ (is (= [[4]] (exec-drpc drpc "words" "the")))
+ (is (= [[2]] (exec-drpc drpc "words" "man")))
+ (is (= [[8]] (exec-drpc drpc "words" "man where you the")))
+ (feed feeder [["the the"]])
+ (is (= [[6]] (exec-drpc drpc "words" "the")))
+ (feed feeder [["the"]])
+ (is (= [[7]] (exec-drpc drpc "words" "the")))
+ )))))
+
(deftest test-count-agg
(t/with-local-cluster [cluster]
@@ -76,4 +116,26 @@
(with-topology [cluster topo]
(is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
(is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
- )))))
+ )))))
+
+
+;; (deftest test-split-merge
+;; (t/with-local-cluster [cluster]
+;; (with-drpc [drpc]
+;; (letlocals
+;; (bind topo (TridentTopology.))
+;; (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
+;; (bind s1
+;; (-> drpc-stream
+;; (.each (fields "args") (Split.) (fields "word"))
+;; (.project (fields "word"))))
+;; (bind s2
+;; (-> drpc-stream
+;; (.each (fields "args") (StringLength.) (fields "len"))
+;; (.project (fields "len"))))
+;;
+;; (.merge topo [s1 s2])
+;; (with-topology [cluster topo]
+;; (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
+;; (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
+;; )))))

0 comments on commit 7f73c14

Please sign in to comment.