Skip to content

Commit

Permalink
Avoid File Not Found error under heavy thrift load.
Browse files Browse the repository at this point in the history
  • Loading branch information
Robert (Bobby) Evans committed Feb 14, 2013
1 parent f10b698 commit 59217cd
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 4 deletions.
25 changes: 21 additions & 4 deletions src/clj/backtype/storm/daemon/nimbus.clj
Expand Up @@ -4,6 +4,7 @@
(:import [org.apache.thrift7 TException])
(:import [org.apache.thrift7.transport TNonblockingServerTransport TNonblockingServerSocket])
(:import [java.nio ByteBuffer])
(:import [java.io FileNotFoundException])
(:import [java.nio.channels Channels WritableByteChannel])
(:use [backtype.storm.scheduler.DefaultScheduler])
(:import [backtype.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails
Expand Down Expand Up @@ -856,6 +857,22 @@
(throw (InvalidTopologyException.
(str "Topology name cannot contain any of the following: " (pr-str DISALLOWED-TOPOLOGY-NAME-STRS))))))

(defn- try-read-storm-conf [conf storm-id]
(try-cause
(read-storm-conf conf storm-id)
(catch FileNotFoundException e
(throw (NotAliveException. storm-id)))
)
)

(defn- try-read-storm-topology [conf storm-id]
(try-cause
(read-storm-topology conf storm-id)
(catch FileNotFoundException e
(throw (NotAliveException. storm-id)))
)
)

(defserverfn service-handler [conf inimbus]
(.prepare inimbus conf (master-inimbus-dir conf))
(log-message "Starting Nimbus with conf " conf)
Expand Down Expand Up @@ -1014,13 +1031,13 @@
(to-json (:conf nimbus)))

(^String getTopologyConf [this ^String id]
(to-json (read-storm-conf conf id)))
(to-json (try-read-storm-conf conf id)))

(^StormTopology getTopology [this ^String id]
(system-topology! (read-storm-conf conf id) (read-storm-topology conf id)))
(system-topology! (try-read-storm-conf conf id) (try-read-storm-topology conf id)))

(^StormTopology getUserTopology [this ^String id]
(read-storm-topology conf id))
(try-read-storm-topology conf id))

(^ClusterSummary getClusterInfo [this]
(let [storm-cluster-state (:storm-cluster-state nimbus)
Expand Down Expand Up @@ -1063,7 +1080,7 @@

(^TopologyInfo getTopologyInfo [this ^String storm-id]
(let [storm-cluster-state (:storm-cluster-state nimbus)
task->component (storm-task-info (read-storm-topology conf storm-id) (read-storm-conf conf storm-id))
task->component (storm-task-info (try-read-storm-topology conf storm-id) (try-read-storm-conf conf storm-id))
base (.storm-base storm-cluster-state storm-id nil)
assignment (.assignment-info storm-cluster-state storm-id nil)
beats (.executor-beats storm-cluster-state storm-id (:executor->node+port assignment))
Expand Down
10 changes: 10 additions & 0 deletions test/clj/backtype/storm/nimbus_test.clj
Expand Up @@ -144,6 +144,16 @@
(is (not-nil? ((:executor->start-time-secs assignment) e))))
))

(deftest test-bogusId
(with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
nimbus (:nimbus cluster)]
(is (thrown? NotAliveException (.getTopologyConf nimbus "bogus-id")))
(is (thrown? NotAliveException (.getTopology nimbus "bogus-id")))
(is (thrown? NotAliveException (.getUserTopology nimbus "bogus-id")))
(is (thrown? NotAliveException (.getTopologyInfo nimbus "bogus-id")))
)))

(deftest test-assignment
(with-local-cluster [cluster :supervisors 4 :ports-per-supervisor 3 :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0}]
(let [state (:storm-cluster-state cluster)
Expand Down

0 comments on commit 59217cd

Please sign in to comment.