Skip to content

Commit

Permalink
Merge branch 'master' into 0.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Dec 20, 2012
2 parents 7f2b38a + 2ec670b commit 0182cc3
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
3 changes: 2 additions & 1 deletion src/clj/backtype/storm/scheduler/IsolationScheduler.clj
Expand Up @@ -93,6 +93,7 @@
(group-by #(.getHost cluster (.getNodeId ^WorkerSlot %)) <>)
(dissoc <> nil)
(sort-by #(-> % second count -) <>)
shuffle
(LinkedList. <>)
))

Expand Down Expand Up @@ -160,7 +161,7 @@
(let [top-id (-> assignments first second)
distribution (get topology-machine-distribution top-id)
^Set worker-specs (get topology-worker-specs top-id)
num-workers (count host-assignments)
num-workers (count assignments)
]
(if (and (contains? iso-ids-set top-id)
(every? #(= (second %) top-id) assignments)
Expand Down
24 changes: 23 additions & 1 deletion test/clj/backtype/storm/nimbus_test.clj
Expand Up @@ -48,6 +48,15 @@
set
)))

(defn topology-slots [state storm-name]
(let [storm-id (get-storm-id state storm-name)
assignment (.assignment-info state storm-id nil)]
(->> assignment
:executor->node+port
vals
set
)))

(defn topology-node-distribution [state storm-name]
(let [storm-id (get-storm-id state storm-name)
assignment (.assignment-info state storm-id nil)]
Expand Down Expand Up @@ -189,13 +198,14 @@
))))

(deftest test-isolated-assignment
(with-local-cluster [cluster :supervisors 6
(with-simulated-time-local-cluster [cluster :supervisors 6
:ports-per-supervisor 3
:inimbus (isolation-nimbus)
:daemon-conf {SUPERVISOR-ENABLE false
TOPOLOGY-ACKER-EXECUTORS 0
STORM-SCHEDULER "backtype.storm.scheduler.IsolationScheduler"
ISOLATION-SCHEDULER-MACHINES {"tester1" 3 "tester2" 2}
NIMBUS-MONITOR-FREQ-SECS 10
}]
(letlocals
(bind state (:storm-cluster-state cluster))
Expand All @@ -206,11 +216,13 @@
"3" (thrift/mk-bolt-spec {"2" :none} (TestPlannerBolt.))}))

(submit-local-topology nimbus "noniso" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 4} topology)
(advance-cluster-time cluster 1)
(is (= 4 (topology-num-nodes state "noniso")))
(is (= 4 (storm-num-workers state "noniso")))

(submit-local-topology nimbus "tester1" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 6} topology)
(submit-local-topology nimbus "tester2" {TOPOLOGY-OPTIMIZE false TOPOLOGY-WORKERS 6} topology)
(advance-cluster-time cluster 1)

(bind task-info-tester1 (storm-component->task-info cluster "tester1"))
(bind task-info-tester2 (storm-component->task-info cluster "tester2"))
Expand All @@ -227,6 +239,16 @@
(check-consistency cluster "tester1")
(check-consistency cluster "tester2")
(check-consistency cluster "noniso")

;;check that nothing gets reassigned
(bind tester1-slots (topology-slots state "tester1"))
(bind tester2-slots (topology-slots state "tester2"))
(bind noniso-slots (topology-slots state "noniso"))
(advance-cluster-time cluster 20)
(is (= tester1-slots (topology-slots state "tester1")))
(is (= tester2-slots (topology-slots state "tester2")))
(is (= noniso-slots (topology-slots state "noniso")))

)))

(deftest test-zero-executor-or-tasks
Expand Down

0 comments on commit 0182cc3

Please sign in to comment.