Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge branch 'better-trident-spouts' into 0.9.0

  • Loading branch information...
commit 1003690cee89cd6600c9ffc13fa3fdda3d823a3d 2 parents 8ccb15a + 1428913
Nathan Marz nathanmarz authored
9 src/jvm/backtype/storm/StormSubmitter.java
@@ -38,7 +38,7 @@ public static void setLocalNimbus(Nimbus.Iface localNimbusHandler) {
38 38 * @throws InvalidTopologyException if an invalid topology was submitted
39 39 */
40 40 public static void submitTopology(String name, Map stormConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException {
41   - submitTopology(name, stormConf, topology, new SubmitOptions(TopologyInitialStatus.ACTIVE));
  41 + submitTopology(name, stormConf, topology, null);
42 42 }
43 43
44 44 /**
@@ -74,7 +74,12 @@ public static void submitTopology(String name, Map stormConf, StormTopology topo
74 74 submitJar(conf);
75 75 try {
76 76 LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
77   - client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
  77 + if(opts!=null) {
  78 + client.getClient().submitTopologyWithOpts(name, submittedJar, serConf, topology, opts);
  79 + } else {
  80 + // this is for backwards compatibility
  81 + client.getClient().submitTopology(name, submittedJar, serConf, topology);
  82 + }
78 83 } catch(InvalidTopologyException e) {
79 84 LOG.warn("Topology submission exception", e);
80 85 throw e;
2  src/jvm/storm/trident/spout/BatchSpoutExecutor.java
@@ -9,7 +9,7 @@
9 9 public class BatchSpoutExecutor implements ITridentSpout {
10 10 public static class EmptyCoordinator implements BatchCoordinator {
11 11 @Override
12   - public Object initializeTransaction(long txid, Object prevMetadata) {
  12 + public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
13 13 return null;
14 14 }
15 15
21 src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -3,6 +3,7 @@
3 3 import backtype.storm.task.TopologyContext;
4 4 import backtype.storm.tuple.Fields;
5 5 import java.io.Serializable;
  6 +import java.util.List;
6 7 import java.util.Map;
7 8 import storm.trident.operation.TridentCollector;
8 9 import storm.trident.topology.TransactionAttempt;
@@ -10,26 +11,34 @@
10 11 /**
11 12 * This defines a transactional spout which does *not* necessarily
12 13 * replay the same batch every time it emits a batch for a transaction id.
  14 + *
13 15 */
14   -public interface IOpaquePartitionedTridentSpout<T> extends Serializable {
15   - public interface Coordinator {
  16 +public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, M> extends Serializable {
  17 + public interface Coordinator<Partitions> {
16 18 boolean isReady(long txid);
  19 + Partitions getPartitionsForBatch();
17 20 void close();
18 21 }
19 22
20   - public interface Emitter<X> {
  23 + public interface Emitter<Partitions, Partition extends ISpoutPartition, M> {
21 24 /**
22 25 * Emit a batch of tuples for a partition/transaction.
23 26 *
24 27 * Return the metadata describing this batch that will be used as lastPartitionMeta
25 28 * for defining the parameters of the next batch.
26 29 */
27   - X emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, int partition, X lastPartitionMeta);
28   - long numPartitions();
  30 + M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, Partition partition, M lastPartitionMeta);
  31 +
  32 + /**
  33 + * This method is called when this task is responsible for a new set of partitions. Should be used
  34 + * to manage things like connections to brokers.
  35 + */
  36 + void refreshPartitions(List<Partition> partitionResponsibilities);
  37 + List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
29 38 void close();
30 39 }
31 40
32   - Emitter<T> getEmitter(Map conf, TopologyContext context);
  41 + Emitter<Partitions, Partition, M> getEmitter(Map conf, TopologyContext context);
33 42 Coordinator getCoordinator(Map conf, TopologyContext context);
34 43 Map getComponentConfiguration();
35 44 Fields getOutputFields();
30 src/jvm/storm/trident/spout/IPartitionedTridentSpout.java
@@ -3,6 +3,7 @@
3 3 import backtype.storm.task.TopologyContext;
4 4 import backtype.storm.tuple.Fields;
5 5 import java.io.Serializable;
  6 +import java.util.List;
6 7 import java.util.Map;
7 8 import storm.trident.operation.TridentCollector;
8 9 import storm.trident.topology.TransactionAttempt;
@@ -12,38 +13,47 @@
12 13 * brokers. It automates the storing of metadata for each partition to ensure that the same batch
13 14 * is always emitted for the same transaction id. The partition metadata is stored in Zookeeper.
14 15 */
15   -public interface IPartitionedTridentSpout<T> extends Serializable {
16   - public interface Coordinator {
  16 +public interface IPartitionedTridentSpout<Partitions, Partition extends ISpoutPartition, T> extends Serializable {
  17 + public interface Coordinator<Partitions> {
17 18 /**
18   - * Return the number of partitions currently in the source of data. The idea is
  19 + * Return the partitions currently in the source of data. The idea is
19 20 * is that if a new partition is added and a prior transaction is replayed, it doesn't
20   - * emit tuples for the new partition because it knows how many partitions were in
  21 + * emit tuples for the new partition because it knows what partitions were in
21 22 * that transaction.
22 23 */
23   - long numPartitions();
  24 + Partitions getPartitionsForBatch();
24 25
25 26 boolean isReady(long txid);
26 27
27 28 void close();
28 29 }
29 30
30   - public interface Emitter<X> {
  31 + public interface Emitter<Partitions, Partition extends ISpoutPartition, X> {
  32 +
  33 + List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
  34 +
31 35 /**
32 36 * Emit a batch of tuples for a partition/transaction that's never been emitted before.
33 37 * Return the metadata that can be used to reconstruct this partition/batch in the future.
34 38 */
35   - X emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, int partition, X lastPartitionMeta);
  39 + X emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, Partition partition, X lastPartitionMeta);
36 40
37 41 /**
  42 + * This method is called when this task is responsible for a new set of partitions. Should be used
  43 + * to manage things like connections to brokers.
  44 + */
  45 + void refreshPartitions(List<Partition> partitionResponsibilities);
  46 +
  47 + /**
38 48 * Emit a batch of tuples for a partition/transaction that has been emitted before, using
39 49 * the metadata created when it was first emitted.
40 50 */
41   - void emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, int partition, X partitionMeta);
  51 + void emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, Partition partition, X partitionMeta);
42 52 void close();
43 53 }
44 54
45   - Coordinator getCoordinator(Map conf, TopologyContext context);
46   - Emitter<T> getEmitter(Map conf, TopologyContext context);
  55 + Coordinator<Partitions> getCoordinator(Map conf, TopologyContext context);
  56 + Emitter<Partitions, Partition, T> getEmitter(Map conf, TopologyContext context);
47 57
48 58 Map getComponentConfiguration();
49 59 Fields getOutputFields();
8 src/jvm/storm/trident/spout/ISpoutPartition.java
... ... @@ -0,0 +1,8 @@
  1 +package storm.trident.spout;
  2 +
  3 +public interface ISpoutPartition {
  4 + /**
  5 + * This is used as a Zookeeper node path for storing metadata.
  6 + */
  7 + String getId();
  8 +}
4 src/jvm/storm/trident/spout/ITridentSpout.java
@@ -22,9 +22,11 @@
22 22 *
23 23 * @param txid The id of the transaction.
24 24 * @param prevMetadata The metadata of the previous transaction
  25 + * @param currMetadata The metadata for this transaction the last time it was initialized.
  26 + * null if this is the first attempt
25 27 * @return the metadata for this new transaction
26 28 */
27   - X initializeTransaction(long txid, X prevMetadata);
  29 + X initializeTransaction(long txid, X prevMetadata, X currMetadata);
28 30
29 31 void success(long txid);
30 32
111 src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -3,10 +3,13 @@
3 3
4 4 import backtype.storm.task.TopologyContext;
5 5 import backtype.storm.tuple.Fields;
  6 +import java.util.ArrayList;
6 7 import java.util.HashMap;
  8 +import java.util.HashSet;
7 9 import java.util.List;
8 10 import java.util.Map;
9 11 import java.util.Map.Entry;
  12 +import java.util.Set;
10 13 import java.util.TreeMap;
11 14 import storm.trident.operation.TridentCollector;
12 15 import storm.trident.topology.state.RotatingTransactionalState;
@@ -25,8 +28,8 @@ public Coordinator(Map conf, TopologyContext context) {
25 28 }
26 29
27 30 @Override
28   - public Object initializeTransaction(long txid, Object prevMetadata) {
29   - return null;
  31 + public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
  32 + return _coordinator.getPartitionsForBatch();
30 33 }
31 34
32 35 @Override
@@ -44,11 +47,21 @@ public boolean isReady(long txid) {
44 47 }
45 48 }
46 49
47   - public class Emitter implements ICommitterTridentSpout.Emitter {
  50 + static class EmitterPartitionState {
  51 + public RotatingTransactionalState rotatingState;
  52 + public ISpoutPartition partition;
  53 +
  54 + public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
  55 + rotatingState = s;
  56 + partition = p;
  57 + }
  58 + }
  59 +
  60 + public class Emitter implements ICommitterTridentSpout.Emitter {
48 61 IOpaquePartitionedTridentSpout.Emitter _emitter;
49 62 TransactionalState _state;
50   - TreeMap<Long, Map<Integer, Object>> _cachedMetas = new TreeMap<Long, Map<Integer, Object>>();
51   - Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
  63 + TreeMap<Long, Map<String, Object>> _cachedMetas = new TreeMap<Long, Map<String, Object>>();
  64 + Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
52 65 int _index;
53 66 int _numTasks;
54 67
@@ -56,57 +69,85 @@ public Emitter(String txStateId, Map conf, TopologyContext context) {
56 69 _emitter = _spout.getEmitter(conf, context);
57 70 _index = context.getThisTaskIndex();
58 71 _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
59   - _state = TransactionalState.newUserState(conf, txStateId);
60   - List<String> existingPartitions = _state.list("");
61   - for(String p: existingPartitions) {
62   - int partition = Integer.parseInt(p);
63   - if((partition - _index) % _numTasks == 0) {
64   - _partitionStates.put(partition, new RotatingTransactionalState(_state, p));
65   - }
66   - }
  72 + _state = TransactionalState.newUserState(conf, txStateId);
67 73 }
68 74
  75 + Object _savedCoordinatorMeta = null;
  76 + boolean _changedMeta = false;
  77 +
69 78 @Override
70 79 public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentCollector collector) {
71   - Map<Integer, Object> metas = new HashMap<Integer, Object>();
  80 + if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
  81 + List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
  82 + _partitionStates.clear();
  83 + List<ISpoutPartition> myPartitions = new ArrayList();
  84 + for(int i=_index; i < partitions.size(); i+=_numTasks) {
  85 + ISpoutPartition p = partitions.get(i);
  86 + String id = p.getId();
  87 + myPartitions.add(p);
  88 + _partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(_state, id), p));
  89 + }
  90 + _emitter.refreshPartitions(myPartitions);
  91 + _savedCoordinatorMeta = coordinatorMeta;
  92 + _changedMeta = true;
  93 + }
  94 + Map<String, Object> metas = new HashMap<String, Object>();
72 95 _cachedMetas.put(tx.getTransactionId(), metas);
73   - long partitions = _emitter.numPartitions();
74   - Entry<Long, Map<Integer, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
75   - Map<Integer, Object> prevCached;
  96 +
  97 + Entry<Long, Map<String, Object>> entry = _cachedMetas.lowerEntry(tx.getTransactionId());
  98 + Map<String, Object> prevCached;
76 99 if(entry!=null) {
77 100 prevCached = entry.getValue();
78 101 } else {
79   - prevCached = new HashMap<Integer, Object>();
  102 + prevCached = new HashMap<String, Object>();
80 103 }
81 104
82   - for(int i=_index; i < partitions; i+=_numTasks) {
83   - RotatingTransactionalState state = _partitionStates.get(i);
84   - if(state==null) {
85   - state = new RotatingTransactionalState(_state, "" + i);
86   - _partitionStates.put(i, state);
87   - }
88   - state.removeState(tx.getTransactionId());
89   - Object lastMeta = prevCached.get(i);
90   - if(lastMeta==null) lastMeta = state.getLastState();
91   - Object meta = _emitter.emitPartitionBatch(tx, collector, i, lastMeta);
92   - metas.put(i, meta);
  105 + for(String id: _partitionStates.keySet()) {
  106 + EmitterPartitionState s = _partitionStates.get(id);
  107 + s.rotatingState.removeState(tx.getTransactionId());
  108 + Object lastMeta = prevCached.get(id);
  109 + if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
  110 + Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
  111 + metas.put(id, meta);
93 112 }
94 113 }
95 114
96 115 @Override
97 116 public void success(TransactionAttempt tx) {
98   - for(RotatingTransactionalState state: _partitionStates.values()) {
99   - state.cleanupBefore(tx.getTransactionId());
  117 + for(EmitterPartitionState state: _partitionStates.values()) {
  118 + state.rotatingState.cleanupBefore(tx.getTransactionId());
100 119 }
101 120 }
102 121
103 122 @Override
104 123 public void commit(TransactionAttempt attempt) {
  124 + // this code here handles a case where a previous commit failed, and the partitions
  125 + // changed since the last commit. This clears out any state for the removed partitions
  126 + // for this txid.
  127 + // we make sure only a single task ever does this. we're also guaranteed that
  128 + // it's impossible for there to be another writer to the directory for that partition
  129 + // because only a single commit can be happening at once. this is because in order for
  130 + // another attempt of the batch to commit, the batch phase must have succeeded in between.
  131 + // hence, all tasks for the prior commit must have finished committing (whether successfully or not)
  132 + if(_changedMeta && _index==0) {
  133 + Set<String> validIds = new HashSet<String>();
  134 + for(ISpoutPartition p: (List<ISpoutPartition>) _emitter.getOrderedPartitions(_savedCoordinatorMeta)) {
  135 + validIds.add(p.getId());
  136 + }
  137 + for(String existingPartition: _state.list("")) {
  138 + if(!validIds.contains(existingPartition)) {
  139 + RotatingTransactionalState s = new RotatingTransactionalState(_state, existingPartition);
  140 + s.removeState(attempt.getTransactionId());
  141 + }
  142 + }
  143 + _changedMeta = false;
  144 + }
  145 +
105 146 Long txid = attempt.getTransactionId();
106   - Map<Integer, Object> metas = _cachedMetas.remove(txid);
107   - for(Integer partition: metas.keySet()) {
108   - Object meta = metas.get(partition);
109   - _partitionStates.get(partition).overrideState(txid, meta);
  147 + Map<String, Object> metas = _cachedMetas.remove(txid);
  148 + for(String partitionId: metas.keySet()) {
  149 + Object meta = metas.get(partitionId);
  150 + _partitionStates.get(partitionId).rotatingState.overrideState(txid, meta);
110 151 }
111 152 }
112 153
58 src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java
@@ -2,7 +2,9 @@
2 2
3 3 import backtype.storm.task.TopologyContext;
4 4 import backtype.storm.tuple.Fields;
  5 +import java.util.ArrayList;
5 6 import java.util.HashMap;
  7 +import java.util.List;
6 8 import java.util.Map;
7 9 import storm.trident.operation.TridentCollector;
8 10 import storm.trident.topology.TransactionAttempt;
@@ -21,7 +23,7 @@ public IPartitionedTridentSpout getPartitionedSpout() {
21 23 return _spout;
22 24 }
23 25
24   - class Coordinator implements ITridentSpout.BatchCoordinator<Long> {
  26 + class Coordinator implements ITridentSpout.BatchCoordinator<Object> {
25 27 private IPartitionedTridentSpout.Coordinator _coordinator;
26 28
27 29 public Coordinator(Map conf, TopologyContext context) {
@@ -29,8 +31,12 @@ public Coordinator(Map conf, TopologyContext context) {
29 31 }
30 32
31 33 @Override
32   - public Long initializeTransaction(long txid, Long prevMetadata) {
33   - return _coordinator.numPartitions();
  34 + public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
  35 + if(currMetadata!=null) {
  36 + return currMetadata;
  37 + } else {
  38 + return _coordinator.getPartitionsForBatch();
  39 + }
34 40 }
35 41
36 42
@@ -49,10 +55,20 @@ public boolean isReady(long txid) {
49 55 }
50 56 }
51 57
52   - class Emitter implements ITridentSpout.Emitter<Long> {
  58 + static class EmitterPartitionState {
  59 + public RotatingTransactionalState rotatingState;
  60 + public ISpoutPartition partition;
  61 +
  62 + public EmitterPartitionState(RotatingTransactionalState s, ISpoutPartition p) {
  63 + rotatingState = s;
  64 + partition = p;
  65 + }
  66 + }
  67 +
  68 + class Emitter implements ITridentSpout.Emitter<Object> {
53 69 private IPartitionedTridentSpout.Emitter _emitter;
54 70 private TransactionalState _state;
55   - private Map<Integer, RotatingTransactionalState> _partitionStates = new HashMap<Integer, RotatingTransactionalState>();
  71 + private Map<String, EmitterPartitionState> _partitionStates = new HashMap<String, EmitterPartitionState>();
56 72 private int _index;
57 73 private int _numTasks;
58 74
@@ -63,15 +79,28 @@ public Emitter(String txStateId, Map conf, TopologyContext context) {
63 79 _numTasks = context.getComponentTasks(context.getThisComponentId()).size();
64 80 }
65 81
  82 + Object _savedCoordinatorMeta = null;
  83 +
  84 +
66 85 @Override
67   - public void emitBatch(final TransactionAttempt tx, final Long partitions,
  86 + public void emitBatch(final TransactionAttempt tx, final Object coordinatorMeta,
68 87 final TridentCollector collector) {
69   - for(int i=_index; i < partitions; i+=_numTasks) {
70   - if(!_partitionStates.containsKey(i)) {
71   - _partitionStates.put(i, new RotatingTransactionalState(_state, "" + i));
  88 + if(_savedCoordinatorMeta == null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
  89 + List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
  90 + _partitionStates.clear();
  91 + List<ISpoutPartition> myPartitions = new ArrayList();
  92 + for(int i=_index; i < partitions.size(); i+=_numTasks) {
  93 + ISpoutPartition p = partitions.get(i);
  94 + String id = p.getId();
  95 + myPartitions.add(p);
  96 + _partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(_state, id), p));
72 97 }
73   - RotatingTransactionalState state = _partitionStates.get(i);
74   - final int partition = i;
  98 + _emitter.refreshPartitions(myPartitions);
  99 + _savedCoordinatorMeta = coordinatorMeta;
  100 + }
  101 + for(EmitterPartitionState s: _partitionStates.values()) {
  102 + RotatingTransactionalState state = s.rotatingState;
  103 + final ISpoutPartition partition = s.partition;
75 104 Object meta = state.getStateOrCreate(tx.getTransactionId(),
76 105 new RotatingTransactionalState.StateInitializer() {
77 106 @Override
@@ -86,14 +115,13 @@ public Object init(long txid, Object lastState) {
86 115 if(meta!=null) {
87 116 _emitter.emitPartitionBatch(tx, collector, partition, meta);
88 117 }
89   - }
90   -
  118 + }
91 119 }
92 120
93 121 @Override
94 122 public void success(TransactionAttempt tx) {
95   - for(RotatingTransactionalState state: _partitionStates.values()) {
96   - state.cleanupBefore(tx.getTransactionId());
  123 + for(EmitterPartitionState state: _partitionStates.values()) {
  124 + state.rotatingState.cleanupBefore(tx.getTransactionId());
97 125 }
98 126 }
99 127
2  src/jvm/storm/trident/spout/RichSpoutBatchExecutor.java
@@ -129,7 +129,7 @@ public void close() {
129 129
130 130 class RichSpoutCoordinator implements ITridentSpout.BatchCoordinator {
131 131 @Override
132   - public Object initializeTransaction(long txid, Object prevMetadata) {
  132 + public Object initializeTransaction(long txid, Object prevMetadata, Object currMetadata) {
133 133 return null;
134 134 }
135 135
17 src/jvm/storm/trident/spout/TridentSpoutCoordinator.java
@@ -26,7 +26,6 @@
26 26 RotatingTransactionalState _state;
27 27 TransactionalState _underlyingState;
28 28 String _id;
29   - StateInitializer _initializer;
30 29
31 30
32 31 public TridentSpoutCoordinator(String id, ITridentSpout spout) {
@@ -39,7 +38,6 @@ public void prepare(Map conf, TopologyContext context) {
39 38 _coord = _spout.getCoordinator(_id, conf, context);
40 39 _underlyingState = TransactionalState.newCoordinatorState(conf, _id);
41 40 _state = new RotatingTransactionalState(_underlyingState, META_DIR);
42   - _initializer = new StateInitializer();
43 41 }
44 42
45 43 @Override
@@ -50,7 +48,10 @@ public void execute(Tuple tuple, BasicOutputCollector collector) {
50 48 _state.cleanupBefore(attempt.getTransactionId());
51 49 _coord.success(attempt.getTransactionId());
52 50 } else {
53   - Object meta = _state.getState(attempt.getTransactionId(), _initializer);
  51 + long txid = attempt.getTransactionId();
  52 + Object prevMeta = _state.getPreviousState(txid);
  53 + Object meta = _coord.initializeTransaction(txid, prevMeta, _state.getState(txid));
  54 + _state.overrideState(txid, meta);
54 55 collector.emit(MasterBatchCoordinator.BATCH_STREAM_ID, new Values(attempt, meta));
55 56 }
56 57
@@ -72,13 +73,5 @@ public void declareOutputFields(OutputFieldsDeclarer declarer) {
72 73 Config ret = new Config();
73 74 ret.setMaxTaskParallelism(1);
74 75 return ret;
75   - }
76   -
77   -
78   - private class StateInitializer implements RotatingTransactionalState.StateInitializer {
79   - @Override
80   - public Object init(long txid, Object lastState) {
81   - return _coord.initializeTransaction(txid, lastState);
82   - }
83   - }
  76 + }
84 77 }
3  src/jvm/storm/trident/testing/FeederBatchSpout.java
@@ -56,7 +56,8 @@ public FeederCoordinator(int numPartitions) {
56 56 }
57 57
58 58 @Override
59   - public Map<Integer, List<List<Object>>> initializeTransaction(long txid, Map<Integer, List<List<Object>>> prevMetadata) {
  59 + public Map<Integer, List<List<Object>>> initializeTransaction(long txid, Map<Integer, List<List<Object>>> prevMetadata, Map<Integer, List<List<Object>>> currMetadata) {
  60 + if(currMetadata!=null) return currMetadata;
60 61 List allBatches = (List) RegisteredGlobalState.getState(_id);
61 62 if(allBatches.size()>_emittedIndex) {
62 63 Object batchInfo = allBatches.get(_emittedIndex);
11 src/jvm/storm/trident/topology/state/RotatingTransactionalState.java
@@ -4,7 +4,6 @@
4 4 import java.util.List;
5 5 import java.util.SortedMap;
6 6 import java.util.TreeMap;
7   -import storm.trident.topology.MasterBatchCoordinator;
8 7
9 8 public class RotatingTransactionalState {
10 9 public static interface StateInitializer {
@@ -41,6 +40,10 @@ public void removeState(long txid) {
41 40 }
42 41 }
43 42
  43 + public Object getState(long txid) {
  44 + return _curr.get(txid);
  45 + }
  46 +
44 47 public Object getState(long txid, StateInitializer init) {
45 48 if(!_curr.containsKey(txid)) {
46 49 SortedMap<Long, Object> prevMap = _curr.headMap(txid);
@@ -67,6 +70,12 @@ public Object getState(long txid, StateInitializer init) {
67 70 return _curr.get(txid);
68 71 }
69 72
  73 + public Object getPreviousState(long txid) {
  74 + SortedMap<Long, Object> prevMap = _curr.headMap(txid);
  75 + if(prevMap.isEmpty()) return null;
  76 + else return prevMap.get(prevMap.lastKey());
  77 + }
  78 +
70 79 public boolean hasCache(long txid) {
71 80 return _curr.containsKey(txid);
72 81 }

0 comments on commit 1003690

Please sign in to comment.
Something went wrong with that request. Please try again.