Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge branch 'master' into 0.9.0

  • Loading branch information...
commit 6b1796fd77299a688dd45a8dcbdabec579092241 2 parents 6c7c3cc + 97b5bfa
@nathanmarz authored
View
1  CHANGELOG.md
@@ -31,6 +31,7 @@
* Added ReportedFailedException which causes a batch to fail without killing worker and reports the error to the UI
* Execute latency now tracked and shown in Storm UI
* Adding testTuple methods for easily creating Tuple instances to Testing API (thanks xumingming)
+ * Trident now throws an error during construction of a topology when try to select fields that don't exist in a stream (thanks xumingming)
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
View
27 src/jvm/storm/trident/Stream.java
@@ -62,14 +62,17 @@ public Stream parallelismHint(int hint) {
}
public Stream project(Fields keepFields) {
+ projectionValidation(keepFields);
return _topology.addSourcedNode(this, new ProcessorNode(_topology.getUniqueStreamId(), _name, keepFields, new Fields(), new ProjectedProcessor(keepFields)));
}
public GroupedStream groupBy(Fields fields) {
+ projectionValidation(fields);
return new GroupedStream(this, fields);
}
public Stream partitionBy(Fields fields) {
+ projectionValidation(fields);
return partition(Grouping.fields(fields.toList()));
}
@@ -114,6 +117,7 @@ public Stream applyAssembly(Assembly assembly) {
@Override
public Stream each(Fields inputFields, Function function, Fields functionFields) {
+ projectionValidation(inputFields);
return _topology.addSourcedNode(this,
new ProcessorNode(_topology.getUniqueStreamId(),
_name,
@@ -125,6 +129,7 @@ public Stream each(Fields inputFields, Function function, Fields functionFields)
//creates brand new tuples with brand new fields
@Override
public Stream partitionAggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
+ projectionValidation(inputFields);
return _topology.addSourcedNode(this,
new ProcessorNode(_topology.getUniqueStreamId(),
_name,
@@ -134,6 +139,7 @@ public Stream partitionAggregate(Fields inputFields, Aggregator agg, Fields func
}
public Stream stateQuery(TridentState state, Fields inputFields, QueryFunction function, Fields functionFields) {
+ projectionValidation(inputFields);
String stateId = state._node.stateInfo.id;
Node n = new ProcessorNode(_topology.getUniqueStreamId(),
_name,
@@ -149,6 +155,7 @@ public TridentState partitionPersist(StateFactory stateFactory, Fields inputFiel
}
public TridentState partitionPersist(StateSpec stateSpec, Fields inputFields, StateUpdater updater, Fields functionFields) {
+ projectionValidation(inputFields);
String id = _topology.getUniqueStateId();
ProcessorNode n = new ProcessorNode(_topology.getUniqueStreamId(),
_name,
@@ -189,6 +196,7 @@ public Stream partitionAggregate(CombinerAggregator agg, Fields functionFields)
}
public Stream partitionAggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
+ projectionValidation(inputFields);
return chainedAgg()
.partitionAggregate(inputFields, agg, functionFields)
.chainEnd();
@@ -199,6 +207,7 @@ public Stream partitionAggregate(ReducerAggregator agg, Fields functionFields) {
}
public Stream partitionAggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
+ projectionValidation(inputFields);
return chainedAgg()
.partitionAggregate(inputFields, agg, functionFields)
.chainEnd();
@@ -209,6 +218,7 @@ public Stream aggregate(Aggregator agg, Fields functionFields) {
}
public Stream aggregate(Fields inputFields, Aggregator agg, Fields functionFields) {
+ projectionValidation(inputFields);
return chainedAgg()
.aggregate(inputFields, agg, functionFields)
.chainEnd();
@@ -219,6 +229,7 @@ public Stream aggregate(CombinerAggregator agg, Fields functionFields) {
}
public Stream aggregate(Fields inputFields, CombinerAggregator agg, Fields functionFields) {
+ projectionValidation(inputFields);
return chainedAgg()
.aggregate(inputFields, agg, functionFields)
.chainEnd();
@@ -229,6 +240,7 @@ public Stream aggregate(ReducerAggregator agg, Fields functionFields) {
}
public Stream aggregate(Fields inputFields, ReducerAggregator agg, Fields functionFields) {
+ projectionValidation(inputFields);
return chainedAgg()
.aggregate(inputFields, agg, functionFields)
.chainEnd();
@@ -263,6 +275,7 @@ public TridentState persistentAggregate(StateFactory stateFactory, Fields inputF
}
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, CombinerAggregator agg, Fields functionFields) {
+ projectionValidation(inputFields);
// replaces normal aggregation here with a global grouping because it needs to be consistent across batches
return new ChainedAggregatorDeclarer(this, new GlobalAggScheme())
.aggregate(inputFields, agg, functionFields)
@@ -283,6 +296,7 @@ public TridentState persistentAggregate(StateFactory stateFactory, Fields inputF
}
public TridentState persistentAggregate(StateSpec spec, Fields inputFields, ReducerAggregator agg, Fields functionFields) {
+ projectionValidation(inputFields);
return global().partitionPersist(spec, inputFields, new ReducerAggStateUpdater(agg), functionFields);
}
@@ -327,4 +341,17 @@ public BatchToPartition singleEmitPartitioner() {
}
}
+
+ private void projectionValidation(Fields projFields) {
+ if (projFields == null) {
+ return;
+ }
+
+ Fields allFields = this.getOutputFields();
+ for (String field : projFields) {
+ if (!allFields.contains(field)) {
+ throw new IllegalArgumentException("Trying to select non-existent field: '" + field + "' from stream containing fields fields: <" + allFields + ">");
+ }
+ }
+ }
}
View
74 test/clj/storm/trident/integration_test.clj
@@ -1,7 +1,10 @@
(ns storm.trident.integration-test
(:use [clojure test])
(:require [backtype.storm [testing :as t]])
- (:import [storm.trident.testing Split CountAsAggregator StringLength TrueFilter])
+ (:import [storm.trident.testing Split CountAsAggregator StringLength TrueFilter
+ MemoryMapState$Factory])
+ (:import [storm.trident.state StateSpec])
+ (:import [storm.trident.operation.impl CombinerAggStateUpdater])
(:use [storm.trident testing])
(:use [backtype.storm util]))
@@ -182,6 +185,75 @@
(is (t/ms= [[1]] (exec-drpc drpc "tester" "aaa")))
)))))
+(deftest test-stream-projection-validation
+ (t/with-local-cluster [cluster]
+ (letlocals
+ (bind feeder (feeder-committer-spout ["sentence"]))
+ (bind topo (TridentTopology.))
+ ;; valid projection fields will not throw exceptions
+ (bind word-counts
+ (-> topo
+ (.newStream "tester" feeder)
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.persistentAggregate (memory-map-state) (Count.) (fields "count"))
+ (.parallelismHint 6)
+ ))
+ (bind stream (-> topo
+ (.newStream "tester" feeder)))
+ ;; test .each
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence1") (Split.) (fields "word")))))
+ ;; test .groupBy
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word1")))))
+ ;; test .aggregate
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.aggregate (fields "word1") (Count.) (fields "count")))))
+ ;; test .project
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.project (fields "sentence1")))))
+ ;; test .partitionBy
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.partitionBy (fields "sentence1")))))
+ ;; test .partitionAggregate
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.partitionAggregate (fields "word1") (Count.) (fields "count")))))
+ ;; test .persistentAggregate
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.persistentAggregate (StateSpec. (MemoryMapState$Factory.)) (fields "non-existent") (Count.) (fields "count")))))
+ ;; test .partitionPersist
+ (is (thrown? IllegalArgumentException
+ (-> stream
+ (.each (fields "sentence") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.partitionPersist (StateSpec. (MemoryMapState$Factory.))
+ (fields "non-existent")
+ (CombinerAggStateUpdater. (Count.))
+ (fields "count")))))
+ ;; test .stateQuery
+ (with-drpc [drpc]
+ (is (thrown? IllegalArgumentException
+ (-> topo
+ (.newDRPCStream "words" drpc)
+ (.each (fields "args") (Split.) (fields "word"))
+ (.groupBy (fields "word"))
+ (.stateQuery word-counts (fields "word1") (MapGet.) (fields "count"))))))
+ )))
+
;; (deftest test-split-merge
;; (t/with-local-cluster [cluster]
;; (with-drpc [drpc]
Please sign in to comment.
Something went wrong with that request. Please try again.