Permalink
Browse files

reimplemented timer to integrate with Time, progress on refactoring

  • Loading branch information...
1 parent 7371926 commit 44b6edec43275800a0c6eaba278d145c8f70d564 @nathanmarz committed Dec 13, 2011
@@ -17,7 +17,7 @@
(import (quote [backtype.storm.daemon Shutdownable]))
(require (quote [backtype.storm.messaging.loader :as msg-loader]))
(require (quote [backtype.storm.messaging.protocol :as msg]))
- (use (quote [backtype.storm config util log clojure]))
+ (use (quote [backtype.storm config util log clojure timer]))
(use (quote [clojure.contrib.seq :only [find-first]]))
(require (quote [backtype.storm [thrift :as thrift] [cluster :as cluster]
[event :as event] [process-simulator :as psim]]))
@@ -94,7 +94,6 @@
))
;; TODO:
-;; have another thread that every X seconds schedules rebalance for all active topologies
;; how does cleanup work?
;; -- schedule zookeeper cleanup to be task timeout seconds in future
;; -- cleanup code if its not active (check every minute)
@@ -120,7 +119,7 @@
(log-message "Killing topology: " storm-id)
(.remove-storm! (:storm-cluster-state nimbus)
storm-id)
- 'removed )
+ nil)
}
:rebalancing {:monitor (fn [] (delay-event nimbus
storm-id
@@ -159,10 +158,10 @@
(fn [] transition)
transition)
new-status (apply transition event-args)
- new-status (cond (nil? new-status) status
- (keyword? new-status) {:type new-status}
- true new-status)]
- (when-not (= 'removed new-status)
+ new-status (if (keyword? new-status)
+ {:type new-status}
+ new-status)]
+ (when new-status
(set-topology-status! nimbus storm-id new-status)))))
)))
@@ -322,15 +321,6 @@
(File. (master-stormcode-path stormroot))
))))
-
-(defn task-dead? [conf storm-cluster-state storm-id task-id]
- (let [info (.task-heartbeat storm-cluster-state storm-id task-id)]
- (or (not info)
- ;; TODO: this is not correct... times could be different
- (> (time-delta (:time-secs info))
- (conf NIMBUS-TASK-TIMEOUT-SECS)))
- ))
-
;; public so it can be mocked in tests
(defn mk-task-component-assignments [conf storm-id]
(let [storm-conf (read-storm-conf conf storm-id)
@@ -542,17 +532,9 @@
(defn cleanup-storm-ids [conf storm-cluster-state]
(let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
error-ids (set (.task-error-storms storm-cluster-state))
- ;; TODO: should probably use storm-base to determine this?
- assigned-ids (set (.assignments storm-cluster-state nil))
- storm-ids (set/difference (set/union heartbeat-ids error-ids) assigned-ids)]
- (filter
- (fn [storm-id]
- (every?
- (partial task-dead? conf storm-cluster-state storm-id)
- (.heartbeat-tasks storm-cluster-state storm-id)
- ))
- storm-ids
- )))
+ assigned-ids (set (.active-storms storm-cluster-state nil))]
+ (set/difference (set/union heartbeat-ids error-ids) assigned-ids)
+ ))
(defn validate-topology! [topology]
(let [bolt-ids (keys (.get_bolts topology))
@@ -649,10 +631,10 @@
topology)
storm-cluster-state (:storm-cluster-state nimbus)]
(log-message "Received topology submission for " storm-name " with conf " storm-conf)
- (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
- ;; protects against multiple storms being submitted at once and cleanup thread killing storm in b/w
- ;; assignment and starting the storm
+ ;; lock protects against multiple topologies being submitted at once and
+ ;; cleanup thread killing topology in b/w assignment and starting the topology
(locking (:submit-lock nimbus)
+ (setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
(.setup-heartbeats! storm-cluster-state storm-id)
(setup-storm-static conf storm-id storm-cluster-state)
(mk-assignments conf storm-id storm-cluster-state
@@ -735,10 +717,6 @@
(^StormTopology getTopology [this ^String id]
(system-topology (read-storm-conf conf id) (read-storm-topology conf id)))
-
- ;; TODO: finish refactoring to new structure
-
-
(^ClusterSummary getClusterInfo [this]
(let [storm-cluster-state (:storm-cluster-state nimbus)
assigned (assigned-slots storm-cluster-state)
@@ -0,0 +1,85 @@
+(ns backtype.storm.timer
+ (:import [backtype.storm.utils Time])
+ (:import [java.util PriorityQueue Comparator])
+ (:import [java.util.concurrent Semaphore])
+ (:use [backtype.storm util])
+ (:use [clojure.contrib.def :only [defnk]])
+ )
+
+;; The timer defined in this file is very similar to java.util.Timer, except it integrates with
+;; Storm's time simulation capabilities. This lets us test code that does asynchronous work on the timer thread
+
+(defnk mk-timer [:kill-fn (fn [& _] )]
+ (let [queue (PriorityQueue. 10
+ (reify Comparator
+ (compare [this o1 o2]
+ (- (first o1) (first o2))
+ )
+ (equals [this obj]
+ true
+ )))
+ active (atom true)
+ lock (Object.)
+ notifier (Semaphore. 0)
+ timer-thread (Thread.
+ (fn []
+ (while @active
+ (try
+ (let [[time-secs _ _ :as elem] (.peek queue)]
+ (if elem
+ (if (>= (current-time-secs) time-secs)
+ (locking lock
+ ((second (.poll queue))))
+ (Time/sleepUntil (to-millis time-secs))
+ )
+ (Time/sleep 10000)
+ ))
+
+ (catch InterruptedException e
+ )
+ (catch Throwable t
+ (kill-fn t)
+ (reset! active false)
+ (throw t)
+ )))
+ (.release notifier)))]
+ (.setDaemon timer-thread true)
+ (.start timer-thread)
+ {:timer-thread timer-thread
+ :queue queue
+ :active active
+ :lock lock
+ :cancel-notifier notifier}))
+
+(defn- check-active! [timer]
+ (when-not @(:active timer)
+ (throw (IllegalStateException. "Timer is not active"))))
+
+(defn schedule [timer delay-secs afn]
+ (check-active! timer)
+ (let [id (uuid)
+ ^PriorityQueue queue (:queue timer)]
+ (locking (:lock timer)
+ (.add queue [(+ (current-time-secs) delay-secs) afn id])
+ (when (= id (nth (.peek queue) 2))
+ (.interrupt ^Thread (:timer-thread timer)))
+ )))
+
+(defn schedule-recurring [timer delay-secs recur-secs afn]
+ (check-active! timer)
+ (schedule timer
+ delay-secs
+ (fn this []
+ (afn)
+ (schedule timer recur-secs this)
+ )))
+
+(defn cancel-timer [timer]
+ (check-active! timer)
+ (reset! (:active timer) false)
+ (locking (:lock timer)
+ (.interrupt (:timer-thread timer)))
+ (.acquire (:cancel-notifier timer)))
+
+(defn timer-waiting? [timer]
+ (Time/isThreadWaiting (:timer-thread timer)))
@@ -537,60 +537,5 @@
(defn to-millis [secs]
(* 1000 (long secs)))
-(defn- run-on-timer-thread [^Timer timer afn]
- (let [wait-sem (Semaphore. 0)]
- (.schedule timer
- (ClojureTimerTask.
- (fn []
- (afn)
- (.release wait-sem)))
- (long 0))
- (.acquire wait-sem)))
-
-(defnk mk-timer [:kill-fn (fn [& _] )]
- (let [timer (Timer. true)
- timer-thread (atom nil)]
- (run-on-timer-thread timer
- #(reset! timer-thread (Thread/currentThread)))
- {:timer timer
- :kill-fn kill-fn
- :timer-thread @timer-thread
- :active-tasks (atom 0)}))
-
-(defn- wrap-timer-fn [timer afn task-decrease]
- (fn []
- (try
- (when (Time/isSimulating)
- ;; this is so we can see if the timer is waiting or not
- (Time/sleep 1))
- (afn)
- (swap! (:active-tasks timer) - task-decrease)
- (catch Throwable t
- ((:kill-fn timer) t)
- (throw t)
- ))))
-
-(defn schedule [timer delay-secs afn]
- (swap! (:active-tasks timer) inc)
- (.schedule ^Timer (:timer timer)
- (ClojureTimerTask. (wrap-timer-fn timer afn 1))
- (to-millis delay-secs)
- ))
-
-(defn schedule-recurring [timer delay-secs recur-secs afn]
- (swap! (:active-tasks timer) inc)
- (.schedule ^Timer (:timer timer)
- (ClojureTimerTask. (wrap-timer-fn timer afn 0))
- (to-millis delay-secs)
- (to-millis recur-secs)
- ))
-
-(defn cancel-timer [timer]
- (run-on-timer-thread (:timer timer) #(.cancel (:timer timer))))
-
-(defn timer-waiting? [timer]
- (or (= 0 @(:active-tasks timer))
- (Time/isThreadWaiting (:timer-thread timer))))
-
(defn throw-runtime [& strs]
(throw (RuntimeException. (apply str strs))))

0 comments on commit 44b6ede

Please sign in to comment.