diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 6da607b79..a8ebbb459 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -87,6 +87,7 @@ topology.max.task.parallelism: null topology.max.spout.pending: null topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 0.05 +topology.builtin.metrics.bucket.size.secs: 60 topology.fall.back.on.java.serialization: true topology.worker.childopts: null topology.executor.receive.buffer.size: 1024 #batched @@ -103,5 +104,4 @@ topology.max.error.report.per.interval: 5 topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" topology.trident.batch.emit.interval.millis: 500 - dev.zookeeper.path: "/tmp/dev-storm-zookeeper" diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example index 6e9cefc89..043f6ff31 100644 --- a/conf/storm.yaml.example +++ b/conf/storm.yaml.example @@ -21,3 +21,10 @@ # drpc.servers: # - "server1" # - "server2" + +## Metrics Consumers +# topology.metrics.consumers.register: +# - class: "org.mycompany.MyMetricsConsumer" +# argument: +# - endpoint: "metrics-collector.mycompany.org" +# parallelism.hint: 1 diff --git a/project.clj b/project.clj index 555930795..1778180bf 100644 --- a/project.clj +++ b/project.clj @@ -3,7 +3,7 @@ (do (println (str "ERROR: requires Leiningen 1.x but you are using " lein-version)) (System/exit 1))) -(defproject storm "0.8.2-wip15" +(defproject storm "0.8.2-wip16" :source-path "src/clj" :test-path "test/clj" :java-source-path "src/jvm" diff --git a/src/clj/backtype/storm/daemon/builtin_metrics.clj b/src/clj/backtype/storm/daemon/builtin_metrics.clj new file mode 100644 index 000000000..057fd513f --- /dev/null +++ b/src/clj/backtype/storm/daemon/builtin_metrics.clj @@ -0,0 +1,64 @@ +(ns backtype.storm.daemon.builtin-metrics + (:import [backtype.storm.metric.api MultiCountMetric MultiReducedMetric MeanReducer]) + (:import [backtype.storm Config]) + (:use [backtype.storm.stats :only [stats-rate]])) + +(defrecord BuiltinSpoutMetrics [^MultiCountMetric ack-count + ^MultiReducedMetric complete-latency + ^MultiCountMetric fail-count + ^MultiCountMetric emit-count + ^MultiCountMetric transfer-count]) +(defrecord BuiltinBoltMetrics [^MultiCountMetric ack-count + ^MultiReducedMetric process-latency + ^MultiCountMetric fail-count + ^MultiCountMetric execute-count + ^MultiReducedMetric execute-latency + ^MultiCountMetric emit-count + ^MultiCountMetric transfer-count]) + +(defn make-data [executor-type] + (condp = executor-type + :spout (BuiltinSpoutMetrics. (MultiCountMetric.) + (MultiReducedMetric. (MeanReducer.)) + (MultiCountMetric.) + (MultiCountMetric.) + (MultiCountMetric.)) + :bolt (BuiltinBoltMetrics. (MultiCountMetric.) + (MultiReducedMetric. (MeanReducer.)) + (MultiCountMetric.) + (MultiCountMetric.) + (MultiReducedMetric. (MeanReducer.)) + (MultiCountMetric.) + (MultiCountMetric.)))) + +(defn register-all [builtin-metrics storm-conf topology-context] + (doseq [[kw imetric] builtin-metrics] + (.registerMetric topology-context (str "__" (name kw)) imetric + (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))) + +(defn spout-acked-tuple! [^BuiltinSpoutMetrics m stats stream latency-ms] + (-> m .ack-count (.scope stream) (.incrBy (stats-rate stats))) + (-> m .complete-latency (.scope stream) (.update latency-ms))) + +(defn spout-failed-tuple! [^BuiltinSpoutMetrics m stats stream] + (-> m .fail-count (.scope stream) (.incrBy (stats-rate stats)))) + +(defn bolt-execute-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms] + (let [scope (str comp-id ":" stream)] + (-> m .execute-count (.scope scope) (.incrBy (stats-rate stats))) + (-> m .execute-latency (.scope scope) (.update latency-ms)))) + +(defn bolt-acked-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms] + (let [scope (str comp-id ":" stream)] + (-> m .ack-count (.scope scope) (.incrBy (stats-rate stats))) + (-> m .process-latency (.scope scope) (.update latency-ms)))) + +(defn bolt-failed-tuple! [^BuiltinBoltMetrics m stats comp-id stream] + (let [scope (str comp-id ":" stream)] + (-> m .fail-count (.scope scope) (.incrBy (stats-rate stats))))) + +(defn emitted-tuple! [m stats stream] + (-> m :emit-count (.scope stream) (.incrBy (stats-rate stats)))) + +(defn transferred-tuple! [m stats stream num-out-tasks] + (-> m :transfer-count (.scope stream) (.incrBy (* num-out-tasks (stats-rate stats))))) diff --git a/src/clj/backtype/storm/daemon/common.clj b/src/clj/backtype/storm/daemon/common.clj index b4f271d0e..da93fdd4c 100644 --- a/src/clj/backtype/storm/daemon/common.clj +++ b/src/clj/backtype/storm/daemon/common.clj @@ -23,6 +23,8 @@ (def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID) (def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID) +(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID) +(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID) ;; the task id is the virtual port ;; node->host is here so that tasks know who to talk to just from assignment @@ -206,27 +208,78 @@ (.put_to_bolts ret "__acker" acker-bolt) )) +(defn add-metric-streams! [^StormTopology topology] + (doseq [[_ component] (all-components topology) + :let [common (.get_common component)]] + (.put_to_streams common METRICS-STREAM-ID + (thrift/output-fields ["task-info" "data-points"])))) + (defn add-system-streams! [^StormTopology topology] (doseq [[_ component] (all-components topology) :let [common (.get_common component)]] - (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"])) - ;; TODO: consider adding a stats stream for stats aggregation - )) + (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"])))) + + +(defn map-occurrences [afn coll] + (->> coll + (reduce (fn [[counts new-coll] x] + (let [occurs (inc (get counts x 0))] + [(assoc counts x occurs) (cons (afn x occurs) new-coll)])) + [{} []]) + (second) + (reverse))) + +(defn number-duplicates [coll] + "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]" + (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll)) + +(defn metrics-consumer-register-ids [storm-conf] + "Generates a list of component ids for each metrics consumer + e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] " + (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER) + (map #(get % "class")) + (number-duplicates) + (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %)))) + +(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf] + (let [inputs (->> (for [comp-id components-ids-that-emit-metrics] + {[comp-id METRICS-STREAM-ID] :shuffle}) + (into {})) + + mk-bolt-spec (fn [class arg p] + (thrift/mk-bolt-spec* + inputs + (backtype.storm.metric.MetricsConsumerBolt. class arg) + {} :p p :conf {TOPOLOGY-TASKS p}))] + + (map + (fn [component-id register] + [component-id (mk-bolt-spec (get register "class") + (get register "argument") + (or (get register "parallelism.hint") 1))]) + + (metrics-consumer-register-ids storm-conf) + (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)))) + +(defn add-metric-components! [storm-conf ^StormTopology topology] + (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)] + (.put_to_bolts topology comp-id bolt-spec))) (defn add-system-components! [^StormTopology topology] (let [system-spout (thrift/mk-spout-spec* - (NoOpSpout.) - {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) - } - :p 0 - :conf {TOPOLOGY-TASKS 0})] - (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout) - )) + (NoOpSpout.) + {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) + METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])} + :p 0 + :conf {TOPOLOGY-TASKS 0})] + (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout))) (defn system-topology! [storm-conf ^StormTopology topology] (validate-basic! topology) (let [ret (.deepCopy topology)] (add-acker! storm-conf ret) + (add-metric-components! storm-conf ret) + (add-metric-streams! ret) (add-system-streams! ret) (add-system-components! ret) (validate-structure! ret) diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj index 5b972c270..2e176bcd2 100644 --- a/src/clj/backtype/storm/daemon/executor.clj +++ b/src/clj/backtype/storm/daemon/executor.clj @@ -5,10 +5,11 @@ (:import [backtype.storm.tuple Tuple]) (:import [backtype.storm.spout ISpoutWaitStrategy]) (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo - EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo]) + EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo]) + (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint]) (:require [backtype.storm [tuple :as tuple]]) (:require [backtype.storm.daemon [task :as task]]) - ) + (:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics])) (bootstrap) @@ -212,6 +213,7 @@ :type executor-type ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field) :stats (mk-executor-stats <> (sampling-rate storm-conf)) + :interval->task->metric-registry (HashMap.) :task->component (:task->component worker) :stream->component->grouper (outbound-components worker-context component-id) :report-error (throttled-report-error-fn <>) @@ -238,7 +240,42 @@ (worker-transfer-fn serializer alist) (.setObject cached-emit (ArrayList.)) ))) - :kill-fn (:report-error-and-die executor-data)))) + :kill-fn (:report-error-and-die executor-data)))) + +(defn setup-metrics! [executor-data] + (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data + distinct-time-bucket-intervals (keys interval->task->metric-registry)] + (doseq [interval distinct-time-bucket-intervals] + (schedule-recurring + (:user-timer (:worker executor-data)) + interval + interval + (fn [] + (disruptor/publish + receive-queue + [[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]])))))) + +(defn metrics-tick [executor-data task-datas ^TupleImpl tuple] + (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data + interval (.getInteger tuple 0)] + (doseq [[task-id task-data] task-datas + :let [name->imetric (-> interval->task->metric-registry (get interval) (get task-id)) + task-info (IMetricsConsumer$TaskInfo. + (. (java.net.InetAddress/getLocalHost) getCanonicalHostName) + (.getThisWorkerPort worker-context) + (:component-id executor-data) + task-id + (long (/ (System/currentTimeMillis) 1000)) + interval) + data-points (->> name->imetric + (map (fn [[name imetric]] + (let [value (.getValueAndReset ^IMetric imetric)] + (if value + (IMetricsConsumer$DataPoint. name value))))) + (filter identity) + (into []))]] + (if (seq data-points) + (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))) (defn setup-ticks! [worker executor-data] (let [storm-conf (:storm-conf executor-data) @@ -279,7 +316,7 @@ (mk-threads executor-data task-datas)) threads (concat handlers system-threads)] (setup-ticks! worker executor-data) - + (log-message "Finished loading executor " component-id ":" (pr-str executor-id)) ;; TODO: add method here to get rendered stats... have worker call that when heartbeating (reify @@ -316,8 +353,8 @@ (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta) - ))) + (builtin-metrics/spout-failed-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info)) + (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)))) (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta] (let [storm-conf (:storm-conf executor-data) @@ -328,8 +365,8 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta) - ))) + (builtin-metrics/spout-acked-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info) time-delta) + (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)))) (defn mk-task-receiver [executor-data tuple-action-fn] (let [^KryoTupleDeserializer deserializer (:deserializer executor-data) @@ -377,8 +414,9 @@ )))) tuple-action-fn (fn [task-id ^TupleImpl tuple] (let [stream-id (.getSourceStreamId tuple)] - (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID) - (.rotate pending) + (condp = stream-id + Constants/SYSTEM_TICK_STREAM_ID (.rotate pending) + Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple) (let [id (.getValue tuple 0) [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)] (when spout-id @@ -389,7 +427,7 @@ ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id) spout-id tuple-finished-info time-delta) ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id) - spout-id tuple-finished-info time-delta) + spout-id tuple-finished-info time-delta) ))) ;; TODO: on failure, emit tuple to failure stream )))) @@ -455,6 +493,7 @@ (if (sampler) 0)))) (or out-tasks []) ))]] + (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data)) (.open spout-obj storm-conf (:user-context task-data) @@ -472,6 +511,7 @@ ))))) (reset! open-or-prepare-was-called? true) (log-message "Opened spout " component-id ":" (keys task-datas)) + (setup-metrics! executor-data) (disruptor/consumer-started! (:receive-queue executor-data)) (fn [] @@ -550,28 +590,36 @@ ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state ;; TODO: how to handle incremental updates as well as synchronizations at same time ;; TODO: need to version tuples somehow + ;;(log-debug "Received tuple " tuple " at task " task-id) ;; need to do it this way to avoid reflection - (let [task-data (get task-datas task-id) - ^IBolt bolt-obj (:object task-data) - user-context (:user-context task-data) - sampler? (sampler) - execute-sampler? (execute-sampler) - now (if (or sampler? execute-sampler?) (System/currentTimeMillis))] - (when sampler? - (.setProcessSampleStartTime tuple now)) - (when execute-sampler? - (.setExecuteSampleStartTime tuple now)) - (.execute bolt-obj tuple) - (let [delta (tuple-execute-time-delta! tuple)] - (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta)) - (when delta - (stats/bolt-execute-tuple! executor-stats - (.getSourceComponent tuple) - (.getSourceStreamId tuple) - delta) - ))))] + (let [stream-id (.getSourceStreamId tuple)] + (condp = stream-id + Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple) + (let [task-data (get task-datas task-id) + ^IBolt bolt-obj (:object task-data) + user-context (:user-context task-data) + sampler? (sampler) + execute-sampler? (execute-sampler) + now (if (or sampler? execute-sampler?) (System/currentTimeMillis))] + (when sampler? + (.setProcessSampleStartTime tuple now)) + (when execute-sampler? + (.setExecuteSampleStartTime tuple now)) + (.execute bolt-obj tuple) + (let [delta (tuple-execute-time-delta! tuple)] + (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta)) + (when delta + (builtin-metrics/bolt-execute-tuple! (:builtin-metrics task-data) + executor-stats + (.getSourceComponent tuple) + (.getSourceStreamId tuple) + delta) + (stats/bolt-execute-tuple! executor-stats + (.getSourceComponent tuple) + (.getSourceStreamId tuple) + delta)))))))] ;; TODO: can get any SubscribedState objects out of the context now @@ -607,6 +655,7 @@ stream (MessageId/makeId anchors-to-ids))))) (or out-tasks [])))]] + (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context) (.prepare bolt-obj storm-conf user-context @@ -627,11 +676,15 @@ (let [delta (tuple-time-delta! tuple)] (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) (when delta + (builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data) + executor-stats + (.getSourceComponent tuple) + (.getSourceStreamId tuple) + delta) (stats/bolt-acked-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) - delta) - ))) + delta)))) (^void fail [this ^Tuple tuple] (fast-list-iter [root (.. tuple getMessageId getAnchors)] (task/send-unanchored task-data @@ -640,16 +693,20 @@ (let [delta (tuple-time-delta! tuple)] (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) (when delta + (builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data) + executor-stats + (.getSourceComponent tuple) + (.getSourceStreamId tuple)) (stats/bolt-failed-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) - delta) - ))) + delta)))) (reportError [this error] (report-error error) ))))) (reset! open-or-prepare-was-called? true) - (log-message "Prepared bolt " component-id ":" (keys task-datas)) + (log-message "Prepared bolt " component-id ":" (keys task-datas)) + (setup-metrics! executor-data) (let [receive-queue (:receive-queue executor-data) event-handler (mk-task-receiver executor-data tuple-action-fn)] diff --git a/src/clj/backtype/storm/daemon/task.clj b/src/clj/backtype/storm/daemon/task.clj index 7a314ef69..d39d2c81d 100644 --- a/src/clj/backtype/storm/daemon/task.clj +++ b/src/clj/backtype/storm/daemon/task.clj @@ -5,8 +5,9 @@ (:import [backtype.storm.tuple Tuple]) (:import [backtype.storm.generated SpoutSpec Bolt StateSpoutSpec]) (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo - EmitInfo BoltFailInfo BoltAckInfo]) - (:require [backtype.storm [tuple :as tuple]])) + EmitInfo BoltFailInfo BoltAckInfo]) + (:require [backtype.storm [tuple :as tuple]]) + (:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics])) (bootstrap) @@ -28,7 +29,8 @@ (:default-shared-resources worker) (:user-shared-resources worker) (:shared-executor-data executor-data) - ))) + (:interval->task->metric-registry executor-data) + (:open-or-prepare-was-called? executor-data)))) (defn system-topology-context [worker executor-data tid] ((mk-topology-context-builder @@ -123,9 +125,11 @@ (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) (when (emit-sampler) + (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream) (stats/emitted-tuple! executor-stats stream) (if out-task-id - (stats/transferred-tuples! executor-stats stream 1))) + (stats/transferred-tuples! executor-stats stream 1) + (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1))) (if out-task-id [out-task-id]) )) ([^String stream ^List values] @@ -144,7 +148,9 @@ (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) (when (emit-sampler) (stats/emitted-tuple! executor-stats stream) - (stats/transferred-tuples! executor-stats stream (count out-tasks))) + (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream) + (stats/transferred-tuples! executor-stats stream (count out-tasks)) + (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream (count out-tasks))) out-tasks))) )) @@ -154,9 +160,9 @@ :task-id task-id :system-context (system-topology-context (:worker executor-data) executor-data task-id) :user-context (user-topology-context (:worker executor-data) executor-data task-id) + :builtin-metrics (builtin-metrics/make-data (:type executor-data)) :tasks-fn (mk-tasks-fn <>) - :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data)) - )) + :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data)))) (defn mk-task [executor-data task-id] diff --git a/src/clj/backtype/storm/metric/testing.clj b/src/clj/backtype/storm/metric/testing.clj new file mode 100644 index 000000000..36aa954e5 --- /dev/null +++ b/src/clj/backtype/storm/metric/testing.clj @@ -0,0 +1,53 @@ +(ns backtype.storm.metric.testing + "This namespace is for AOT dependent metrics testing code." + (:gen-class)) + +(letfn [(for- [threader arg seq-exprs body] + `(reduce #(%2 %1) + ~arg + (for ~seq-exprs + (fn [arg#] (~threader arg# ~@body)))))] + (defmacro for-> + "Apply a thread expression to a sequence. + eg. + (-> 1 + (for-> [x [1 2 3]] + (+ x))) + => 7" + {:indent 1} + [arg seq-exprs & body] + (for- 'clojure.core/-> arg seq-exprs body))) + +(gen-class + :name clojure.storm.metric.testing.FakeMetricConsumer + :implements [backtype.storm.metric.api.IMetricsConsumer] + :prefix "impl-") + +(def buffer (atom nil)) + +(defn impl-prepare [this conf argument ctx error-reporter] + (reset! buffer {})) + +(defn impl-cleanup [this] + (reset! buffer {})) + +(defn vec-conj [coll x] (if coll + (conj coll x) + [x])) + +(defn expand-complex-datapoint [dp] + (if (or (map? (.value dp)) + (instance? java.util.AbstractMap (.value dp))) + (into [] (for [[k v] (.value dp)] + [(str (.name dp) "/" k) v])) + [[(.name dp) (.value dp)]])) + +(defn impl-handleDataPoints [this task-info data-points] + (swap! buffer + (fn [old] + (-> old + (for-> [dp data-points + [name val] (expand-complex-datapoint dp)] + (update-in [(.srcComponentId task-info) name (.srcTaskId task-info)] vec-conj val)))))) + + diff --git a/src/clj/backtype/storm/testing.clj b/src/clj/backtype/storm/testing.clj index cfa296511..fe264d08c 100644 --- a/src/clj/backtype/storm/testing.clj +++ b/src/clj/backtype/storm/testing.clj @@ -8,7 +8,7 @@ (:require [backtype.storm [process-simulator :as psim]]) (:import [org.apache.commons.io FileUtils]) (:import [java.io File]) - (:import [java.util HashMap]) + (:import [java.util HashMap ArrayList]) (:import [java.util.concurrent.atomic AtomicInteger]) (:import [java.util.concurrent ConcurrentHashMap]) (:import [backtype.storm.utils Time Utils RegisteredGlobalState]) @@ -587,6 +587,8 @@ [(int 1)] {} {} - (HashMap.))] + (HashMap.) + (HashMap.) + (atom false))] (TupleImpl. context values 1 stream) )) diff --git a/src/jvm/backtype/storm/Config.java b/src/jvm/backtype/storm/Config.java index e8393c255..77b49b420 100644 --- a/src/jvm/backtype/storm/Config.java +++ b/src/jvm/backtype/storm/Config.java @@ -410,6 +410,13 @@ public class Config extends HashMap { */ public static String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations"; + /* + * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format). + * Each listed class will be routed all the metrics data generated by the storm metrics API. + * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable. + */ + public static String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register"; + /** * The maximum parallelism allowed for a component in this topology. This configuration is @@ -453,6 +460,11 @@ public class Config extends HashMap { */ public static String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate"; + /** + * The time period that builtin metrics data in bucketed into. + */ + public static String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs"; + /** * Whether or not to use Java serialization in a topology. */ @@ -646,6 +658,26 @@ public void registerSerialization(Class klass, Class seria registerSerialization(this, klass, serializerClass); } + public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) { + HashMap m = new HashMap(); + m.put("class", klass.getCanonicalName()); + m.put("parallelism.hint", parallelismHint); + m.put("argument", argument); + + List l = (List)this.get(TOPOLOGY_METRICS_CONSUMER_REGISTER); + if(l == null) { l = new ArrayList(); } + l.add(m); + this.put(TOPOLOGY_METRICS_CONSUMER_REGISTER, l); + } + + public void registerMetricsConsumer(Class klass, long parallelismHint) { + registerMetricsConsumer(klass, null, parallelismHint); + } + + public void registerMetricsConsumer(Class klass) { + registerMetricsConsumer(klass, null, 1L); + } + public static void registerDecorator(Map conf, Class klass) { getRegisteredDecorators(conf).add(klass.getName()); } diff --git a/src/jvm/backtype/storm/Constants.java b/src/jvm/backtype/storm/Constants.java index 950c533ad..705278986 100644 --- a/src/jvm/backtype/storm/Constants.java +++ b/src/jvm/backtype/storm/Constants.java @@ -8,4 +8,8 @@ public class Constants { public static final String SYSTEM_COMPONENT_ID = "__system"; public static final String SYSTEM_TICK_STREAM_ID = "__tick"; + public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics"; + public static final String METRICS_STREAM_ID = "__metrics"; + public static final String METRICS_TICK_STREAM_ID = "__metrics_tick"; } + \ No newline at end of file diff --git a/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java new file mode 100644 index 000000000..a8a697506 --- /dev/null +++ b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java @@ -0,0 +1,47 @@ +package backtype.storm.metric; + +import backtype.storm.Config; +import backtype.storm.metric.api.IMetricsConsumer; +import backtype.storm.task.IBolt; +import backtype.storm.task.IErrorReporter; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import java.util.Collection; +import java.util.Map; + +public class MetricsConsumerBolt implements IBolt { + IMetricsConsumer _metricsConsumer; + String _consumerClassName; + OutputCollector _collector; + Object _registrationArgument; + + public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) { + _consumerClassName = consumerClassName; + _registrationArgument = registrationArgument; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + try { + _metricsConsumer = (IMetricsConsumer)Class.forName(_consumerClassName).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate a class listed in config under section " + + Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e); + } + _metricsConsumer.prepare(stormConf, _registrationArgument, context, (IErrorReporter)collector); + _collector = collector; + } + + @Override + public void execute(Tuple input) { + _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1)); + _collector.ack(input); + } + + @Override + public void cleanup() { + _metricsConsumer.cleanup(); + } + +} diff --git a/src/jvm/backtype/storm/metric/api/AssignableMetric.java b/src/jvm/backtype/storm/metric/api/AssignableMetric.java new file mode 100644 index 000000000..b38a57e91 --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/AssignableMetric.java @@ -0,0 +1,17 @@ +package backtype.storm.metric.api; + +public class AssignableMetric implements IMetric { + Object _value; + + public AssignableMetric(Object value) { + _value = value; + } + + public void setValue(Object value) { + _value = value; + } + + public Object getValueAndReset() { + return _value; + } +} diff --git a/src/jvm/backtype/storm/metric/api/CombinedMetric.java b/src/jvm/backtype/storm/metric/api/CombinedMetric.java new file mode 100644 index 000000000..cd7b08bc6 --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/CombinedMetric.java @@ -0,0 +1,21 @@ +package backtype.storm.metric.api; + +public class CombinedMetric implements IMetric { + private final ICombiner _combiner; + private Object _value; + + public CombinedMetric(ICombiner combiner) { + _combiner = combiner; + _value = _combiner.identity(); + } + + public void update(Object value) { + _value = _combiner.combine(_value, value); + } + + public Object getValueAndReset() { + Object ret = _value; + _value = _combiner.identity(); + return ret; + } +} diff --git a/src/jvm/backtype/storm/metric/api/CountMetric.java b/src/jvm/backtype/storm/metric/api/CountMetric.java new file mode 100644 index 000000000..7a8f829cc --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/CountMetric.java @@ -0,0 +1,24 @@ +package backtype.storm.metric.api; + +import backtype.storm.metric.api.IMetric; + +public class CountMetric implements IMetric { + long _value = 0; + + public CountMetric() { + } + + public void incr() { + _value++; + } + + public void incrBy(long incrementBy) { + _value += incrementBy; + } + + public Object getValueAndReset() { + long ret = _value; + _value = 0; + return ret; + } +} diff --git a/src/jvm/backtype/storm/metric/api/ICombiner.java b/src/jvm/backtype/storm/metric/api/ICombiner.java new file mode 100644 index 000000000..7eb468e4e --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/ICombiner.java @@ -0,0 +1,6 @@ +package backtype.storm.metric.api; + +public interface ICombiner { + public T identity(); + public T combine(T a, T b); +} diff --git a/src/jvm/backtype/storm/metric/api/IMetric.java b/src/jvm/backtype/storm/metric/api/IMetric.java new file mode 100644 index 000000000..400994d4c --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/IMetric.java @@ -0,0 +1,5 @@ +package backtype.storm.metric.api; + +public interface IMetric { + public Object getValueAndReset(); +} diff --git a/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java new file mode 100644 index 000000000..5bfece354 --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java @@ -0,0 +1,43 @@ +package backtype.storm.metric.api; + +import backtype.storm.task.IErrorReporter; +import backtype.storm.task.TopologyContext; +import java.util.Collection; +import java.util.Map; + +public interface IMetricsConsumer { + public static class TaskInfo { + public TaskInfo() {} + public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId, int srcTaskId, long timestamp, int updateIntervalSecs) { + this.srcWorkerHost = srcWorkerHost; + this.srcWorkerPort = srcWorkerPort; + this.srcComponentId = srcComponentId; + this.srcTaskId = srcTaskId; + this.timestamp = timestamp; + this.updateIntervalSecs = updateIntervalSecs; + } + public String srcWorkerHost; + public int srcWorkerPort; + public String srcComponentId; + public int srcTaskId; + public long timestamp; + public int updateIntervalSecs; + } + public static class DataPoint { + public DataPoint() {} + public DataPoint(String name, Object value) { + this.name = name; + this.value = value; + } + @Override + public String toString() { + return "[" + name + " = " + value + "]"; + } + public String name; + public Object value; + } + + void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter); + void handleDataPoints(TaskInfo taskInfo, Collection dataPoints); + void cleanup(); +} \ No newline at end of file diff --git a/src/jvm/backtype/storm/metric/api/IReducer.java b/src/jvm/backtype/storm/metric/api/IReducer.java new file mode 100644 index 000000000..929c31781 --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/IReducer.java @@ -0,0 +1,7 @@ +package backtype.storm.metric.api; + +public interface IReducer { + T init(); + T reduce(T accumulator, Object input); + Object extractResult(T accumulator); +} diff --git a/src/jvm/backtype/storm/metric/api/MeanReducer.java b/src/jvm/backtype/storm/metric/api/MeanReducer.java new file mode 100644 index 000000000..38f627507 --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/MeanReducer.java @@ -0,0 +1,38 @@ +package backtype.storm.metric.api; + +import backtype.storm.metric.api.IReducer; + +class MeanReducerState { + public int count = 0; + public double sum = 0.0; +} + +public class MeanReducer implements IReducer { + public MeanReducerState init() { + return new MeanReducerState(); + } + + public MeanReducerState reduce(MeanReducerState acc, Object input) { + acc.count++; + if(input instanceof Double) { + acc.sum += (Double)input; + } else if(input instanceof Long) { + acc.sum += ((Long)input).doubleValue(); + } else if(input instanceof Integer) { + acc.sum += ((Integer)input).doubleValue(); + } else { + throw new RuntimeException( + "MeanReducer::reduce called with unsupported input type `" + input.getClass() + + "`. Supported types are Double, Long, Integer."); + } + return acc; + } + + public Object extractResult(MeanReducerState acc) { + if(acc.count > 0) { + return new Double(acc.sum / (double)acc.count); + } else { + return null; + } + } +} diff --git a/src/jvm/backtype/storm/metric/api/MultiCountMetric.java b/src/jvm/backtype/storm/metric/api/MultiCountMetric.java new file mode 100644 index 000000000..02473ca6a --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/MultiCountMetric.java @@ -0,0 +1,28 @@ +package backtype.storm.metric.api; + +import backtype.storm.metric.api.IMetric; +import java.util.HashMap; +import java.util.Map; + +public class MultiCountMetric implements IMetric { + Map _value = new HashMap(); + + public MultiCountMetric() { + } + + public CountMetric scope(String key) { + CountMetric val = _value.get(key); + if(val == null) { + _value.put(key, val = new CountMetric()); + } + return val; + } + + public Object getValueAndReset() { + Map ret = new HashMap(); + for(Map.Entry e : _value.entrySet()) { + ret.put(e.getKey(), e.getValue().getValueAndReset()); + } + return ret; + } +} diff --git a/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java b/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java new file mode 100644 index 000000000..cfa39ec8b --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java @@ -0,0 +1,30 @@ +package backtype.storm.metric.api; + +import backtype.storm.metric.api.IMetric; +import java.util.HashMap; +import java.util.Map; + +public class MultiReducedMetric implements IMetric { + Map _value = new HashMap(); + IReducer _reducer; + + public MultiReducedMetric(IReducer reducer) { + _reducer = reducer; + } + + public ReducedMetric scope(String key) { + ReducedMetric val = _value.get(key); + if(val == null) { + _value.put(key, val = new ReducedMetric(_reducer)); + } + return val; + } + + public Object getValueAndReset() { + Map ret = new HashMap(); + for(Map.Entry e : _value.entrySet()) { + ret.put(e.getKey(), e.getValue().getValueAndReset()); + } + return ret; + } +} diff --git a/src/jvm/backtype/storm/metric/api/ReducedMetric.java b/src/jvm/backtype/storm/metric/api/ReducedMetric.java new file mode 100644 index 000000000..cfeef3b70 --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/ReducedMetric.java @@ -0,0 +1,21 @@ +package backtype.storm.metric.api; + +public class ReducedMetric implements IMetric { + private final IReducer _reducer; + private Object _accumulator; + + public ReducedMetric(IReducer reducer) { + _reducer = reducer; + _accumulator = _reducer.init(); + } + + public void update(Object value) { + _accumulator = _reducer.reduce(_accumulator, value); + } + + public Object getValueAndReset() { + Object ret = _reducer.extractResult(_accumulator); + _accumulator = _reducer.init(); + return ret; + } +} diff --git a/src/jvm/backtype/storm/serialization/SerializationFactory.java b/src/jvm/backtype/storm/serialization/SerializationFactory.java index 43aebacc0..d9bd89206 100644 --- a/src/jvm/backtype/storm/serialization/SerializationFactory.java +++ b/src/jvm/backtype/storm/serialization/SerializationFactory.java @@ -38,6 +38,8 @@ public static Kryo getKryo(Map conf) { k.register(BigInteger.class, new BigIntegerSerializer()); k.register(TransactionAttempt.class); k.register(Values.class); + k.register(backtype.storm.metric.api.IMetricsConsumer.DataPoint.class); + k.register(backtype.storm.metric.api.IMetricsConsumer.TaskInfo.class); try { JavaBridge.registerPrimitives(k); JavaBridge.registerCollections(k); diff --git a/src/jvm/backtype/storm/task/IErrorReporter.java b/src/jvm/backtype/storm/task/IErrorReporter.java new file mode 100644 index 000000000..d2e7e5d33 --- /dev/null +++ b/src/jvm/backtype/storm/task/IErrorReporter.java @@ -0,0 +1,5 @@ +package backtype.storm.task; + +public interface IErrorReporter { + void reportError(Throwable error); +} diff --git a/src/jvm/backtype/storm/task/IOutputCollector.java b/src/jvm/backtype/storm/task/IOutputCollector.java index 8381895e6..8e56bfa60 100644 --- a/src/jvm/backtype/storm/task/IOutputCollector.java +++ b/src/jvm/backtype/storm/task/IOutputCollector.java @@ -4,7 +4,7 @@ import java.util.Collection; import java.util.List; -public interface IOutputCollector { +public interface IOutputCollector extends IErrorReporter { /** * Returns the task ids that received the tuples. */ @@ -12,5 +12,4 @@ public interface IOutputCollector { void emitDirect(int taskId, String streamId, Collection anchors, List tuple); void ack(Tuple input); void fail(Tuple input); - void reportError(Throwable error); } diff --git a/src/jvm/backtype/storm/task/TopologyContext.java b/src/jvm/backtype/storm/task/TopologyContext.java index e9d499a00..872f8a95d 100644 --- a/src/jvm/backtype/storm/task/TopologyContext.java +++ b/src/jvm/backtype/storm/task/TopologyContext.java @@ -4,6 +4,11 @@ import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; import backtype.storm.hooks.ITaskHook; +import backtype.storm.metric.api.IMetric; +import backtype.storm.metric.api.IReducer; +import backtype.storm.metric.api.ICombiner; +import backtype.storm.metric.api.ReducedMetric; +import backtype.storm.metric.api.CombinedMetric; import backtype.storm.state.ISubscribedState; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; @@ -29,6 +34,8 @@ public class TopologyContext extends WorkerTopologyContext { private Map _taskData = new HashMap(); private List _hooks = new ArrayList(); private Map _executorData; + private Map>> _registeredMetrics; + private clojure.lang.Atom _openOrPrepareWasCalled; public TopologyContext(StormTopology topology, Map stormConf, @@ -36,12 +43,15 @@ public TopologyContext(StormTopology topology, Map stormConf, Map> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List workerTasks, Map defaultResources, - Map userResources, Map executorData) { + Map userResources, Map executorData, Map registeredMetrics, + clojure.lang.Atom openOrPrepareWasCalled) { super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, workerPort, workerTasks, defaultResources, userResources); _taskId = taskId; _executorData = executorData; + _registeredMetrics = registeredMetrics; + _openOrPrepareWasCalled = openOrPrepareWasCalled; } /** @@ -190,4 +200,50 @@ public void addTaskHook(ITaskHook hook) { public Collection getHooks() { return _hooks; } + + /* + * Register a IMetric instance. + * Storm will then call getValueAndReset on the metric every timeBucketSizeInSecs + * and the returned value is sent to all metrics consumers. + * You must call this during IBolt::prepare or ISpout::open. + * @return The IMetric argument unchanged. + */ + public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) { + if((Boolean)_openOrPrepareWasCalled.deref() == true) { + throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " + + "IBolt::prepare() or ISpout::open() method."); + } + + Map m1 = _registeredMetrics; + if(!m1.containsKey(timeBucketSizeInSecs)) { + m1.put(timeBucketSizeInSecs, new HashMap()); + } + + Map m2 = (Map)m1.get(timeBucketSizeInSecs); + if(!m2.containsKey(_taskId)) { + m2.put(_taskId, new HashMap()); + } + + Map m3 = (Map)m2.get(_taskId); + if(m3.containsKey(name)) { + throw new RuntimeException("The same metric name `" + name + "` was registered twice." ); + } else { + m3.put(name, metric); + } + + return metric; + } + + /* + * Convinience method for registering ReducedMetric. + */ + public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) { + return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs); + } + /* + * Convinience method for registering CombinedMetric. + */ + public IMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { + return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); + } } \ No newline at end of file diff --git a/src/jvm/storm/trident/operation/TridentOperationContext.java b/src/jvm/storm/trident/operation/TridentOperationContext.java index 0aad4c652..75251a501 100644 --- a/src/jvm/storm/trident/operation/TridentOperationContext.java +++ b/src/jvm/storm/trident/operation/TridentOperationContext.java @@ -1,5 +1,6 @@ package storm.trident.operation; +import backtype.storm.metric.api.*; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import storm.trident.tuple.TridentTuple; @@ -29,4 +30,14 @@ public int numPartitions() { public int getPartitionIndex() { return _topoContext.getThisTaskIndex(); } + + public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) { + return _topoContext.registerMetric(name, metric, timeBucketSizeInSecs); + } + public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) { + return _topoContext.registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs); + } + public IMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { + return _topoContext.registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); + } } diff --git a/test/clj/backtype/storm/metrics_test.clj b/test/clj/backtype/storm/metrics_test.clj new file mode 100644 index 000000000..bd3f6c9b1 --- /dev/null +++ b/test/clj/backtype/storm/metrics_test.clj @@ -0,0 +1,189 @@ +(ns backtype.storm.metrics-test + (:use [clojure test]) + (:import [backtype.storm.topology TopologyBuilder]) + (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus]) + (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount + TestAggregatesCounter TestConfBolt AckFailMapTracker]) + (:import [backtype.storm.metric.api CountMetric IMetricsConsumer$DataPoint IMetricsConsumer$TaskInfo]) + + (:use [backtype.storm bootstrap testing]) + (:use [backtype.storm.daemon common]) + (:use [backtype.storm.metric testing])) + + +(bootstrap) + +(defbolt acking-bolt {} {:prepare true} + [conf context collector] + (bolt + (execute [tuple] + (ack! collector tuple)))) + +(defbolt ack-every-other {} {:prepare true} + [conf context collector] + (let [state (atom -1)] + (bolt + (execute [tuple] + (let [val (swap! state -)] + (when (pos? val) + (ack! collector tuple) + )))))) + +(defn assert-loop [afn ids] + (while (not (every? afn ids)) + (Thread/sleep 1))) + +(defn assert-acked [tracker & ids] + (assert-loop #(.isAcked tracker %) ids)) + +(defn assert-failed [tracker & ids] + (assert-loop #(.isFailed tracker %) ids)) + +(defbolt count-acks {} {:prepare true} + [conf context collector] + + (let [mycustommetric (CountMetric.)] + (.registerMetric context "my-custom-metric" mycustommetric 5) + (bolt + (execute [tuple] + (.incr mycustommetric) + (ack! collector tuple))))) + +(def metrics-data backtype.storm.metric.testing/buffer) + +(defn wait-for-atleast-N-buckets! [N comp-id metric-name] + (while + (let [taskid->buckets (-> @metrics-data (get comp-id) (get metric-name))] + (or + (and (not= N 0) (nil? taskid->buckets)) + (not-every? #(<= N %) (map (comp count second) taskid->buckets)))) + (println "Waiting for at least" N "timebuckets to appear in FakeMetricsConsumer for component id" comp-id + "and metric name" metric-name) + (Thread/sleep 10))) + +(defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name] + (-> @metrics-data + (get comp-id) + (get metric-name) + (first) ;; pick first task in the list, ignore other tasks' metric data. + (second) + (or []))) + +(defmacro assert-buckets! [comp-id metric-name expected] + `(do + (let [N# (count ~expected)] + (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name) + (is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#)))))) + +(deftest test-custom-metric + (with-simulated-time-local-cluster + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}] + (let [feeder (feeder-spout ["field1"]) + tracker (AckFailMapTracker.) + _ (.setAckFailDelegate feeder tracker) + topology (thrift/mk-topology + {"1" (thrift/mk-spout-spec feeder)} + {"2" (thrift/mk-bolt-spec {"1" :global} count-acks)})] + (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology) + + (.feed feeder ["a"] 1) + (advance-cluster-time cluster 6) + (assert-buckets! "2" "my-custom-metric" [1]) + + (advance-cluster-time cluster 5) + (assert-buckets! "2" "my-custom-metric" [1 0]) + + (advance-cluster-time cluster 20) + (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0]) + + (.feed feeder ["b"] 2) + (.feed feeder ["c"] 3) + (advance-cluster-time cluster 5) + (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2])))) + + +(deftest test-builtin-metrics-1 + (with-simulated-time-local-cluster + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] + TOPOLOGY-STATS-SAMPLE-RATE 1.0 + TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}] + (let [feeder (feeder-spout ["field1"]) + tracker (AckFailMapTracker.) + _ (.setAckFailDelegate feeder tracker) + topology (thrift/mk-topology + {"myspout" (thrift/mk-spout-spec feeder)} + {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} acking-bolt)})] + (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology) + + (.feed feeder ["a"] 1) + (advance-cluster-time cluster 61) + (assert-buckets! "myspout" "__ack-count/default" [1]) + (assert-buckets! "myspout" "__emit-count/default" [1]) + (assert-buckets! "myspout" "__transfer-count/default" [1]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1]) + + (advance-cluster-time cluster 120) + (assert-buckets! "myspout" "__ack-count/default" [1 0 0]) + (assert-buckets! "myspout" "__emit-count/default" [1 0 0]) + (assert-buckets! "myspout" "__transfer-count/default" [1 0 0]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0]) + + (.feed feeder ["b"] 1) + (.feed feeder ["c"] 1) + (advance-cluster-time cluster 60) + (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2]) + (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2]) + (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2])))) + + +(deftest test-builtin-metrics-2 + (with-simulated-time-local-cluster + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] + TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true + TOPOLOGY-STATS-SAMPLE-RATE 1.0 + TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5}] + (let [feeder (feeder-spout ["field1"]) + tracker (AckFailMapTracker.) + _ (.setAckFailDelegate feeder tracker) + topology (thrift/mk-topology + {"myspout" (thrift/mk-spout-spec feeder)} + {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})] + (submit-local-topology (:nimbus cluster) + "metrics-tester" + {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} + topology) + + (.feed feeder ["a"] 1) + (advance-cluster-time cluster 6) + (assert-acked tracker 1) + (assert-buckets! "myspout" "__fail-count/default" []) + (assert-buckets! "myspout" "__ack-count/default" [1]) + (assert-buckets! "myspout" "__emit-count/default" [1]) + (assert-buckets! "myspout" "__transfer-count/default" [1]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1]) + + (.feed feeder ["b"] 2) + (advance-cluster-time cluster 5) + (assert-buckets! "myspout" "__fail-count/default" []) + (assert-buckets! "myspout" "__ack-count/default" [1 0]) + (assert-buckets! "myspout" "__emit-count/default" [1 1]) + (assert-buckets! "myspout" "__transfer-count/default" [1 1]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1]) + + (advance-cluster-time cluster 30) + (assert-failed tracker 2) + (assert-buckets! "myspout" "__fail-count/default" [1]) + (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0]) + (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0]) + (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0]))))