Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Nimbus HA #422

Closed
wants to merge 85 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
85 commits
Select commit Hold shift + click to select a range
382dfb6
INimbusStorage interface added
Frostman Oct 15, 2012
69f520e
custom storage implemented; local fs storage implemented.
Frostman Oct 15, 2012
683e248
storage instantiation added
Frostman Oct 15, 2012
eb788bd
nimbus-storage-stormdist-root added to config
Frostman Oct 15, 2012
1151035
use storage to download files from nimbus
Frostman Oct 15, 2012
cd50cf7
nimbus-storage-stormdist-root fixed
Frostman Oct 15, 2012
f29dffe
IOUtils added to bootstrap
Frostman Oct 15, 2012
5235675
Use FileInputStream instead of BufferFileInputStream
Frostman Oct 15, 2012
53f24f3
remove /nimbus prefix from storage path (nimbus-storage-stormdist-root)
Frostman Oct 15, 2012
8ef473c
read-storm-conf is using storage now
Frostman Oct 15, 2012
72d5a8d
read-storm-topology is now using storage
Frostman Oct 15, 2012
a34d703
BufferInputStream added
Frostman Oct 15, 2012
cac28ea
fileDownload is now using BufferInputStream
Frostman Oct 15, 2012
e79f716
dir for local storage can be configured directly now
Frostman Oct 16, 2012
8075736
master-storage-local-dir -> nimbus-storage-local-dir
Frostman Oct 16, 2012
25809bb
some helpers for working with nimbus storage has been added
Frostman Oct 16, 2012
7d094fd
setup-storm-code is now using storage
Frostman Oct 16, 2012
e94196c
code-ids is now using storage
Frostman Oct 17, 2012
0a66c6a
full-path option added to the storage interface
Frostman Oct 17, 2012
b5ef2f1
code-ids fixed (list is now returns only names)
Frostman Oct 17, 2012
7c83af6
deserialize-from-storage helper added
Frostman Oct 17, 2012
cd218f0
msg fixed
Frostman Oct 17, 2012
171f9ac
Don't call delete for empty list.
Frostman Oct 18, 2012
328ce66
clean INimbusStorage interface
Frostman Oct 18, 2012
97fa2fd
delete(List) and move has been removed from Nimbus storage (unused)
Frostman Oct 18, 2012
57160cf
Merge remote-tracking branch 'upstream/master' into nimbus-storage
Frostman Oct 19, 2012
08e1618
0.8.2-wip16
Oct 30, 2012
9a0243c
Merge branch 'master' into 0.8.2
Oct 30, 2012
aa50da1
Merge remote-tracking branch 'upstream/master' into nimbus-storage
Frostman Nov 1, 2012
7a66f2f
nimbus.elections.zookeeper.root parameter added to config
Frostman Nov 1, 2012
5219d54
NimbusLeaderElections implemented
Frostman Nov 1, 2012
f0de754
small refactoring
Frostman Nov 1, 2012
1e3820e
NimbusClient is now discovering Nimbus leader using Zookeeper
Frostman Nov 1, 2012
22db24a
elections clj helper added
Frostman Nov 1, 2012
b550254
leadership awaiting added to nimbus
Frostman Nov 1, 2012
05038c2
nimbus leader discovery added to ui
Frostman Nov 1, 2012
8e178ee
fix curator client zk root path.
Frostman Nov 1, 2012
192d85a
Merge branch 'master' into 0.8.2
Nov 7, 2012
61ba860
0.8.2-wip17
Nov 7, 2012
cb7e648
Merge branch 'master' into 0.8.2
Nov 8, 2012
4b22ed4
0.8.2-wip18
Nov 8, 2012
3d90c4d
Merge branch 'master' into 0.8.2
Nov 8, 2012
1488476
Merge branch 'master' into 0.8.2
Nov 8, 2012
98ba1f4
close zk connection after leader id discovery
Frostman Nov 8, 2012
15c75b2
Merge branch 'master' into 0.8.2
Nov 8, 2012
9595dad
Merge remote-tracking branch 'upstream/master' into nimbus-storage
Frostman Nov 13, 2012
08bb175
Merge branch 'nimbus-storage' into nimbus-ha
Frostman Nov 13, 2012
92d5991
log-message about awaiting/gaining leadership added
Frostman Nov 15, 2012
7bde88d
close leader-elections on nimbus shutdown (release leadership mutex)
Frostman Nov 15, 2012
dd02170
ensure mutex path to avoid NoNode exception
Frostman Nov 15, 2012
c967da2
NotALeaderException added to thrift api
Frostman Nov 15, 2012
846af39
ensure-leadership fixed
Frostman Nov 15, 2012
6fe8f88
ensure-leadership added to all methods of thrift service handler
Frostman Nov 15, 2012
e87fbbb
NotALeaderException handled in StormSubmitter
Frostman Nov 15, 2012
84d3f80
NotALeaderException handled in Utils
Frostman Nov 15, 2012
796f708
personal session and connection timeouts has been added for nimbus el…
Frostman Nov 15, 2012
9647543
with-configured-nimbus-connection fixed
Frostman Nov 19, 2012
e1351ba
unnecessary 'str' call removed
Frostman Nov 27, 2012
c529b7a
Merge branch 'nimbus-storage' into nimbus-ha
Frostman Nov 27, 2012
b365a69
exception will be thrown if secondary Nimbus will try to start with s…
Frostman Nov 27, 2012
3e38314
Merge branch 'master' into 0.8.2
Nov 29, 2012
7769afb
0.8.2-wip19
Nov 29, 2012
92deeb7
nimbus.local.dir option has been removed from config
Frostman Dec 3, 2012
c7fec94
nimbus.local.dir option has been removed from config
Frostman Dec 3, 2012
94098fb
join-paths function added and used instead of `str` in storage.clj
Frostman Dec 3, 2012
1a17260
javadoc added for nimbus.storage option
Frostman Dec 3, 2012
48443a6
Merge branch 'nimbus-storage' into nimbus-ha
Frostman Dec 3, 2012
138a2f9
Merge branch 'master' into 0.8.2
Dec 10, 2012
c405490
0.8.2-wip20
Dec 10, 2012
cfdb01b
Merge branch 'master' into 0.8.2
Dec 12, 2012
3fe6e8f
Merge branch 'master' into 0.8.2
Dec 20, 2012
5ab86ad
0.8.2-wip21
Dec 20, 2012
6f615b4
Merge branch 'master' into 0.8.2
Jan 5, 2013
9e30263
Merge branch 'master' into 0.8.2
Jan 7, 2013
8e0af8f
Merge branch 'master' into 0.8.2
Jan 8, 2013
88bd39a
0.8.2-wip22
Jan 8, 2013
3285f4e
Merge branch 'master' into 0.8.2
Jan 9, 2013
c59ea52
Merge branch 'master' into 0.8.2
Jan 9, 2013
843c86e
storm 0.8.2
Jan 11, 2013
0be62fe
Merge branch 'master' into 0.8.2
Jan 11, 2013
09e592d
now host+port used as Nimbus host id for elections
Frostman Jan 14, 2013
e8a61da
exception message fixed
Frostman Jan 16, 2013
b02f8d6
method name fixed
Frostman Jan 16, 2013
407667d
method names fixed, label changed
Frostman Jan 16, 2013
13d19b9
Merge remote-tracking branch 'upstream/0.8.2' into nimbus-ha
Frostman Jan 22, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ nimbus.task.launch.secs: 120
nimbus.reassign: true
nimbus.file.copy.expiration.secs: 600
nimbus.topology.validator: "backtype.storm.nimbus.DefaultTopologyValidator"
nimbus.elections.zk.path: "/storm/nimbus-leader"
nimbus.elections.zk.session.timeout: 2000
nimbus.elections.zk.connection.timeout: 15000

