Skip to content
Browse files

fix race condition between supervisor and nimbus that could lead to i…

…nfinite crash of stormconf.ser not found. supervisor now carefully snapshots the state in zookeeper before making decisions instead of grabbing it piecemeal. the race condition was due to one of the pieces of data grabbed being null
  • Loading branch information...
1 parent aa118f6 commit 414af600ab08c4cdd7cefc0205ac95036af64c1e @nathanmarz nathanmarz committed Feb 18, 2013
Showing with 20 additions and 25 deletions.
  1. +20 −25 src/clj/backtype/storm/daemon/supervisor.clj
View
45 src/clj/backtype/storm/daemon/supervisor.clj
@@ -20,11 +20,17 @@
(shutdown-all-workers [this])
)
+(defn- assignments-snapshot [storm-cluster-state callback]
+ (let [storm-ids (.assignments storm-cluster-state callback)]
+ (->> (dofor [sid storm-ids] {sid (.assignment-info storm-cluster-state sid callback)})
+ (apply merge)
+ (filter-val not-nil?)
+ )))
-(defn- read-my-executors [storm-cluster-state storm-id assignment-id callback]
- (let [assignment (.assignment-info storm-cluster-state storm-id callback)
+(defn- read-my-executors [assignments-snapshot storm-id assignment-id]
+ (let [assignment (get assignments-snapshot storm-id)
my-executors (filter (fn [[_ [node _]]] (= node assignment-id))
- (:executor->node+port assignment))
+ (:executor->node+port assignment))
port-executors (apply merge-with
concat
(for [[executor [_ port]] my-executors]
@@ -34,29 +40,18 @@
;; need to cast to int b/c it might be a long (due to how yaml parses things)
;; doall is to avoid serialization/deserialization problems with lazy seqs
[(Integer. port) (LocalAssignment. storm-id (doall executors))]
- ))
- ))
+ ))))
+
(defn- read-assignments
"Returns map from port to struct containing :storm-id and :executors"
- [storm-cluster-state assignment-id callback]
- (let [storm-ids (.assignments storm-cluster-state callback)]
- (apply merge-with
- (fn [& ignored]
- (throw (RuntimeException.
- "Should not have multiple topologies assigned to one port")))
- (dofor [sid storm-ids] (read-my-executors storm-cluster-state sid assignment-id callback))
- )))
+ [assignments-snapshot assignment-id]
+ (->> (dofor [sid (keys assignments-snapshot)] (read-my-executors assignments-snapshot sid assignment-id))
+ (apply merge-with (fn [& ignored] (throw-runtime "Should not have multiple topologies assigned to one port")))))
(defn- read-storm-code-locations
- [storm-cluster-state callback]
- (let [storm-ids (.assignments storm-cluster-state callback)]
- (into {}
- (dofor [sid storm-ids]
- [sid (:master-code-dir (.assignment-info storm-cluster-state sid callback))]
- ))
- ))
-
+ [assignments-snapshot]
+ (map-val :master-code-dir assignments-snapshot))
(defn- read-downloaded-storm-ids [conf]
(map #(java.net.URLDecoder/decode %) (read-dir-contents (supervisor-stormdist-root conf)))
@@ -265,12 +260,12 @@
^ISupervisor isupervisor (:isupervisor supervisor)
^LocalState local-state (:local-state supervisor)
sync-callback (fn [& ignored] (.add event-manager this))
- storm-code-map (read-storm-code-locations storm-cluster-state sync-callback)
+ assignments-snapshot (assignments-snapshot storm-cluster-state sync-callback)
+ storm-code-map (read-storm-code-locations assignments-snapshot)
downloaded-storm-ids (set (read-downloaded-storm-ids conf))
all-assignment (read-assignments
- storm-cluster-state
- (:assignment-id supervisor)
- sync-callback)
+ assignments-snapshot
+ (:assignment-id supervisor))
new-assignment (->> all-assignment
(filter-key #(.confirmAssigned isupervisor %)))
assigned-storm-ids (assigned-storm-ids-from-port-assignments new-assignment)

0 comments on commit 414af60

Please sign in to comment.
Something went wrong with that request. Please try again.