Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

fleshed out nimbus cleanup code

  • Loading branch information...
commit 934240a0c10845297c1fdbe297626829f37310d9 1 parent ab5935c
@nathanmarz authored
View
14 src/clj/backtype/storm/cluster.clj
@@ -1,9 +1,11 @@
(ns backtype.storm.cluster
(:import [org.apache.zookeeper.data Stat])
+ (:import [org.apache.zookeeper KeeperException])
(:import [backtype.storm.utils Utils])
(:use [backtype.storm util log config])
(:use [clojure.contrib.core :only [dissoc-in]])
(:require [backtype.storm [zookeeper :as zk]])
+
)
(defprotocol ClusterState
@@ -281,10 +283,18 @@
(mkdirs cluster-state (taskbeat-storm-root storm-id)))
(teardown-heartbeats! [this storm-id]
- (delete-node cluster-state (taskbeat-storm-root storm-id)))
+ (try-cause
+ (delete-node cluster-state (taskbeat-storm-root storm-id))
+ (catch KeeperException e
+ (log-warn-error e "Could not teardown heartbeats for " storm-id)
+ )))
(teardown-task-errors! [this storm-id]
- (delete-node cluster-state (taskerror-storm-root storm-id)))
+ (try-cause
+ (delete-node cluster-state (taskerror-storm-root storm-id))
+ (catch KeeperException e
+ (log-warn-error e "Could not teardown errors for " storm-id)
+ )))
(supervisor-heartbeat! [this supervisor-id info]
(set-ephemeral-node cluster-state (supervisor-path supervisor-id) (Utils/serialize info))
View
7 src/clj/backtype/storm/config.clj
@@ -77,8 +77,11 @@
ret
))
-(defn master-stormdist-root [conf storm-id]
- (str (master-local-dir conf) "/stormdist/" storm-id))
+(defn master-stormdist-root
+ ([conf]
+ (str (master-local-dir conf) "/stormdist"))
+ ([conf storm-id]
+ (str (master-stormdist-root conf) "/" storm-id)))
(defn master-stormjar-path [stormroot]
(str stormroot "/stormjar.jar"))
View
31 src/clj/backtype/storm/daemon/nimbus.clj
@@ -93,10 +93,6 @@
nil
))
-;; TODO:
-;; how does cleanup work?
-;; -- schedule zookeeper cleanup to be task timeout seconds in future
-;; -- cleanup code if its not active (check every minute)
(defn state-transitions [nimbus storm-id status]
{:active {:monitor (reassign-transition nimbus storm-id)
:inactivate :inactive
@@ -529,11 +525,19 @@
(throw (AlreadyAliveException. (str storm-name " is already active"))))
))
+(defn code-ids [conf]
+ (-> conf
+ master-stormdist-root
+ read-dir-contents
+ set
+ ))
+
(defn cleanup-storm-ids [conf storm-cluster-state]
(let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
error-ids (set (.task-error-storms storm-cluster-state))
+ code-ids (code-ids conf)
assigned-ids (set (.active-storms storm-cluster-state))]
- (set/difference (set/union heartbeat-ids error-ids) assigned-ids)
+ (set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids)
))
(defn validate-topology! [topology]
@@ -573,17 +577,6 @@
(assoc storm-conf TOPOLOGY-KRYO-REGISTER sers)
))
-;; TODO: need to put this somewhere else
-;; TODO: removing code locally should be done separately (since topology that doesn't start will still have code)
-;;
-;; technically a supervisor could still think there's an assignment and try to d/l
-;; this will cause supervisor to go down and come back up... eventually it should sync
-;; (rmr (master-stormdist-root conf id))
-;;
-;; (.remove-storm! storm-cluster-state id)
-
-
-;; TODO: need to rethink this whole thing
(defn do-cleanup [nimbus]
(let [storm-cluster-state (:storm-cluster-state nimbus)
conf (:conf nimbus)
@@ -592,11 +585,11 @@
(cleanup-storm-ids conf storm-cluster-state))]
(when-not (empty? to-cleanup-ids)
(doseq [id to-cleanup-ids]
+ (log-message "Cleaning up " id)
(.teardown-heartbeats! storm-cluster-state id)
(.teardown-task-errors! storm-cluster-state id)
- (swap! (:task-heartbeats-cache nimbus) dissoc id)
- )
- (log-message "Cleaned up topology task heartbeats: " (pr-str to-cleanup-ids))
+ (rmr (master-stormdist-root conf id))
+ (swap! (:task-heartbeats-cache nimbus) dissoc id))
))))
View
3  src/clj/backtype/storm/log.clj
@@ -9,3 +9,6 @@
(defmacro log-debug [& args]
`(log/debug (str ~@args)))
+
+(defmacro log-warn-error [e & args]
+ `(log/warn (str ~@args) ~e))
View
29 src/clj/backtype/storm/util.clj
@@ -539,3 +539,32 @@
(defn throw-runtime [& strs]
(throw (RuntimeException. (apply str strs))))
+
+(defn exception-cause? [klass ^Throwable t]
+ (->> (iterate #(.getCause ^Throwable %) t)
+ (take-while identity)
+ (some (partial instance? klass))
+ boolean))
+
+(defmacro forcat [[args aseq] & body]
+ `(mapcat (fn [~args]
+ ~@body)
+ ~aseq))
+
+(defmacro try-cause [& body]
+ (let [checker (fn [form]
+ (or (not (sequential? form))
+ (not= 'catch (first form))))
+ [code guards] (split-with checker body)
+ error-local (gensym "t")
+ guards (forcat [[_ klass local & guard-body] guards]
+ `((exception-cause? ~klass ~error-local)
+ (let [~local ~error-local]
+ ~@guard-body
+ )))
+ ]
+ `(try ~@code
+ (catch Throwable ~error-local
+ (cond ~@guards
+ true (throw ~error-local)
+ )))))
View
10 src/clj/backtype/storm/zookeeper.clj
@@ -96,10 +96,16 @@
(defn delete-recursive [^ZooKeeper zk ^String path]
(let [path (normalize-path path)]
(when (exists-node? zk path false)
- (let [children (get-children zk path false)]
+ (let [children (try-cause (get-children zk path false)
+ (catch KeeperException$NoNodeException e
+ []
+ ))]
(doseq [c children]
(delete-recursive zk (full-path path c)))
- (delete-node zk path)
+ (try-cause (delete-node zk path)
+ (catch KeeperException$NoNodeException e
+ nil
+ ))
))))
(defn mk-inprocess-zookeeper [localdir port]
Please sign in to comment.
Something went wrong with that request. Please try again.