### ui.* configs are for the master
ui.port: 8080
Expand Down
3 changes: 2 additions & 1 deletion project.clj
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
(do (println (str "ERROR: requires Leiningen 1.x but you are using " lein-version))
(System/exit 1)))

(defproject storm "0.8.2-wip15"
(defproject storm "0.8.2"
:source-path "src/clj"
:test-path "test/clj"
:java-source-path "src/jvm"
Expand All @@ -18,6 +18,7 @@
[clj-time "0.4.1"]
[log4j/log4j "1.2.16"]
[com.netflix.curator/curator-framework "1.0.1"]
[com.netflix.curator/curator-recipes "1.0.1"]
[backtype/jzmq "2.1.0"]
[com.googlecode.json-simple/json-simple "1.1"]
[compojure "1.1.3"]
Expand Down
4 changes: 2 additions & 2 deletions src/clj/backtype/storm/bootstrap.clj
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
(import (quote [backtype.storm.utils Utils LocalState Time TimeCacheMap
TimeCacheMap$ExpiredCallback
RotatingMap RotatingMap$ExpiredCallback
BufferFileInputStream
BufferInputStream BufferFileInputStream
RegisteredGlobalState ThriftTopologyUtils DisruptorQueue
MutableObject MutableLong]))
(import (quote [backtype.storm.serialization KryoTupleSerializer KryoTupleDeserializer]))
Expand Down Expand Up @@ -44,5 +44,5 @@
(import (quote [backtype.storm.grouping CustomStreamGrouping]))
(import (quote [java.io File FileOutputStream FileInputStream]))
(import (quote [java.util Collection List Random Map HashMap Collections ArrayList LinkedList]))
(import (quote [org.apache.commons.io FileUtils]))
(import (quote [org.apache.commons.io FileUtils IOUtils]))
))
14 changes: 14 additions & 0 deletions src/clj/backtype/storm/config.clj
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,26 @@
ret
))

