Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Set thread name for spout and bolt threads

  • Loading branch information...
commit befe50cc496b017576332173a7190c3b3c51588a 1 parent 4859f0d
Andrew Olson authored
View
6 src/clj/backtype/storm/daemon/executor.clj
@@ -554,7 +554,8 @@
))
0))
:kill-fn (:report-error-and-die executor-data)
- :factory? true)]))
+ :factory? true
+ :thread-name component-id)]))
(defn- tuple-time-delta! [^TupleImpl tuple]
(let [ms (.getProcessSampleStartTime tuple)]
@@ -715,7 +716,8 @@
(disruptor/consume-batch-when-available receive-queue event-handler)
0)))
:kill-fn (:report-error-and-die executor-data)
- :factory? true)]))
+ :factory? true
+ :thread-name component-id)]))
(defmethod close-component :spout [executor-data spout]
(.close spout))
View
4 src/clj/backtype/storm/disruptor.clj
@@ -67,12 +67,14 @@
(defn halt-with-interrupt! [^DisruptorQueue queue]
(.haltWithInterrupt queue))
-(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))]
+(defnk consume-loop* [^DisruptorQueue queue handler :kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
+ :thread-name nil]
(let [ret (async-loop
(fn []
(consume-batch-when-available queue handler)
0 )
:kill-fn kill-fn
+ :thread-name thread-name
)]
(consumer-started! queue)
ret
View
5 src/clj/backtype/storm/util.clj
@@ -368,7 +368,8 @@
:kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
:priority Thread/NORM_PRIORITY
:factory? false
- :start true]
+ :start true
+ :thread-name nil]
(let [thread (Thread.
(fn []
(try-cause
@@ -389,6 +390,8 @@
))]
(.setDaemon thread daemon)
(.setPriority thread priority)
+ (when-not (nil? thread-name)
+ (.setName thread (str (.getName thread) "-" thread-name)))
(when start
(.start thread))
;; should return object that supports stop, interrupt, join, and waiting?
View
24 test/clj/backtype/storm/util_test.clj
@@ -0,0 +1,24 @@
+(ns backtype.storm.util-test
+ (:import [java.util.regex Pattern])
+ (:use [clojure test])
+ (:use [backtype.storm util]))
+
+(deftest async-loop-test
+ (testing "thread name provided"
+ (let [thread (async-loop
+ (fn []
+ (is (= true (.startsWith (.getName (Thread/currentThread)) "Thread-")))
+ (is (= true (.endsWith (.getName (Thread/currentThread)) "-mythreadname")))
+ 1)
+ :thread-name "mythreadname")]
+ (sleep-secs 2)
+ (.interrupt thread)
+ (.join thread)))
+ (testing "thread name not provided"
+ (let [thread (async-loop
+ (fn []
+ (is (= true (Pattern/matches "Thread-\\d+" (.getName (Thread/currentThread)))))
+ 1))]
+ (sleep-secs 2)
+ (.interrupt thread)
+ (.join thread))))
Please sign in to comment.
Something went wrong with that request. Please try again.