Skip to content

Commit

Permalink
added drpc to storm deploy
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Oct 23, 2011
1 parent b14644d commit 6b0dc55
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 13 deletions.
2 changes: 1 addition & 1 deletion project.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
} }


:dependencies [ :dependencies [
[storm "0.5.2"] [storm "0.5.4"]
[commons-codec "1.4"] [commons-codec "1.4"]
[org.cloudhoist/pallet "0.6.1"] [org.cloudhoist/pallet "0.6.1"]
[org.cloudhoist/java "0.5.0"] [org.cloudhoist/java "0.5.0"]
Expand Down
49 changes: 45 additions & 4 deletions src/clj/backtype/storm/crate/storm.clj
Original file line number Original file line Diff line number Diff line change
@@ -1,6 +1,6 @@
(ns backtype.storm.crate.storm (ns backtype.storm.crate.storm
(:use [clojure.contrib.def :only [defnk]] (:use [clojure.contrib.def :only [defnk]]
[pallet.compute :only [running? primary-ip]] [pallet.compute :only [running? primary-ip private-ip]]
[org.jclouds.compute :only [nodes-with-tag]] [org.jclouds.compute :only [nodes-with-tag]]
[pallet.configure :only [compute-service-properties pallet-config]]) [pallet.configure :only [compute-service-properties pallet-config]])
(:require (:require
Expand All @@ -23,6 +23,12 @@
(assert (= (count running-nodes) 1)) (assert (= (count running-nodes) 1))
(primary-ip (first running-nodes)))) (primary-ip (first running-nodes))))


(defn nimbus-private-ip [compute name]
(let [running-nodes (filter running? (nodes-with-tag (str "nimbus-" name) compute))]
(assert (= (count running-nodes) 1))
(private-ip (first running-nodes))))


(defn zookeeper-ips [compute name] (defn zookeeper-ips [compute name]
(let [running-nodes (filter running? (let [running-nodes (filter running?
(nodes-with-tag (str "zookeeper-" name) compute))] (nodes-with-tag (str "zookeeper-" name) compute))]
Expand Down Expand Up @@ -63,7 +69,7 @@
"Build storm" "Build storm"


(cd "$HOME") (cd "$HOME")
(mkdir "build") (mkdir -p "build")
(cd "$HOME/build") (cd "$HOME/build")
(if-not (directory? "storm") (if-not (directory? "storm")
(git clone "git://github.com/nathanmarz/storm.git")) (git clone "git://github.com/nathanmarz/storm.git"))
Expand Down Expand Up @@ -96,7 +102,7 @@
(rm "-f storm") (rm "-f storm")
(ln "-s $HOME/`ls | grep zip | sed s/.zip//` storm") (ln "-s $HOME/`ls | grep zip | sed s/.zip//` storm")


(mkdir "daemon") (mkdir -p "daemon")
(chmod "755" "$HOME/storm/log4j") (chmod "755" "$HOME/storm/log4j")
(touch "$HOME/storm/log4j/storm.log.properties") (touch "$HOME/storm/log4j/storm.log.properties")
(touch "$HOME/storm/log4j/log4j.properties") (touch "$HOME/storm/log4j/log4j.properties")
Expand Down Expand Up @@ -130,6 +136,20 @@
:mode 755) :mode 755)
)) ))


(defn write-drpc-exec [request path]
(-> request
(remote-file/remote-file
path
;;TODO: need to replace $HOME with the hardcoded absolute path
:content (str
"#!/bin/bash\n\n
cd $HOME/storm\n\n
python bin/storm drpc")
:overwrite-changes true
:literal true
:mode 755)
))

(defn install-ui [request] (defn install-ui [request]
(-> request (-> request
(directory/directory "$HOME/ui" :owner "storm" :mode "700") (directory/directory "$HOME/ui" :owner "storm" :mode "700")
Expand All @@ -138,6 +158,14 @@
(write-ui-exec "$HOME/ui/run") (write-ui-exec "$HOME/ui/run")
)) ))


(defn install-drpc [request]
(-> request
(directory/directory "$HOME/drpc" :owner "storm" :mode "700")
(directory/directory "$HOME/drpc/logs" :owner "storm" :mode "700")
(directory/directory "$HOME/drpc/supervise" :owner "storm" :mode "700")
(write-drpc-exec "$HOME/drpc/run")
))

(defn install-nimbus [request release local-dir-path] (defn install-nimbus [request release local-dir-path]
(-> (->
request request
Expand All @@ -152,8 +180,19 @@
(cd "$HOME/daemon") (cd "$HOME/daemon")
"ps ax | grep backtype.storm | grep -v grep | awk '{print $1}' | xargs kill -9\n\n" "ps ax | grep backtype.storm | grep -v grep | awk '{print $1}' | xargs kill -9\n\n"
"sudo -u storm -H nohup supervise . &" "sudo -u storm -H nohup supervise . &"
)))

(defn exec-ui [request]
(exec-script/exec-script request
(cd "$HOME/ui") (cd "$HOME/ui")
"sudo -u storm -H nohup supervise . &"))) "sudo -u storm -H nohup supervise . &"
))