(defn nimbus-storage-local-dir [conf]
(let [storm-local-dir (conf STORM-LOCAL-DIR)
nimbus-local-dir (get conf "nimbus.local.dir")
local-dir (if-not (clojure.string/blank? nimbus-local-dir) nimbus-local-dir storm-local-dir)
ret (str local-dir "/nimbus")]
(FileUtils/forceMkdir (File. ret))
ret))

(defn master-stormdist-root
([conf]
(str (master-local-dir conf) "/stormdist"))
([conf storm-id]
(str (master-stormdist-root conf) "/" storm-id)))

(defn nimbus-storage-stormdist-root
([]
"/stormdist")
([storm-id]
(str (nimbus-storage-stormdist-root) "/" storm-id)))

(defn master-stormjar-path [stormroot]
(str stormroot "/stormjar.jar"))

Expand Down
117 changes: 58 additions & 59 deletions src/clj/backtype/storm/daemon/nimbus.clj
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
(:import [org.apache.thrift7.transport TNonblockingServerTransport TNonblockingServerSocket])
(:import [java.nio ByteBuffer])
(:import [java.nio.channels Channels WritableByteChannel])
(:import [java.io InputStream])
(:use [backtype.storm.scheduler.DefaultScheduler])
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails])
(:use [backtype.storm bootstrap util])
(:use [backtype.storm.daemon common])
(:use [backtype.storm.nimbus elections storage])
(:gen-class
:methods [^{:static true} [launch [backtype.storm.scheduler.INimbus] void]]))

