Skip to content
Browse files

added refreshPartitions method for managing connections

  • Loading branch information...
1 parent d6c2736 commit 14289132abfb8749012faf663d281f336451b703 @nathanmarz nathanmarz committed
View
6 src/jvm/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -28,6 +28,12 @@
* for defining the parameters of the next batch.
*/
M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, Partition partition, M lastPartitionMeta);
+
+ /**
+ * This method is called when this task is responsible for a new set of partitions. Should be used
+ * to manage things like connections to brokers.
+ */
+ void refreshPartitions(List<Partition> partitionResponsibilities);
List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
void close();
}
View
6 src/jvm/storm/trident/spout/IPartitionedTridentSpout.java
@@ -39,6 +39,12 @@
X emitPartitionBatchNew(TransactionAttempt tx, TridentCollector collector, Partition partition, X lastPartitionMeta);
/**
+ * This method is called when this task is responsible for a new set of partitions. Should be used
+ * to manage things like connections to brokers.
+ */
+ void refreshPartitions(List<Partition> partitionResponsibilities);
+
+ /**
* Emit a batch of tuples for a partition/transaction that has been emitted before, using
* the metadata created when it was first emitted.
*/
View
4 src/jvm/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -3,6 +3,7 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
@@ -79,11 +80,14 @@ public void emitBatch(TransactionAttempt tx, Object coordinatorMeta, TridentColl
if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
_partitionStates.clear();
+ List<ISpoutPartition> myPartitions = new ArrayList();
for(int i=_index; i < partitions.size(); i+=_numTasks) {
ISpoutPartition p = partitions.get(i);
String id = p.getId();
+ myPartitions.add(p);
_partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(_state, id), p));
}
+ _emitter.refreshPartitions(myPartitions);
_savedCoordinatorMeta = coordinatorMeta;
_changedMeta = true;
}
View
5 src/jvm/storm/trident/spout/PartitionedTridentSpoutExecutor.java
@@ -2,11 +2,11 @@
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import storm.trident.operation.TridentCollector;
-import storm.trident.spout.OpaquePartitionedTridentSpoutExecutor.EmitterPartitionState;
import storm.trident.topology.TransactionAttempt;
import storm.trident.topology.state.RotatingTransactionalState;
import storm.trident.topology.state.TransactionalState;
@@ -88,11 +88,14 @@ public void emitBatch(final TransactionAttempt tx, final Object coordinatorMeta,
if(_savedCoordinatorMeta == null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
List<ISpoutPartition> partitions = _emitter.getOrderedPartitions(coordinatorMeta);
_partitionStates.clear();
+ List<ISpoutPartition> myPartitions = new ArrayList();
for(int i=_index; i < partitions.size(); i+=_numTasks) {
ISpoutPartition p = partitions.get(i);
String id = p.getId();
+ myPartitions.add(p);
_partitionStates.put(id, new EmitterPartitionState(new RotatingTransactionalState(_state, id), p));
}
+ _emitter.refreshPartitions(myPartitions);
_savedCoordinatorMeta = coordinatorMeta;
}
for(EmitterPartitionState s: _partitionStates.values()) {

0 comments on commit 1428913

Please sign in to comment.
Something went wrong with that request. Please try again.