Permalink
Browse files

fix conflicts

  • Loading branch information...
2 parents 810e676 + e8d8864 commit d527be5946fa9ea086d33b41147acc172a644fed @nathanmarz nathanmarz committed Oct 25, 2012
View
@@ -23,6 +23,12 @@
* Can now submit a topology in inactive state. Storm will wait to call open/prepare on the spouts/bolts until it is first activated.
* Can now activate, deactive, rebalance, and kill topologies from the Storm UI (thanks Frostman)
* Can now use --config option to override which yaml file from ~/.storm to use for the config (thanks tjun)
+ * Redesigned the pluggable resource scheduler (INimbus, ISupervisor) interfaces to allow for much simpler integrations
+ * Added "throws Exception" to TestJob interface
+ * Added reportError to multilang protocol and updated Python and Ruby adapters to use it (thanks Lazyshot)
+ * Number tuples executed now tracked and shown in Storm UI
+ * Added ReportedFailedException which causes a batch to fail without killing worker and reports the error to the UI
+ * Execute latency now tracked and shown in Storm UI
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified
View
@@ -62,6 +62,7 @@ You must not remove this notice, or any other, from this software.
* Sergey Lukjanov ([@Frostman](https://github.com/Frostman))
* Ross Feinstein ([@rnfein](https://github.com/rnfein))
* Junichiro Takagi ([@tjun](https://github.com/tjun))
+* Bryan Peterson ([@Lazyshot](https://github.com/Lazyshot))
## Acknowledgements
View
@@ -25,6 +25,3 @@ scp target/storm*jar pom.xml clojars@clojars.org:
rm -Rf target *.xml
git checkout conf/logback.xml
-
-
-
@@ -5,7 +5,7 @@
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.spout ISpoutWaitStrategy])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
- EmitInfo BoltFailInfo BoltAckInfo])
+ EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
(:require [backtype.storm [tuple :as tuple]])
(:require [backtype.storm.daemon [task :as task]])
)
@@ -479,7 +479,12 @@
:factory? true)]))
(defn- tuple-time-delta! [^TupleImpl tuple]
- (let [ms (.getSampleStartTime tuple)]
+ (let [ms (.getProcessSampleStartTime tuple)]
+ (if ms
+ (time-delta-ms ms))))
+
+(defn- tuple-execute-time-delta! [^TupleImpl tuple]
+ (let [ms (.getExecuteSampleStartTime tuple)]
(if ms
(time-delta-ms ms))))
@@ -488,7 +493,8 @@
(.put pending key (bit-xor curr id))))
(defmethod mk-threads :bolt [executor-data task-datas]
- (let [executor-stats (:stats executor-data)
+ (let [execute-sampler (mk-stats-sampler (:storm-conf executor-data))
+ executor-stats (:stats executor-data)
{:keys [storm-conf component-id worker-context transfer-fn report-error sampler
open-or-prepare-was-called?]} executor-data
rand (Random. (Utils/secureRandomLong))
@@ -509,10 +515,25 @@
;;(log-debug "Received tuple " tuple " at task " task-id)
;; need to do it this way to avoid reflection
- (let [^IBolt bolt-obj (->> task-id (get task-datas) :object)]
- (when (sampler)
- (.setSampleStartTime tuple (System/currentTimeMillis)))
- (.execute bolt-obj tuple)))]
+ (let [task-data (get task-datas task-id)
+ ^IBolt bolt-obj (:object task-data)
+ user-context (:user-context task-data)
+ sampler? (sampler)
+ execute-sampler? (execute-sampler)
+ now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
+ (when sampler?
+ (.setProcessSampleStartTime tuple now))
+ (when execute-sampler?
+ (.setExecuteSampleStartTime tuple now))
+ (.execute bolt-obj tuple)
+ (let [delta (tuple-execute-time-delta! tuple)]
+ (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
+ (when delta
+ (stats/bolt-execute-tuple! executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple)
+ delta)
+ ))))]
;; TODO: can get any SubscribedState objects out of the context now
@@ -142,9 +142,9 @@
(def COMMON-FIELDS [:emitted :transferred])
(defrecord CommonStats [emitted transferred rate])
-(def BOLT-FIELDS [:acked :failed :process-latencies])
+(def BOLT-FIELDS [:acked :failed :process-latencies :executed :execute-latencies])
;;acked and failed count individual tuples
-(defrecord BoltExecutorStats [common acked failed process-latencies])
+(defrecord BoltExecutorStats [common acked failed process-latencies executed execute-latencies])
(def SPOUT-FIELDS [:acked :failed :complete-latencies])
;;acked and failed count tuple completion
@@ -165,6 +165,8 @@
(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
(atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
(atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+ (atom (apply keyed-counter-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
+ (atom (apply keyed-avg-rolling-window-set NUM-STAT-BUCKETS STAT-BUCKETS))
))
(defn mk-spout-stats [rate]
@@ -188,6 +190,12 @@
(defn transferred-tuples! [stats stream amt]
(update-executor-stat! stats [:common :transferred] stream (* (stats-rate stats) amt)))
+(defn bolt-execute-tuple! [^BoltExecutorStats stats component stream latency-ms]
+ (let [key [component stream]]
+ (update-executor-stat! stats :executed key (stats-rate stats))
+ (update-executor-stat! stats :execute-latencies key latency-ms)
+ ))
+
(defn bolt-acked-tuple! [^BoltExecutorStats stats component stream latency-ms]
(let [key [component stream]]
(update-executor-stat! stats :acked key (stats-rate stats))
@@ -286,8 +294,10 @@
(ExecutorSpecificStats/bolt
(BoltStats. (window-set-converter (:acked stats) to-global-stream-id)
(window-set-converter (:failed stats) to-global-stream-id)
- (window-set-converter (:process-latencies stats) to-global-stream-id)))
- )
+ (window-set-converter (:process-latencies stats) to-global-stream-id)
+ (window-set-converter (:executed stats) to-global-stream-id)
+ (window-set-converter (:execute-latencies stats) to-global-stream-id)
+ )))
(defmethod thriftify-specific-stats :spout
[stats]
@@ -238,10 +238,18 @@
:failed
(aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_failed)
stats-seq))
+ :executed
+ (aggregate-counts (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
+ stats-seq))
:process-latencies
(aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_process_ms_avg)
stats-seq)
(map #(.. ^ExecutorStats % get_specific get_bolt get_acked)
+ stats-seq))
+ :execute-latencies
+ (aggregate-averages (map #(.. ^ExecutorStats % get_specific get_bolt get_execute_ms_avg)
+ stats-seq)
+ (map #(.. ^ExecutorStats % get_specific get_bolt get_executed)
stats-seq))}
)))
@@ -269,6 +277,9 @@
:transferred (aggregate-count-streams (:transferred stats))
:process-latencies (aggregate-avg-streams (:process-latencies stats)
(:acked stats))
+ :executed (aggregate-count-streams (:executed stats))
+ :execute-latencies (aggregate-avg-streams (:execute-latencies stats)
+ (:executed stats))
})
(defn aggregate-spout-streams [stats]
@@ -390,7 +401,7 @@
(defn bolt-comp-table [top-id summ-map errors window include-sys?]
(sorted-table
- ["Id" "Executors" "Tasks" "Emitted" "Transferred" "Process latency (ms)"
+ ["Id" "Executors" "Tasks" "Emitted" "Transferred" "Execute latency (ms)" "Executed" "Process latency (ms)"
"Acked" "Failed" "Last error"]
(for [[id summs] summ-map
:let [stats-seq (get-filled-stats summs)
@@ -403,6 +414,8 @@
(sum-tasks summs)
(get-in stats [:emitted window])
(get-in stats [:transferred window])
+ (float-str (get-in stats [:execute-latencies window]))
+ (get-in stats [:executed window])
(float-str (get-in stats [:process-latencies window]))
(get-in stats [:acked window])
(get-in stats [:failed window])
@@ -560,13 +573,15 @@
(let [stream-summary (-> stream-summary
swap-map-order
(get window)
- (select-keys [:acked :failed :process-latencies])
+ (select-keys [:acked :failed :process-latencies :executed :execute-latencies])
swap-map-order)]
(sorted-table
- ["Component" "Stream" "Process latency (ms)" "Acked" "Failed"]
+ ["Component" "Stream" "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"]
(for [[^GlobalStreamId s stats] stream-summary]
[(.get_componentId s)
(.get_streamId s)
+ (float-str (:execute-latencies stats))
+ (nil-to-zero (:executed stats))
(float-str (:process-latencies stats))
(nil-to-zero (:acked stats))
(nil-to-zero (:failed stats))
@@ -576,7 +591,7 @@
(defn bolt-executor-table [topology-id executors window include-sys?]
(sorted-table
["Id" "Uptime" "Host" "Port" "Emitted" "Transferred"
- "Process latency (ms)" "Acked" "Failed"]
+ "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"]
(for [^ExecutorSummary e executors
:let [stats (.get_stats e)
stats (if stats
@@ -591,6 +606,8 @@
(.get_port e)
(nil-to-zero (:emitted stats))
(nil-to-zero (:transferred stats))
+ (float-str (:execute-latencies stats))
+ (nil-to-zero (:executed stats))
(float-str (:process-latencies stats))
(nil-to-zero (:acked stats))
(nil-to-zero (:failed stats))
@@ -604,14 +621,16 @@
display-map (into {} (for [t times] [t pretty-uptime-sec]))
display-map (assoc display-map ":all-time" (fn [_] "All time"))]
(sorted-table
- ["Window" "Emitted" "Transferred" "Process latency (ms)" "Acked" "Failed"]
+ ["Window" "Emitted" "Transferred" "Execute latency (ms)" "Executed" "Process latency (ms)" "Acked" "Failed"]
(for [k (concat times [":all-time"])
:let [disp ((display-map k) k)]]
[(link-to (if (= k window) {:class "red"} {})
(url-format "/topology/%s/component/%s?window=%s" topology-id id k)
disp)
(get-in stats [:emitted k])
(get-in stats [:transferred k])
+ (float-str (get-in stats [:execute-latencies k]))
+ (get-in stats [:executed k])
(float-str (get-in stats [:process-latencies k]))
(get-in stats [:acked k])
(get-in stats [:failed k])
@@ -1,10 +0,0 @@
-require: "storm"
-
-class TesterBolt : Storm Bolt {
- def process: tuple {
- emit: [tuple values first + "lalala"]
- ack: tuple
- }
-}
-
-TesterBolt new run
Oops, something went wrong.

0 comments on commit d527be5

Please sign in to comment.