Skip to content

Commit

Permalink
progress on transactional tests, working so far
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Jan 26, 2012
1 parent 51d59ec commit ea8b4b9
Showing 1 changed file with 45 additions and 17 deletions.
62 changes: 45 additions & 17 deletions test/clj/backtype/storm/transactional_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
))))

(defn normalize-tx-tuple [values]
(-> values vec (update 0 (memfn getTransactionId))))
(-> values vec (update 0 #(-> % .getTransactionId .intValue))))

(defn verify-and-reset! [expected-map emitted-map-atom]
(let [results @emitted-map-atom]
Expand Down Expand Up @@ -391,20 +391,19 @@
(.createTopology builder)))
(submit-local-topology (:nimbus cluster)
"transactional-test"
{TOPOLOGY-MAX-SPOUT-PENDING 2
TOPOLOGY-DEBUG true}
{TOPOLOGY-MAX-SPOUT-PENDING 2}
(:topology topo-info))

(bind ack-tx (fn [txid]
(let [[to-ack not-to-ack] (separate
#(-> %
(.getValue 0)
.getTransactionId
(= txid))
@tuples)]
(reset! tuples not-to-ack)
(doseq [t to-ack]
(.ack @collector t)))))
(bind ack-tx! (fn [txid]
(let [[to-ack not-to-ack] (separate
#(-> %
(.getValue 0)
.getTransactionId
(= txid))
@tuples)]
(reset! tuples not-to-ack)
(doseq [t to-ack]
(.ack @collector t)))))

;; only check default streams
(bind verify! (fn [expected]
Expand All @@ -421,10 +420,28 @@
(tracked-wait topo-info 2)
(println "Controlled: " @tuples)
(println "Captured:" (-> topo-info :capturer .getResults))
;; check that batch tuples have been emitted for tx 1 and 2
;; check the outputs of all the bolts
;; ack the ack the first batch
;; check that first batch commits
(verify! {"sum" [[1 "dog" 3]
[1 "cat" 5]
[1 "mango" 6]
[1 "happy" 11]
[2 "apple" 1]
[2 "dog" 3]
[2 "zebra" 1]]
"count" []
"count2" []})
(ack-tx! 1)
(tracked-wait topo-info 1)
(verify! {"sum" []
"count" [[1 "dog" 1]
[1 "cat" 2]
[1 "mango" 2]
[1 "happy" 1]]
"count2" [[1 "dog" 2]
[1 "cat" 2]
[1 "mango" 2]
[1 "happy" 2]]})
(ack-tx! 1)

;; ack the commit
;; check that third batch is emitted
;; ack the third batch
Expand All @@ -440,3 +457,14 @@
;; (println (read-tuples results "count"))
(-> topo-info :capturer .getAndClearResults)
))))

{0 [["dog" 3]
["cat" 4]
["apple" 1]
["dog" 3]
["banana" 0]]
1 [["cat" 1]
["mango" 4]]
2 [["happy" 11]
["mango" 2]
["zebra" 1]]}

0 comments on commit ea8b4b9

Please sign in to comment.