Skip to content

Commit

Permalink
fix bug where committer spouts (including opaque spouts) could cause …
Browse files Browse the repository at this point in the history
…processing to freeze due to commit being interpreted as new batch of processing
  • Loading branch information
Nathan Marz committed Aug 20, 2012
1 parent 09f86bd commit f66ad09
Show file tree
Hide file tree
Showing 9 changed files with 201 additions and 12 deletions.
4 changes: 2 additions & 2 deletions src/clj/backtype/storm/util.clj
Expand Up @@ -488,8 +488,8 @@
(defn collectify [obj]
(if (or (sequential? obj) (instance? Collection obj)) obj [obj]))

(defn to-json [^Map m]
(JSONValue/toJSONString m))
(defn to-json [obj]
(JSONValue/toJSONString obj))

(defn from-json [^String str]
(if str
Expand Down
16 changes: 15 additions & 1 deletion src/clj/storm/trident/testing.clj
@@ -1,5 +1,5 @@
(ns storm.trident.testing
(:import [storm.trident.testing FeederBatchSpout MemoryMapState MemoryMapState$Factory])
(:import [storm.trident.testing FeederBatchSpout FeederCommitterBatchSpout MemoryMapState MemoryMapState$Factory TuplifyArgs])
(:import [backtype.storm LocalDRPC])
(:import [backtype.storm.tuple Fields])
(:import [backtype.storm.generated KillOptions])
Expand All @@ -14,9 +14,15 @@
(let [res (.execute drpc function-name args)]
(from-json res)))

(defn exec-drpc-tuples [^LocalDRPC drpc function-name tuples]
(exec-drpc drpc function-name (to-json tuples)))

(defn feeder-spout [fields]
(FeederBatchSpout. fields))

(defn feeder-committer-spout [fields]
(FeederCommitterBatchSpout. fields))

(defn feed [feeder tuples]
(.feed feeder tuples))

Expand Down Expand Up @@ -46,3 +52,11 @@
(import 'storm.trident.TridentTopology)
(import '[storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN])
)

(defn drpc-tuples-input [topology function-name drpc outfields]
(-> topology
(.newDRPCStream function-name drpc)
(.each (fields "args") (TuplifyArgs.) outfields)
))


1 change: 0 additions & 1 deletion src/jvm/storm/trident/spout/TridentSpoutExecutor.java
Expand Up @@ -48,7 +48,6 @@ public void execute(BatchInfo info, Tuple input) {
// there won't be a BatchInfo for the success stream
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
_collector.setBatch(info.batchId);
if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
_activeBatches.remove(attempt.getTransactionId());
Expand Down
8 changes: 7 additions & 1 deletion src/jvm/storm/trident/testing/FeederBatchSpout.java
Expand Up @@ -15,11 +15,12 @@
import storm.trident.topology.TransactionAttempt;
import storm.trident.topology.TridentTopologyBuilder;

public class FeederBatchSpout implements ITridentSpout {
public class FeederBatchSpout implements ITridentSpout, IFeeder {

String _id;
String _semaphoreId;
Fields _outFields;
boolean _waitToEmit = true;


public FeederBatchSpout(List<String> fields) {
Expand All @@ -28,6 +29,10 @@ public FeederBatchSpout(List<String> fields) {
_semaphoreId = RegisteredGlobalState.registerState(new CopyOnWriteArrayList());
}

public void setWaitToEmit(boolean trueIfWait) {
_waitToEmit = trueIfWait;
}

public void feed(Object tuples) {
Semaphore sem = new Semaphore(0);
((List)RegisteredGlobalState.getState(_semaphoreId)).add(sem);
Expand Down Expand Up @@ -93,6 +98,7 @@ public void success(long txid) {

@Override
public boolean isReady(long txid) {
if(!_waitToEmit) return true;
List allBatches = (List) RegisteredGlobalState.getState(_id);
if(allBatches.size() > _masterEmitted) {
_masterEmitted++;
Expand Down
79 changes: 79 additions & 0 deletions src/jvm/storm/trident/testing/FeederCommitterBatchSpout.java
@@ -0,0 +1,79 @@
package storm.trident.testing;

import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.spout.ICommitterTridentSpout;
import storm.trident.spout.ITridentSpout;
import storm.trident.topology.TransactionAttempt;


public class FeederCommitterBatchSpout implements ICommitterTridentSpout, IFeeder {

FeederBatchSpout _spout;

public FeederCommitterBatchSpout(List<String> fields) {
_spout = new FeederBatchSpout(fields);
}

public void setWaitToEmit(boolean trueIfWait) {
_spout.setWaitToEmit(trueIfWait);
}

static class CommitterEmitter implements ICommitterTridentSpout.Emitter {
ITridentSpout.Emitter _emitter;


public CommitterEmitter(ITridentSpout.Emitter e) {
_emitter = e;
}

@Override
public void commit(TransactionAttempt attempt) {
}

@Override
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
_emitter.emitBatch(tx, coordinatorMeta, collector);
}

@Override
public void success(TransactionAttempt tx) {
_emitter.success(tx);
}

@Override
public void close() {
_emitter.close();
}

}

@Override
public Emitter getEmitter(String txStateId, Map conf, TopologyContext context) {
return new CommitterEmitter(_spout.getEmitter(txStateId, conf, context));
}

@Override
public BatchCoordinator getCoordinator(String txStateId, Map conf, TopologyContext context) {
return _spout.getCoordinator(txStateId, conf, context);
}

@Override
public Fields getOutputFields() {
return _spout.getOutputFields();
}

@Override
public Map getComponentConfiguration() {
return _spout.getComponentConfiguration();
}

@Override
public void feed(Object tuples) {
_spout.feed(tuples);
}

}
6 changes: 6 additions & 0 deletions src/jvm/storm/trident/testing/IFeeder.java
@@ -0,0 +1,6 @@
package storm.trident.testing;


public interface IFeeder {
void feed(Object tuples);
}
20 changes: 20 additions & 0 deletions src/jvm/storm/trident/testing/TuplifyArgs.java
@@ -0,0 +1,20 @@
package storm.trident.testing;

import java.util.List;
import org.json.simple.JSONValue;
import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class TuplifyArgs extends BaseFunction {

@Override
public void execute(TridentTuple input, TridentCollector collector) {
String args = input.getString(0);
List<List<Object>> tuples = (List) JSONValue.parse(args);
for(List<Object> tuple: tuples) {
collector.emit(tuple);
}
}

}
15 changes: 9 additions & 6 deletions src/jvm/storm/trident/topology/TridentTopologyBuilder.java
Expand Up @@ -83,12 +83,14 @@ public static String spoutIdFromCoordinatorId(String coordId) {
return coordId.substring(SPOUT_COORD_PREFIX.length());
}

Map<GlobalStreamId, String> fleshOutStreamBatchIds() {
Map<GlobalStreamId, String> fleshOutStreamBatchIds(boolean includeCommitStream) {
Map<GlobalStreamId, String> ret = new HashMap<GlobalStreamId, String>(_batchIds);
Set<String> allBatches = new HashSet(_batchIds.values());
for(String b: allBatches) {
ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.BATCH_STREAM_ID), b);
ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID), b);
if(includeCommitStream) {
ret.put(new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID), b);
}
// DO NOT include the success stream as part of the batch. it should not trigger coordination tuples,
// and is just a metadata tuple to assist in cleanup, should not trigger batch tracking
}
Expand All @@ -111,7 +113,8 @@ Map<GlobalStreamId, String> fleshOutStreamBatchIds() {

public StormTopology buildTopology() {
TopologyBuilder builder = new TopologyBuilder();
Map<GlobalStreamId, String> batchIds = fleshOutStreamBatchIds();
Map<GlobalStreamId, String> batchIdsForSpouts = fleshOutStreamBatchIds(false);
Map<GlobalStreamId, String> batchIdsForBolts = fleshOutStreamBatchIds(true);

Map<String, List<String>> batchesToCommitIds = new HashMap<String, List<String>>();
Map<String, List<ITridentSpout>> batchesToSpouts = new HashMap<String, List<ITridentSpout>>();
Expand Down Expand Up @@ -152,7 +155,7 @@ public StormTopology buildTopology() {
c.commitStateId,
c.streamName,
((ITridentSpout) c.spout)),
batchIds,
batchIdsForSpouts,
specs),
c.parallelism);
bd.allGrouping(spoutCoordinator(id), MasterBatchCoordinator.BATCH_STREAM_ID);
Expand Down Expand Up @@ -186,7 +189,7 @@ public StormTopology buildTopology() {
Map<String, CoordSpec> specs = new HashMap();

for(GlobalStreamId s: getBoltSubscriptionStreams(id)) {
String batch = batchIds.get(s);
String batch = batchIdsForBolts.get(s);
if(!specs.containsKey(batch)) specs.put(batch, new CoordSpec());
CoordSpec spec = specs.get(batch);
CoordType ct;
Expand All @@ -202,7 +205,7 @@ public StormTopology buildTopology() {
specs.get(b).commitStream = new GlobalStreamId(masterCoordinator(b), MasterBatchCoordinator.COMMIT_STREAM_ID);
}

BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIds, specs), c.parallelism);
BoltDeclarer d = builder.setBolt(id, new TridentBoltExecutor(c.bolt, batchIdsForBolts, specs), c.parallelism);
for(Map conf: c.componentConfs) {
d.addConfigurations(conf);
}
Expand Down
64 changes: 63 additions & 1 deletion test/clj/storm/trident/integration_test.clj
Expand Up @@ -38,6 +38,46 @@
(is (= [[8]] (exec-drpc drpc "words" "man where you the")))
)))))

;; this test reproduces a bug where committer spouts freeze processing when
;; there's at least one repartitioning after the spout
(deftest test-word-count-committer-spout
(t/with-local-cluster [cluster]
(with-drpc [drpc]
(letlocals
(bind topo (TridentTopology.))
(bind feeder (feeder-committer-spout ["sentence"]))
(.setWaitToEmit feeder false) ;;this causes lots of empty batches
(bind word-counts
(-> topo
(.newStream "tester" feeder)
(.parallelismHint 2)
(.each (fields "sentence") (Split.) (fields "word"))
(.groupBy (fields "word"))
(.persistentAggregate (memory-map-state) (Count.) (fields "count"))
(.parallelismHint 6)
))
(-> topo
(.newDRPCStream "words" drpc)
(.each (fields "args") (Split.) (fields "word"))
(.groupBy (fields "word"))
(.stateQuery word-counts (fields "word") (MapGet.) (fields "count"))
(.aggregate (fields "count") (Sum.) (fields "sum"))
(.project (fields "sum")))
(with-topology [cluster topo]
(feed feeder [["hello the man said"] ["the"]])
(is (= [[2]] (exec-drpc drpc "words" "the")))
(is (= [[1]] (exec-drpc drpc "words" "hello")))
(Thread/sleep 1000) ;; this is necessary to reproduce the bug where committer spouts freeze processing
(feed feeder [["the man on the moon"] ["where are you"]])
(is (= [[4]] (exec-drpc drpc "words" "the")))
(is (= [[2]] (exec-drpc drpc "words" "man")))
(is (= [[8]] (exec-drpc drpc "words" "man where you the")))
(feed feeder [["the the"]])
(is (= [[6]] (exec-drpc drpc "words" "the")))
(feed feeder [["the"]])
(is (= [[7]] (exec-drpc drpc "words" "the")))
)))))


(deftest test-count-agg
(t/with-local-cluster [cluster]
Expand Down Expand Up @@ -76,4 +116,26 @@
(with-topology [cluster topo]
(is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
(is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
)))))
)))))


;; (deftest test-split-merge
;; (t/with-local-cluster [cluster]
;; (with-drpc [drpc]
;; (letlocals
;; (bind topo (TridentTopology.))
;; (bind drpc-stream (-> topo (.newDRPCStream "splitter" drpc)))
;; (bind s1
;; (-> drpc-stream
;; (.each (fields "args") (Split.) (fields "word"))
;; (.project (fields "word"))))
;; (bind s2
;; (-> drpc-stream
;; (.each (fields "args") (StringLength.) (fields "len"))
;; (.project (fields "len"))))
;;
;; (.merge topo [s1 s2])
;; (with-topology [cluster topo]
;; (is (t/ms= [[7] ["the"] ["man"]] (exec-drpc drpc "splitter" "the man")))
;; (is (t/ms= [[5] ["hello"]] (exec-drpc drpc "splitter" "hello")))
;; )))))

0 comments on commit f66ad09

Please sign in to comment.