Skip to content

Commit

Permalink
unify committer and batch bolt abstractions with the ICommitter inter…
Browse files Browse the repository at this point in the history
…face, this greatly simplifies the transactional topology abstraction and makes things more reusable
  • Loading branch information
Nathan Marz committed Jan 27, 2012
1 parent 634706f commit 297ee5a
Show file tree
Hide file tree
Showing 12 changed files with 37 additions and 124 deletions.
1 change: 0 additions & 1 deletion src/clj/backtype/storm/bootstrap.clj
Expand Up @@ -14,7 +14,6 @@
(import (quote [backtype.storm.task IBolt IOutputCollector
OutputCollector OutputCollectorImpl IInternalOutputCollector
TopologyContext ShellBolt]))
(import (quote [backtype.storm.transactional ICommitterBolt CommitterBoltExecutor]))
(import (quote [backtype.storm.coordination CoordinatedBolt CoordinatedBolt$SourceArgs
IBatchBolt BatchBoltExecutor]))
(import (quote [backtype.storm.drpc KeyedFairBolt]))
Expand Down
4 changes: 2 additions & 2 deletions src/jvm/backtype/storm/coordination/IBatchBolt.java
Expand Up @@ -6,8 +6,8 @@
import java.io.Serializable;
import java.util.Map;

public interface IBatchBolt extends Serializable, IComponent {
void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, Object id);
public interface IBatchBolt<T> extends Serializable, IComponent {
void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, T id);
void execute(Tuple tuple);
void finishBatch();
}
15 changes: 9 additions & 6 deletions src/jvm/backtype/storm/testing/CountingCommitBolt.java
Expand Up @@ -3,20 +3,23 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.topology.base.BaseCommitterBolt;
import backtype.storm.topology.base.BaseTransactionalBolt;
import backtype.storm.transactional.ICommitter;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.Map;

public class CountingCommitBolt extends BaseCommitterBolt {
public class CountingCommitBolt extends BaseTransactionalBolt implements ICommitter {
BatchOutputCollector _collector;
TransactionAttempt _id;
int _count = 0;

@Override
public void prepare(Map conf, TopologyContext context, TransactionAttempt id) {
public void prepare(Map conf, TopologyContext context, BatchOutputCollector collector, TransactionAttempt id) {
_id = id;
_collector = collector;
}

@Override
Expand All @@ -25,13 +28,13 @@ public void execute(Tuple tuple) {
}

@Override
public void commit(BatchOutputCollector collector) {
collector.emit(new Values(_id, _count));
public void finishBatch() {
_collector.emit(new Values(_id, _count));
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tx", "count"));
}

}
40 changes: 2 additions & 38 deletions src/jvm/backtype/storm/testing/KeyedCountingCommitterBolt.java
@@ -1,43 +1,7 @@
package backtype.storm.testing;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.coordination.BatchOutputCollector;
import backtype.storm.topology.base.BaseCommitterBolt;
import backtype.storm.transactional.TransactionAttempt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import backtype.storm.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import backtype.storm.transactional.ICommitter;

public class KeyedCountingCommitterBolt extends BaseCommitterBolt {
TransactionAttempt _id;
Map<Object, Integer> _counts = new HashMap<Object, Integer>();

@Override
public void prepare(Map conf, TopologyContext context, TransactionAttempt id) {
_id = id;
}
public class KeyedCountingCommitterBolt extends KeyedCountingBatchBolt implements ICommitter {

@Override
public void execute(Tuple tuple) {
Object key = tuple.getValue(1);
int curr = Utils.get(_counts, key, 0);
_counts.put(key, curr + 1);
}

@Override
public void commit(BatchOutputCollector collector) {
for(Object key: _counts.keySet()) {
collector.emit(new Values(_id, key, _counts.get(key)));
}
}

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("tx", "key", "count"));
}

}
2 changes: 1 addition & 1 deletion src/jvm/backtype/storm/topology/base/BaseBatchBolt.java
Expand Up @@ -3,7 +3,7 @@
import backtype.storm.coordination.IBatchBolt;
import java.util.Map;

public abstract class BaseBatchBolt implements IBatchBolt {
public abstract class BaseBatchBolt<T> implements IBatchBolt<T> {
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
Expand Down
11 changes: 0 additions & 11 deletions src/jvm/backtype/storm/topology/base/BaseCommitterBolt.java

This file was deleted.

@@ -0,0 +1,7 @@
package backtype.storm.topology.base;

import backtype.storm.transactional.TransactionAttempt;

public abstract class BaseTransactionalBolt extends BaseBatchBolt<TransactionAttempt> {

}
45 changes: 0 additions & 45 deletions src/jvm/backtype/storm/transactional/CommitterBoltExecutor.java

This file was deleted.

9 changes: 9 additions & 0 deletions src/jvm/backtype/storm/transactional/ICommitter.java
@@ -0,0 +1,9 @@
package backtype.storm.transactional;

/**
* This marks an IBatchBolt within a transactional topology as a committer. This causes the
* finishBatch method to be called in order of the transactions.
*/
public interface ICommitter {

}
13 changes: 0 additions & 13 deletions src/jvm/backtype/storm/transactional/ICommitterBolt.java

This file was deleted.

Expand Up @@ -66,15 +66,15 @@ public BoltDeclarer setBolt(String id, IBatchBolt bolt) {
}

public BoltDeclarer setBolt(String id, IBatchBolt bolt, Integer parallelism) {
return setBolt(id, new BatchBoltExecutor(bolt), parallelism, false);
}
return setBolt(id, new BatchBoltExecutor(bolt), parallelism, bolt instanceof ICommitter);
}

public BoltDeclarer setBolt(String id, ICommitterBolt bolt) {
return setBolt(id, bolt, null);
public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt) {
return setCommitterBolt(id, bolt, null);
}

public BoltDeclarer setBolt(String id, ICommitterBolt bolt, Integer parallelism) {
return setBolt(id, new BatchBoltExecutor(new CommitterBoltExecutor(bolt)), parallelism, true);
public BoltDeclarer setCommitterBolt(String id, IBatchBolt bolt, Integer parallelism) {
return setBolt(id, new BatchBoltExecutor(bolt), parallelism, true);
}

public BoltDeclarer setBolt(String id, IBasicBolt bolt) {
Expand Down
2 changes: 1 addition & 1 deletion test/clj/backtype/storm/transactional_test.clj
Expand Up @@ -346,7 +346,7 @@
(.fieldsGrouping "id1" (Fields. ["word"])))

(-> builder
(.setBolt "count" (KeyedCountingCommitterBolt.) 2)
(.setCommitterBolt "count" (KeyedCountingBatchBolt.) 2)
(.fieldsGrouping "id2" (Fields. ["word"])))

(-> builder
Expand Down

0 comments on commit 297ee5a

Please sign in to comment.