Permalink
Browse files

merge

  • Loading branch information...
2 parents f6b56d2 + fcc3956 commit c7b965a0c795fe2b717d0c0afb5a20b95e846ca8 @nathanmarz nathanmarz committed Aug 27, 2012
Showing with 489 additions and 100 deletions.
  1. +1 −0 .gitignore
  2. +9 −0 CHANGELOG.md
  3. +1 −0 README.markdown
  4. +1 −1 bin/install_zmq.sh
  5. +1 −1 project.clj
  6. +2 −2 src/clj/backtype/storm/util.clj
  7. +15 −1 src/clj/storm/trident/testing.clj
  8. +23 −5 src/jvm/backtype/storm/StormSubmitter.java
  9. +6 −1 src/jvm/storm/trident/Stream.java
  10. +9 −4 src/jvm/storm/trident/fluent/ChainedAggregatorDeclarer.java
  11. +16 −0 src/jvm/storm/trident/operation/BaseMultiReducer.java
  12. +3 −1 src/jvm/storm/trident/partition/IndexHashGrouping.java
  13. +5 −1 src/jvm/storm/trident/planner/SubtopologyBolt.java
  14. +0 −1 src/jvm/storm/trident/spout/TridentSpoutExecutor.java
  15. +2 −2 src/jvm/storm/trident/state/OpaqueValue.java
  16. +1 −1 src/jvm/storm/trident/state/Serializer.java
  17. +5 −5 src/jvm/storm/trident/state/map/CachedBatchReadsMap.java
  18. +19 −19 src/jvm/storm/trident/state/map/CachedMap.java
  19. +7 −7 src/jvm/storm/trident/state/map/NonTransactionalMap.java
  20. +24 −23 src/jvm/storm/trident/state/map/OpaqueMap.java
  21. +15 −14 src/jvm/storm/trident/state/map/TransactionalMap.java
  22. +30 −0 src/jvm/storm/trident/testing/CountAsAggregator.java
  23. +7 −1 src/jvm/storm/trident/testing/FeederBatchSpout.java
  24. +79 −0 src/jvm/storm/trident/testing/FeederCommitterBatchSpout.java
  25. +6 −0 src/jvm/storm/trident/testing/IFeeder.java
  26. +1 −1 src/jvm/storm/trident/testing/LRUMemoryMapState.java
  27. +3 −1 src/jvm/storm/trident/testing/Split.java
  28. +15 −0 src/jvm/storm/trident/testing/StringLength.java
  29. +13 −0 src/jvm/storm/trident/testing/TrueFilter.java
  30. +20 −0 src/jvm/storm/trident/testing/TuplifyArgs.java
  31. +9 −6 src/jvm/storm/trident/topology/TridentTopologyBuilder.java
  32. +1 −1 src/jvm/storm/trident/tuple/TridentTupleView.java
  33. +140 −1 test/clj/storm/trident/integration_test.clj
