Skip to content

Commit

Permalink
fix how host names are determining by using INimbus, use blacklisting…
Browse files Browse the repository at this point in the history
… instead of whitelisting
  • Loading branch information
Nathan Marz committed Dec 18, 2012
1 parent a8df52d commit 29997fd
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 10 deletions.
2 changes: 1 addition & 1 deletion src/clj/backtype/storm/daemon/nimbus.clj
Expand Up @@ -569,7 +569,7 @@
(apply merge-with set/union))

supervisors (read-all-supervisor-details nimbus all-scheduling-slots supervisor->dead-ports)
cluster (Cluster. supervisors topology->scheduler-assignment)
cluster (Cluster. (:inimbus nimbus) supervisors topology->scheduler-assignment)

;; call scheduler.schedule to schedule all the topologies
;; the new assignments for all the topologies are in the cluster object.
Expand Down
7 changes: 4 additions & 3 deletions src/clj/backtype/storm/scheduler/DefaultScheduler.clj
Expand Up @@ -28,9 +28,10 @@
(->> slots
(filter
(fn [[node port]]
(if-let [supervisor (.getSupervisorById cluster node)]
(.contains (.getAllPorts supervisor) (int port))
)))))
(if-not (.isBlackListed cluster node)
(if-let [supervisor (.getSupervisorById cluster node)]
(.contains (.getAllPorts supervisor) (int port))
))))))

(defn -prepare [this conf]
)
Expand Down
28 changes: 23 additions & 5 deletions src/jvm/backtype/storm/scheduler/Cluster.java
Expand Up @@ -24,9 +24,11 @@ public class Cluster {
*/
private Map<String, List<String>> hostToId;

private Set<String> whiteListedHosts = new HashSet<String>();
private Set<String> blackListedHosts = new HashSet<String>();
private INimbus inimbus;

public Cluster(Map<String, SupervisorDetails> supervisors, Map<String, SchedulerAssignmentImpl> assignments){
public Cluster(INimbus nimbus, Map<String, SupervisorDetails> supervisors, Map<String, SchedulerAssignmentImpl> assignments){
this.inimbus = nimbus;
this.supervisors = new HashMap<String, SupervisorDetails>(supervisors.size());
this.supervisors.putAll(supervisors);
this.assignments = new HashMap<String, SchedulerAssignmentImpl>(assignments.size());
Expand All @@ -42,8 +44,24 @@ public Cluster(Map<String, SupervisorDetails> supervisors, Map<String, Scheduler
}
}

public void setWhitelistedHosts(Set<String> hosts) {
whiteListedHosts = hosts;
public void setBlacklistedHosts(Set<String> hosts) {
blackListedHosts = hosts;
}

public void blacklistHost(String host) {
// this is so it plays well with setting blackListedHosts to an immutable list
if(blackListedHosts==null) blackListedHosts = new HashSet<String>();
if(!(blackListedHosts instanceof HashSet))
blackListedHosts = new HashSet<String>(blackListedHosts);
blackListedHosts.add(host);
}

public boolean isBlackListed(String supervisorId) {
return blackListedHosts != null && blackListedHosts.contains(getHost(supervisorId));
}

public String getHost(String supervisorId) {
return inimbus.getHostName(supervisors, supervisorId);
}

/**
Expand Down Expand Up @@ -167,7 +185,7 @@ public List<Integer> getAvailablePorts(SupervisorDetails supervisor) {
* @return
*/
public List<WorkerSlot> getAvailableSlots(SupervisorDetails supervisor) {
if(whiteListedHosts!=null && !whiteListedHosts.isEmpty() && !whiteListedHosts.contains(supervisor.host)) return new ArrayList();
if(isBlackListed(supervisor.id)) return new ArrayList();
List<Integer> ports = this.getAvailablePorts(supervisor);
List<WorkerSlot> slots = new ArrayList<WorkerSlot>(ports.size());

Expand Down
4 changes: 3 additions & 1 deletion test/clj/backtype/storm/scheduler_test.clj
@@ -1,6 +1,7 @@
(ns backtype.storm.scheduler-test
(:use [clojure test])
(:use [backtype.storm bootstrap config testing])
(:require [backtype.storm.daemon [nimbus :as nimbus]])
(:import [backtype.storm.generated StormTopology])
(:import [backtype.storm.scheduler Cluster SupervisorDetails WorkerSlot ExecutorDetails
SchedulerAssignmentImpl Topologies TopologyDetails]))
Expand Down Expand Up @@ -112,7 +113,8 @@
assignment1 (SchedulerAssignmentImpl. "topology1" executor->slot1)
assignment2 (SchedulerAssignmentImpl. "topology2" executor->slot2)
assignment3 (SchedulerAssignmentImpl. "topology3" executor->slot3)
cluster (Cluster. {"supervisor1" supervisor1 "supervisor2" supervisor2}
cluster (Cluster. (nimbus/standalone-nimbus)
{"supervisor1" supervisor1 "supervisor2" supervisor2}
{"topology1" assignment1 "topology2" assignment2 "topology3" assignment3})]
;; test Cluster constructor
(is (= #{"supervisor1" "supervisor2"}
Expand Down

0 comments on commit 29997fd

Please sign in to comment.