Expand Down Expand Up @@ -54,6 +56,7 @@
:uploaders (file-cache-map conf)
:uptime (uptime-computer)
:validator (new-instance (conf NIMBUS-TOPOLOGY-VALIDATOR))
:storage (create-nimbus-storage conf)
:timer (mk-timer :kill-fn (fn [t]
(log-error t "Error when processing event")
(halt-process! 20 "Error when processing an event")
Expand All @@ -64,13 +67,9 @@
(defn inbox [nimbus]
(master-inbox (:conf nimbus)))

(defn- read-storm-conf [conf storm-id]
(let [stormroot (master-stormdist-root conf storm-id)]
(merge conf
(Utils/deserialize
(FileUtils/readFileToByteArray
(File. (master-stormconf-path stormroot))
)))))
(defn- read-storm-conf [storage conf storm-id]
(let [stormroot (nimbus-storage-stormdist-root storm-id)]
(merge conf (deserialize-from-storage storage (master-stormconf-path stormroot)))))

(defn set-topology-status! [nimbus storm-id status]
(let [storm-cluster-state (:storm-cluster-state nimbus)]
Expand All @@ -87,7 +86,7 @@
(fn [kill-time]
(let [delay (if kill-time
kill-time
(get (read-storm-conf (:conf nimbus) storm-id)
(get (read-storm-conf (:storage nimbus) (:conf nimbus) storm-id)
TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
(delay-event nimbus
storm-id
Expand All @@ -101,7 +100,7 @@
(fn [time num-workers executor-overrides]
(let [delay (if time
time
(get (read-storm-conf (:conf nimbus) storm-id)
(get (read-storm-conf (:storage nimbus) (:conf nimbus) storm-id)
TOPOLOGY-MESSAGE-TIMEOUT-SECS))]
(delay-event nimbus
storm-id
Expand Down Expand Up @@ -289,29 +288,25 @@
;; need to somehow maintain stream/component ids inside tuples
topology)

(defn- setup-storm-code [conf storm-id tmp-jar-location storm-conf topology]
(let [stormroot (master-stormdist-root conf storm-id)]
(FileUtils/forceMkdir (File. stormroot))
(FileUtils/cleanDirectory (File. stormroot))
(setup-jar conf tmp-jar-location stormroot)
(FileUtils/writeByteArrayToFile (File. (master-stormcode-path stormroot)) (Utils/serialize topology))
(FileUtils/writeByteArrayToFile (File. (master-stormconf-path stormroot)) (Utils/serialize storm-conf))
))
(defn- setup-storm-code [conf storage storm-id tmp-jar-location storm-conf topology]
(let [stormroot (nimbus-storage-stormdist-root storm-id)]
(ensure-clean-dir-in-storage storage stormroot)
(setup-jar conf storage tmp-jar-location stormroot)
(serialize-to-storage topology storage (master-stormcode-path stormroot))
(serialize-to-storage storm-conf storage (master-stormconf-path stormroot))))

(defn- read-storm-topology [conf storm-id]
(let [stormroot (master-stormdist-root conf storm-id)]
(Utils/deserialize
(FileUtils/readFileToByteArray
(File. (master-stormcode-path stormroot))
))))
(defn- read-storm-topology [storage storm-id]
(let [stormroot (nimbus-storage-stormdist-root storm-id)]
(deserialize-from-storage storage (master-stormcode-path stormroot))))

(declare compute-executor->component)

(defn read-topology-details [nimbus storm-id]
(let [conf (:conf nimbus)
storage (:storage nimbus)
storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
topology-conf (read-storm-conf conf storm-id)
topology (read-storm-topology conf storm-id)
topology-conf (read-storm-conf storage conf storm-id)
topology (read-storm-topology storage storm-id)
executor->component (->> (compute-executor->component nimbus storm-id)
(map-key (fn [[start-task end-task]]
(ExecutorDetails. (int start-task) (int end-task)))))]
Expand Down Expand Up @@ -401,10 +396,11 @@

(defn- compute-executors [nimbus storm-id]
(let [conf (:conf nimbus)
storage (:storage nimbus)
storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
component->executors (:component->executors storm-base)
storm-conf (read-storm-conf conf storm-id)
topology (read-storm-topology conf storm-id)
storm-conf (read-storm-conf storage conf storm-id)
topology (read-storm-topology storage storm-id)
task->component (storm-task-info topology storm-conf)]
(->> (storm-task-info topology storm-conf)
reverse-map
Expand All @@ -417,9 +413,10 @@

(defn- compute-executor->component [nimbus storm-id]
(let [conf (:conf nimbus)
storage (:storage nimbus)
executors (compute-executors nimbus storm-id)
topology (read-storm-topology conf storm-id)
storm-conf (read-storm-conf conf storm-id)
topology (read-storm-topology storage storm-id)
storm-conf (read-storm-conf storage conf storm-id)
task->component (storm-task-info topology storm-conf)
executor->component (into {} (for [executor executors
:let [start-task (first executor)
Expand Down Expand Up @@ -673,7 +670,7 @@
[id now-secs]
)))]]
{topology-id (Assignment.
(master-stormdist-root conf topology-id)
(nimbus-storage-stormdist-root topology-id)
(select-keys all-node->host all-nodes)
executor->node+port
start-times)}))]
Expand Down Expand Up @@ -702,8 +699,9 @@
{:pre [(#{:active :inactive} topology-initial-status)]}
(let [storm-cluster-state (:storm-cluster-state nimbus)
conf (:conf nimbus)
storm-conf (read-storm-conf conf storm-id)
topology (system-topology! storm-conf (read-storm-topology conf storm-id))
storage (:storage nimbus)
storm-conf (read-storm-conf storage conf storm-id)
topology (system-topology! storm-conf (read-storm-topology storage storm-id))
num-executors (->> (all-components topology) (map-val num-start-executors))]
(log-message "Activating " storm-name ": " storm-id)
(.activate-storm! storm-cluster-state
Expand Down Expand Up @@ -732,17 +730,13 @@
(throw (AlreadyAliveException. (str storm-name " is already active"))))
))

(defn code-ids [conf]
(-> conf
master-stormdist-root
read-dir-contents
set
))
(defn code-ids [storage]
(set (.list storage (nimbus-storage-stormdist-root))))

(defn cleanup-storm-ids [conf storm-cluster-state]
(defn cleanup-storm-ids [storage storm-cluster-state]
(let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
error-ids (set (.error-topologies storm-cluster-state))
code-ids (code-ids conf)
code-ids (code-ids storage)
assigned-ids (set (.active-storms storm-cluster-state))]
(set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids)
))
Expand Down Expand Up @@ -805,15 +799,16 @@
(defn do-cleanup [nimbus]
(let [storm-cluster-state (:storm-cluster-state nimbus)
conf (:conf nimbus)
storage (:storage nimbus)
submit-lock (:submit-lock nimbus)]
(let [to-cleanup-ids (locking submit-lock
(cleanup-storm-ids conf storm-cluster-state))]
(cleanup-storm-ids storage storm-cluster-state))]
(when-not (empty? to-cleanup-ids)
(doseq [id to-cleanup-ids]
(log-message "Cleaning up " id)
(.teardown-heartbeats! storm-cluster-state id)
(.teardown-topology-errors! storm-cluster-state id)
(rmr (master-stormdist-root conf id))
(.delete storage (nimbus-storage-stormdist-root id))
(swap! (:heartbeats-cache nimbus) dissoc id))
))))

Expand All @@ -834,11 +829,11 @@

(defn cleanup-corrupt-topologies! [nimbus]
(let [storm-cluster-state (:storm-cluster-state nimbus)
code-ids (set (code-ids (:conf nimbus)))
code-ids (code-ids (:storage nimbus))
active-topologies (set (.active-storms storm-cluster-state))
corrupt-topologies (set/difference active-topologies code-ids)]
(doseq [corrupt corrupt-topologies]
(log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...")
(log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a dir in Nimbus storage. Cleaning up...")
(.remove-storm! storm-cluster-state corrupt)
)))

Expand All @@ -859,7 +854,9 @@
(defserverfn service-handler [conf inimbus]
(.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf)
(let [nimbus (nimbus-data conf inimbus)]
(let [nimbus (nimbus-data conf inimbus)
storage (:storage nimbus)
leader-elections (await-leadership conf storage)]
(cleanup-corrupt-topologies! nimbus)
(doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
(transition! nimbus storm-id :startup))
Expand All @@ -879,7 +876,7 @@
(fn []
(clean-inbox (inbox nimbus) (conf NIMBUS-INBOX-JAR-EXPIRATION-SECS))
))
(reify Nimbus$Iface
(reify-with-validation #(ensure-leadership leader-elections) backtype.storm.generated.Nimbus$Iface Shutdownable DaemonCommon
(^void submitTopologyWithOpts
[this ^String storm-name ^String uploadedJarLocation ^String serializedConf ^StormTopology topology
^SubmitOptions submitOptions]
Expand Down Expand Up @@ -911,7 +908,7 @@
;; lock protects against multiple topologies being submitted at once and
;; cleanup thread killing topology in b/w assignment and starting the topology
(locking (:submit-lock nimbus)
(setup-storm-code conf storm-id uploadedJarLocation storm-conf topology)
(setup-storm-code conf storage storm-id uploadedJarLocation storm-conf topology)
(.setup-heartbeats! storm-cluster-state storm-id)
(let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive
TopologyInitialStatus/ACTIVE :active}]
Expand Down Expand Up @@ -991,15 +988,15 @@
))

(^String beginFileDownload [this ^String file]
(let [is (BufferFileInputStream. file)
(let [is (BufferInputStream. (.open storage file))
id (uuid)]
(.put (:downloaders nimbus) id is)
id
))

(^ByteBuffer downloadChunk [this ^String id]
(let [downloaders (:downloaders nimbus)
^BufferFileInputStream is (.get downloaders id)]
^BufferInputStream is (.get downloaders id)]
(when-not is
(throw (RuntimeException.
"Could not find input stream for that id")))
Expand All @@ -1014,13 +1011,13 @@
(to-json (:conf nimbus)))

(^String getTopologyConf [this ^String id]
(to-json (read-storm-conf conf id)))
(to-json (read-storm-conf storage conf id)))

(^StormTopology getTopology [this ^String id]
(system-topology! (read-storm-conf conf id) (read-storm-topology conf id)))
(system-topology! (read-storm-conf storage conf id) (read-storm-topology storage id)))

(^StormTopology getUserTopology [this ^String id]
(read-storm-topology conf id))
(read-storm-topology storage id))

(^ClusterSummary getClusterInfo [this]
(let [storm-cluster-state (:storm-cluster-state nimbus)
Expand Down Expand Up @@ -1063,7 +1060,9 @@

(^TopologyInfo getTopologyInfo [this ^String storm-id]
(let [storm-cluster-state (:storm-cluster-state nimbus)
task->component (storm-task-info (read-storm-topology conf storm-id) (read-storm-conf conf storm-id))
storm-topology (read-storm-topology storage storm-id)
storm-conf (read-storm-conf storage conf storm-id)
task->component (storm-task-info storm-topology storm-conf)
base (.storm-base storm-cluster-state storm-id nil)
assignment (.assignment-info storm-cluster-state storm-id nil)
beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))
Expand Down Expand Up @@ -1095,16 +1094,17 @@
)
))

Shutdownable

(shutdown [this]
(log-message "Shutting down master")
(cancel-timer (:timer nimbus))
(.disconnect (:storm-cluster-state nimbus))
(.cleanup (:downloaders nimbus))
(.cleanup (:uploaders nimbus))
(.close leader-elections)
(log-message "Shut down master")
)
DaemonCommon

(waiting? [this]
(timer-waiting? (:timer nimbus))))))

Expand All @@ -1125,14 +1125,13 @@

;; distributed implementation

(defmethod setup-jar :distributed [conf tmp-jar-location stormroot]
(defmethod setup-jar :distributed [conf storage tmp-jar-location stormroot]
(let [src-file (File. tmp-jar-location)]
(if-not (.exists src-file)
(throw
(IllegalArgumentException.
(str tmp-jar-location " to copy to " stormroot " does not exist!"))))
(FileUtils/copyFile src-file (File. (master-stormjar-path stormroot)))
))
(str tmp-jar-location " to copy to Nimbus storage does not exist!"))))
(upload-file-to-storage src-file storage (master-stormjar-path stormroot))))

;; local implementation

Expand Down
5 changes: 3 additions & 2 deletions src/clj/backtype/storm/daemon/supervisor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -439,8 +439,9 @@

(defmethod download-storm-code
:local [conf storm-id master-code-dir]
(let [stormroot (supervisor-stormdist-root conf storm-id)]
(FileUtils/copyDirectory (File. master-code-dir) (File. stormroot))
(let [stormroot (supervisor-stormdist-root conf storm-id)
master-root (nimbus-storage-local-dir conf)]
(FileUtils/copyDirectory (File. (str master-root master-code-dir)) (File. stormroot))
(let [classloader (.getContextClassLoader (Thread/currentThread))
resources-jar (resources-jar)
url (.getResource classloader RESOURCES-SUBDIR)
Expand Down
Loading