Browse files

revamp trident spout and partitioned trident spouts to support spouts…

… where the source can change
  • Loading branch information...
1 parent 3a146c6 commit d6c2736aaa6980cde2f530b544508e7ce5f457af @nathanmarz nathanmarz committed Oct 12, 2012
View
2 src/jvm/storm/trident/spout/BatchSpoutExecutor.java
@@ -9,7 +9,7 @@
public class BatchSpoutExecutor implements ITridentSpout {
public static class EmptyCoordinator implements BatchCoordinator {
@Override
- public Object initializeTransaction(long txid, Object prevMetadata) {
+ public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
return null;
}
View
15 src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -3,33 +3,36 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.io.Serializable;
+import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.topology.TransactionAttempt;
/**
* This defines a transactional spout which does *not* necessarily
* replay the same batch every time it emits a batch for a transaction id.
+ *
*/
-public interface IOpaquePartitionedTridentSpout<T> extends Serializable {
- public interface Coordinator {
+public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, M> extends Serializable {
+ public interface Coordinator<Partitions> {
boolean isReady(long txid);
+ Partitions getPartitionsForBatch();
void close();
}
- public interface Emitter<X> {
+ public interface Emitter<Partitions, Partition extends ISpoutPartition, M> {
/**
* Emit a batch of tuples for a partition/transaction.
*
* Return the metadata describing this batch that will be used as lastPartitionMeta
* for defining the parameters of the next batch.
*/
- X emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, int partition, X lastPartitionMeta);
- long numPartitions();
+ M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, Partition partition, M lastPartitionMeta);
+ List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
void close();
}
- Emitter<T> getEmitter(Map conf, TopologyContext context);
+ Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext context);
Coordinator getCoordinator(Map conf, TopologyContext context);
Map getComponentConfiguration();
Fields getOutputFields();
View
24 src/jvm/storm/trident/spout/IPartitionedTridentSpout.java
@@ -3,6 +3,7 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.io.Serializable;
+import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
import storm.trident.topology.TransactionAttempt;
@@ -12,38 +13,41 @@
* brokers. It automates the storing of metadata for each partition to ensure that the same batch
* is always emitted for the same transaction id. The partition metadata is stored in Zookeeper.
*/
-public interface IPartitionedTridentSpout<T> extends Serializable {
- public interface Coordinator {
+public interface IPartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, T> extends Serializable {
+ public interface Coordinator<Partitions> {
/**
- * Return the number of partitions currently in the source of data. The idea is
+ * Return the partitions currently in the source of data. The idea is
* is that if a new partition is added and a prior transaction is replayed, it doesn't
- * emit tuples for the new partition because it knows how many partitions were in
+ * emit tuples for the new partition because it knows what partitions were in
* that transaction.
*/
- long numPartitions();
+ Partitions getPartitionsForBatch();
boolean isReady(long txid);
void close();
}
- public interface Emitter<X> {
+ public interface Emitter<Partitions, Partition extends ISpoutPartition, X> {
+
+ List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
+
/**
* Emit a batch of tuples for a partition/transaction that's never been emitted before.
* Return the metadata that can be used to reconstruct this partition/batch in the future.
*/
- X emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, int partition, X lastPartitionMeta);
+ X emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, Partition partition, X lastPartitionMeta);
/**
* Emit a batch of tuples for a partition/transaction that has been emitted before, using
* the metadata created when it was first emitted.
*/
- void emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, int partition, X partitionMeta);
+ void emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, Partition partition, X partitionMeta);
void close();
}
- Coordinator getCoordinator(Map conf, TopologyContext context);
- Emitter<T> getEmitter(Map conf, TopologyContext context);
+ Coordinator<Partitions> getCoordinator(Map conf, TopologyContext context);
+ Emitter<Partitions, Partition, T> getEmitter(Map conf, TopologyContext context);
Map getComponentConfiguration();
Fields getOutputFields();
View
8 src/jvm/storm/trident/spout/ISpoutPartition.java
@@ -0,0 +1,8 @@
+package storm.trident.spout;
+
+public interface ISpoutPartition {
+ /**
+ * This is used as a Zookeeper node path for storing metadata.
+ */
+ String getId();
+}
View
4 src/jvm/storm/trident/spout/ITridentSpout.java
@@ -22,9 +22,11 @@
*
* @param txid The id of the transaction.
* @param prevMetadata The metadata of the previous transaction
+ * @param currMetadata The metadata for this transaction the last time it was initialized.
+ * null if this is the first attempt
* @return the metadata for this new transaction
*/
- X initializeTransaction(long txid, X prevMetadata);
+ X initializeTransaction(long txid, X prevMetadata, X currMetadata);
void success(long txid);
View
107 src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -4,9 +4,11 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.Set;
import java.util.TreeMap;
import storm.trident.operation.TridentCollector;
import storm.trident.topology.state.RotatingTransactionalState;
@@ -25,8 +27,8 @@ public Coordinator(Map conf, TopologyContext context) {
}
@Override
- public Object initializeTransaction(long txid, Object prevMetadata) {
- return null;
+ public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
+ return _coordinator.getPartitionsForBatch();
}
@Override
@@ -44,69 +46,104 @@ public boolean isReady(long txid) {
}
}
- public class Emitter implements ICommitterTridentSpout.Emitter {
+ static class EmitterPartitionState {
+ public RotatingTransactionalState rotatingState;
+ public ISpoutPartition partition;
+
+ public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
+ rotatingState = s;
+ partition = p;
+ }
+ }
+
+ public class Emitter implements ICommitterTridentSpout.Emitter {
IOpaquePartitionedTridentSpout.Emitter _emitter;
TransactionalState _state;
- TreeMap<Long, Map<Integer, Object>> _cachedMetas = new TreeMap<Long, Map<Integer, Object>>();
- Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
+ TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<Long, Map<String, Object>>();
+ Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
int _index;
int _numTasks;
public Emitter(String txStateId, Map conf, TopologyContext context) {
_emitter = _spout.getEmitter(conf, context);
_index = context.getThisTaskIndex();
_numTasks = context.getComponentTasks(context.getThisComponentId()).size();
- _state = TransactionalState.newUserState(conf, txStateId);
- List<String> existingPartitions = _state.list("");
- for(String p: existingPartitions) {
- int partition = Integer.parseInt(p);
- if((partition - _index) % _numTasks == 0) {
- _partitionStates.put(partition, new RotatingTransactionalState(_state, p));
- }
- }
+ _state = TransactionalState.newUserState(conf, txStateId);
}
+ Object _savedCoordinatorMeta = null;
+ boolean _changedMeta = false;
+
@Override
public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
- Map<Integer, Object> metas = new HashMap<Integer, Object>();
+ if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
+ List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
+ _partitionStates.clear();
+ for(int i=_index; i < partitions.size(); i+=_numTasks) {
+ ISpoutPartition p = partitions.get(i);
+ String id = p.getId();
+ _partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(_state, id), p));
+ }
+ _savedCoordinatorMeta = coordinatorMeta;
+ _changedMeta = true;
+ }
+ Map<String, Object> metas = new HashMap<String, Object>();
_cachedMetas.put(tx.getTransactionId(), metas);
- long partitions = _emitter.numPartitions();
- Entry<Long, Map<Integer, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
- Map<Integer, Object> prevCached;
+
+ Entry<Long, Map<String, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
+ Map<String, Object> prevCached;
if(entry!=null) {
prevCached = entry.getValue();
} else {
- prevCached = new HashMap<Integer, Object>();
+ prevCached = new HashMap<String, Object>();
}
- for(int i=_index; i < partitions; i+=_numTasks) {
- RotatingTransactionalState state = _partitionStates.get(i);
- if(state==null) {
- state = new RotatingTransactionalState(_state, "" + i);
- _partitionStates.put(i, state);
- }
- state.removeState(tx.getTransactionId());
- Object lastMeta = prevCached.get(i);
- if(lastMeta==null) lastMeta = state.getLastState();
- Object meta = _emitter.emitPartitionBatch(tx, collector, i, lastMeta);
- metas.put(i, meta);
+ for(String id: _partitionStates.keySet()) {
+ EmitterPartitionState s = _partitionStates.get(id);
+ s.rotatingState.removeState(tx.getTransactionId());
+ Object lastMeta = prevCached.get(id);
+ if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
+ Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
+ metas.put(id, meta);
}
}
@Override
public void success(TransactionAttempt tx) {
- for(RotatingTransactionalState state: _partitionStates.values()) {
- state.cleanupBefore(tx.getTransactionId());
+ for(EmitterPartitionState state: _partitionStates.values()) {
+ state.rotatingState.cleanupBefore(tx.getTransactionId());
}
}
@Override
public void commit(TransactionAttempt attempt) {
+ // this code here handles a case where a previous commit failed, and the partitions
+ // changed since the last commit. This clears out any state for the removed partitions
+ // for this txid.
+ // we make sure only a single task ever does this. we're also guaranteed that
+ // it's impossible for there to be another writer to the directory for that partition
+ // because only a single commit can be happening at once. this is because in order for
+ // another attempt of the batch to commit, the batch phase must have succeeded in between.
+ // hence, all tasks for the prior commit must have finished committing (whether successfully or not)
+ if(_changedMeta && _index==0) {
+ Set<String> validIds = new HashSet<String>();
+ for(ISpoutPartition p: (List<ISpoutPartition>) _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
+ validIds.add(p.getId());
+ }
+ for(String existingPartition: _state.list("")) {
+ if(!validIds.contains(existingPartition)) {
+ RotatingTransactionalState s = new RotatingTransactionalState(_state, existingPartition);
+ s.removeState(attempt.getTransactionId());
+ }
+ }
+ _changedMeta = false;
+ }
+
Long txid = attempt.getTransactionId();
- Map<Integer, Object> metas = _cachedMetas.remove(txid);
- for(Integer partition: metas.keySet()) {
- Object meta = metas.get(partition);
- _partitionStates.get(partition).overrideState(txid, meta);
+ Map<String, Object> metas = _cachedMetas.remove(txid);
+ for(String partitionId: metas.keySet()) {
+ Object meta = metas.get(partitionId);
+ _partitionStates.get(partitionId).rotatingState.overrideState(txid, meta);
}
}
View
55 src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java
@@ -3,8 +3,10 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
+import storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.EmitterPartitionState;
import storm.trident.topology.TransactionAttempt;
import storm.trident.topology.state.RotatingTransactionalState;
import storm.trident.topology.state.TransactionalState;
@@ -21,16 +23,20 @@ public IPartitionedTridentSpout getPartitionedSpout() {
return _spout;
}
- class Coordinator implements ITridentSpout.BatchCoordinator<Long> {
+ class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
private IPartitionedTridentSpout.Coordinator _coordinator;
public Coordinator(Map conf, TopologyContext context) {
_coordinator = _spout.getCoordinator(conf, context);
}
@Override
- public Long initializeTransaction(long txid, Long prevMetadata) {
- return _coordinator.numPartitions();
+ public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
+ if(currMetadata!=null) {
+ return currMetadata;
+ } else {
+ return _coordinator.getPartitionsForBatch();
+ }
}
@@ -49,10 +55,20 @@ public boolean isReady(long txid) {
}
}
- class Emitter implements ITridentSpout.Emitter<Long> {
+ static class EmitterPartitionState {
+ public RotatingTransactionalState rotatingState;
+ public ISpoutPartition partition;
+
+ public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
+ rotatingState = s;
+ partition = p;
+ }
+ }
+
+ class Emitter implements ITridentSpout.Emitter<Object> {
private IPartitionedTridentSpout.Emitter _emitter;
private TransactionalState _state;
- private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
+ private Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
private int _index;
private int _numTasks;
@@ -63,15 +79,25 @@ public Emitter(String txStateId, Map conf, TopologyContext context) {
_numTasks = context.getComponentTasks(context.getThisComponentId()).size();
}
+ Object _savedCoordinatorMeta = null;
+
+
@Override
- public void emitBatch(final TransactionAttempt tx, final Long partitions,
+ public void emitBatch(final TransactionAttempt tx, final Object coordinatorMeta,
final TridentCollector collector) {
- for(int i=_index; i < partitions; i+=_numTasks) {
- if(!_partitionStates.containsKey(i)) {
- _partitionStates.put(i, new RotatingTransactionalState(_state, "" + i));
+ if(_savedCoordinatorMeta == null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
+ List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
+ _partitionStates.clear();
+ for(int i=_index; i < partitions.size(); i+=_numTasks) {
+ ISpoutPartition p = partitions.get(i);
+ String id = p.getId();
+ _partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(_state, id), p));
}
- RotatingTransactionalState state = _partitionStates.get(i);
- final int partition = i;
+ _savedCoordinatorMeta = coordinatorMeta;
+ }
+ for(EmitterPartitionState s: _partitionStates.values()) {
+ RotatingTransactionalState state = s.rotatingState;
+ final ISpoutPartition partition = s.partition;
Object meta = state.getStateOrCreate(tx.getTransactionId(),
new RotatingTransactionalState.StateInitializer() {
@Override
@@ -86,14 +112,13 @@ public Object init(long txid, Object lastState) {
if(meta!=null) {
_emitter.emitPartitionBatch(tx, collector, partition, meta);
}
- }
-
+ }
}
@Override
public void success(TransactionAttempt tx) {
- for(RotatingTransactionalState state: _partitionStates.values()) {
- state.cleanupBefore(tx.getTransactionId());
+ for(EmitterPartitionState state: _partitionStates.values()) {
+ state.rotatingState.cleanupBefore(tx.getTransactionId());
}
}
View
2 src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -129,7 +129,7 @@ public void close() {
class RichSpoutCoordinator implements ITridentSpout.BatchCoordinator {
@Override
- public Object initializeTransaction(long txid, Object prevMetadata) {
+ public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
return null;
}
View
17 src/jvm/storm/trident/spout/TridentSpoutCoordinator.java
@@ -25,7 +25,6 @@
RotatingTransactionalState _state;
TransactionalState _underlyingState;
String _id;
- StateInitializer _initializer;
public TridentSpoutCoordinator(String id, ITridentSpout spout) {
@@ -38,7 +37,6 @@ public void prepare(Map conf, TopologyContext context) {
_coord = _spout.getCoordinator(_id, conf, context);
_underlyingState = TransactionalState.newCoordinatorState(conf, _id);
_state = new RotatingTransactionalState(_underlyingState, META_DIR);
- _initializer = new StateInitializer();
}
@Override
@@ -49,7 +47,10 @@ public void execute(Tuple tuple, BasicOutputCollector collector) {
_state.cleanupBefore(attempt.getTransactionId());
_coord.success(attempt.getTransactionId());
} else {
- Object meta = _state.getState(attempt.getTransactionId(), _initializer);
+ long txid = attempt.getTransactionId();
+ Object prevMeta = _state.getPreviousState(txid);
+ Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
+ _state.overrideState(txid, meta);
collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
}
@@ -71,13 +72,5 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
Config ret = new Config();
ret.setMaxTaskParallelism(1);
return ret;
- }
-
-
- private class StateInitializer implements RotatingTransactionalState.StateInitializer {
- @Override
- public Object init(long txid, Object lastState) {
- return _coord.initializeTransaction(txid, lastState);
- }
- }
+ }
}
View
3 src/jvm/storm/trident/testing/FeederBatchSpout.java
@@ -56,7 +56,8 @@ public FeederCoordinator(int numPartitions) {
}
@Override
- public Map<Integer, List<List<Object>>> initializeTransaction(long txid, Map<Integer, List<List<Object>>> prevMetadata) {
+ public Map<Integer, List<List<Object>>> initializeTransaction(long txid, Map<Integer, List<List<Object>>> prevMetadata, Map<Integer, List<List<Object>>> currMetadata) {
+ if(currMetadata!=null) return currMetadata;
List allBatches = (List) RegisteredGlobalState.getState(_id);
if(allBatches.size()>_emittedIndex) {
Object batchInfo = allBatches.get(_emittedIndex);
View
11 src/jvm/storm/trident/topology/state/RotatingTransactionalState.java
@@ -4,7 +4,6 @@
import java.util.List;
import java.util.SortedMap;
import java.util.TreeMap;
-import storm.trident.topology.MasterBatchCoordinator;
public class RotatingTransactionalState {
public static interface StateInitializer {
@@ -41,6 +40,10 @@ public void removeState(long txid) {
}
}
+ public Object getState(long txid) {
+ return _curr.get(txid);
+ }
+
public Object getState(long txid, StateInitializer init) {
if(!_curr.containsKey(txid)) {
SortedMap<Long, Object> prevMap = _curr.headMap(txid);
@@ -67,6 +70,12 @@ public Object getState(long txid, StateInitializer init) {
return _curr.get(txid);
}
+ public Object getPreviousState(long txid) {
+ SortedMap<Long, Object> prevMap = _curr.headMap(txid);
+ if(prevMap.isEmpty()) return null;
+ else return prevMap.get(prevMap.lastKey());
+ }
+
public boolean hasCache(long txid) {
return _curr.containsKey(txid);
}

0 comments on commit d6c2736

Please sign in to comment.