Browse files

Merge branch 'master' into 0.9.0

  • Loading branch information...
2 parents ecafdcd + d53f3d5 commit 6c7c3cca93b21ddb8b678bd12d50bbda6952e143 @nathanmarz nathanmarz committed Oct 29, 2012
View
2 CHANGELOG.md
@@ -24,11 +24,13 @@
* Can now activate, deactive, rebalance, and kill topologies from the Storm UI (thanks Frostman)
* Can now use --config option to override which yaml file from ~/.storm to use for the config (thanks tjun)
* Redesigned the pluggable resource scheduler (INimbus, ISupervisor) interfaces to allow for much simpler integrations
+ * Added prepare method to IScheduler
* Added "throws Exception" to TestJob interface
* Added reportError to multilang protocol and updated Python and Ruby adapters to use it (thanks Lazyshot)
* Number tuples executed now tracked and shown in Storm UI
* Added ReportedFailedException which causes a batch to fail without killing worker and reports the error to the UI
* Execute latency now tracked and shown in Storm UI
+ * Adding testTuple methods for easily creating Tuple instances to Testing API (thanks xumingming)
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
View
31 src/clj/backtype/storm/daemon/nimbus.clj
@@ -24,6 +24,24 @@
))
))
+(defn mk-scheduler [conf inimbus]
+ (let [forced-scheduler (.getForcedScheduler inimbus)
+ scheduler (cond
+ forced-scheduler
+ (do (log-message "Using forced scheduler from INimbus " (class forced-scheduler))
+ forced-scheduler)
+
+ (conf STORM-SCHEDULER)
+ (do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
+ (-> (conf STORM-SCHEDULER) new-instance))
+
+ :else
+ (do (log-message "Using default scheduler")
+ (DefaultScheduler.)))]
+ (.prepare scheduler conf)
+ scheduler
+ ))
+
(defn nimbus-data [conf inimbus]
(let [forced-scheduler (.getForcedScheduler inimbus)]
{:conf conf
@@ -40,18 +58,7 @@
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
))
- :scheduler (cond
- forced-scheduler
- (do (log-message "Using forced scheduler from INimbus " (class forced-scheduler))
- forced-scheduler)
-
- (conf STORM-SCHEDULER)
- (do (log-message "Using custom scheduler: " (conf STORM-SCHEDULER))
- (-> (conf STORM-SCHEDULER) new-instance))
-
- :else
- (do (log-message "Using default scheduler")
- (DefaultScheduler.)))
+ :scheduler (mk-scheduler conf inimbus)
}))
(defn inbox [nimbus]
View
5 src/clj/backtype/storm/scheduler/DefaultScheduler.clj
@@ -31,7 +31,10 @@
(if-let [supervisor (.getSupervisorById cluster node)]
(.contains (.getAllPorts supervisor) (int port))
)))))
-
+
+(defn -prepare [this conf]
+ )
+
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
(let [needs-scheduling-topologies (.needsSchedulingTopologies cluster topologies)]
(doseq [^TopologyDetails topology needs-scheduling-topologies
View
3 src/clj/backtype/storm/scheduler/EvenScheduler.clj
@@ -59,5 +59,8 @@
(ExecutorDetails. start-task end-task))]]
(.assign cluster slot topology-id executors)))))
+(defn -prepare [this conf]
+ )
+
(defn -schedule [this ^Topologies topologies ^Cluster cluster]
(schedule-topologies-evenly topologies cluster))
View
20 src/clj/backtype/storm/testing4j.clj
@@ -4,7 +4,7 @@
(:import [backtype.storm.generated StormTopology])
(:import [backtype.storm.daemon nimbus])
(:import [backtype.storm.testing TestJob MockedSources TrackedTopology
- MkClusterParam CompleteTopologyParam])
+ MkClusterParam CompleteTopologyParam MkTupleParam])
(:import [backtype.storm.utils Utils])
(:use [backtype.storm testing util log])
(:gen-class
@@ -31,7 +31,10 @@
^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer Integer] void]
^:static [advanceClusterTime [backtype.storm.ILocalCluster Integer] void]
^:static [multiseteq [java.util.Collection java.util.Collection] boolean]
- ^:static [multiseteq [java.util.Map java.util.Map] boolean]]))
+ ^:static [multiseteq [java.util.Map java.util.Map] boolean]
+ ^:static [testTuple [java.util.List] backtype.storm.tuple.Tuple]
+ ^:static [testTuple [java.util.List backtype.storm.testing.MkTupleParam] backtype.storm.tuple.Tuple]]))
+
(defn -completeTopology
([^ILocalCluster cluster ^StormTopology topology ^CompleteTopologyParam completeTopologyParam]
@@ -60,7 +63,7 @@
:daemon-conf daemon-conf#]
(let [cluster# (LocalCluster. cluster#)]
(.run ~code cluster#)))))
-
+
(defn -withLocalCluster
([^MkClusterParam mkClusterParam ^TestJob code]
(with-cluster with-local-cluster mkClusterParam code))
@@ -122,3 +125,14 @@
(defn -multiseteq [^Map coll1 ^Map coll2]
(multiseteq coll1 coll2))
+
+(defn -testTuple
+ ([^List values]
+ (-testTuple values nil))
+ ([^List values ^MkTupleParam param]
+ (if (nil? param)
+ (test-tuple values)
+ (let [stream (or (.getStream param) Utils/DEFAULT_STREAM_ID)
+ component (or (.getComponent param) "component")
+ fields (.getFields param)]
+ (test-tuple values :stream stream :component component :fields fields)))))
View
7 src/jvm/backtype/storm/scheduler/IScheduler.java
@@ -1,7 +1,12 @@
package backtype.storm.scheduler;
+import java.util.Map;
+
public interface IScheduler {
+
+ void prepare(Map conf);
+
/**
* Set assignments for the topologies which needs scheduling. The new assignments is available
* through <code>cluster.getAssignments()</code>
@@ -14,5 +19,5 @@
* assignments for all the topologies etc. User can set the new assignment for topologies using
* <code>cluster.setAssignmentById</code>
*/
- public void schedule(Topologies topologies, Cluster cluster);
+ void schedule(Topologies topologies, Cluster cluster);
}
View
34 src/jvm/backtype/storm/testing/MkTupleParam.java
@@ -0,0 +1,34 @@
+package backtype.storm.testing;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MkTupleParam {
+ private String stream;
+ private String component;
+ private List<String> fields;
+
+ public String getStream() {
+ return stream;
+ }
+ public void setStream(String stream) {
+ this.stream = stream;
+ }
+
+ public String getComponent() {
+ return component;
+ }
+ public void setComponent(String component) {
+ this.component = component;
+ }
+
+ public List<String> getFields() {
+ return fields;
+ }
+ public void setFields(String... fields) {
+ this.fields = new ArrayList<String>();
+ for (int i = 0; i < fields.length; i++) {
+ this.fields.add(fields[i]);
+ }
+ }
+}
View
31 test/clj/backtype/storm/testing4j_test.clj
@@ -1,14 +1,14 @@
(ns backtype.storm.testing4j-test
(:use [clojure.test])
- (:use [backtype.storm config clojure testing])
+ (:use [backtype.storm config clojure testing util])
(:require [backtype.storm.integration-test :as it])
(:require [backtype.storm.thrift :as thrift])
(:import [backtype.storm Testing Config ILocalCluster])
- (:import [backtype.storm.tuple Values])
+ (:import [backtype.storm.tuple Values Tuple])
(:import [backtype.storm.utils Time Utils])
(:import [backtype.storm.testing MkClusterParam TestJob MockedSources TestWordSpout
TestWordCounter TestGlobalCount TestAggregatesCounter CompleteTopologyParam
- AckFailMapTracker]))
+ AckFailMapTracker MkTupleParam]))
(deftest test-with-simulated-time
(is (= false (Time/isSimulating)))
@@ -64,7 +64,7 @@
(.addMockData "1" (into-array Values [(Values. (into-array ["nathan"]))
(Values. (into-array ["bob"]))
(Values. (into-array ["joey"]))
- (Values. (into-array ["nathan"]))])
+ (Values. (into-array ["nathan"]))])
))
storm-conf (doto (Config.)
(.setNumWorkers 2))
@@ -127,7 +127,7 @@
{"1" (thrift/mk-spout-spec feeder)}
{"2" (thrift/mk-bolt-spec {"1" :global} it/ack-every-other)})
storm-conf (doto (Config.)
- (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
+ (.put TOPOLOGY-MESSAGE-TIMEOUT-SECS 10))]
(.submitTopology cluster
"timeout-tester"
storm-conf
@@ -141,3 +141,24 @@
(Testing/advanceClusterTime cluster (int 12))
(it/assert-failed tracker 2)
))))))
+
+(deftest test-test-tuple
+ (letlocals
+ ;; test the one-param signature
+ (bind ^Tuple tuple (Testing/testTuple ["james" "bond"]))
+ (is (= ["james" "bond"] (.getValues tuple)))
+ (is (= Utils/DEFAULT_STREAM_ID (.getSourceStreamId tuple)))
+ (is (= ["field1" "field2"] (-> tuple .getFields .toList)))
+ (is (= "component" (.getSourceComponent tuple)))
+
+ ;; test the two-params signature
+ (bind mk-tuple-param (MkTupleParam.))
+ (doto mk-tuple-param
+ (.setStream "test-stream")
+ (.setComponent "test-component")
+ (.setFields (into-array String ["fname" "lname"])))
+ (bind ^Tuple tuple (Testing/testTuple ["james" "bond"] mk-tuple-param))
+ (is (= ["james" "bond"] (.getValues tuple)))
+ (is (= "test-stream" (.getSourceStreamId tuple)))
+ (is (= ["fname" "lname"] (-> tuple .getFields .toList)))
+ (is (= "test-component" (.getSourceComponent tuple)))))

0 comments on commit 6c7c3cc

Please sign in to comment.