Permalink
Browse files

Added built-in storm metrics, Added MultiReducedMetric. Refactored

MultiCountMetric. Added Unit tests.
  • Loading branch information...
1 parent 0f59a40 commit c177d096666496a06cf0e6282e2d6849f6641e61 Jason Jackson committed Nov 13, 2012
View
@@ -87,6 +87,7 @@ topology.max.task.parallelism: null
topology.max.spout.pending: null
topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
+topology.builtin.metrics.bucket.size.secs: 60
topology.fall.back.on.java.serialization: true
topology.worker.childopts: null
topology.executor.receive.buffer.size: 1024 #batched
@@ -0,0 +1,64 @@
+(ns backtype.storm.daemon.builtin-metrics
+ (:import [backtype.storm.metric.api MultiCountMetric MultiReducedMetric MeanReducer])
+ (:import [backtype.storm Config])
+ (:use [backtype.storm.stats :only [stats-rate]]))
+
+(defrecord BuiltinSpoutMetrics [^MultiCountMetric ack-count
+ ^MultiReducedMetric complete-latency
+ ^MultiCountMetric fail-count
+ ^MultiCountMetric emit-count
+ ^MultiCountMetric transfer-count])
+(defrecord BuiltinBoltMetrics [^MultiCountMetric ack-count
+ ^MultiReducedMetric process-latency
+ ^MultiCountMetric fail-count
+ ^MultiCountMetric execute-count
+ ^MultiReducedMetric execute-latency
+ ^MultiCountMetric emit-count
+ ^MultiCountMetric transfer-count])
+
+(defn make-data [executor-type]
+ (condp = executor-type
+ :spout (BuiltinSpoutMetrics. (MultiCountMetric.)
+ (MultiReducedMetric. (MeanReducer.))
+ (MultiCountMetric.)
+ (MultiCountMetric.)
+ (MultiCountMetric.))
+ :bolt (BuiltinBoltMetrics. (MultiCountMetric.)
+ (MultiReducedMetric. (MeanReducer.))
+ (MultiCountMetric.)
+ (MultiCountMetric.)
+ (MultiReducedMetric. (MeanReducer.))
+ (MultiCountMetric.)
+ (MultiCountMetric.))))
+
+(defn register-all [builtin-metrics storm-conf topology-context]
+ (doseq [[kw imetric] builtin-metrics]
+ (.registerMetric topology-context (str "__" (name kw)) imetric
+ (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
+
+(defn spout-acked-tuple! [^BuiltinSpoutMetrics m stats stream latency-ms]
+ (-> m .ack-count (.scope stream) (.incrBy (stats-rate stats)))
+ (-> m .complete-latency (.scope stream) (.update latency-ms)))
+
+(defn spout-failed-tuple! [^BuiltinSpoutMetrics m stats stream]
+ (-> m .fail-count (.scope stream) (.incrBy (stats-rate stats))))
+
+(defn bolt-execute-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms]
+ (let [scope (str comp-id ":" stream)]
+ (-> m .execute-count (.scope scope) (.incrBy (stats-rate stats)))
+ (-> m .execute-latency (.scope scope) (.update latency-ms))))
+
+(defn bolt-acked-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms]
+ (let [scope (str comp-id ":" stream)]
+ (-> m .ack-count (.scope scope) (.incrBy (stats-rate stats)))
+ (-> m .process-latency (.scope scope) (.update latency-ms))))
+
+(defn bolt-failed-tuple! [^BuiltinBoltMetrics m stats comp-id stream]
+ (let [scope (str comp-id ":" stream)]
+ (-> m .fail-count (.scope scope) (.incrBy (stats-rate stats)))))
+
+(defn emitted-tuple! [m stats stream]
+ (-> m :emit-count (.scope stream) (.incrBy (stats-rate stats))))
+
+(defn transferred-tuple! [m stats stream num-out-tasks]
+ (-> m :transfer-count (.scope stream) (.incrBy (* num-out-tasks (stats-rate stats)))))
@@ -278,8 +278,8 @@
(validate-basic! topology)
(let [ret (.deepCopy topology)]
(add-acker! storm-conf ret)
- (add-metric-streams! ret)
(add-metric-components! storm-conf ret)
+ (add-metric-streams! ret)
(add-system-streams! ret)
(add-system-components! ret)
(validate-structure! ret)
@@ -6,11 +6,10 @@
(:import [backtype.storm.spout ISpoutWaitStrategy])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
- (:import [backtype.storm.metric MetricHolder])
- (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint])
+ (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint])
(:require [backtype.storm [tuple :as tuple]])
(:require [backtype.storm.daemon [task :as task]])
- )
+ (:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics]))
(bootstrap)
@@ -214,7 +213,7 @@
:type executor-type
;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
:stats (mk-executor-stats <> (sampling-rate storm-conf))
- :interval->task->registered-metrics (HashMap.)
+ :interval->task->metric-registry (HashMap.)
:task->component (:task->component worker)
:stream->component->grouper (outbound-components worker-context component-id)
:report-error (throttled-report-error-fn <>)
@@ -244,8 +243,8 @@
:kill-fn (:report-error-and-die executor-data))))
(defn setup-metrics! [executor-data]
- (let [{:keys [storm-conf receive-queue worker-context interval->task->registered-metrics]} executor-data
- distinct-time-bucket-intervals (keys interval->task->registered-metrics)]
+ (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
+ distinct-time-bucket-intervals (keys interval->task->metric-registry)]
(doseq [interval distinct-time-bucket-intervals]
(schedule-recurring
(:user-timer (:worker executor-data))
@@ -257,21 +256,20 @@
[[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]]))))))
(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
- (let [{:keys [interval->task->registered-metrics ^WorkerTopologyContext worker-context]} executor-data
+ (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
interval (.getInteger tuple 0)]
(doseq [[task-id task-data] task-datas
- :let [metric-holders (-> interval->task->registered-metrics (get interval) (get task-id))
+ :let [name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
task-info (IMetricsConsumer$TaskInfo.
(. (java.net.InetAddress/getLocalHost) getCanonicalHostName)
(.getThisWorkerPort worker-context)
(:component-id executor-data)
task-id
(long (/ (System/currentTimeMillis) 1000))
interval)
- data-points (->> metric-holders
- (map (fn [^MetricHolder mh]
- (IMetricsConsumer$DataPoint. (.name mh)
- (.getValueAndReset ^IMetric (.metric mh)))))
+ data-points (->> name->imetric
+ (map (fn [[name imetric]]
+ (IMetricsConsumer$DataPoint. name (.getValueAndReset ^IMetric imetric))))
(into []))]]
(if (seq data-points)
(task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points])))))
@@ -352,8 +350,8 @@
(.fail spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
(when time-delta
- (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
- )))
+ (builtin-metrics/spout-failed-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info))
+ (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta]
(let [storm-conf (:storm-conf executor-data)
@@ -364,8 +362,8 @@
(.ack spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
(when time-delta
- (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
- )))
+ (builtin-metrics/spout-acked-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info) time-delta)
+ (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
(defn mk-task-receiver [executor-data tuple-action-fn]
(let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
@@ -492,6 +490,7 @@
(if (sampler) 0))))
(or out-tasks [])
))]]
+ (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
(.open spout-obj
storm-conf
(:user-context task-data)
@@ -609,11 +608,15 @@
(let [delta (tuple-execute-time-delta! tuple)]
(task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
(when delta
+ (builtin-metrics/bolt-execute-tuple! (:builtin-metrics task-data)
+ executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple)
+ delta)
(stats/bolt-execute-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
- delta)
- ))))))]
+ delta)))))))]
;; TODO: can get any SubscribedState objects out of the context now
@@ -649,6 +652,7 @@
stream
(MessageId/makeId anchors-to-ids)))))
(or out-tasks [])))]]
+ (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
(.prepare bolt-obj
storm-conf
user-context
@@ -669,11 +673,15 @@
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
(when delta
+ (builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data)
+ executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple)
+ delta)
(stats/bolt-acked-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
- delta)
- )))
+ delta))))
(^void fail [this ^Tuple tuple]
(fast-list-iter [root (.. tuple getMessageId getAnchors)]
(task/send-unanchored task-data
@@ -682,11 +690,14 @@
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
(when delta
+ (builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data)
+ executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple))
(stats/bolt-failed-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
- delta)
- )))
+ delta))))
(reportError [this error]
(report-error error)
)))))
@@ -5,8 +5,9 @@
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.generated SpoutSpec Bolt StateSpoutSpec])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
- EmitInfo BoltFailInfo BoltAckInfo])
- (:require [backtype.storm [tuple :as tuple]]))
+ EmitInfo BoltFailInfo BoltAckInfo])
+ (:require [backtype.storm [tuple :as tuple]])
+ (:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics]))
(bootstrap)
@@ -28,7 +29,7 @@
(:default-shared-resources worker)
(:user-shared-resources worker)
(:shared-executor-data executor-data)
- (:interval->task->registered-metrics executor-data)
+ (:interval->task->metric-registry executor-data)
(:open-or-prepare-was-called? executor-data))))
(defn system-topology-context [worker executor-data tid]
@@ -124,9 +125,11 @@
(throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
(apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
(when (emit-sampler)
+ (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
(stats/emitted-tuple! executor-stats stream)
(if out-task-id
- (stats/transferred-tuples! executor-stats stream 1)))
+ (stats/transferred-tuples! executor-stats stream 1)
+ (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1)))
(if out-task-id [out-task-id])
))
([^String stream ^List values]
@@ -145,7 +148,9 @@
(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(when (emit-sampler)
(stats/emitted-tuple! executor-stats stream)
- (stats/transferred-tuples! executor-stats stream (count out-tasks)))
+ (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
+ (stats/transferred-tuples! executor-stats stream (count out-tasks))
+ (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream (count out-tasks)))
out-tasks)))
))
@@ -155,9 +160,9 @@
:task-id task-id
:system-context (system-topology-context (:worker executor-data) executor-data task-id)
:user-context (user-topology-context (:worker executor-data) executor-data task-id)
+ :builtin-metrics (builtin-metrics/make-data (:type executor-data))
:tasks-fn (mk-tasks-fn <>)
- :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))
- ))
+ :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))
(defn mk-task [executor-data task-id]
@@ -2,23 +2,52 @@
"This namespace is for AOT dependent metrics testing code."
(:gen-class))
+(letfn [(for- [threader arg seq-exprs body]
+ `(reduce #(%2 %1)
+ ~arg
+ (for ~seq-exprs
+ (fn [arg#] (~threader arg# ~@body)))))]
+ (defmacro for->
+ "Apply a thread expression to a sequence.
+ eg.
+ (-> 1
+ (for-> [x [1 2 3]]
+ (+ x)))
+ => 7"
+ {:indent 1}
+ [arg seq-exprs & body]
+ (for- 'clojure.core/-> arg seq-exprs body)))
+
(gen-class
:name clojure.storm.metric.testing.FakeMetricConsumer
:implements [backtype.storm.metric.api.IMetricsConsumer]
- :prefix "impl-"
- :state state
- :init init)
+ :prefix "impl-")
-(defn impl-init [] [[] (atom [])])
+(def buffer (atom nil))
-(defn impl-prepare [this conf {:keys [ns var-name]} ctx error-reporter]
- (reset! (.state this) @(intern ns var-name))
- (reset! @(.state this) []))
+(defn impl-prepare [this conf argument ctx error-reporter]
+ (reset! buffer {}))
(defn impl-cleanup [this]
- (reset! @(.state this) []))
+ (reset! buffer {}))
+
+(defn vec-conj [coll x] (if coll
+ (conj coll x)
+ [x]))
+
+(defn expand-complex-datapoint [dp]
+ (if (or (map? (.value dp))
+ (instance? java.util.AbstractMap (.value dp)))
+ (into [] (for [[k v] (.value dp)]
+ [(str (.name dp) "/" k) v]))
+ [[(.name dp) (.value dp)]]))
-(defn impl-handleDataPoints [this task-info data-points]
- (swap! @(.state this) conj [task-info data-points]))
+(defn impl-handleDataPoints [this task-info data-points]
+ (swap! buffer
+ (fn [old]
+ (-> old
+ (for-> [dp data-points
+ [name val] (expand-complex-datapoint dp)]
+ (update-in [(.srcComponentId task-info) name (.srcTaskId task-info)] vec-conj val))))))
@@ -461,6 +461,11 @@
public static String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
/**
+ * The time period that builtin metrics data in bucketed into.
+ */
+ public static String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
+
+ /**
* Whether or not to use Java serialization in a topology.
*/
public static String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION="topology.fall.back.on.java.serialization";
@@ -653,6 +658,26 @@ public void registerSerialization(Class klass, Class<? extends Serializer> seria
registerSerialization(this, klass, serializerClass);
}
+ public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) {
+ HashMap m = new HashMap();
+ m.put("class", klass.getCanonicalName());
+ m.put("parallelism.hint", parallelismHint);
+ m.put("argument", argument);
+
+ List l = (List)this.get(TOPOLOGY_METRICS_CONSUMER_REGISTER);
+ if(l == null) { l = new ArrayList(); }
+ l.add(m);
+ this.put(TOPOLOGY_METRICS_CONSUMER_REGISTER, l);
+ }
+
+ public void registerMetricsConsumer(Class klass, long parallelismHint) {
+ registerMetricsConsumer(klass, null, parallelismHint);
+ }
+
+ public void registerMetricsConsumer(Class klass) {
+ registerMetricsConsumer(klass, null, 1L);
+ }
+
public static void registerDecorator(Map conf, Class<? extends IKryoDecorator> klass) {
getRegisteredDecorators(conf).add(klass.getName());
}
Oops, something went wrong.

0 comments on commit c177d09

Please sign in to comment.