View
@@ -20,4 +20,5 @@ NANNY
_release
*.zip
.lein-deps-sum
+*.iml
View
@@ -2,8 +2,17 @@
* Changed debug level of "Failed message" logging to DEBUG
* Deprecated LinearDRPCTopologyBuilder, TimeCacheMap, and transactional topologies
+ * During "storm jar", whether topology is already running or not is checked before submitting jar to save time (thanks jasonjckn)
+ * Added BaseMultiReducer class to Trident that provides empty implementations of prepare and cleanup
* Bug fix: When an item is consumed off an internal buffer, the entry on the buffer is nulled to allow GC to happen on that data
* Bug fix: Helper class for Trident MapStates now clear their read cache when a new commit happens, preventing updates from spilling over from a failed batch attempt to the next attempt
+ * Bug fix: Fix NonTransactionalMap to take in an IBackingMap for regular values rather than TransactionalValue (thanks sjoerdmulder)
+ * Bug fix: Fix NPE when no input fields given for regular Aggregator
+ * Bug fix: Fix IndexOutOfBoundsExceptions when a bolt for global aggregation had a parallelism greater than 1 (possible with splitting, stateQuerying, and multiReduce)
+ * Bug fix: Fix "fields size" error that would sometimes occur when splitting a stream with multiple eaches
+ * Bug fix: Fix bug where a committer spout (including opaque spouts) could cause Trident processing to halt
+ * Bug fix: Fix Trident bug where multiple groupings on same stream would cause tuples to be duplicate dto all consumers
+ * Bug fix: Fixed error when repartitioning stream twice in a row without any operations in between
## 0.8.0
View
@@ -56,6 +56,7 @@ You must not remove this notice, or any other, from this software.
* Vinod Chandru ([@vinodc](https://github.com/vinodc))
* Martin Kleppmann ([@ept](https://github.com/ept))
* Evan Chan ([@velvia](https://github.com/velvia))
+* Sjoerd Mulder ([@sjoerdmulder](https://github.com/sjoerdmulder))
## Acknowledgements
View
@@ -1,5 +1,5 @@
#!/bin/bash
-export JAVA_HOME=$(/usr/libexec/java_home)
+export JAVA_HOME=${JAVA_HOME:/usr/libexec/java_home}
if [ ! -d "$JAVA_HOME/include" ]; then
echo "
View
@@ -1,4 +1,4 @@
-(defproject storm "0.8.1-experimental-wip2"
+(defproject storm "0.8.1-experimental-wip7"
:source-path "src/clj"
:test-path "test/clj"
:java-source-path "src/jvm"
@@ -488,8 +488,8 @@
(defn collectify [obj]
(if (or (sequential? obj) (instance? Collection obj)) obj [obj]))
-(defn to-json [^Map m]
- (JSONValue/toJSONString m))
+(defn to-json [obj]
+ (JSONValue/toJSONString obj))
(defn from-json [^String str]
(if str
@@ -1,5 +1,5 @@
(ns storm.trident.testing
- (:import [storm.trident.testing FeederBatchSpout MemoryMapState MemoryMapState$Factory])
+ (:import [storm.trident.testing FeederBatchSpout FeederCommitterBatchSpout MemoryMapState MemoryMapState$Factory TuplifyArgs])
(:import [backtype.storm LocalDRPC])
(:import [backtype.storm.tuple Fields])
(:import [backtype.storm.generated KillOptions])
@@ -14,9 +14,15 @@
(let [res (.execute drpc function-name args)]
(from-json res)))
+(defn exec-drpc-tuples [^LocalDRPC drpc function-name tuples]
+ (exec-drpc drpc function-name (to-json tuples)))
+
(defn feeder-spout [fields]
(FeederBatchSpout. fields))
+(defn feeder-committer-spout [fields]
+ (FeederCommitterBatchSpout. fields))
+
(defn feed [feeder tuples]
(.feed feeder tuples))
@@ -46,3 +52,11 @@
(import 'storm.trident.TridentTopology)
(import '[storm.trident.operation.builtin Count Sum Equals MapGet Debug FilterNull FirstN])
)
+
+(defn drpc-tuples-input [topology function-name drpc outfields]
+ (-> topology
+ (.newDRPCStream function-name drpc)
+ (.each (fields "args") (TuplifyArgs.) outfields)
+ ))
+
+
@@ -1,9 +1,6 @@
package backtype.storm;
-import backtype.storm.generated.AlreadyAliveException;
-import backtype.storm.generated.InvalidTopologyException;
-import backtype.storm.generated.Nimbus;
-import backtype.storm.generated.StormTopology;
+import backtype.storm.generated.*;
import backtype.storm.utils.BufferFileInputStream;
import backtype.storm.utils.NimbusClient;
import backtype.storm.utils.Utils;
@@ -53,8 +50,11 @@ public static void submitTopology(String name, Map stormConf, StormTopology topo
LOG.info("Submitting topology " + name + " in local mode");
localNimbus.submitTopology(name, null, serConf, topology);
} else {
- submitJar(conf);
NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ if(topologyNameExists(conf, name)) {
+ throw new RuntimeException("Topology with name `" + name + "` already exists on cluster");
+ }
+ submitJar(conf);
try {
LOG.info("Submitting topology " + name + " in distributed mode with conf " + serConf);
client.getClient().submitTopology(name, submittedJar, serConf, topology);
@@ -67,6 +67,24 @@ public static void submitTopology(String name, Map stormConf, StormTopology topo
throw new RuntimeException(e);
}
}
+
+ private static boolean topologyNameExists(Map conf, String name) {
+ NimbusClient client = NimbusClient.getConfiguredClient(conf);
+ try {
+ ClusterSummary summary = client.getClient().getClusterInfo();
+ for(TopologySummary s : summary.get_topologies()) {
+ if(s.get_name().equals(name)) {
+ return true;
+ }
+ }
+ return false;
+
+ } catch(Exception e) {
+ throw new RuntimeException(e);
+ } finally {
+ client.close();
+ }
+ }
private static String submittedJar = null;
@@ -21,6 +21,7 @@
import storm.trident.operation.impl.ReducerAggStateUpdater;
import storm.trident.operation.impl.IndexHashBatchToPartition;
import storm.trident.operation.impl.SingleEmitAggregator.BatchToPartition;
+import storm.trident.operation.impl.TrueFilter;
import storm.trident.partition.GlobalGrouping;
import storm.trident.partition.IdentityGrouping;
import storm.trident.partition.IndexHashGrouping;
@@ -94,7 +95,11 @@ public Stream identityPartition() {
}
public Stream partition(Grouping grouping) {
- return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, getOutputFields(), grouping));
+ if(_node instanceof PartitionNode) {
+ return each(new Fields(), new TrueFilter()).partition(grouping);
+ } else {
+ return _topology.addSourcedNode(this, new PartitionNode(_node.streamId, getOutputFields(), grouping));
+ }
}
public Stream applyAssembly(Assembly assembly) {
@@ -60,11 +60,16 @@ public Stream chainEnd() {
Set<String> allInFields = new HashSet<String>();
for(int i=0; i<_aggs.size(); i++) {
AggSpec spec = _aggs.get(i);
- inputFields[i] = spec.inFields;
+ Fields infields = spec.inFields;
+ if(infields==null) infields = new Fields();
+ Fields outfields = spec.outFields;
+ if(outfields==null) outfields = new Fields();
+
+ inputFields[i] = infields;
aggs[i] = spec.agg;
- outSizes[i] = spec.outFields.size();
- allOutFields.addAll(spec.outFields.toList());
- allInFields.addAll(spec.inFields.toList());
+ outSizes[i] = outfields.size();
+ allOutFields.addAll(outfields.toList());
+ allInFields.addAll(infields.toList());
}
if(new HashSet(allOutFields).size() != allOutFields.size()) {
throw new IllegalArgumentException("Output fields for chained aggregators must be distinct: " + allOutFields.toString());
@@ -0,0 +1,16 @@
+package storm.trident.operation;
+
+import java.util.Map;
+
+public abstract class BaseMultiReducer<T> implements MultiReducer<T> {
+
+ @Override
+ public void prepare(Map conf, TridentMultiReducerContext context) {
+ }
+
+
+ @Override
+ public void cleanup() {
+ }
+
+}
@@ -9,7 +9,9 @@
public class IndexHashGrouping implements CustomStreamGrouping {
public static int objectToIndex(Object val, int numPartitions) {
if(val==null) return 0;
- else return val.hashCode() % numPartitions;
+ else {
+ return Math.abs(val.hashCode()) % numPartitions;
+ }
}
int _index;
@@ -80,13 +80,17 @@ public void prepare(Map conf, TopologyContext context, BatchOutputCollector batc
}
}
List<TupleReceiver> targets = new ArrayList();
+ boolean outgoingNode = false;
for(Node cn: TridentUtils.getChildren(_graph, n)) {
if(_nodes.contains(cn)) {
targets.add(((ProcessorNode) cn).processor);
} else {
- targets.add(new BridgeReceiver(batchCollector));
+ outgoingNode = true;
}
}
+ if(outgoingNode) {
+ targets.add(new BridgeReceiver(batchCollector));
+ }
TridentContext triContext = new TridentContext(
pn.selfOutFields,
@@ -48,7 +48,6 @@ public void execute(BatchInfo info, Tuple input) {
// there won't be a BatchInfo for the success stream
TransactionAttempt attempt = (TransactionAttempt) input.getValue(0);
if(input.getSourceStreamId().equals(MasterBatchCoordinator.COMMIT_STREAM_ID)) {
- _collector.setBatch(info.batchId);
if(attempt.equals(_activeBatches.get(attempt.getTransactionId()))) {
((ICommitterTridentSpout.Emitter) _emitter).commit(attempt);
_activeBatches.remove(attempt.getTransactionId());
@@ -17,14 +17,14 @@ public OpaqueValue(Long currTxid, T val) {
this(currTxid, val, null);
}
- public OpaqueValue update(Long batchTxid, T newVal) {
+ public OpaqueValue<T> update(Long batchTxid, T newVal) {
T prev;
if(batchTxid!=null && batchTxid.equals(this.currTxid)) {
prev = this.prev;
} else {
prev = this.curr;
}
- return new OpaqueValue(batchTxid, newVal, prev);
+ return new OpaqueValue<T>(batchTxid, newVal, prev);
}
public T get(Long batchTxid) {
@@ -5,5 +5,5 @@
public interface Serializer<T> extends Serializable {
byte[] serialize(T obj);
- Object deserialize(byte[] b);
+ T deserialize(byte[] b);
}
@@ -7,11 +7,11 @@
import storm.trident.state.ValueUpdater;
public class CachedBatchReadsMap<T> implements MapState<T> {
- Map<List<Object>, Object> _cached = new HashMap();
+ Map<List<Object>, T> _cached = new HashMap<List<Object>, T>();
- public MapState _delegate;
+ public MapState<T> _delegate;
- public CachedBatchReadsMap(MapState delegate) {
+ public CachedBatchReadsMap(MapState<T> delegate) {
_delegate = delegate;
}
@@ -23,15 +23,15 @@ public CachedBatchReadsMap(MapState delegate) {
for(int i=0; i<keys.size(); i++) {
List<Object> key = keys.get(i);
if(_cached.containsKey(key)) {
- ret.set(i, (T) _cached.get(key));
+ ret.set(i, _cached.get(key));
}
}
}
return ret;
}
@Override
- public List multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
+ public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
List<T> vals = _delegate.multiUpdate(keys, updaters);
cache(keys, vals);
return vals;
@@ -9,24 +9,24 @@
/**
* Useful to layer over a map that communicates with a database. you generally layer opaque map over this over your database store
* @author nathan
- * @param <T>
+ * @param <T>
*/
public class CachedMap<T> implements IBackingMap<T> {
- LRUMap _cache;
- IBackingMap _delegate;
-
- public CachedMap(IBackingMap delegate, int cacheSize) {
- _cache = new LRUMap(cacheSize);
+ LRUMap<List<Object>, T> _cache;
+ IBackingMap<T> _delegate;
+
+ public CachedMap(IBackingMap<T> delegate, int cacheSize) {
+ _cache = new LRUMap<List<Object>, T>(cacheSize);
_delegate = delegate;
- }
-
+ }
+
@Override
public List<T> multiGet(List<List<Object>> keys) {
- Map<List<Object>, T> results = new HashMap();
- List<List<Object>> toGet = new ArrayList();
+ Map<List<Object>, T> results = new HashMap<List<Object>, T>();
+ List<List<Object>> toGet = new ArrayList<List<Object>>();
for(List<Object> key: keys) {
if(_cache.containsKey(key)) {
- results.put(key, (T) _cache.get(key));
+ results.put(key, _cache.get(key));
} else {
toGet.add(key);
}
@@ -39,23 +39,23 @@ public CachedMap(IBackingMap delegate, int cacheSize) {
_cache.put(key, val);
results.put(key, val);
}
-
- List<T> ret = new ArrayList(keys.size());
+
+ List<T> ret = new ArrayList<T>(keys.size());
for(List<Object> key: keys) {
ret.add(results.get(key));
}
return ret;
}
@Override
- public void multiPut(List<List<Object>> keys, List<T> vals) {
- cache(keys, vals);
- _delegate.multiPut(keys, vals);
+ public void multiPut(List<List<Object>> keys, List<T> values) {
+ cache(keys, values);
+ _delegate.multiPut(keys, values);
}
-
- private void cache(List<List<Object>> keys, List<T> vals) {
+
+ private void cache(List<List<Object>> keys, List<T> values) {
for(int i=0; i<keys.size(); i++) {
- _cache.put(keys.get(i), vals.get(i));
+ _cache.put(keys.get(i), values.get(i));
}
}
@@ -1,14 +1,14 @@
package storm.trident.state.map;
+import storm.trident.state.ValueUpdater;
+
import java.util.ArrayList;
import java.util.List;
-import storm.trident.state.TransactionalValue;
-import storm.trident.state.ValueUpdater;
public class NonTransactionalMap<T> implements MapState<T> {
- public static MapState build(IBackingMap<TransactionalValue> backing) {
- return new NonTransactionalMap(backing);
+ public static <T> MapState<T> build(IBackingMap<T> backing) {
+ return new NonTransactionalMap<T>(backing);
}
IBackingMap<T> _backing;
@@ -25,11 +25,11 @@ protected NonTransactionalMap(IBackingMap<T> backing) {
@Override
public List<T> multiUpdate(List<List<Object>> keys, List<ValueUpdater> updaters) {
List<T> curr = _backing.multiGet(keys);
- List<T> ret = new ArrayList(curr.size());
+ List<T> ret = new ArrayList<T>(curr.size());
for(int i=0; i<curr.size(); i++) {
T currVal = curr.get(i);
- ValueUpdater updater = updaters.get(i);
- ret.add((T) updater.update(currVal));
+ ValueUpdater<T> updater = updaters.get(i);
+ ret.add(updater.update(currVal));
}
_backing.multiPut(keys, ret);
return ret;
Oops, something went wrong.

0 comments on commit c7b965a

Please sign in to comment.