Skip to content
Browse files

uncomments tests

  • Loading branch information...
1 parent 0730a6e commit f7e789d01cdbe5d2eaa92986598773ff008e7f17 @nathanmarz nathanmarz committed
Showing with 513 additions and 513 deletions.
  1. +513 −513 test/clj/backtype/storm/integration_test.clj
View
1,026 test/clj/backtype/storm/integration_test.clj
@@ -77,520 +77,520 @@
)))
-;; (deftest test-basic-topology
-;; (doseq [zmq-on? [true false]]
-;; (with-simulated-time-local-cluster [cluster :supervisors 4
-;; :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
-;; (let [topology (thrift/mk-topology
-;; {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
-;; {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
-;; "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
-;; "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
-;; })
-;; results (complete-topology cluster
-;; topology
-;; :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
-;; :storm-conf {TOPOLOGY-DEBUG true
-;; TOPOLOGY-WORKERS 2})]
-;; (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
-;; (read-tuples results "1")))
-;; (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
-;; (read-tuples results "2")))
-;; (is (= [[1] [2] [3] [4]]
-;; (read-tuples results "3")))
-;; (is (= [[1] [2] [3] [4]]
-;; (read-tuples results "4")))
-;; ))))
-;;
-;; (defbolt identity-bolt ["num"]
-;; [tuple collector]
-;; (emit-bolt! collector (.getValues tuple) :anchor tuple)
-;; (ack! collector tuple))
-;;
-;; (deftest test-system-stream
-;; ;; this test works because mocking a spout splits up the tuples evenly among the tasks
+(deftest test-basic-topology
+ (doseq [zmq-on? [true false]]
+ (with-simulated-time-local-cluster [cluster :supervisors 4
+ :daemon-conf {STORM-LOCAL-MODE-ZMQ zmq-on?}]
+ (let [topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 3)}
+ {"2" (thrift/mk-bolt-spec {"1" ["word"]} (TestWordCounter.) :parallelism-hint 4)
+ "3" (thrift/mk-bolt-spec {"1" :global} (TestGlobalCount.))
+ "4" (thrift/mk-bolt-spec {"2" :global} (TestAggregatesCounter.))
+ })
+ results (complete-topology cluster
+ topology
+ :mock-sources {"1" [["nathan"] ["bob"] ["joey"] ["nathan"]]}
+ :storm-conf {TOPOLOGY-DEBUG true
+ TOPOLOGY-WORKERS 2})]
+ (is (ms= [["nathan"] ["bob"] ["joey"] ["nathan"]]
+ (read-tuples results "1")))
+ (is (ms= [["nathan" 1] ["nathan" 2] ["bob" 1] ["joey" 1]]
+ (read-tuples results "2")))
+ (is (= [[1] [2] [3] [4]]
+ (read-tuples results "3")))
+ (is (= [[1] [2] [3] [4]]
+ (read-tuples results "4")))
+ ))))
+
+(defbolt identity-bolt ["num"]
+ [tuple collector]
+ (emit-bolt! collector (.getValues tuple) :anchor tuple)
+ (ack! collector tuple))
+
+(deftest test-system-stream
+ ;; this test works because mocking a spout splits up the tuples evenly among the tasks
+ (with-simulated-time-local-cluster [cluster]
+ (let [topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
+ {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
+ })
+ results (complete-topology cluster
+ topology
+ :mock-sources {"1" [["a"] ["b"] ["c"]]}
+ :storm-conf {TOPOLOGY-WORKERS 2})]
+ (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
+ (read-tuples results "2")))
+ )))
+
+(deftest test-shuffle
+ (with-simulated-time-local-cluster [cluster :supervisors 4]
+ (let [topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
+ {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
+ :parallelism-hint 6)
+ })
+ results (complete-topology cluster
+ topology
+ ;; important for test that
+ ;; #tuples = multiple of 4 and 6
+ :mock-sources {"1" [["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ["a"] ["b"]
+ ]}
+ )]
+ (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
+ (read-tuples results "2")))
+ )))
+
+(defbolt lalala-bolt1 ["word"] [[val :as tuple] collector]
+ (let [ret (str val "lalala")]
+ (emit-bolt! collector [ret] :anchor tuple)
+ (ack! collector tuple)
+ ))
+
+(defbolt lalala-bolt2 ["word"] {:prepare true}
+ [conf context collector]
+ (let [state (atom nil)]
+ (reset! state "lalala")
+ (bolt
+ (execute [tuple]
+ (let [ret (-> (.getValue tuple 0) (str @state))]
+ (emit-bolt! collector [ret] :anchor tuple)
+ (ack! collector tuple)
+ ))
+ )))
+
+(defbolt lalala-bolt3 ["word"] {:prepare true :params [prefix]}
+ [conf context collector]
+ (let [state (atom nil)]
+ (bolt
+ (prepare [_ _ _]
+ (reset! state (str prefix "lalala")))
+ (execute [{val "word" :as tuple}]
+ (let [ret (-> (.getValue tuple 0) (str @state))]
+ (emit-bolt! collector [ret] :anchor tuple)
+ (ack! collector tuple)
+ )))
+ ))
+
+(deftest test-clojure-bolt
+ (with-simulated-time-local-cluster [cluster :supervisors 4]
+ (let [nimbus (:nimbus cluster)
+ topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec (TestWordSpout. false))}
+ {"2" (thrift/mk-bolt-spec {"1" :shuffle}
+ lalala-bolt1)
+ "3" (thrift/mk-bolt-spec {"1" :shuffle}
+ lalala-bolt2)
+ "4" (thrift/mk-bolt-spec {"1" :shuffle}
+ (lalala-bolt3 "_nathan_"))}
+ )
+ results (complete-topology cluster
+ topology
+ :mock-sources {"1" [["david"]
+ ["adam"]
+ ]}
+ )]
+ (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "2")))
+ (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "3")))
+ (is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results "4")))
+ )))
+
+(defbolt punctuator-bolt ["word" "period" "question" "exclamation"]
+ [tuple collector]
+ (if (= (:word tuple) "bar")
+ (do
+ (emit-bolt! collector {:word "bar" :period "bar" :question "bar"
+ "exclamation" "bar"})
+ (ack! collector tuple))
+ (let [ res (assoc tuple :period (str (:word tuple) "."))
+ res (assoc res :exclamation (str (:word tuple) "!"))
+ res (assoc res :question (str (:word tuple) "?")) ]
+ (emit-bolt! collector res)
+ (ack! collector tuple))))
+
+(deftest test-map-emit
+ (with-simulated-time-local-cluster [cluster :supervisors 4]
+ (let [topology (thrift/mk-topology
+ {"words" (thrift/mk-spout-spec (TestWordSpout. false))}
+ {"out" (thrift/mk-bolt-spec {"words" :shuffle}
+ punctuator-bolt)}
+ )
+ results (complete-topology cluster
+ topology
+ :mock-sources {"words" [["foo"] ["bar"]]}
+ )]
+ (is (ms= [["foo" "foo." "foo?" "foo!"]
+ ["bar" "bar" "bar" "bar"] ] (read-tuples results "out"))))))
+
+
+(defn ack-tracking-feeder [fields]
+ (let [tracker (AckTracker.)]
+ [(doto (feeder-spout fields)
+ (.setAckFailDelegate tracker))
+ (fn [val]
+ (is (= (.getNumAcks tracker) val))
+ (.resetNumAcks tracker)
+ )]
+ ))
+
+(defbolt branching-bolt ["num"]
+ {:params [amt]}
+ [tuple collector]
+ (doseq [i (range amt)]
+ (emit-bolt! collector [i] :anchor tuple))
+ (ack! collector tuple))
+
+(defbolt agg-bolt ["num"] {:prepare true :params [amt]}
+ [conf context collector]
+ (let [seen (atom [])]
+ (bolt
+ (execute [tuple]
+ (swap! seen conj tuple)
+ (when (= (count @seen) amt)
+ (emit-bolt! collector [1] :anchor @seen)
+ (doseq [s @seen]
+ (ack! collector s))
+ (reset! seen [])
+ )))
+ ))
+
+(defbolt ack-bolt {}
+ [tuple collector]
+ (ack! collector tuple))
+
+(deftest test-acking
+ (with-tracked-cluster [cluster]
+ (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
+ [feeder2 checker2] (ack-tracking-feeder ["num"])
+ [feeder3 checker3] (ack-tracking-feeder ["num"])
+ tracked (mk-tracked-topology
+ cluster
+ (topology
+ {"1" (spout-spec feeder1)
+ "2" (spout-spec feeder2)
+ "3" (spout-spec feeder3)}
+ {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
+ "5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
+ "6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
+ "7" (bolt-spec
+ {"4" :shuffle
+ "5" :shuffle
+ "6" :shuffle}
+ (agg-bolt 3))
+ "8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
+ "9" (bolt-spec {"8" :shuffle} ack-bolt)}
+ ))]
+ (submit-local-topology (:nimbus cluster)
+ "acking-test1"
+ {TOPOLOGY-DEBUG true}
+ (:topology tracked))
+ (.feed feeder1 [1])
+ (tracked-wait tracked 1)
+ (checker1 0)
+ (.feed feeder2 [1])
+ (tracked-wait tracked 1)
+ (checker1 1)
+ (checker2 1)
+ (.feed feeder1 [1])
+ (tracked-wait tracked 1)
+ (checker1 0)
+ (.feed feeder1 [1])
+ (tracked-wait tracked 1)
+ (checker1 1)
+ (.feed feeder3 [1])
+ (tracked-wait tracked 1)
+ (checker1 0)
+ (checker3 0)
+ (.feed feeder2 [1])
+ (tracked-wait tracked 1)
+ (checker1 1)
+ (checker2 1)
+ (checker3 1)
+
+ )))
+
+(deftest test-ack-branching
+ (with-tracked-cluster [cluster]
+ (let [[feeder checker] (ack-tracking-feeder ["num"])
+ tracked (mk-tracked-topology
+ cluster
+ (topology
+ {"1" (spout-spec feeder)}
+ {"2" (bolt-spec {"1" :shuffle} identity-bolt)
+ "3" (bolt-spec {"1" :shuffle} identity-bolt)
+ "4" (bolt-spec
+ {"2" :shuffle
+ "3" :shuffle}
+ (agg-bolt 4))}))]
+ (submit-local-topology (:nimbus cluster)
+ "test-acking2"
+ {}
+ (:topology tracked))
+ (.feed feeder [1])
+ (tracked-wait tracked 1)
+ (checker 0)
+ (.feed feeder [1])
+ (tracked-wait tracked 1)
+ (checker 2)
+ )))
+
+(defbolt dup-anchor ["num"]
+ [tuple collector]
+ (emit-bolt! collector [1] :anchor [tuple tuple])
+ (ack! collector tuple))
+
+(deftest test-acking-self-anchor
+ (with-tracked-cluster [cluster]
+ (let [[feeder checker] (ack-tracking-feeder ["num"])
+ tracked (mk-tracked-topology
+ cluster
+ (topology
+ {"1" (spout-spec feeder)}
+ {"2" (bolt-spec {"1" :shuffle} dup-anchor)
+ "3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
+ (submit-local-topology (:nimbus cluster)
+ "test"
+ {}
+ (:topology tracked))
+ (.feed feeder [1])
+ (tracked-wait tracked 1)
+ (checker 1)
+ (.feed feeder [1])
+ (.feed feeder [1])
+ (.feed feeder [1])
+ (tracked-wait tracked 3)
+ (checker 3)
+ )))
+
+;; (defspout ConstantSpout ["val"] {:prepare false}
+;; [collector]
+;; (Time/sleep 100)
+;; (emit-spout! collector [1]))
+
+;; (def errored (atom false))
+;; (def restarted (atom false))
+
+;; (defbolt local-error-checker {} [tuple collector]
+;; (when-not @errored
+;; (reset! errored true)
+;; (println "erroring")
+;; (throw (RuntimeException.)))
+;; (when-not @restarted (println "restarted"))
+;; (reset! restarted true))
+
+;; (deftest test-no-halt-local-mode
;; (with-simulated-time-local-cluster [cluster]
-;; (let [topology (thrift/mk-topology
-;; {"1" (thrift/mk-spout-spec (TestWordSpout. true) :p 3)}
-;; {"2" (thrift/mk-bolt-spec {"1" ["word"] ["1" "__system"] :global} identity-bolt :p 1)
-;; })
-;; results (complete-topology cluster
-;; topology
-;; :mock-sources {"1" [["a"] ["b"] ["c"]]}
-;; :storm-conf {TOPOLOGY-WORKERS 2})]
-;; (is (ms= [["a"] ["b"] ["c"] ["startup"] ["startup"] ["startup"]]
-;; (read-tuples results "2")))
+;; (let [topology (topology
+;; {1 (spout-spec ConstantSpout)}
+;; {2 (bolt-spec {1 :shuffle} local-error-checker)
+;; })]
+;; (submit-local-topology (:nimbus cluster)
+;; "test"
+;; {}
+;; topology)
+;; (while (not @restarted)
+;; (advance-time-ms! 100))
;; )))
-;;
-;; (deftest test-shuffle
-;; (with-simulated-time-local-cluster [cluster :supervisors 4]
-;; (let [topology (thrift/mk-topology
-;; {"1" (thrift/mk-spout-spec (TestWordSpout. true) :parallelism-hint 4)}
-;; {"2" (thrift/mk-bolt-spec {"1" :shuffle} (TestGlobalCount.)
-;; :parallelism-hint 6)
-;; })
-;; results (complete-topology cluster
-;; topology
-;; ;; important for test that
-;; ;; #tuples = multiple of 4 and 6
-;; :mock-sources {"1" [["a"] ["b"]
-;; ["a"] ["b"]
-;; ["a"] ["b"]
-;; ["a"] ["b"]
-;; ["a"] ["b"]
-;; ["a"] ["b"]
-;; ["a"] ["b"]
-;; ["a"] ["b"]
-;; ["a"] ["b"]
-;; ["a"] ["b"]
-;; ["a"] ["b"]
-;; ["a"] ["b"]
-;; ]}
-;; )]
-;; (is (ms= (apply concat (repeat 6 [[1] [2] [3] [4]]))
-;; (read-tuples results "2")))
-;; )))
-;;
-;; (defbolt lalala-bolt1 ["word"] [[val :as tuple] collector]
-;; (let [ret (str val "lalala")]
-;; (emit-bolt! collector [ret] :anchor tuple)
-;; (ack! collector tuple)
-;; ))
-;;
-;; (defbolt lalala-bolt2 ["word"] {:prepare true}
-;; [conf context collector]
-;; (let [state (atom nil)]
-;; (reset! state "lalala")
-;; (bolt
-;; (execute [tuple]
-;; (let [ret (-> (.getValue tuple 0) (str @state))]
-;; (emit-bolt! collector [ret] :anchor tuple)
-;; (ack! collector tuple)
-;; ))
-;; )))
-;;
-;; (defbolt lalala-bolt3 ["word"] {:prepare true :params [prefix]}
-;; [conf context collector]
-;; (let [state (atom nil)]
-;; (bolt
-;; (prepare [_ _ _]
-;; (reset! state (str prefix "lalala")))
-;; (execute [{val "word" :as tuple}]
-;; (let [ret (-> (.getValue tuple 0) (str @state))]
-;; (emit-bolt! collector [ret] :anchor tuple)
-;; (ack! collector tuple)
-;; )))
-;; ))
-;;
-;; (deftest test-clojure-bolt
-;; (with-simulated-time-local-cluster [cluster :supervisors 4]
+
+(defspout IncSpout ["word"]
+ [conf context collector]
+ (let [state (atom 0)]
+ (spout
+ (nextTuple []
+ (Thread/sleep 100)
+ (emit-spout! collector [@state] :id 1)
+ )
+ (ack [id]
+ (swap! state inc))
+ )))
+
+
+(defspout IncSpout2 ["word"] {:params [prefix]}
+ [conf context collector]
+ (let [state (atom 0)]
+ (spout
+ (nextTuple []
+ (Thread/sleep 100)
+ (swap! state inc)
+ (emit-spout! collector [(str prefix "-" @state)])
+ )
+ )))
+
+;; (deftest test-clojure-spout
+;; (with-local-cluster [cluster]
;; (let [nimbus (:nimbus cluster)
-;; topology (thrift/mk-topology
-;; {"1" (thrift/mk-spout-spec (TestWordSpout. false))}
-;; {"2" (thrift/mk-bolt-spec {"1" :shuffle}
-;; lalala-bolt1)
-;; "3" (thrift/mk-bolt-spec {"1" :shuffle}
-;; lalala-bolt2)
-;; "4" (thrift/mk-bolt-spec {"1" :shuffle}
-;; (lalala-bolt3 "_nathan_"))}
-;; )
-;; results (complete-topology cluster
-;; topology
-;; :mock-sources {"1" [["david"]
-;; ["adam"]
-;; ]}
-;; )]
-;; (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "2")))
-;; (is (ms= [["davidlalala"] ["adamlalala"]] (read-tuples results "3")))
-;; (is (ms= [["david_nathan_lalala"] ["adam_nathan_lalala"]] (read-tuples results "4")))
-;; )))
-;;
-;; (defbolt punctuator-bolt ["word" "period" "question" "exclamation"]
-;; [tuple collector]
-;; (if (= (:word tuple) "bar")
-;; (do
-;; (emit-bolt! collector {:word "bar" :period "bar" :question "bar"
-;; "exclamation" "bar"})
-;; (ack! collector tuple))
-;; (let [ res (assoc tuple :period (str (:word tuple) "."))
-;; res (assoc res :exclamation (str (:word tuple) "!"))
-;; res (assoc res :question (str (:word tuple) "?")) ]
-;; (emit-bolt! collector res)
-;; (ack! collector tuple))))
-;;
-;; (deftest test-map-emit
-;; (with-simulated-time-local-cluster [cluster :supervisors 4]
-;; (let [topology (thrift/mk-topology
-;; {"words" (thrift/mk-spout-spec (TestWordSpout. false))}
-;; {"out" (thrift/mk-bolt-spec {"words" :shuffle}
-;; punctuator-bolt)}
-;; )
-;; results (complete-topology cluster
-;; topology
-;; :mock-sources {"words" [["foo"] ["bar"]]}
-;; )]
-;; (is (ms= [["foo" "foo." "foo?" "foo!"]
-;; ["bar" "bar" "bar" "bar"] ] (read-tuples results "out"))))))
-;;
-;;
-;; (defn ack-tracking-feeder [fields]
-;; (let [tracker (AckTracker.)]
-;; [(doto (feeder-spout fields)
-;; (.setAckFailDelegate tracker))
-;; (fn [val]
-;; (is (= (.getNumAcks tracker) val))
-;; (.resetNumAcks tracker)
-;; )]
-;; ))
-;;
-;; (defbolt branching-bolt ["num"]
-;; {:params [amt]}
-;; [tuple collector]
-;; (doseq [i (range amt)]
-;; (emit-bolt! collector [i] :anchor tuple))
-;; (ack! collector tuple))
-;;
-;; (defbolt agg-bolt ["num"] {:prepare true :params [amt]}
-;; [conf context collector]
-;; (let [seen (atom [])]
-;; (bolt
-;; (execute [tuple]
-;; (swap! seen conj tuple)
-;; (when (= (count @seen) amt)
-;; (emit-bolt! collector [1] :anchor @seen)
-;; (doseq [s @seen]
-;; (ack! collector s))
-;; (reset! seen [])
-;; )))
-;; ))
-;;
-;; (defbolt ack-bolt {}
-;; [tuple collector]
-;; (ack! collector tuple))
-;;
-;; (deftest test-acking
-;; (with-tracked-cluster [cluster]
-;; (let [[feeder1 checker1] (ack-tracking-feeder ["num"])
-;; [feeder2 checker2] (ack-tracking-feeder ["num"])
-;; [feeder3 checker3] (ack-tracking-feeder ["num"])
-;; tracked (mk-tracked-topology
-;; cluster
-;; (topology
-;; {"1" (spout-spec feeder1)
-;; "2" (spout-spec feeder2)
-;; "3" (spout-spec feeder3)}
-;; {"4" (bolt-spec {"1" :shuffle} (branching-bolt 2))
-;; "5" (bolt-spec {"2" :shuffle} (branching-bolt 4))
-;; "6" (bolt-spec {"3" :shuffle} (branching-bolt 1))
-;; "7" (bolt-spec
-;; {"4" :shuffle
-;; "5" :shuffle
-;; "6" :shuffle}
-;; (agg-bolt 3))
-;; "8" (bolt-spec {"7" :shuffle} (branching-bolt 2))
-;; "9" (bolt-spec {"8" :shuffle} ack-bolt)}
-;; ))]
-;; (submit-local-topology (:nimbus cluster)
-;; "acking-test1"
-;; {TOPOLOGY-DEBUG true}
-;; (:topology tracked))
-;; (.feed feeder1 [1])
-;; (tracked-wait tracked 1)
-;; (checker1 0)
-;; (.feed feeder2 [1])
-;; (tracked-wait tracked 1)
-;; (checker1 1)
-;; (checker2 1)
-;; (.feed feeder1 [1])
-;; (tracked-wait tracked 1)
-;; (checker1 0)
-;; (.feed feeder1 [1])
-;; (tracked-wait tracked 1)
-;; (checker1 1)
-;; (.feed feeder3 [1])
-;; (tracked-wait tracked 1)
-;; (checker1 0)
-;; (checker3 0)
-;; (.feed feeder2 [1])
-;; (tracked-wait tracked 1)
-;; (checker1 1)
-;; (checker2 1)
-;; (checker3 1)
-;;
-;; )))
-;;
-;; (deftest test-ack-branching
-;; (with-tracked-cluster [cluster]
-;; (let [[feeder checker] (ack-tracking-feeder ["num"])
-;; tracked (mk-tracked-topology
-;; cluster
-;; (topology
-;; {"1" (spout-spec feeder)}
-;; {"2" (bolt-spec {"1" :shuffle} identity-bolt)
-;; "3" (bolt-spec {"1" :shuffle} identity-bolt)
-;; "4" (bolt-spec
-;; {"2" :shuffle
-;; "3" :shuffle}
-;; (agg-bolt 4))}))]
-;; (submit-local-topology (:nimbus cluster)
-;; "test-acking2"
-;; {}
-;; (:topology tracked))
-;; (.feed feeder [1])
-;; (tracked-wait tracked 1)
-;; (checker 0)
-;; (.feed feeder [1])
-;; (tracked-wait tracked 1)
-;; (checker 2)
-;; )))
-;;
-;; (defbolt dup-anchor ["num"]
-;; [tuple collector]
-;; (emit-bolt! collector [1] :anchor [tuple tuple])
-;; (ack! collector tuple))
-;;
-;; (deftest test-acking-self-anchor
-;; (with-tracked-cluster [cluster]
-;; (let [[feeder checker] (ack-tracking-feeder ["num"])
-;; tracked (mk-tracked-topology
-;; cluster
-;; (topology
-;; {"1" (spout-spec feeder)}
-;; {"2" (bolt-spec {"1" :shuffle} dup-anchor)
-;; "3" (bolt-spec {"2" :shuffle} ack-bolt)}))]
-;; (submit-local-topology (:nimbus cluster)
-;; "test"
-;; {}
-;; (:topology tracked))
-;; (.feed feeder [1])
-;; (tracked-wait tracked 1)
-;; (checker 1)
-;; (.feed feeder [1])
-;; (.feed feeder [1])
-;; (.feed feeder [1])
-;; (tracked-wait tracked 3)
-;; (checker 3)
+;; top (topology
+;; {1 (spout-spec IncSpout)}
+;; {}
+;; )]
+;; (submit-local-topology nimbus
+;; "spout-test"
+;; {TOPOLOGY-DEBUG true
+;; TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
+;; top)
+;; (Thread/sleep 10000)
+;; (.killTopology nimbus "spout-test")
+;; (Thread/sleep 10000)
;; )))
-;;
-;; ;; (defspout ConstantSpout ["val"] {:prepare false}
-;; ;; [collector]
-;; ;; (Time/sleep 100)
-;; ;; (emit-spout! collector [1]))
-;;
-;; ;; (def errored (atom false))
-;; ;; (def restarted (atom false))
-;;
-;; ;; (defbolt local-error-checker {} [tuple collector]
-;; ;; (when-not @errored
-;; ;; (reset! errored true)
-;; ;; (println "erroring")
-;; ;; (throw (RuntimeException.)))
-;; ;; (when-not @restarted (println "restarted"))
-;; ;; (reset! restarted true))
-;;
-;; ;; (deftest test-no-halt-local-mode
-;; ;; (with-simulated-time-local-cluster [cluster]
-;; ;; (let [topology (topology
-;; ;; {1 (spout-spec ConstantSpout)}
-;; ;; {2 (bolt-spec {1 :shuffle} local-error-checker)
-;; ;; })]
-;; ;; (submit-local-topology (:nimbus cluster)
-;; ;; "test"
-;; ;; {}
-;; ;; topology)
-;; ;; (while (not @restarted)
-;; ;; (advance-time-ms! 100))
-;; ;; )))
-;;
-;; (defspout IncSpout ["word"]
-;; [conf context collector]
-;; (let [state (atom 0)]
-;; (spout
-;; (nextTuple []
-;; (Thread/sleep 100)
-;; (emit-spout! collector [@state] :id 1)
-;; )
-;; (ack [id]
-;; (swap! state inc))
-;; )))
-;;
-;;
-;; (defspout IncSpout2 ["word"] {:params [prefix]}
-;; [conf context collector]
-;; (let [state (atom 0)]
-;; (spout
-;; (nextTuple []
-;; (Thread/sleep 100)
-;; (swap! state inc)
-;; (emit-spout! collector [(str prefix "-" @state)])
-;; )
-;; )))
-;;
-;; ;; (deftest test-clojure-spout
-;; ;; (with-local-cluster [cluster]
-;; ;; (let [nimbus (:nimbus cluster)
-;; ;; top (topology
-;; ;; {1 (spout-spec IncSpout)}
-;; ;; {}
-;; ;; )]
-;; ;; (submit-local-topology nimbus
-;; ;; "spout-test"
-;; ;; {TOPOLOGY-DEBUG true
-;; ;; TOPOLOGY-MESSAGE-TIMEOUT-SECS 3}
-;; ;; top)
-;; ;; (Thread/sleep 10000)
-;; ;; (.killTopology nimbus "spout-test")
-;; ;; (Thread/sleep 10000)
-;; ;; )))
-;;
-;;
-;; (deftest test-component-specific-config
-;; (with-simulated-time-local-cluster [cluster
-;; :daemon-conf {TOPOLOGY-OPTIMIZE false
-;; TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
-;; (letlocals
-;; (bind builder (TopologyBuilder.))
-;; (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
-;; (-> builder
-;; (.setBolt "2"
-;; (TestConfBolt.
-;; {"fake.config" 123
-;; TOPOLOGY-MAX-TASK-PARALLELISM 20
-;; TOPOLOGY-MAX-SPOUT-PENDING 30
-;; TOPOLOGY-OPTIMIZE true
-;; TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"}
-;; {"fake.type2" "a.serializer"}]
-;; }))
-;; (.shuffleGrouping "1")
-;; (.setMaxTaskParallelism 2)
-;; (.addConfiguration "fake.config2" 987)
-;; )
-;;
-;;
-;; (bind results
-;; (complete-topology cluster
-;; (.createTopology builder)
-;; :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer" "fake.type3" "a.serializer3"}]}
-;; :mock-sources {"1" [["fake.config"]
-;; [TOPOLOGY-MAX-TASK-PARALLELISM]
-;; [TOPOLOGY-MAX-SPOUT-PENDING]
-;; [TOPOLOGY-OPTIMIZE]
-;; ["fake.config2"]
-;; [TOPOLOGY-KRYO-REGISTER]
-;; ]}))
-;; (is (= {"fake.config" 123
-;; "fake.config2" 987
-;; TOPOLOGY-MAX-TASK-PARALLELISM 2
-;; TOPOLOGY-MAX-SPOUT-PENDING 30
-;; TOPOLOGY-OPTIMIZE false
-;; TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
-;; "fake.type2" "a.serializer"
-;; "fake.type3" "a.serializer3"}}
-;; (->> (read-tuples results "2")
-;; (apply concat)
-;; (apply hash-map))
-;; ))
-;; )))
-;;
-;; (defbolt conf-query-bolt ["conf" "val"] {:prepare true :params [conf] :conf conf}
-;; [conf context collector]
-;; (bolt
-;; (execute [tuple]
-;; (let [name (.getValue tuple 0)
-;; val (if (= name "!MAX_MSG_TIMEOUT") (.maxTopologyMessageTimeout context) (get conf name))]
-;; (emit-bolt! collector [name val] :anchor tuple)
-;; (ack! collector tuple))
-;; )))
-;;
-;; (deftest test-component-specific-config-clojure
-;; (with-simulated-time-local-cluster [cluster]
-;; (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])) :conf {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})
-;; }
-;; {"2" (bolt-spec {"1" :shuffle}
-;; (conf-query-bolt {"fake.config" 1
-;; TOPOLOGY-MAX-TASK-PARALLELISM 2
-;; TOPOLOGY-MAX-SPOUT-PENDING 10})
-;; :conf {TOPOLOGY-MAX-SPOUT-PENDING 3})
-;; })
-;; results (complete-topology cluster
-;; topology
-;; :topology-name "test123"
-;; :storm-conf {TOPOLOGY-MAX-TASK-PARALLELISM 10
-;; TOPOLOGY-MESSAGE-TIMEOUT-SECS 30}
-;; :mock-sources {"1" [["fake.config"]
-;; [TOPOLOGY-MAX-TASK-PARALLELISM]
-;; [TOPOLOGY-MAX-SPOUT-PENDING]
-;; ["!MAX_MSG_TIMEOUT"]
-;; [TOPOLOGY-NAME]
-;; ]})]
-;; (is (= {"fake.config" 1
-;; TOPOLOGY-MAX-TASK-PARALLELISM 2
-;; TOPOLOGY-MAX-SPOUT-PENDING 3
-;; "!MAX_MSG_TIMEOUT" 40
-;; TOPOLOGY-NAME "test123"}
-;; (->> (read-tuples results "2")
-;; (apply concat)
-;; (apply hash-map))
-;; )))))
-;;
-;; (defbolt hooks-bolt ["emit" "ack" "fail"] {:prepare true}
-;; [conf context collector]
-;; (let [acked (atom 0)
-;; failed (atom 0)
-;; emitted (atom 0)]
-;; (.addTaskHook context
-;; (reify backtype.storm.hooks.ITaskHook
-;; (prepare [this conf context]
-;; )
-;; (cleanup [this]
-;; )
-;; (emit [this info]
-;; (swap! emitted inc))
-;; (boltAck [this info]
-;; (swap! acked inc))
-;; (boltFail [this info]
-;; (swap! failed inc))))
-;; (bolt
-;; (execute [tuple]
-;; (emit-bolt! collector [@emitted @acked @failed])
-;; (if (= 0 (- @acked @failed))
-;; (ack! collector tuple)
-;; (fail! collector tuple))
-;; ))))
-;;
-;; (deftest test-hooks
-;; (with-simulated-time-local-cluster [cluster]
-;; (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
-;; }
-;; {"2" (bolt-spec {"1" :shuffle}
-;; hooks-bolt)
-;; })
-;; results (complete-topology cluster
-;; topology
-;; :mock-sources {"1" [[1]
-;; [1]
-;; [1]
-;; [1]
-;; ]})]
-;; (is (= [[0 0 0]
-;; [2 1 0]
-;; [4 1 1]
-;; [6 2 1]]
-;; (read-tuples results "2")
-;; )))))
-;;
-;; (deftest test-acking-branching-complex
-;; ;; test acking with branching in the topology
-;; )
-;;
-;;
-;; (deftest test-fields-grouping
-;; ;; 1. put a shitload of random tuples through it and test that counts are right
-;; ;; 2. test that different spouts with different phints group the same way
-;; )
-;;
-;; (deftest test-all-grouping
-;; )
-;;
-;; (deftest test-direct-grouping
-;; )
+
+
+(deftest test-component-specific-config
+ (with-simulated-time-local-cluster [cluster
+ :daemon-conf {TOPOLOGY-OPTIMIZE false
+ TOPOLOGY-SKIP-MISSING-KRYO-REGISTRATIONS true}]
+ (letlocals
+ (bind builder (TopologyBuilder.))
+ (.setSpout builder "1" (TestPlannerSpout. (Fields. ["conf"])))
+ (-> builder
+ (.setBolt "2"
+ (TestConfBolt.
+ {"fake.config" 123
+ TOPOLOGY-MAX-TASK-PARALLELISM 20
+ TOPOLOGY-MAX-SPOUT-PENDING 30
+ TOPOLOGY-OPTIMIZE true
+ TOPOLOGY-KRYO-REGISTER [{"fake.type" "bad.serializer"}
+ {"fake.type2" "a.serializer"}]
+ }))
+ (.shuffleGrouping "1")
+ (.setMaxTaskParallelism 2)
+ (.addConfiguration "fake.config2" 987)
+ )
+
+
+ (bind results
+ (complete-topology cluster
+ (.createTopology builder)
+ :storm-conf {TOPOLOGY-KRYO-REGISTER [{"fake.type" "good.serializer" "fake.type3" "a.serializer3"}]}
+ :mock-sources {"1" [["fake.config"]
+ [TOPOLOGY-MAX-TASK-PARALLELISM]
+ [TOPOLOGY-MAX-SPOUT-PENDING]
+ [TOPOLOGY-OPTIMIZE]
+ ["fake.config2"]
+ [TOPOLOGY-KRYO-REGISTER]
+ ]}))
+ (is (= {"fake.config" 123
+ "fake.config2" 987
+ TOPOLOGY-MAX-TASK-PARALLELISM 2
+ TOPOLOGY-MAX-SPOUT-PENDING 30
+ TOPOLOGY-OPTIMIZE false
+ TOPOLOGY-KRYO-REGISTER {"fake.type" "good.serializer"
+ "fake.type2" "a.serializer"
+ "fake.type3" "a.serializer3"}}
+ (->> (read-tuples results "2")
+ (apply concat)
+ (apply hash-map))
+ ))
+ )))
+
+(defbolt conf-query-bolt ["conf" "val"] {:prepare true :params [conf] :conf conf}
+ [conf context collector]
+ (bolt
+ (execute [tuple]
+ (let [name (.getValue tuple 0)
+ val (if (= name "!MAX_MSG_TIMEOUT") (.maxTopologyMessageTimeout context) (get conf name))]
+ (emit-bolt! collector [name val] :anchor tuple)
+ (ack! collector tuple))
+ )))
+
+(deftest test-component-specific-config-clojure
+ (with-simulated-time-local-cluster [cluster]
+ (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])) :conf {TOPOLOGY-MESSAGE-TIMEOUT-SECS 40})
+ }
+ {"2" (bolt-spec {"1" :shuffle}
+ (conf-query-bolt {"fake.config" 1
+ TOPOLOGY-MAX-TASK-PARALLELISM 2
+ TOPOLOGY-MAX-SPOUT-PENDING 10})
+ :conf {TOPOLOGY-MAX-SPOUT-PENDING 3})
+ })
+ results (complete-topology cluster
+ topology
+ :topology-name "test123"
+ :storm-conf {TOPOLOGY-MAX-TASK-PARALLELISM 10
+ TOPOLOGY-MESSAGE-TIMEOUT-SECS 30}
+ :mock-sources {"1" [["fake.config"]
+ [TOPOLOGY-MAX-TASK-PARALLELISM]
+ [TOPOLOGY-MAX-SPOUT-PENDING]
+ ["!MAX_MSG_TIMEOUT"]
+ [TOPOLOGY-NAME]
+ ]})]
+ (is (= {"fake.config" 1
+ TOPOLOGY-MAX-TASK-PARALLELISM 2
+ TOPOLOGY-MAX-SPOUT-PENDING 3
+ "!MAX_MSG_TIMEOUT" 40
+ TOPOLOGY-NAME "test123"}
+ (->> (read-tuples results "2")
+ (apply concat)
+ (apply hash-map))
+ )))))
+
+(defbolt hooks-bolt ["emit" "ack" "fail"] {:prepare true}
+ [conf context collector]
+ (let [acked (atom 0)
+ failed (atom 0)
+ emitted (atom 0)]
+ (.addTaskHook context
+ (reify backtype.storm.hooks.ITaskHook
+ (prepare [this conf context]
+ )
+ (cleanup [this]
+ )
+ (emit [this info]
+ (swap! emitted inc))
+ (boltAck [this info]
+ (swap! acked inc))
+ (boltFail [this info]
+ (swap! failed inc))))
+ (bolt
+ (execute [tuple]
+ (emit-bolt! collector [@emitted @acked @failed])
+ (if (= 0 (- @acked @failed))
+ (ack! collector tuple)
+ (fail! collector tuple))
+ ))))
+
+(deftest test-hooks
+ (with-simulated-time-local-cluster [cluster]
+ (let [topology (topology {"1" (spout-spec (TestPlannerSpout. (Fields. ["conf"])))
+ }
+ {"2" (bolt-spec {"1" :shuffle}
+ hooks-bolt)
+ })
+ results (complete-topology cluster
+ topology
+ :mock-sources {"1" [[1]
+ [1]
+ [1]
+ [1]
+ ]})]
+ (is (= [[0 0 0]
+ [2 1 0]
+ [4 1 1]
+ [6 2 1]]
+ (read-tuples results "2")
+ )))))
+
+(deftest test-acking-branching-complex
+ ;; test acking with branching in the topology
+ )
+
+
+(deftest test-fields-grouping
+ ;; 1. put a shitload of random tuples through it and test that counts are right
+ ;; 2. test that different spouts with different phints group the same way
+ )
+
+(deftest test-all-grouping
+ )
+
+(deftest test-direct-grouping
+ )

0 comments on commit f7e789d

Please sign in to comment.
Something went wrong with that request. Please try again.