(defn exec-drpc [request]
(exec-script/exec-script request
(cd "$HOME/drpc")
"sudo -u storm -H nohup supervise . &"
))


(defn write-storm-exec [request name] (defn write-storm-exec [request name]
(-> request (-> request
Expand All @@ -178,6 +217,8 @@
(concat (map #(str " - \"" % "\"") (zookeeper-ips compute name))) (concat (map #(str " - \"" % "\"") (zookeeper-ips compute name)))
[] []
[(str "nimbus.host: \"" (nimbus-ip compute name) "\"")] [(str "nimbus.host: \"" (nimbus-ip compute name) "\"")]
["drpc.servers:"]
[(str " - \"" (nimbus-private-ip compute name) "\"")]
[(str "storm.local.dir: \"/mnt/storm\"")])))) [(str "storm.local.dir: \"/mnt/storm\"")]))))


(defn mk-supervisor-yaml [compute name] (defn mk-supervisor-yaml [compute name]
Expand Down
32 changes: 30 additions & 2 deletions src/clj/backtype/storm/node.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -28,6 +28,18 @@
[org.jclouds.compute :only [nodes-with-tag]] [org.jclouds.compute :only [nodes-with-tag]]
[clojure.walk])) [clojure.walk]))


(defn parse-release [release]
(map #(Integer/parseInt %) (.split release "\\.")))

(defn release> [release1 release2]
(let [r1 (parse-release release1)
r2 (parse-release release2)
diff (map - r1 r2)
left (take-while #(>= % 0) diff)]
(and (not (empty? left))
(pos? (last left)))
))

;; CONSTANTS ;; CONSTANTS


(def clusters-conf (def clusters-conf
Expand Down Expand Up @@ -98,6 +110,18 @@
(storm/write-storm-exec (storm/write-storm-exec
"supervisor"))})) "supervisor"))}))


(defn maybe-install-drpc [req release]
(if (or (not release) (release> release "0.5.3"))
(storm/install-drpc req)
req
))

(defn maybe-exec-drpc [req release]
(if (or (not release) (release> release "0.5.3"))
(storm/exec-drpc req)
req
))

(defn nimbus-server-spec [name release] (defn nimbus-server-spec [name release]
(server-spec (server-spec
:extends (storm-base-server-spec name) :extends (storm-base-server-spec name)
Expand All @@ -106,12 +130,16 @@
(storm/install-nimbus (storm/install-nimbus
release release
"/mnt/storm") "/mnt/storm")
(storm/install-ui)) (storm/install-ui)
(maybe-install-drpc release))
:post-configure (phase :post-configure (phase
(ganglia/ganglia-finish) (ganglia/ganglia-finish)
(storm/write-storm-exec (storm/write-storm-exec
"nimbus") "nimbus")
)})) )
:exec (phase
(storm/exec-ui)
(maybe-exec-drpc release))}))


(defn node-spec-from-config [group-name inbound-ports] (defn node-spec-from-config [group-name inbound-ports]
(letfn [(assoc-with-conf-key [image image-key conf-key & {:keys [f] :or {f identity}}] (letfn [(assoc-with-conf-key [image image-key conf-key & {:keys [f] :or {f identity}}]
Expand Down
9 changes: 3 additions & 6 deletions src/clj/backtype/storm/provision.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -54,11 +54,8 @@
(sync-storm-conf-dir aws name) (sync-storm-conf-dir aws name)
(authorizeme aws (jclouds-group "nimbus-" name) 80) (authorizeme aws (jclouds-group "nimbus-" name) 80)
(authorizeme aws (jclouds-group "nimbus-" name) (node/storm-conf "nimbus.thrift.port")) (authorizeme aws (jclouds-group "nimbus-" name) (node/storm-conf "nimbus.thrift.port"))
(authorizeme aws (jclouds-group "nimbus-" name) 8080) (authorizeme aws (jclouds-group "nimbus-" name) (node/storm-conf "ui.port"))

(authorizeme aws (jclouds-group "nimbus-" name) (node/storm-conf "drpc.port"))

;; TODO: should probably move this out of this deploy
(authorizeme aws (jclouds-group "nimbus-" name) 3772) ;; drpc
(println "Attaching Complete.")) (println "Attaching Complete."))


(defn start-with-nodes! [aws name nimbus supervisor zookeeper] (defn start-with-nodes! [aws name nimbus supervisor zookeeper]
Expand Down Expand Up @@ -128,7 +125,7 @@
(use 'backtype.storm.provision) (use 'backtype.storm.provision)
(ns backtype.storm.provision) (ns backtype.storm.provision)
(def aws (mk-aws)) (def aws (mk-aws))
(lift (node/zookeeper "test") :compute aws :phase [:configure] ) (lift (node/supervisor "test" nil) :compute aws :phase [:post-configure] )
(sync-storm-conf-dir aws) (sync-storm-conf-dir aws)
(print-all-ips! aws) (print-all-ips! aws)
) )

0 comments on commit 6b0dc55

Please sign in to comment.