Skip to content

Commit

Permalink
finish INimbus/ISupervisor integration
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Apr 5, 2012
1 parent d5376e5 commit d74ae43
Show file tree
Hide file tree
Showing 5 changed files with 11 additions and 11 deletions.
4 changes: 2 additions & 2 deletions src/clj/backtype/storm/daemon/nimbus.clj
Expand Up @@ -427,10 +427,10 @@
(defn compute-new-task->node+port [nimbus ^TopologyDetails topology-details existing-assignment callback scratch?]
(let [conf (:conf nimbus)
storm-cluster-state (:storm-cluster-state nimbus)
task-heartbeat-cache (:task-heartbeats-cache nimbus)
task-heartbeats-cache (:task-heartbeats-cache nimbus)
storm-id (.getId topology-details)

available-slots (available-slots nimbus callback)
available-slots (available-slots nimbus callback topology-details)
storm-conf (read-storm-conf conf storm-id)
all-task-ids (set (.task-ids storm-cluster-state storm-id))

Expand Down
2 changes: 1 addition & 1 deletion src/clj/backtype/storm/daemon/supervisor.clj
Expand Up @@ -281,7 +281,7 @@
))
(log-debug "Writing new assignment "
(pr-str new-assignment))
(doseq [p (set/difference (set (keys existing-assigned))
(doseq [p (set/difference (set (keys existing-assignment))
(set (keys new-assignment)))]
(.killedWorker isupervisor (int p)))
(.put local-state
Expand Down
12 changes: 6 additions & 6 deletions src/jvm/backtype/storm/scheduler/TopologyDetails.java
@@ -1,18 +1,18 @@
package backtype.storm.scheduler;

import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.generated.StormTopology;
import java.util.Map;


public class TopologyDetails {
String topologyId;
Map topologyConf;
GeneralTopologyContext context;
StormTopology topology;

public TopologyDetails(String topologyId, Map topologyConf, GeneralTopologyContext context) {
public TopologyDetails(String topologyId, Map topologyConf, StormTopology topology) {
this.topologyId = topologyId;
this.topologyConf = topologyConf;
this.context = context;
this.topology = topology;
}

public String getId() {
Expand All @@ -23,7 +23,7 @@ public Map getConf() {
return topologyConf;
}

public GeneralTopologyContext getContext() {
return context;
public StormTopology getTopology() {
return topology;
}
}
2 changes: 1 addition & 1 deletion test/clj/backtype/storm/nimbus_test.clj
Expand Up @@ -417,7 +417,7 @@
(bind storm-id2 (get-storm-id cluster-state "t2"))
(.shutdown nimbus)
(rmr (master-stormdist-root conf storm-id1))
(bind nimbus (nimbus/service-handler conf) (nimbus/standalone-nimbus))
(bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
(is ( = #{storm-id2} (set (.active-storms cluster-state))))
(.shutdown nimbus)
(.disconnect cluster-state)
Expand Down
2 changes: 1 addition & 1 deletion test/clj/backtype/storm/supervisor_test.clj
Expand Up @@ -197,7 +197,7 @@
(bind sup1 (add-supervisor cluster :id "sup" :ports [5 6 7]))
(advance-cluster-time cluster 4)
(bind hb (get-heartbeat cluster "sup"))
(is (= #{5 6 7} (set (:worker-ports hb))))
(is (= #{5 6 7} (set (:meta hb))))
(check-heartbeat cluster "sup" 3)
(advance-cluster-time cluster 3)
(check-heartbeat cluster "sup" 3)
Expand Down

0 comments on commit d74ae43

Please sign in to comment.