Skip to content

Commit

Permalink
implemented activate and deactivate commands
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Dec 2, 2011
1 parent 6cd9c96 commit b6eac26
Show file tree
Hide file tree
Showing 10 changed files with 1,811 additions and 13 deletions.
11 changes: 10 additions & 1 deletion bin/storm
Expand Up @@ -71,6 +71,12 @@ def jar(jarfile, klass, *args):
def kill(*args):
exec_storm_class("backtype.storm.command.kill_topology", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"])

def activate(*args):
exec_storm_class("backtype.storm.command.activate", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"])

def deactivate(*args):
exec_storm_class("backtype.storm.command.deactivate", args=args, jvmtype="-client", extrajars=[CONF_DIR, STORM_DIR + "/bin"])

def shell(resourcesdir, command, *args):
tmpjarpath = "stormshell" + str(random.randint(0, 10000000)) + ".jar"
os.system("jar cf %s %s" % (tmpjarpath, resourcesdir))
Expand Down Expand Up @@ -104,7 +110,10 @@ def drpc():
def print_classpath():
print get_classpath([])

COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui, "drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue, "remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath}
COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui,
"drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
"remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
"activate": activate, "deactivate": deactivate}

def print_commands():
global COMMANDS
Expand Down
9 changes: 9 additions & 0 deletions src/clj/backtype/storm/command/activate.clj
@@ -0,0 +1,9 @@
(ns backtype.storm.command.activate
(:use [backtype.storm thrift log])
(:gen-class))

(defn -main [name]
(with-configured-nimbus-connection nimbus
(.activate nimbus name)
(log-message "Activated topology: " name)
))
9 changes: 9 additions & 0 deletions src/clj/backtype/storm/command/deactivate.clj
@@ -0,0 +1,9 @@
(ns backtype.storm.command.deactivate
(:use [backtype.storm thrift log])
(:gen-class))

(defn -main [name]
(with-configured-nimbus-connection nimbus
(.deactivate nimbus name)
(log-message "Deactivated topology: " name)
))
7 changes: 2 additions & 5 deletions src/clj/backtype/storm/command/kill_topology.clj
Expand Up @@ -10,13 +10,10 @@
"Kill a topology"
[[wait w "Override the amount of time to wait after deactivating before killing" nil]
posargs]
(let [conf (read-storm-config)
host (conf NIMBUS-HOST)
port (conf NIMBUS-THRIFT-PORT)
name (first posargs)
(let [name (first posargs)
opts (KillOptions.)]
(if wait (.set_wait_secs opts (Integer/parseInt wait)))
(with-nimbus-connection [nimbus host port]
(with-configured-nimbus-connection nimbus
(.killTopologyWithOpts nimbus name opts)
(log-message "Killed topology: " name)
))))
52 changes: 46 additions & 6 deletions src/clj/backtype/storm/daemon/nimbus.clj
Expand Up @@ -14,6 +14,22 @@
(defmulti setup-jar cluster-mode)


;; status types
;; -- killed (:kill-time-secs)
;; -- active
;; -- inactive
;; -- swapping (:name, :launch-wait-time [defaults to launch timeout] :inactive-wait-time[ message timeout for active topology]) --> steps: wait launch timeout, inactivate other topology, wait message timeout, kill other topology (with timeout of 0), activate swapped topology
;; State transitions:
;; -- swapped + active other = wait + inactivate other
;; -- inactive other + swapped = wait message timeout + kill(0)
;; -- swapped + killed other = activate

;; swapping design
;; -- need 2 ports per worker (swap port and regular port)
;; -- topology that swaps in can use all the existing topologies swap ports, + unused worker slots
;; -- how to define worker resources? port range + number of workers?


;; Master:
;; job submit:
;; 1. read which nodes are available
Expand Down Expand Up @@ -400,6 +416,17 @@
(assoc storm-conf TOPOLOGY-KRYO-REGISTER sers)
))

(defn set-topology-status! [storm-cluster-state lock storm-name status]
(let [storm-id (get-storm-id storm-cluster-state storm-name)]
(when-not storm-id
(throw (NotAliveException. storm-name)))
(locking lock
(.update-storm! storm-cluster-state
storm-id
{:status status}))
(log-message "Updated " storm-name " with status " status)
))

(defserverfn service-handler [conf]
(let [submitted-count (atom 0)
active (atom true)
Expand Down Expand Up @@ -505,15 +532,28 @@
(.get_wait_secs options)
((read-storm-conf conf storm-id) TOPOLOGY-MESSAGE-TIMEOUT-SECS)
))
(locking storm-submit-lock
(.update-storm! storm-cluster-state
storm-id
{:status {:type :killed
:kill-time-secs (+ (current-time-secs) wait-amt)}}))
(set-topology-status! storm-cluster-state
storm-submit-lock
storm-name
{:type :killed
:kill-time-secs (+ (current-time-secs) wait-amt)})
(.add cleanup-manager cleanup-fn)
(log-message "Deactivated " storm-name " and scheduled to be killed")
))

(activate [this storm-name]
(set-topology-status! storm-cluster-state
storm-submit-lock
storm-name
{:type :active})
)

(deactivate [this storm-name]
(set-topology-status! storm-cluster-state
storm-submit-lock
storm-name
{:type :inactive})
)

(beginFileUpload [this]
(let [fileloc (str inbox "/stormjar-" (uuid) ".jar")]
(.put uploaders fileloc (Channels/newChannel (FileOutputStream. fileloc)))
Expand Down
9 changes: 8 additions & 1 deletion src/clj/backtype/storm/thrift.clj
Expand Up @@ -10,7 +10,7 @@
(:import [backtype.storm.topology OutputFieldsGetter IBasicBolt BasicBoltExecutor])
(:import [org.apache.thrift7.protocol TBinaryProtocol TProtocol])
(:import [org.apache.thrift7.transport TTransport TFramedTransport TSocket])
(:use [backtype.storm util])
(:use [backtype.storm util config])
(:use [clojure.contrib.def :only [defnk]])
)

Expand Down Expand Up @@ -62,6 +62,13 @@
(finally (.close conn#)))
))

(defmacro with-configured-nimbus-connection [client-sym & body]
`(let [conf# (read-storm-config)
host# (conf# NIMBUS-HOST)
port# (conf# NIMBUS-THRIFT-PORT)]
(with-nimbus-connection [~client-sym host# port#]
~@body )))

(defn mk-component-common [component parallelism-hint]
(let [getter (OutputFieldsGetter.)
_ (.declareOutputFields component getter)
Expand Down

0 comments on commit b6eac26

Please sign in to comment.