Skip to content

Commit

Permalink
Merge remote-tracking branch 'noslowerdna/thread-names'
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanmarz committed Mar 5, 2013
2 parents 6ee428d + befe50c commit 726be6c
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 4 deletions.
6 changes: 4 additions & 2 deletions src/clj/backtype/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,8 @@
))
0))
:kill-fn (:report-error-and-die executor-data)
:factory? true)]))
:factory? true
:thread-name component-id)]))

(defn- tuple-time-delta! [^TupleImpl tuple]
(let [ms (.getProcessSampleStartTime tuple)]
Expand Down Expand Up @@ -715,7 +716,8 @@
(disruptor/consume-batch-when-available receive-queue event-handler)
0)))
:kill-fn (:report-error-and-die executor-data)
:factory? true)]))
:factory? true
:thread-name component-id)]))

(defmethod close-component :spout [executor-data spout]
(.close spout))
Expand Down
4 changes: 3 additions & 1 deletion src/clj/backtype/storm/disruptor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,14 @@
(defn halt-with-interrupt! [^DisruptorQueue queue]
(.haltWithInterrupt queue))

(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))]
(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
:thread-name nil]
(let [ret (async-loop
(fn []
(consume-batch-when-available queue handler)
0 )
:kill-fn kill-fn
:thread-name thread-name
)]
(consumer-started! queue)
ret
Expand Down
5 changes: 4 additions & 1 deletion src/clj/backtype/storm/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,8 @@
:kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
:priority Thread/NORM_PRIORITY
:factory? false
:start true]
:start true
:thread-name nil]
(let [thread (Thread.
(fn []
(try-cause
Expand All @@ -389,6 +390,8 @@
))]
(.setDaemon thread daemon)
(.setPriority thread priority)
(when-not (nil? thread-name)
(.setName thread (str (.getName thread) "-" thread-name)))
(when start
(.start thread))
;; should return object that supports stop, interrupt, join, and waiting?
Expand Down
24 changes: 24 additions & 0 deletions test/clj/backtype/storm/util_test.clj
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
(ns backtype.storm.util-test
(:import [java.util.regex Pattern])
(:use [clojure test])
(:use [backtype.storm util]))

(deftest async-loop-test
(testing "thread name provided"
(let [thread (async-loop
(fn []
(is (= true (.startsWith (.getName (Thread/currentThread)) "Thread-")))
(is (= true (.endsWith (.getName (Thread/currentThread)) "-mythreadname")))
1)
:thread-name "mythreadname")]
(sleep-secs 2)
(.interrupt thread)
(.join thread)))
(testing "thread name not provided"
(let [thread (async-loop
(fn []
(is (= true (Pattern/matches "Thread-\\d+" (.getName (Thread/currentThread)))))
1))]
(sleep-secs 2)
(.interrupt thread)
(.join thread))))

0 comments on commit 726be6c

Please sign in to comment.