Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

store number of workers/executors in the storm base, make it possible…

… to make it dynamic
  • Loading branch information...
commit d1f42d237f989b480de80334630d6b6dfefa1594 1 parent d035c1e
@nathanmarz nathanmarz authored
View
18 src/clj/backtype/storm/daemon/common.clj
@@ -23,7 +23,7 @@
;; this avoid situation where node goes down and task doesn't know what to do information-wise
(defrecord Assignment [master-code-dir node->host task->node+port task->start-time-secs])
-(defrecord StormBase [storm-name launch-time-secs status])
+(defrecord StormBase [storm-name launch-time-secs status num-workers component->executors])
(defrecord SupervisorInfo [time-secs hostname meta uptime-secs])
@@ -190,11 +190,19 @@
(and (or (nil? tasks) (> tasks 0))
(> (storm-conf TOPOLOGY-ACKER-EXECUTORS) 0))))
+(defn component-conf [storm-conf component]
+ (->> component
+ .get_common
+ .get_json_conf
+ from-json
+ (merge storm-conf)))
+
+(defn num-start-executors [component]
+ (thrift/parallelism-hint (.get_common component)))
+
(defn- component-parallelism [storm-conf component]
- (let [common (.get_common component)
- num-executors (thrift/parallelism-hint common)
- storm-conf (merge storm-conf (from-json (.get_json_conf common)))
- num-tasks (or (storm-conf TOPOLOGY-TASKS) num-executors)
+ (let [storm-conf (component-conf storm-conf component)
+ num-tasks (or (storm-conf TOPOLOGY-TASKS) (num-start-executors component))
max-parallelism (storm-conf TOPOLOGY-MAX-TASK-PARALLELISM)
]
(if max-parallelism
View
30 src/clj/backtype/storm/daemon/nimbus.clj
@@ -392,6 +392,9 @@
task-heartbeats-cache (:task-heartbeats-cache nimbus)
storm-id (.getId topology-details)
+ storm-base (.storm-base storm-cluster-state storm-id nil)
+
+
available-slots (available-slots nimbus callback topology-details)
storm-conf (read-storm-conf conf storm-id)
all-task-ids (-> (read-storm-topology conf storm-id) (storm-task-info storm-conf) keys set)
@@ -404,7 +407,7 @@
alive-assigned (filter-val (partial every? alive-ids) existing-assigned)
- total-slots-to-use (min (storm-conf TOPOLOGY-WORKERS)
+ total-slots-to-use (min (:num-workers storm-base)
(+ (count available-slots) (count alive-assigned)))
keep-assigned (if scratch?
{}
@@ -504,13 +507,20 @@
(mk-assignments nimbus
storm-id))))
-(defn- start-storm [storm-name storm-cluster-state storm-id]
- (log-message "Activating " storm-name ": " storm-id)
- (.activate-storm! storm-cluster-state
- storm-id
- (StormBase. storm-name
- (current-time-secs)
- {:type :active})))
+(defn- start-storm [nimbus storm-name storm-id]
+ (let [storm-cluster-state (:storm-cluster-state nimbus)
+ conf (:conf nimbus)
+ storm-conf (read-storm-conf conf storm-id)
+ topology (system-topology! storm-conf (read-storm-topology conf storm-id))
+ executors (->> (all-components topology) (map-val num-start-executors))]
+ (log-message "Activating " storm-name ": " storm-id)
+ (.activate-storm! storm-cluster-state
+ storm-id
+ (StormBase. storm-name
+ (current-time-secs)
+ {:type :active}
+ (storm-conf TOPOLOGY-WORKERS)
+ executors))))
;; Master:
;; job submit:
@@ -667,8 +677,8 @@
(locking (:submit-lock nimbus)
(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
(.setup-heartbeats! storm-cluster-state storm-id)
- (mk-assignments nimbus storm-id)
- (start-storm storm-name storm-cluster-state storm-id))
+ (start-storm nimbus storm-name storm-id)
+ (mk-assignments nimbus storm-id))
))
(^void killTopology [this ^String name]
View
4 src/clj/backtype/storm/daemon/task.clj
@@ -136,7 +136,7 @@
(defn- get-readable-name [topology-context]
(.getThisComponentId topology-context))
-(defn- component-conf [storm-conf topology-context component-id]
+(defn- normalized-component-conf [storm-conf topology-context component-id]
(let [to-remove (disj (set ALL-CONFIGS)
TOPOLOGY-DEBUG
TOPOLOGY-MAX-SPOUT-PENDING
@@ -177,7 +177,7 @@
worker-port (.getThisWorkerPort topology-context)
component-id (.getThisComponentId topology-context)
- storm-conf (component-conf storm-conf topology-context component-id)
+ storm-conf (normalized-component-conf storm-conf topology-context component-id)
_ (log-message "Loading task " component-id ":" task-id)
task-info (.getTaskToComponent topology-context)
active (atom true)
View
4 test/clj/backtype/storm/cluster_test.clj
@@ -146,8 +146,8 @@
(let [state (mk-storm-state zk-port)
assignment1 (Assignment. "/aaa" {} {1 [2 2002 1]} {})
assignment2 (Assignment. "/aaa" {} {1 [2 2002]} {})
- base1 (StormBase. "/tmp/storm1" 1 {:type :active})
- base2 (StormBase. "/tmp/storm2" 2 {:type :active})]
+ base1 (StormBase. "/tmp/storm1" 1 {:type :active} 2 {})
+ base2 (StormBase. "/tmp/storm2" 2 {:type :active} 2 {})]
(is (= [] (.assignments state nil)))
(.set-assignment! state "storm1" assignment1)
(is (= assignment1 (.assignment-info state "storm1" nil)))
Please sign in to comment.
Something went wrong with that request. Please try again.