Skip to content

Commit

Permalink
remove args-fn from async-loop and replace with factory function to a…
Browse files Browse the repository at this point in the history
…void seq calls
  • Loading branch information
Nathan Marz committed May 26, 2012
1 parent f2d8f2f commit dfd7397
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 18 deletions.
32 changes: 17 additions & 15 deletions src/clj/backtype/storm/messaging/loader.clj
Original file line number Diff line number Diff line change
Expand Up @@ -22,21 +22,23 @@
:priority Thread/NORM_PRIORITY]
(let [max-buffer-size (int max-buffer-size)
vthread (async-loop
(fn [socket]
(let [batched (ArrayList.)
init (msg/recv socket)]
(loop [[task msg :as packet] init]
(if (= task -1)
(do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
(.close socket)
nil )
(do
(when packet (.add batched packet))
(if (and packet (< (.size batched) max-buffer-size))
(recur (msg/recv-with-flags socket 1))
(do (transfer-local-fn batched)
0 )))))))
:args-fn (fn [] [(msg/bind context storm-id port)])
(fn []
(let [socket (msg/bind context storm-id port)]
(fn []
(let [batched (ArrayList.)
init (msg/recv socket)]
(loop [[task msg :as packet] init]
(if (= task -1)
(do (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
(.close socket)
nil )
(do
(when packet (.add batched packet))
(if (and packet (< (.size batched) max-buffer-size))
(recur (msg/recv-with-flags socket 1))
(do (transfer-local-fn batched)
0 )))))))))
:factory? true
:daemon daemon
:kill-fn kill-fn
:priority priority)]
Expand Down
6 changes: 3 additions & 3 deletions src/clj/backtype/storm/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -367,14 +367,14 @@
:daemon false
:kill-fn (fn [error] (halt-process! 1 "Async loop died!"))
:priority Thread/NORM_PRIORITY
:args-fn (fn [] [])
:factory? false
:start true]
(let [thread (Thread.
(fn []
(try-cause
(let [args (args-fn)]
(let [afn (if factory? (afn) afn)]
(loop []
(let [sleep-time (apply afn args)]
(let [sleep-time (afn)]
(when-not (nil? sleep-time)
(sleep-secs sleep-time)
(recur))
Expand Down

0 comments on commit dfd7397

Please sign in to comment.