Skip to content

Commit

Permalink
Merge branch 'master' into 0.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Nov 8, 2012
2 parents d1fc48c + e5ca8b8 commit 4e6e668
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 8 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* Execute latency now tracked and shown in Storm UI
* Adding testTuple methods for easily creating Tuple instances to Testing API (thanks xumingming)
* Trident now throws an error during construction of a topology when try to select fields that don't exist in a stream (thanks xumingming)
* Compute the capacity of a bolt based on execute latency and #executed over last 10 minutes and display in UI
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
Expand Down
2 changes: 1 addition & 1 deletion src/clj/backtype/storm/scheduler/DefaultScheduler.clj
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
alive-assigned (EvenScheduler/get-alive-assigned-node+port->executors cluster topology-id)
can-reassign-slots (slots-can-reassign cluster (keys alive-assigned))
total-slots-to-use (min (.getNumWorkers topology)
(+ (count can-reassign-slots) (count alive-assigned)))
(+ (count can-reassign-slots) (count available-slots)))
bad-slots (if (> total-slots-to-use (count alive-assigned))
(bad-slots alive-assigned (count all-executors) total-slots-to-use)
[])]]
Expand Down
38 changes: 34 additions & 4 deletions src/clj/backtype/storm/ui/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,8 @@
(aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_execute_ms_avg)
stats-seq)
(map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
stats-seq))}
)))
stats-seq))
})))

(defn aggregate-spout-stats [stats-seq include-sys?]
(let [stats-seq (collectify stats-seq)]
Expand Down Expand Up @@ -378,6 +378,34 @@
(defn component-link [storm-id id]
(link-to (url-format "/topology/%s/component/%s" storm-id id) id))

(defn render-capacity [capacity]
[:span (if (> capacity 0.9)
{:class "red"}
{})
(float-str capacity)])

(defn compute-executor-capacity [^ExecutorSummary e]
(let [stats (.get_stats e)
stats (if stats
(-> stats
(aggregate-bolt-stats true)
(aggregate-bolt-streams)
swap-map-order
(get "600")))
uptime (nil-to-zero (.get_uptime_secs e))
window (if (< uptime 600) uptime 600)
executed (-> stats :executed nil-to-zero)
latency (-> stats :execute-latencies nil-to-zero)
]
(if (> window 0)
(div (* executed latency) (* 1000 window))
)))

(defn compute-bolt-capacity [executors]
(->> executors
(map compute-executor-capacity)
(apply max)))

(defn spout-comp-table [top-id summ-map errors window include-sys?]
(sorted-table
["Id" "Executors" "Tasks" "Emitted" "Transferred" "Complete latency (ms)"
Expand All @@ -401,7 +429,7 @@

(defn bolt-comp-table [top-id summ-map errors window include-sys?]
(sorted-table
["Id" "Executors" "Tasks" "Emitted" "Transferred" "Execute latency (ms)" "Executed" "Process latency (ms)"
["Id" "Executors" "Tasks" "Emitted" "Transferred" "Capacity (last 10m)" "Execute latency (ms)" "Executed" "Process latency (ms)"
"Acked" "Failed" "Last error"]
(for [[id summs] summ-map
:let [stats-seq (get-filled-stats summs)
Expand All @@ -414,6 +442,7 @@
(sum-tasks summs)
(get-in stats [:emitted window])
(get-in stats [:transferred window])
(render-capacity (compute-bolt-capacity summs))
(float-str (get-in stats [:execute-latencies window]))
(get-in stats [:executed window])
(float-str (get-in stats [:process-latencies window]))
Expand Down Expand Up @@ -590,7 +619,7 @@

(defn bolt-executor-table [topology-id executors window include-sys?]
(sorted-table
["Id" "Uptime" "Host" "Port" "Emitted" "Transferred"
["Id" "Uptime" "Host" "Port" "Emitted" "Transferred" "Capacity (last 10m)"
"Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"]
(for [^ExecutorSummary e executors
:let [stats (.get_stats e)
Expand All @@ -606,6 +635,7 @@
(.get_port e)
(nil-to-zero (:emitted stats))
(nil-to-zero (:transferred stats))
(render-capacity (compute-executor-capacity e))
(float-str (:execute-latencies stats))
(nil-to-zero (:executed stats))
(float-str (:process-latencies stats))
Expand Down
4 changes: 1 addition & 3 deletions src/jvm/backtype/storm/topology/BasicBoltExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ public void execute(Tuple input) {
_bolt.execute(input, _collector);
_collector.getOutputter().ack(input);
} catch(FailedException e) {
if(e instanceof ReportedFailedException) {
_collector.reportError(e);
}
LOG.warn("Failed to process tuple", e);
_collector.getOutputter().fail(input);
}
}
Expand Down

0 comments on commit 4e6e668

Please sign in to comment.