Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

New Storm metrics system #390

Merged
merged 14 commits into from

3 participants

Jason Jackson Nathan Marz Stanislav Sedov
Jason Jackson
Collaborator

No description provided.

Jason Jackson jasonjckn closed this
Jason Jackson jasonjckn reopened this
Jason Jackson jasonjckn closed this
Jason Jackson jasonjckn reopened this
Jason Jackson
Collaborator

just did a git rebase

src/jvm/backtype/storm/task/TopologyContext.java
@@ -190,4 +194,8 @@ public void addTaskHook(ITaskHook hook) {
public Collection<ITaskHook> getHooks() {
return _hooks;
}
+
+ public void registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) {
Nathan Marz Owner

You should add a convenience method that takes in an IReducer and wraps it in ReducedMetric

Nathan Marz Owner

Let's also have this method return the IMetric that was passed in, so that you can do things like:

_metric = context.registerMetric("foo", new CountMetric(), 5);

instead of:

_metric = new CountMetric();
context.registerMetric("foo", _metric, 5);

When you pass in a reducer or combiner, it will return a ReducedMetric or CombinedMetric appropriately.

Jason Jackson Collaborator

NOTE: you can only call registerMetric during prepare or open, should we add a check for this? That's in clojure atom open-or-prepare-was-called a little cumbersome to access from this function.

I could use clojure.RT.var to do it.

Jason Jackson Collaborator

Do we want to add convenience method for composite metrics? _metric = context.registerMetric("foo", new CompositeMetric(), 5);

The interface would be ICompositeMetric { void doRegistration(TopologyContext context); }

Nathan Marz Owner

Yea, it would be good to check for this in the TopologyContext. Part of setup-metrics! could be to tell the TopologyContext to stop accepting new registerMetrics invocations.

Jason Jackson Collaborator

by 'CompositeMetric' I mean for example where you have 2 or more reducers aggregating the same values passed to .update, exporting e.g. myfoo_count and myfoo_avg

Nathan Marz Owner

Not for now. Let's come back to composite metrics once we see how people are using this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/jvm/backtype/storm/metric/IMetricsConsumer.java
((4 lines not shown))
+import java.util.Map;
+
+public interface IMetricsConsumer {
+ public static class DataPoint {
+ public String srcWorkerHost;
+ public int srcWorkerPort;
+ public String srcComponentId;
+ public int srcTaskId;
+ public long timestamp;
+ public int updateIntervalSecs;
+ public String name;
+ public Object value;
+ }
+
+ void prepare(Map stormConf, Object registrationOptions, TopologyContext context);
+ void handleDataPoint(DataPoint dataPoint);
Nathan Marz Owner

I think we should change this interface actually. Rather than handle one data point at a time, it should handle all data points emitted in the same time interval from a task. So the interface would be like:

handleDataPoint(TaskInfo info, Collection dataPoints);

TaskInfo would contain the generic information like hostname, task id, port, timestamp, and updateIntervalSecs. Each data point would just contain the name and the value.

This also requires the metrics tuple to pack together all the data points for the same sized interval into one tuple.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
src/jvm/backtype/storm/metric/IncrementedMetric.java
@@ -0,0 +1,22 @@
+package backtype.storm.metric;
+
+public class IncrementedMetric implements IMetric {
Nathan Marz Owner

Can we rename this to CountMetric?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Nathan Marz
Owner

Overall, solid implementation. Left a couple of comments.

I also think we should add an ICombiner interface (a monoid interface with zero() and combine(X, X)

Nathan Marz
Owner

Oh, and also we need some unit tests for this. Time simulation should be useful here.

Jason Jackson
Collaborator

Do we want to split backtype.storm.metric namespace into an API and Impl sections?

Nathan Marz
Owner

Yea, that's a good idea.

Jason Jackson added some commits
test/clj/backtype/storm/metrics_test.clj
((21 lines not shown))
+ (bolt
+ (execute [tuple]
+ (.inc ack-count)
+ (ack! collector tuple)))))
+
+(def datapoints-buffer (atom nil))
+
+(defn metric-name->vals! [name]
+ (->> @datapoints-buffer
+ (mapcat (fn [[task-info data-points]] data-points))
+ (filter #(= name (.name %)))
+ (map #(.value %))
+ (into [])))
+
+(deftest test-time-buckets
+ (with-simulated-time-local-cluster
Nathan Marz Owner

The problem with this test is the lack of synchronization, so it can fail even when things are working properly. After each advance-cluster-time call, there's no guarantee that the tick tuple will be emitted, processed, and the resulting messages caught by your consumer before you do your assertion.

One way to solve this would be to use a global semaphore and use that to synchronize your assertions with the consumer actually receiving metrics tuples. The backtype.storm.utils.RegisteredGlobalState class can be useful for this.

Jason Jackson Collaborator

Why is it that 30secs tuple timeouts can be tested with advance-cluster-time without synchronization?

With global semaphore implementation, what happens if metrics tuple is not delivered, unit tests hang? My logic: If you decrement a semaphore when it's at 0, you block, until someone increments it (it's incremented when the tuple is delivered)

Here's another solution: advance-cluster-time blocks until all the time is simulated AND all queues are empty, all tuples were processed and acked, and IBolt::Execute functions returned. In other words, a more generic synchronization.

Nathan Marz Owner

The timeout tests wait until the spout receives the appropriate number of acks or fails, so it's not actually time-dependent.

Yes, with semaphore implementation, if the test is wrong or there's a bug it should hang, which is fine.

Your second proposal is a lot tricker than it seems. It's doable in theory, and in fact tracked topologies implements a bunch of this. We would have to add inter-task queue checking in order to accomplish this. I think the first approach will be much easier for you than trying to iron out every last detail of the second proposal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Nathan Marz
Owner

I commented on what's wrong with the unit test.

Nathan Marz
Owner

OK so the main things left on this issue are:

  1. Getting the standard Storm metrics into this system: execute latency, #executed, #acked, #failed, process latency, etc.
  2. Fixing the unit test
Nathan Marz
Owner

Was just looking at the diff, you should register the metrics classes for serialization in SerializationFactory, not in defaults.yaml. Those configs can be overridden.

Jason Jackson jasonjckn closed this
Jason Jackson jasonjckn reopened this
Jason Jackson jasonjckn closed this
Jason Jackson jasonjckn reopened this
Jason Jackson jasonjckn closed this
Jason Jackson jasonjckn reopened this
Jason Jackson jasonjckn closed this
Jason Jackson
Collaborator

0.9.0 merge undone.

Jason Jackson jasonjckn reopened this
Jason Jackson
Collaborator

all unit tests pass.

Nathan Marz
Owner

Merged into 0.9.0 with a few minor changes

Jason Jackson
Collaborator

can I merge metrics into 0.8.2 branch?

Jason Jackson
Collaborator

and deploy to prod?

Stanislav Sedov stass commented on the diff
src/jvm/backtype/storm/Config.java
@@ -454,6 +461,11 @@
public static String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
/**
+ * The time period that builtin metrics data in bucketed into.
Stanislav Sedov
stass added a note

"in bucketed into" -- should it be "is bucketed into" instead?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Nathan Marz nathanmarz merged commit a9fd4ab into from
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Oct 31, 2012
  1. storm metrics API initial implementation ready for review. no unit te…

    Jason Jackson authored
    …sting
    
    currently.
Commits on Nov 1, 2012
  1. refactorings, metrics API has it's own namespace.

    Jason Jackson authored
  2. IMetricsConsumer handles data points in batches.

    Jason Jackson authored
  3. 1) added CombinedMetric

    Jason Jackson authored
    2) error if registerMetrics called after component was prepared/opened.
  4. fixed comment

    Jason Jackson authored
  5. added one metrics unit test.

    Jason Jackson authored
Commits on Nov 2, 2012
  1. added parameter OutputCollector to IMetricsConsumer::prepare, so that…

    Jason Jackson authored
    … MetricsConsumer can reportError.
  2. tiny change for best practice.

    Jason Jackson authored
Commits on Nov 20, 2012
  1. fixed serialization exception due to metrics classes not being regist…

    Jason Jackson authored
    …ered properly
  2. Added built-in storm metrics, Added MultiReducedMetric. Refactored

    Jason Jackson authored
    MultiCountMetric. Added Unit tests.
Commits on Dec 6, 2012
  1. MeanReducer no longer divides by zero.

    Jason Jackson authored
This page is out of date. Refresh to see the latest.
Showing with 886 additions and 59 deletions.
  1. +1 −1  conf/defaults.yaml
  2. +7 −0 conf/storm.yaml.example
  3. +1 −1  project.clj
  4. +64 −0 src/clj/backtype/storm/daemon/builtin_metrics.clj
  5. +63 −10 src/clj/backtype/storm/daemon/common.clj
  6. +92 −35 src/clj/backtype/storm/daemon/executor.clj
  7. +13 −7 src/clj/backtype/storm/daemon/task.clj
  8. +53 −0 src/clj/backtype/storm/metric/testing.clj
  9. +4 −2 src/clj/backtype/storm/testing.clj
  10. +32 −0 src/jvm/backtype/storm/Config.java
  11. +4 −0 src/jvm/backtype/storm/Constants.java
  12. +47 −0 src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
  13. +17 −0 src/jvm/backtype/storm/metric/api/AssignableMetric.java
  14. +21 −0 src/jvm/backtype/storm/metric/api/CombinedMetric.java
  15. +24 −0 src/jvm/backtype/storm/metric/api/CountMetric.java
  16. +6 −0 src/jvm/backtype/storm/metric/api/ICombiner.java
  17. +5 −0 src/jvm/backtype/storm/metric/api/IMetric.java
  18. +43 −0 src/jvm/backtype/storm/metric/api/IMetricsConsumer.java
  19. +7 −0 src/jvm/backtype/storm/metric/api/IReducer.java
  20. +38 −0 src/jvm/backtype/storm/metric/api/MeanReducer.java
  21. +28 −0 src/jvm/backtype/storm/metric/api/MultiCountMetric.java
  22. +30 −0 src/jvm/backtype/storm/metric/api/MultiReducedMetric.java
  23. +21 −0 src/jvm/backtype/storm/metric/api/ReducedMetric.java
  24. +2 −0  src/jvm/backtype/storm/serialization/SerializationFactory.java
  25. +5 −0 src/jvm/backtype/storm/task/IErrorReporter.java
  26. +1 −2  src/jvm/backtype/storm/task/IOutputCollector.java
  27. +57 −1 src/jvm/backtype/storm/task/TopologyContext.java
  28. +11 −0 src/jvm/storm/trident/operation/TridentOperationContext.java
  29. +189 −0 test/clj/backtype/storm/metrics_test.clj
2  conf/defaults.yaml
View
@@ -87,6 +87,7 @@ topology.max.task.parallelism: null
topology.max.spout.pending: null
topology.state.synchronization.timeout.secs: 60
topology.stats.sample.rate: 0.05
+topology.builtin.metrics.bucket.size.secs: 60
topology.fall.back.on.java.serialization: true
topology.worker.childopts: null
topology.executor.receive.buffer.size: 1024 #batched
@@ -103,5 +104,4 @@ topology.max.error.report.per.interval: 5
topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory"
topology.trident.batch.emit.interval.millis: 500
-
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
7 conf/storm.yaml.example
View
@@ -21,3 +21,10 @@
# drpc.servers:
# - "server1"
# - "server2"
+
+## Metrics Consumers
+# topology.metrics.consumers.register:
+# - class: "org.mycompany.MyMetricsConsumer"
+# argument:
+# - endpoint: "metrics-collector.mycompany.org"
+# parallelism.hint: 1
2  project.clj
View
@@ -3,7 +3,7 @@
(do (println (str "ERROR: requires Leiningen 1.x but you are using " lein-version))
(System/exit 1)))
-(defproject storm "0.8.2-wip15"
+(defproject storm "0.8.2-wip16"
:source-path "src/clj"
:test-path "test/clj"
:java-source-path "src/jvm"
64 src/clj/backtype/storm/daemon/builtin_metrics.clj
View
@@ -0,0 +1,64 @@
+(ns backtype.storm.daemon.builtin-metrics
+ (:import [backtype.storm.metric.api MultiCountMetric MultiReducedMetric MeanReducer])
+ (:import [backtype.storm Config])
+ (:use [backtype.storm.stats :only [stats-rate]]))
+
+(defrecord BuiltinSpoutMetrics [^MultiCountMetric ack-count
+ ^MultiReducedMetric complete-latency
+ ^MultiCountMetric fail-count
+ ^MultiCountMetric emit-count
+ ^MultiCountMetric transfer-count])
+(defrecord BuiltinBoltMetrics [^MultiCountMetric ack-count
+ ^MultiReducedMetric process-latency
+ ^MultiCountMetric fail-count
+ ^MultiCountMetric execute-count
+ ^MultiReducedMetric execute-latency
+ ^MultiCountMetric emit-count
+ ^MultiCountMetric transfer-count])
+
+(defn make-data [executor-type]
+ (condp = executor-type
+ :spout (BuiltinSpoutMetrics. (MultiCountMetric.)
+ (MultiReducedMetric. (MeanReducer.))
+ (MultiCountMetric.)
+ (MultiCountMetric.)
+ (MultiCountMetric.))
+ :bolt (BuiltinBoltMetrics. (MultiCountMetric.)
+ (MultiReducedMetric. (MeanReducer.))
+ (MultiCountMetric.)
+ (MultiCountMetric.)
+ (MultiReducedMetric. (MeanReducer.))
+ (MultiCountMetric.)
+ (MultiCountMetric.))))
+
+(defn register-all [builtin-metrics storm-conf topology-context]
+ (doseq [[kw imetric] builtin-metrics]
+ (.registerMetric topology-context (str "__" (name kw)) imetric
+ (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS)))))
+
+(defn spout-acked-tuple! [^BuiltinSpoutMetrics m stats stream latency-ms]
+ (-> m .ack-count (.scope stream) (.incrBy (stats-rate stats)))
+ (-> m .complete-latency (.scope stream) (.update latency-ms)))
+
+(defn spout-failed-tuple! [^BuiltinSpoutMetrics m stats stream]
+ (-> m .fail-count (.scope stream) (.incrBy (stats-rate stats))))
+
+(defn bolt-execute-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms]
+ (let [scope (str comp-id ":" stream)]
+ (-> m .execute-count (.scope scope) (.incrBy (stats-rate stats)))
+ (-> m .execute-latency (.scope scope) (.update latency-ms))))
+
+(defn bolt-acked-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms]
+ (let [scope (str comp-id ":" stream)]
+ (-> m .ack-count (.scope scope) (.incrBy (stats-rate stats)))
+ (-> m .process-latency (.scope scope) (.update latency-ms))))
+
+(defn bolt-failed-tuple! [^BuiltinBoltMetrics m stats comp-id stream]
+ (let [scope (str comp-id ":" stream)]
+ (-> m .fail-count (.scope scope) (.incrBy (stats-rate stats)))))
+
+(defn emitted-tuple! [m stats stream]
+ (-> m :emit-count (.scope stream) (.incrBy (stats-rate stats))))
+
+(defn transferred-tuple! [m stats stream num-out-tasks]
+ (-> m :transfer-count (.scope stream) (.incrBy (* num-out-tasks (stats-rate stats)))))
73 src/clj/backtype/storm/daemon/common.clj
View
@@ -23,6 +23,8 @@
(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID)
(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID)
+(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID)
+(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID)
;; the task id is the virtual port
;; node->host is here so that tasks know who to talk to just from assignment
@@ -206,27 +208,78 @@
(.put_to_bolts ret "__acker" acker-bolt)
))
+(defn add-metric-streams! [^StormTopology topology]
+ (doseq [[_ component] (all-components topology)
+ :let [common (.get_common component)]]
+ (.put_to_streams common METRICS-STREAM-ID
+ (thrift/output-fields ["task-info" "data-points"]))))
+
(defn add-system-streams! [^StormTopology topology]
(doseq [[_ component] (all-components topology)
:let [common (.get_common component)]]
- (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))
- ;; TODO: consider adding a stats stream for stats aggregation
- ))
+ (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))))
+
+
+(defn map-occurrences [afn coll]
+ (->> coll
+ (reduce (fn [[counts new-coll] x]
+ (let [occurs (inc (get counts x 0))]
+ [(assoc counts x occurs) (cons (afn x occurs) new-coll)]))
+ [{} []])
+ (second)
+ (reverse)))
+
+(defn number-duplicates [coll]
+ "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
+ (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll))
+
+(defn metrics-consumer-register-ids [storm-conf]
+ "Generates a list of component ids for each metrics consumer
+ e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] "
+ (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)
+ (map #(get % "class"))
+ (number-duplicates)
+ (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
+
+(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf]
+ (let [inputs (->> (for [comp-id components-ids-that-emit-metrics]
+ {[comp-id METRICS-STREAM-ID] :shuffle})
+ (into {}))
+
+ mk-bolt-spec (fn [class arg p]
+ (thrift/mk-bolt-spec*
+ inputs
+ (backtype.storm.metric.MetricsConsumerBolt. class arg)
+ {} :p p :conf {TOPOLOGY-TASKS p}))]
+
+ (map
+ (fn [component-id register]
+ [component-id (mk-bolt-spec (get register "class")
+ (get register "argument")
+ (or (get register "parallelism.hint") 1))])
+
+ (metrics-consumer-register-ids storm-conf)
+ (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
+
+(defn add-metric-components! [storm-conf ^StormTopology topology]
+ (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)]
+ (.put_to_bolts topology comp-id bolt-spec)))
(defn add-system-components! [^StormTopology topology]
(let [system-spout (thrift/mk-spout-spec*
- (NoOpSpout.)
- {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
- }
- :p 0
- :conf {TOPOLOGY-TASKS 0})]
- (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)
- ))
+ (NoOpSpout.)
+ {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
+ METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
+ :p 0
+ :conf {TOPOLOGY-TASKS 0})]
+ (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)))
(defn system-topology! [storm-conf ^StormTopology topology]
(validate-basic! topology)
(let [ret (.deepCopy topology)]
(add-acker! storm-conf ret)
+ (add-metric-components! storm-conf ret)
+ (add-metric-streams! ret)
(add-system-streams! ret)
(add-system-components! ret)
(validate-structure! ret)
127 src/clj/backtype/storm/daemon/executor.clj
View
@@ -5,10 +5,11 @@
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.spout ISpoutWaitStrategy])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
- EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
+ EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
+ (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint])
(:require [backtype.storm [tuple :as tuple]])
(:require [backtype.storm.daemon [task :as task]])
- )
+ (:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics]))
(bootstrap)
@@ -212,6 +213,7 @@
:type executor-type
;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
:stats (mk-executor-stats <> (sampling-rate storm-conf))
+ :interval->task->metric-registry (HashMap.)
:task->component (:task->component worker)
:stream->component->grouper (outbound-components worker-context component-id)
:report-error (throttled-report-error-fn <>)
@@ -238,7 +240,42 @@
(worker-transfer-fn serializer alist)
(.setObject cached-emit (ArrayList.))
)))
- :kill-fn (:report-error-and-die executor-data))))
+ :kill-fn (:report-error-and-die executor-data))))
+
+(defn setup-metrics! [executor-data]
+ (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data
+ distinct-time-bucket-intervals (keys interval->task->metric-registry)]
+ (doseq [interval distinct-time-bucket-intervals]
+ (schedule-recurring
+ (:user-timer (:worker executor-data))
+ interval
+ interval
+ (fn []
+ (disruptor/publish
+ receive-queue
+ [[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]]))))))
+
+(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
+ (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data
+ interval (.getInteger tuple 0)]
+ (doseq [[task-id task-data] task-datas
+ :let [name->imetric (-> interval->task->metric-registry (get interval) (get task-id))
+ task-info (IMetricsConsumer$TaskInfo.
+ (. (java.net.InetAddress/getLocalHost) getCanonicalHostName)
+ (.getThisWorkerPort worker-context)
+ (:component-id executor-data)
+ task-id
+ (long (/ (System/currentTimeMillis) 1000))
+ interval)
+ data-points (->> name->imetric
+ (map (fn [[name imetric]]
+ (let [value (.getValueAndReset ^IMetric imetric)]
+ (if value
+ (IMetricsConsumer$DataPoint. name value)))))
+ (filter identity)
+ (into []))]]
+ (if (seq data-points)
+ (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points])))))
(defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
@@ -279,7 +316,7 @@
(mk-threads executor-data task-datas))
threads (concat handlers system-threads)]
(setup-ticks! worker executor-data)
-
+
(log-message "Finished loading executor " component-id ":" (pr-str executor-id))
;; TODO: add method here to get rendered stats... have worker call that when heartbeating
(reify
@@ -316,8 +353,8 @@
(.fail spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta))
(when time-delta
- (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
- )))
+ (builtin-metrics/spout-failed-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info))
+ (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
(defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta]
(let [storm-conf (:storm-conf executor-data)
@@ -328,8 +365,8 @@
(.ack spout msg-id)
(task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta))
(when time-delta
- (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)
- )))
+ (builtin-metrics/spout-acked-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info) time-delta)
+ (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta))))
(defn mk-task-receiver [executor-data tuple-action-fn]
(let [^KryoTupleDeserializer deserializer (:deserializer executor-data)
@@ -377,8 +414,9 @@
))))
tuple-action-fn (fn [task-id ^TupleImpl tuple]
(let [stream-id (.getSourceStreamId tuple)]
- (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
- (.rotate pending)
+ (condp = stream-id
+ Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
+ Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
(let [id (.getValue tuple 0)
[stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
(when spout-id
@@ -389,7 +427,7 @@
ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta)
ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta)
+ spout-id tuple-finished-info time-delta)
)))
;; TODO: on failure, emit tuple to failure stream
))))
@@ -455,6 +493,7 @@
(if (sampler) 0))))
(or out-tasks [])
))]]
+ (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data))
(.open spout-obj
storm-conf
(:user-context task-data)
@@ -472,6 +511,7 @@
)))))
(reset! open-or-prepare-was-called? true)
(log-message "Opened spout " component-id ":" (keys task-datas))
+ (setup-metrics! executor-data)
(disruptor/consumer-started! (:receive-queue executor-data))
(fn []
@@ -550,28 +590,36 @@
;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
;; TODO: how to handle incremental updates as well as synchronizations at same time
;; TODO: need to version tuples somehow
+
;;(log-debug "Received tuple " tuple " at task " task-id)
;; need to do it this way to avoid reflection
- (let [task-data (get task-datas task-id)
- ^IBolt bolt-obj (:object task-data)
- user-context (:user-context task-data)
- sampler? (sampler)
- execute-sampler? (execute-sampler)
- now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
- (when sampler?
- (.setProcessSampleStartTime tuple now))
- (when execute-sampler?
- (.setExecuteSampleStartTime tuple now))
- (.execute bolt-obj tuple)
- (let [delta (tuple-execute-time-delta! tuple)]
- (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
- (when delta
- (stats/bolt-execute-tuple! executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta)
- ))))]
+ (let [stream-id (.getSourceStreamId tuple)]
+ (condp = stream-id
+ Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
+ (let [task-data (get task-datas task-id)
+ ^IBolt bolt-obj (:object task-data)
+ user-context (:user-context task-data)
+ sampler? (sampler)
+ execute-sampler? (execute-sampler)
+ now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
+ (when sampler?
+ (.setProcessSampleStartTime tuple now))
+ (when execute-sampler?
+ (.setExecuteSampleStartTime tuple now))
+ (.execute bolt-obj tuple)
+ (let [delta (tuple-execute-time-delta! tuple)]
+ (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
+ (when delta
+ (builtin-metrics/bolt-execute-tuple! (:builtin-metrics task-data)
+ executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple)
+ delta)
+ (stats/bolt-execute-tuple! executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple)
+ delta)))))))]
;; TODO: can get any SubscribedState objects out of the context now
@@ -607,6 +655,7 @@
stream
(MessageId/makeId anchors-to-ids)))))
(or out-tasks [])))]]
+ (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context)
(.prepare bolt-obj
storm-conf
user-context
@@ -627,11 +676,15 @@
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
(when delta
+ (builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data)
+ executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple)
+ delta)
(stats/bolt-acked-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
- delta)
- )))
+ delta))))
(^void fail [this ^Tuple tuple]
(fast-list-iter [root (.. tuple getMessageId getAnchors)]
(task/send-unanchored task-data
@@ -640,16 +693,20 @@
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
(when delta
+ (builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data)
+ executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple))
(stats/bolt-failed-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
- delta)
- )))
+ delta))))
(reportError [this error]
(report-error error)
)))))
(reset! open-or-prepare-was-called? true)
- (log-message "Prepared bolt " component-id ":" (keys task-datas))
+ (log-message "Prepared bolt " component-id ":" (keys task-datas))
+ (setup-metrics! executor-data)
(let [receive-queue (:receive-queue executor-data)
event-handler (mk-task-receiver executor-data tuple-action-fn)]
20 src/clj/backtype/storm/daemon/task.clj
View
@@ -5,8 +5,9 @@
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.generated SpoutSpec Bolt StateSpoutSpec])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
- EmitInfo BoltFailInfo BoltAckInfo])
- (:require [backtype.storm [tuple :as tuple]]))
+ EmitInfo BoltFailInfo BoltAckInfo])
+ (:require [backtype.storm [tuple :as tuple]])
+ (:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics]))
(bootstrap)
@@ -28,7 +29,8 @@
(:default-shared-resources worker)
(:user-shared-resources worker)
(:shared-executor-data executor-data)
- )))
+ (:interval->task->metric-registry executor-data)
+ (:open-or-prepare-was-called? executor-data))))
(defn system-topology-context [worker executor-data tid]
((mk-topology-context-builder
@@ -123,9 +125,11 @@
(throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping")))
(apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id]))
(when (emit-sampler)
+ (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
(stats/emitted-tuple! executor-stats stream)
(if out-task-id
- (stats/transferred-tuples! executor-stats stream 1)))
+ (stats/transferred-tuples! executor-stats stream 1)
+ (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1)))
(if out-task-id [out-task-id])
))
([^String stream ^List values]
@@ -144,7 +148,9 @@
(apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks))
(when (emit-sampler)
(stats/emitted-tuple! executor-stats stream)
- (stats/transferred-tuples! executor-stats stream (count out-tasks)))
+ (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream)
+ (stats/transferred-tuples! executor-stats stream (count out-tasks))
+ (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream (count out-tasks)))
out-tasks)))
))
@@ -154,9 +160,9 @@
:task-id task-id
:system-context (system-topology-context (:worker executor-data) executor-data task-id)
:user-context (user-topology-context (:worker executor-data) executor-data task-id)
+ :builtin-metrics (builtin-metrics/make-data (:type executor-data))
:tasks-fn (mk-tasks-fn <>)
- :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))
- ))
+ :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data))))
(defn mk-task [executor-data task-id]
53 src/clj/backtype/storm/metric/testing.clj
View
@@ -0,0 +1,53 @@
+(ns backtype.storm.metric.testing
+ "This namespace is for AOT dependent metrics testing code."
+ (:gen-class))
+
+(letfn [(for- [threader arg seq-exprs body]
+ `(reduce #(%2 %1)
+ ~arg
+ (for ~seq-exprs
+ (fn [arg#] (~threader arg# ~@body)))))]
+ (defmacro for->
+ "Apply a thread expression to a sequence.
+ eg.
+ (-> 1
+ (for-> [x [1 2 3]]
+ (+ x)))
+ => 7"
+ {:indent 1}
+ [arg seq-exprs & body]
+ (for- 'clojure.core/-> arg seq-exprs body)))
+
+(gen-class
+ :name clojure.storm.metric.testing.FakeMetricConsumer
+ :implements [backtype.storm.metric.api.IMetricsConsumer]
+ :prefix "impl-")
+
+(def buffer (atom nil))
+
+(defn impl-prepare [this conf argument ctx error-reporter]
+ (reset! buffer {}))
+
+(defn impl-cleanup [this]
+ (reset! buffer {}))
+
+(defn vec-conj [coll x] (if coll
+ (conj coll x)
+ [x]))
+
+(defn expand-complex-datapoint [dp]
+ (if (or (map? (.value dp))
+ (instance? java.util.AbstractMap (.value dp)))
+ (into [] (for [[k v] (.value dp)]
+ [(str (.name dp) "/" k) v]))
+ [[(.name dp) (.value dp)]]))
+
+(defn impl-handleDataPoints [this task-info data-points]
+ (swap! buffer
+ (fn [old]
+ (-> old
+ (for-> [dp data-points
+ [name val] (expand-complex-datapoint dp)]
+ (update-in [(.srcComponentId task-info) name (.srcTaskId task-info)] vec-conj val))))))
+
+
6 src/clj/backtype/storm/testing.clj
View
@@ -8,7 +8,7 @@
(:require [backtype.storm [process-simulator :as psim]])
(:import [org.apache.commons.io FileUtils])
(:import [java.io File])
- (:import [java.util HashMap])
+ (:import [java.util HashMap ArrayList])
(:import [java.util.concurrent.atomic AtomicInteger])
(:import [java.util.concurrent ConcurrentHashMap])
(:import [backtype.storm.utils Time Utils RegisteredGlobalState])
@@ -587,6 +587,8 @@
[(int 1)]
{}
{}
- (HashMap.))]
+ (HashMap.)
+ (HashMap.)
+ (atom false))]
(TupleImpl. context values 1 stream)
))
32 src/jvm/backtype/storm/Config.java
View
@@ -410,6 +410,13 @@
*/
public static String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations";
+ /*
+ * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format).
+ * Each listed class will be routed all the metrics data generated by the storm metrics API.
+ * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
+ */
+ public static String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
+
/**
* The maximum parallelism allowed for a component in this topology. This configuration is
@@ -454,6 +461,11 @@
public static String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate";
/**
+ * The time period that builtin metrics data in bucketed into.
Stanislav Sedov
stass added a note

"in bucketed into" -- should it be "is bucketed into" instead?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
+ */
+ public static String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs";
+
+ /**
* Whether or not to use Java serialization in a topology.
*/
public static String TOPOLOGY_FALL_BACK_ON_JAVA_SERIALIZATION="topology.fall.back.on.java.serialization";
@@ -646,6 +658,26 @@ public void registerSerialization(Class klass, Class<? extends Serializer> seria
registerSerialization(this, klass, serializerClass);
}
+ public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) {
+ HashMap m = new HashMap();
+ m.put("class", klass.getCanonicalName());
+ m.put("parallelism.hint", parallelismHint);
+ m.put("argument", argument);
+
+ List l = (List)this.get(TOPOLOGY_METRICS_CONSUMER_REGISTER);
+ if(l == null) { l = new ArrayList(); }
+ l.add(m);
+ this.put(TOPOLOGY_METRICS_CONSUMER_REGISTER, l);
+ }
+
+ public void registerMetricsConsumer(Class klass, long parallelismHint) {
+ registerMetricsConsumer(klass, null, parallelismHint);
+ }
+
+ public void registerMetricsConsumer(Class klass) {
+ registerMetricsConsumer(klass, null, 1L);
+ }
+
public static void registerDecorator(Map conf, Class<? extends IKryoDecorator> klass) {
getRegisteredDecorators(conf).add(klass.getName());
}
4 src/jvm/backtype/storm/Constants.java
View
@@ -8,4 +8,8 @@
public static final String SYSTEM_COMPONENT_ID = "__system";
public static final String SYSTEM_TICK_STREAM_ID = "__tick";
+ public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
+ public static final String METRICS_STREAM_ID = "__metrics";
+ public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
}
+
47 src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
View
@@ -0,0 +1,47 @@
+package backtype.storm.metric;
+
+import backtype.storm.Config;
+import backtype.storm.metric.api.IMetricsConsumer;
+import backtype.storm.task.IBolt;
+import backtype.storm.task.IErrorReporter;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import java.util.Collection;
+import java.util.Map;
+
+public class MetricsConsumerBolt implements IBolt {
+ IMetricsConsumer _metricsConsumer;
+ String _consumerClassName;
+ OutputCollector _collector;
+ Object _registrationArgument;
+
+ public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) {
+ _consumerClassName = consumerClassName;
+ _registrationArgument = registrationArgument;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ try {
+ _metricsConsumer = (IMetricsConsumer)Class.forName(_consumerClassName).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate a class listed in config under section " +
+ Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
+ }
+ _metricsConsumer.prepare(stormConf, _registrationArgument, context, (IErrorReporter)collector);
+ _collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1));
+ _collector.ack(input);
+ }
+
+ @Override
+ public void cleanup() {
+ _metricsConsumer.cleanup();
+ }
+
+}
17 src/jvm/backtype/storm/metric/api/AssignableMetric.java
View
@@ -0,0 +1,17 @@
+package backtype.storm.metric.api;
+
+public class AssignableMetric implements IMetric {
+ Object _value;
+
+ public AssignableMetric(Object value) {
+ _value = value;
+ }
+
+ public void setValue(Object value) {
+ _value = value;
+ }
+
+ public Object getValueAndReset() {
+ return _value;
+ }
+}
21 src/jvm/backtype/storm/metric/api/CombinedMetric.java
View
@@ -0,0 +1,21 @@
+package backtype.storm.metric.api;
+
+public class CombinedMetric implements IMetric {
+ private final ICombiner _combiner;
+ private Object _value;
+
+ public CombinedMetric(ICombiner combiner) {
+ _combiner = combiner;
+ _value = _combiner.identity();
+ }
+
+ public void update(Object value) {
+ _value = _combiner.combine(_value, value);
+ }
+
+ public Object getValueAndReset() {
+ Object ret = _value;
+ _value = _combiner.identity();
+ return ret;
+ }
+}
24 src/jvm/backtype/storm/metric/api/CountMetric.java
View
@@ -0,0 +1,24 @@
+package backtype.storm.metric.api;
+
+import backtype.storm.metric.api.IMetric;
+
+public class CountMetric implements IMetric {
+ long _value = 0;
+
+ public CountMetric() {
+ }
+
+ public void incr() {
+ _value++;
+ }
+
+ public void incrBy(long incrementBy) {
+ _value += incrementBy;
+ }
+
+ public Object getValueAndReset() {
+ long ret = _value;
+ _value = 0;
+ return ret;
+ }
+}
6 src/jvm/backtype/storm/metric/api/ICombiner.java
View
@@ -0,0 +1,6 @@
+package backtype.storm.metric.api;
+
+public interface ICombiner<T> {
+ public T identity();
+ public T combine(T a, T b);
+}
5 src/jvm/backtype/storm/metric/api/IMetric.java
View
@@ -0,0 +1,5 @@
+package backtype.storm.metric.api;
+
+public interface IMetric {
+ public Object getValueAndReset();
+}
43 src/jvm/backtype/storm/metric/api/IMetricsConsumer.java
View
@@ -0,0 +1,43 @@
+package backtype.storm.metric.api;
+
+import backtype.storm.task.IErrorReporter;
+import backtype.storm.task.TopologyContext;
+import java.util.Collection;
+import java.util.Map;
+
+public interface IMetricsConsumer {
+ public static class TaskInfo {
+ public TaskInfo() {}
+ public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId, int srcTaskId, long timestamp, int updateIntervalSecs) {
+ this.srcWorkerHost = srcWorkerHost;
+ this.srcWorkerPort = srcWorkerPort;
+ this.srcComponentId = srcComponentId;
+ this.srcTaskId = srcTaskId;
+ this.timestamp = timestamp;
+ this.updateIntervalSecs = updateIntervalSecs;
+ }
+ public String srcWorkerHost;
+ public int srcWorkerPort;
+ public String srcComponentId;
+ public int srcTaskId;
+ public long timestamp;
+ public int updateIntervalSecs;
+ }
+ public static class DataPoint {
+ public DataPoint() {}
+ public DataPoint(String name, Object value) {
+ this.name = name;
+ this.value = value;
+ }
+ @Override
+ public String toString() {
+ return "[" + name + " = " + value + "]";
+ }
+ public String name;
+ public Object value;
+ }
+
+ void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter);
+ void handleDataPoints(TaskInfo taskInfo, Collection<DataPoint> dataPoints);
+ void cleanup();
+}
7 src/jvm/backtype/storm/metric/api/IReducer.java
View
@@ -0,0 +1,7 @@
+package backtype.storm.metric.api;
+
+public interface IReducer<T> {
+ T init();
+ T reduce(T accumulator, Object input);
+ Object extractResult(T accumulator);
+}
38 src/jvm/backtype/storm/metric/api/MeanReducer.java
View
@@ -0,0 +1,38 @@
+package backtype.storm.metric.api;
+
+import backtype.storm.metric.api.IReducer;
+
+class MeanReducerState {
+ public int count = 0;
+ public double sum = 0.0;
+}
+
+public class MeanReducer implements IReducer<MeanReducerState> {
+ public MeanReducerState init() {
+ return new MeanReducerState();
+ }
+
+ public MeanReducerState reduce(MeanReducerState acc, Object input) {
+ acc.count++;
+ if(input instanceof Double) {
+ acc.sum += (Double)input;
+ } else if(input instanceof Long) {
+ acc.sum += ((Long)input).doubleValue();
+ } else if(input instanceof Integer) {
+ acc.sum += ((Integer)input).doubleValue();
+ } else {
+ throw new RuntimeException(
+ "MeanReducer::reduce called with unsupported input type `" + input.getClass()
+ + "`. Supported types are Double, Long, Integer.");
+ }
+ return acc;
+ }
+
+ public Object extractResult(MeanReducerState acc) {
+ if(acc.count > 0) {
+ return new Double(acc.sum / (double)acc.count);
+ } else {
+ return null;
+ }
+ }
+}
28 src/jvm/backtype/storm/metric/api/MultiCountMetric.java
View
@@ -0,0 +1,28 @@
+package backtype.storm.metric.api;
+
+import backtype.storm.metric.api.IMetric;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MultiCountMetric implements IMetric {
+ Map<String, CountMetric> _value = new HashMap();
+
+ public MultiCountMetric() {
+ }
+
+ public CountMetric scope(String key) {
+ CountMetric val = _value.get(key);
+ if(val == null) {
+ _value.put(key, val = new CountMetric());
+ }
+ return val;
+ }
+
+ public Object getValueAndReset() {
+ Map ret = new HashMap();
+ for(Map.Entry<String, CountMetric> e : _value.entrySet()) {
+ ret.put(e.getKey(), e.getValue().getValueAndReset());
+ }
+ return ret;
+ }
+}
30 src/jvm/backtype/storm/metric/api/MultiReducedMetric.java
View
@@ -0,0 +1,30 @@
+package backtype.storm.metric.api;
+
+import backtype.storm.metric.api.IMetric;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MultiReducedMetric implements IMetric {
+ Map<String, ReducedMetric> _value = new HashMap();
+ IReducer _reducer;
+
+ public MultiReducedMetric(IReducer reducer) {
+ _reducer = reducer;
+ }
+
+ public ReducedMetric scope(String key) {
+ ReducedMetric val = _value.get(key);
+ if(val == null) {
+ _value.put(key, val = new ReducedMetric(_reducer));
+ }
+ return val;
+ }
+
+ public Object getValueAndReset() {
+ Map ret = new HashMap();
+ for(Map.Entry<String, ReducedMetric> e : _value.entrySet()) {
+ ret.put(e.getKey(), e.getValue().getValueAndReset());
+ }
+ return ret;
+ }
+}
21 src/jvm/backtype/storm/metric/api/ReducedMetric.java
View
@@ -0,0 +1,21 @@
+package backtype.storm.metric.api;
+
+public class ReducedMetric implements IMetric {
+ private final IReducer _reducer;
+ private Object _accumulator;
+
+ public ReducedMetric(IReducer reducer) {
+ _reducer = reducer;
+ _accumulator = _reducer.init();
+ }
+
+ public void update(Object value) {
+ _accumulator = _reducer.reduce(_accumulator, value);
+ }
+
+ public Object getValueAndReset() {
+ Object ret = _reducer.extractResult(_accumulator);
+ _accumulator = _reducer.init();
+ return ret;
+ }
+}
2  src/jvm/backtype/storm/serialization/SerializationFactory.java
View
@@ -38,6 +38,8 @@ public static Kryo getKryo(Map conf) {
k.register(BigInteger.class, new BigIntegerSerializer());
k.register(TransactionAttempt.class);
k.register(Values.class);
+ k.register(backtype.storm.metric.api.IMetricsConsumer.DataPoint.class);
+ k.register(backtype.storm.metric.api.IMetricsConsumer.TaskInfo.class);
try {
JavaBridge.registerPrimitives(k);
JavaBridge.registerCollections(k);
5 src/jvm/backtype/storm/task/IErrorReporter.java
View
@@ -0,0 +1,5 @@
+package backtype.storm.task;
+
+public interface IErrorReporter {
+ void reportError(Throwable error);
+}
3  src/jvm/backtype/storm/task/IOutputCollector.java
View
@@ -4,7 +4,7 @@
import java.util.Collection;
import java.util.List;
-public interface IOutputCollector {
+public interface IOutputCollector extends IErrorReporter {
/**
* Returns the task ids that received the tuples.
*/
@@ -12,5 +12,4 @@
void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple);
void ack(Tuple input);
void fail(Tuple input);
- void reportError(Throwable error);
}
58 src/jvm/backtype/storm/task/TopologyContext.java
View
@@ -4,6 +4,11 @@
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.hooks.ITaskHook;
+import backtype.storm.metric.api.IMetric;
+import backtype.storm.metric.api.IReducer;
+import backtype.storm.metric.api.ICombiner;
+import backtype.storm.metric.api.ReducedMetric;
+import backtype.storm.metric.api.CombinedMetric;
import backtype.storm.state.ISubscribedState;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
@@ -29,6 +34,8 @@
private Map<String, Object> _taskData = new HashMap<String, Object>();
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
private Map<String, Object> _executorData;
+ private Map<Integer,Map<Integer, Map<String, IMetric>>> _registeredMetrics;
+ private clojure.lang.Atom _openOrPrepareWasCalled;
public TopologyContext(StormTopology topology, Map stormConf,
@@ -36,12 +43,15 @@ public TopologyContext(StormTopology topology, Map stormConf,
Map<String, Map<String, Fields>> componentToStreamToFields,
String stormId, String codeDir, String pidDir, Integer taskId,
Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
- Map<String, Object> userResources, Map<String, Object> executorData) {
+ Map<String, Object> userResources, Map<String, Object> executorData, Map registeredMetrics,
+ clojure.lang.Atom openOrPrepareWasCalled) {
super(topology, stormConf, taskToComponent, componentToSortedTasks,
componentToStreamToFields, stormId, codeDir, pidDir,
workerPort, workerTasks, defaultResources, userResources);
_taskId = taskId;
_executorData = executorData;
+ _registeredMetrics = registeredMetrics;
+ _openOrPrepareWasCalled = openOrPrepareWasCalled;
}
/**
@@ -190,4 +200,50 @@ public void addTaskHook(ITaskHook hook) {
public Collection<ITaskHook> getHooks() {
return _hooks;
}
+
+ /*
+ * Register a IMetric instance.
+ * Storm will then call getValueAndReset on the metric every timeBucketSizeInSecs
+ * and the returned value is sent to all metrics consumers.
+ * You must call this during IBolt::prepare or ISpout::open.
+ * @return The IMetric argument unchanged.
+ */
+ public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) {
+ if((Boolean)_openOrPrepareWasCalled.deref() == true) {
+ throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " +
+ "IBolt::prepare() or ISpout::open() method.");
+ }
+
+ Map m1 = _registeredMetrics;
+ if(!m1.containsKey(timeBucketSizeInSecs)) {
+ m1.put(timeBucketSizeInSecs, new HashMap());
+ }
+
+ Map m2 = (Map)m1.get(timeBucketSizeInSecs);
+ if(!m2.containsKey(_taskId)) {
+ m2.put(_taskId, new HashMap());
+ }
+
+ Map m3 = (Map)m2.get(_taskId);
+ if(m3.containsKey(name)) {
+ throw new RuntimeException("The same metric name `" + name + "` was registered twice." );
+ } else {
+ m3.put(name, metric);
+ }
+
+ return metric;
+ }
+
+ /*
+ * Convinience method for registering ReducedMetric.
+ */
+ public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
+ return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
+ }
+ /*
+ * Convinience method for registering CombinedMetric.
+ */
+ public IMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
+ return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
+ }
}
11 src/jvm/storm/trident/operation/TridentOperationContext.java
View
@@ -1,5 +1,6 @@
package storm.trident.operation;
+import backtype.storm.metric.api.*;
import backtype.storm.task.TopologyContext;
import backtype.storm.tuple.Fields;
import storm.trident.tuple.TridentTuple;
@@ -29,4 +30,14 @@ public int numPartitions() {
public int getPartitionIndex() {
return _topoContext.getThisTaskIndex();
}
+
+ public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) {
+ return _topoContext.registerMetric(name, metric, timeBucketSizeInSecs);
+ }
+ public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) {
+ return _topoContext.registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs);
+ }
+ public IMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) {
+ return _topoContext.registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs);
+ }
}
189 test/clj/backtype/storm/metrics_test.clj
View
@@ -0,0 +1,189 @@
+(ns backtype.storm.metrics-test
+ (:use [clojure test])
+ (:import [backtype.storm.topology TopologyBuilder])
+ (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus])
+ (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount
+ TestAggregatesCounter TestConfBolt AckFailMapTracker])
+ (:import [backtype.storm.metric.api CountMetric IMetricsConsumer$DataPoint IMetricsConsumer$TaskInfo])
+
+ (:use [backtype.storm bootstrap testing])
+ (:use [backtype.storm.daemon common])
+ (:use [backtype.storm.metric testing]))
+
+
+(bootstrap)
+
+(defbolt acking-bolt {} {:prepare true}
+ [conf context collector]
+ (bolt
+ (execute [tuple]
+ (ack! collector tuple))))
+
+(defbolt ack-every-other {} {:prepare true}
+ [conf context collector]
+ (let [state (atom -1)]
+ (bolt
+ (execute [tuple]
+ (let [val (swap! state -)]
+ (when (pos? val)
+ (ack! collector tuple)
+ ))))))
+
+(defn assert-loop [afn ids]
+ (while (not (every? afn ids))
+ (Thread/sleep 1)))
+
+(defn assert-acked [tracker & ids]
+ (assert-loop #(.isAcked tracker %) ids))
+
+(defn assert-failed [tracker & ids]
+ (assert-loop #(.isFailed tracker %) ids))
+
+(defbolt count-acks {} {:prepare true}
+ [conf context collector]
+
+ (let [mycustommetric (CountMetric.)]
+ (.registerMetric context "my-custom-metric" mycustommetric 5)
+ (bolt
+ (execute [tuple]
+ (.incr mycustommetric)
+ (ack! collector tuple)))))
+
+(def metrics-data backtype.storm.metric.testing/buffer)
+
+(defn wait-for-atleast-N-buckets! [N comp-id metric-name]
+ (while
+ (let [taskid->buckets (-> @metrics-data (get comp-id) (get metric-name))]
+ (or
+ (and (not= N 0) (nil? taskid->buckets))
+ (not-every? #(<= N %) (map (comp count second) taskid->buckets))))
+ (println "Waiting for at least" N "timebuckets to appear in FakeMetricsConsumer for component id" comp-id
+ "and metric name" metric-name)
+ (Thread/sleep 10)))
+
+(defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name]
+ (-> @metrics-data
+ (get comp-id)
+ (get metric-name)
+ (first) ;; pick first task in the list, ignore other tasks' metric data.
+ (second)
+ (or [])))
+
+(defmacro assert-buckets! [comp-id metric-name expected]
+ `(do
+ (let [N# (count ~expected)]
+ (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name)
+ (is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#))))))
+
+(deftest test-custom-metric
+ (with-simulated-time-local-cluster
+ [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}]
+ (let [feeder (feeder-spout ["field1"])
+ tracker (AckFailMapTracker.)
+ _ (.setAckFailDelegate feeder tracker)
+ topology (thrift/mk-topology
+ {"1" (thrift/mk-spout-spec feeder)}
+ {"2" (thrift/mk-bolt-spec {"1" :global} count-acks)})]
+ (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+
+ (.feed feeder ["a"] 1)
+ (advance-cluster-time cluster 6)
+ (assert-buckets! "2" "my-custom-metric" [1])
+
+ (advance-cluster-time cluster 5)
+ (assert-buckets! "2" "my-custom-metric" [1 0])
+
+ (advance-cluster-time cluster 20)
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0])
+
+ (.feed feeder ["b"] 2)
+ (.feed feeder ["c"] 3)
+ (advance-cluster-time cluster 5)
+ (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2]))))
+
+
+(deftest test-builtin-metrics-1
+ (with-simulated-time-local-cluster
+ [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ TOPOLOGY-STATS-SAMPLE-RATE 1.0
+ TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}]
+ (let [feeder (feeder-spout ["field1"])
+ tracker (AckFailMapTracker.)
+ _ (.setAckFailDelegate feeder tracker)
+ topology (thrift/mk-topology
+ {"myspout" (thrift/mk-spout-spec feeder)}
+ {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} acking-bolt)})]
+ (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology)
+
+ (.feed feeder ["a"] 1)
+ (advance-cluster-time cluster 61)
+ (assert-buckets! "myspout" "__ack-count/default" [1])
+ (assert-buckets! "myspout" "__emit-count/default" [1])
+ (assert-buckets! "myspout" "__transfer-count/default" [1])
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1])
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1])
+
+ (advance-cluster-time cluster 120)
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0])
+ (assert-buckets! "myspout" "__emit-count/default" [1 0 0])
+ (assert-buckets! "myspout" "__transfer-count/default" [1 0 0])
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0])
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0])
+
+ (.feed feeder ["b"] 1)
+ (.feed feeder ["c"] 1)
+ (advance-cluster-time cluster 60)
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2])
+ (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2])
+ (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2])
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2])
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2]))))
+
+
+(deftest test-builtin-metrics-2
+ (with-simulated-time-local-cluster
+ [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER
+ [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]
+ TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true
+ TOPOLOGY-STATS-SAMPLE-RATE 1.0
+ TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5}]
+ (let [feeder (feeder-spout ["field1"])
+ tracker (AckFailMapTracker.)
+ _ (.setAckFailDelegate feeder tracker)
+ topology (thrift/mk-topology
+ {"myspout" (thrift/mk-spout-spec feeder)}
+ {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})]
+ (submit-local-topology (:nimbus cluster)
+ "metrics-tester"
+ {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20}
+ topology)
+
+ (.feed feeder ["a"] 1)
+ (advance-cluster-time cluster 6)
+ (assert-acked tracker 1)
+ (assert-buckets! "myspout" "__fail-count/default" [])
+ (assert-buckets! "myspout" "__ack-count/default" [1])
+ (assert-buckets! "myspout" "__emit-count/default" [1])
+ (assert-buckets! "myspout" "__transfer-count/default" [1])
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1])
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1])
+
+ (.feed feeder ["b"] 2)
+ (advance-cluster-time cluster 5)
+ (assert-buckets! "myspout" "__fail-count/default" [])
+ (assert-buckets! "myspout" "__ack-count/default" [1 0])
+ (assert-buckets! "myspout" "__emit-count/default" [1 1])
+ (assert-buckets! "myspout" "__transfer-count/default" [1 1])
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0])
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1])
+
+ (advance-cluster-time cluster 30)
+ (assert-failed tracker 2)
+ (assert-buckets! "myspout" "__fail-count/default" [1])
+ (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0])
+ (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0])
+ (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0])
+ (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0])
+ (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0]))))
Something went wrong with that request. Please try again.