From 4a01197115ea84e8be627fa102ae18c40f94c47b Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Tue, 16 Oct 2012 17:33:52 -0700 Subject: [PATCH 01/14] storm metrics API initial implementation ready for review. no unit testing currently. --- conf/storm.yaml.example | 7 ++ src/clj/backtype/storm/daemon/common.clj | 73 ++++++++++++--- src/clj/backtype/storm/daemon/executor.clj | 88 +++++++++++++------ src/clj/backtype/storm/daemon/task.clj | 2 +- src/clj/backtype/storm/testing.clj | 5 +- src/jvm/backtype/storm/Config.java | 7 ++ src/jvm/backtype/storm/Constants.java | 4 + .../storm/metric/FixedValueMetric.java | 17 ++++ src/jvm/backtype/storm/metric/IMetric.java | 5 ++ .../storm/metric/IMetricsConsumer.java | 21 +++++ src/jvm/backtype/storm/metric/IReducer.java | 7 ++ .../storm/metric/IncrementedMetric.java | 22 +++++ .../backtype/storm/metric/MeanReducer.java | 22 +++++ .../backtype/storm/metric/MetricHolder.java | 17 ++++ .../storm/metric/MetricsConsumerBolt.java | 54 ++++++++++++ .../backtype/storm/metric/ReducedMetric.java | 21 +++++ .../backtype/storm/task/TopologyContext.java | 10 ++- 17 files changed, 342 insertions(+), 40 deletions(-) create mode 100644 src/jvm/backtype/storm/metric/FixedValueMetric.java create mode 100644 src/jvm/backtype/storm/metric/IMetric.java create mode 100644 src/jvm/backtype/storm/metric/IMetricsConsumer.java create mode 100644 src/jvm/backtype/storm/metric/IReducer.java create mode 100644 src/jvm/backtype/storm/metric/IncrementedMetric.java create mode 100644 src/jvm/backtype/storm/metric/MeanReducer.java create mode 100644 src/jvm/backtype/storm/metric/MetricHolder.java create mode 100644 src/jvm/backtype/storm/metric/MetricsConsumerBolt.java create mode 100644 src/jvm/backtype/storm/metric/ReducedMetric.java diff --git a/conf/storm.yaml.example b/conf/storm.yaml.example index 6e9cefc89..043f6ff31 100644 --- a/conf/storm.yaml.example +++ b/conf/storm.yaml.example @@ -21,3 +21,10 @@ # drpc.servers: # - "server1" # - "server2" + +## Metrics Consumers +# topology.metrics.consumers.register: +# - class: "org.mycompany.MyMetricsConsumer" +# argument: +# - endpoint: "metrics-collector.mycompany.org" +# parallelism.hint: 1 diff --git a/src/clj/backtype/storm/daemon/common.clj b/src/clj/backtype/storm/daemon/common.clj index b4f271d0e..780c40726 100644 --- a/src/clj/backtype/storm/daemon/common.clj +++ b/src/clj/backtype/storm/daemon/common.clj @@ -23,6 +23,8 @@ (def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID) (def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID) +(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID) +(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID) ;; the task id is the virtual port ;; node->host is here so that tasks know who to talk to just from assignment @@ -206,27 +208,78 @@ (.put_to_bolts ret "__acker" acker-bolt) )) +(defn add-metric-streams! [^StormTopology topology] + (doseq [[_ component] (all-components topology) + :let [common (.get_common component)]] + (.put_to_streams common METRICS-STREAM-ID + (thrift/output-fields ["worker-host" "worker-port" "interval" "timestamp" "name" "value"])))) + (defn add-system-streams! [^StormTopology topology] (doseq [[_ component] (all-components topology) :let [common (.get_common component)]] - (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"])) - ;; TODO: consider adding a stats stream for stats aggregation - )) + (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"])))) + + +(defn map-occurrences [afn coll] + (->> coll + (reduce (fn [[counts new-coll] x] + (let [occurs (inc (get counts x 0))] + [(assoc counts x occurs) (cons (afn x occurs) new-coll)])) + [{} []]) + (second) + (reverse))) + +(defn number-duplicates [coll] + "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a2\"]" + (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll)) + +(defn metrics-consumer-register-ids [storm-conf] + "Generates a list of component ids for each metrics consumer + e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] " + (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER) + (map #(get % "class")) + (number-duplicates) + (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %)))) + +(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf] + (let [inputs (->> (for [comp-id components-ids-that-emit-metrics] + {[comp-id METRICS-STREAM-ID] :shuffle}) + (into {})) + + mk-bolt-spec (fn [class arg p] + (thrift/mk-bolt-spec* + inputs + (backtype.storm.metric.MetricsConsumerBolt. class arg) + {} :p p :conf {TOPOLOGY-TASKS p}))] + + (map + (fn [component-id register] + [component-id (mk-bolt-spec (get register "class") + (get register "argument") + (or (get register "parallelism.hint") 1))]) + + (metrics-consumer-register-ids storm-conf) + (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)))) + +(defn add-metric-components! [storm-conf ^StormTopology topology] + (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)] + (.put_to_bolts topology comp-id bolt-spec))) (defn add-system-components! [^StormTopology topology] (let [system-spout (thrift/mk-spout-spec* - (NoOpSpout.) - {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) - } - :p 0 - :conf {TOPOLOGY-TASKS 0})] - (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout) - )) + (NoOpSpout.) + {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"]) + METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])} + :p 0 + :conf {TOPOLOGY-TASKS 0})] + (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout))) (defn system-topology! [storm-conf ^StormTopology topology] (validate-basic! topology) (let [ret (.deepCopy topology)] (add-acker! storm-conf ret) + (add-metric-streams! ret) + (add-metric-components! storm-conf ret) (add-system-streams! ret) (add-system-components! ret) (validate-structure! ret) diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj index 5b972c270..1c10b2a20 100644 --- a/src/clj/backtype/storm/daemon/executor.clj +++ b/src/clj/backtype/storm/daemon/executor.clj @@ -5,7 +5,8 @@ (:import [backtype.storm.tuple Tuple]) (:import [backtype.storm.spout ISpoutWaitStrategy]) (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo - EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo]) + EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo]) + (:import [backtype.storm.metric MetricHolder IMetric]) (:require [backtype.storm [tuple :as tuple]]) (:require [backtype.storm.daemon [task :as task]]) ) @@ -212,6 +213,7 @@ :type executor-type ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field) :stats (mk-executor-stats <> (sampling-rate storm-conf)) + :registered-metrics (ArrayList.) :task->component (:task->component worker) :stream->component->grouper (outbound-components worker-context component-id) :report-error (throttled-report-error-fn <>) @@ -238,7 +240,34 @@ (worker-transfer-fn serializer alist) (.setObject cached-emit (ArrayList.)) ))) - :kill-fn (:report-error-and-die executor-data)))) + :kill-fn (:report-error-and-die executor-data)))) + +(defn setup-metrics! [executor-data] + (let [{:keys [storm-conf receive-queue worker-context registered-metrics]} executor-data + distinct-time-bucket-intervals (->> registered-metrics (map #(.getTimeBucketIntervalInSecs %)) distinct)] + (doseq [interval distinct-time-bucket-intervals] + (schedule-recurring + (:user-timer (:worker executor-data)) + interval + interval + (fn [] + (disruptor/publish + receive-queue + [[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]])))))) + +(defn metrics-tick [executor-data task-datas ^TupleImpl tuple] + (let [{:keys [registered-metrics ^WorkerTopologyContext worker-context]} executor-data + interval (.getInteger tuple 0)] + (doseq [^MetricHolder mh registered-metrics] + (when (= interval (.getTimeBucketIntervalInSecs mh)) + (let [^IMetric metric (.getMetric mh) + name (.getName mh) + value (.getValueAndReset metric) + timestamp (System/currentTimeMillis) + worker-host (. (java.net.InetAddress/getLocalHost) getCanonicalHostName) + worker-port (.getThisWorkerPort worker-context)] + (doseq [[task-id task-data] task-datas] + (task/send-unanchored task-data Constants/METRICS_STREAM_ID [worker-host worker-port interval timestamp name value]))))))) (defn setup-ticks! [worker executor-data] (let [storm-conf (:storm-conf executor-data) @@ -279,7 +308,7 @@ (mk-threads executor-data task-datas)) threads (concat handlers system-threads)] (setup-ticks! worker executor-data) - + (log-message "Finished loading executor " component-id ":" (pr-str executor-id)) ;; TODO: add method here to get rendered stats... have worker call that when heartbeating (reify @@ -377,8 +406,9 @@ )))) tuple-action-fn (fn [task-id ^TupleImpl tuple] (let [stream-id (.getSourceStreamId tuple)] - (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID) - (.rotate pending) + (condp = stream-id + Constants/SYSTEM_TICK_STREAM_ID (.rotate pending) + Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple) (let [id (.getValue tuple 0) [stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)] (when spout-id @@ -389,7 +419,7 @@ ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id) spout-id tuple-finished-info time-delta) ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id) - spout-id tuple-finished-info time-delta) + spout-id tuple-finished-info time-delta) ))) ;; TODO: on failure, emit tuple to failure stream )))) @@ -472,6 +502,7 @@ ))))) (reset! open-or-prepare-was-called? true) (log-message "Opened spout " component-id ":" (keys task-datas)) + (setup-metrics! executor-data) (disruptor/consumer-started! (:receive-queue executor-data)) (fn [] @@ -550,28 +581,32 @@ ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state ;; TODO: how to handle incremental updates as well as synchronizations at same time ;; TODO: need to version tuples somehow + ;;(log-debug "Received tuple " tuple " at task " task-id) ;; need to do it this way to avoid reflection - (let [task-data (get task-datas task-id) - ^IBolt bolt-obj (:object task-data) - user-context (:user-context task-data) - sampler? (sampler) - execute-sampler? (execute-sampler) - now (if (or sampler? execute-sampler?) (System/currentTimeMillis))] - (when sampler? - (.setProcessSampleStartTime tuple now)) - (when execute-sampler? - (.setExecuteSampleStartTime tuple now)) - (.execute bolt-obj tuple) - (let [delta (tuple-execute-time-delta! tuple)] - (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta)) - (when delta - (stats/bolt-execute-tuple! executor-stats - (.getSourceComponent tuple) - (.getSourceStreamId tuple) - delta) - ))))] + (let [stream-id (.getSourceStreamId tuple)] + (condp = stream-id + Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple) + (let [task-data (get task-datas task-id) + ^IBolt bolt-obj (:object task-data) + user-context (:user-context task-data) + sampler? (sampler) + execute-sampler? (execute-sampler) + now (if (or sampler? execute-sampler?) (System/currentTimeMillis))] + (when sampler? + (.setProcessSampleStartTime tuple now)) + (when execute-sampler? + (.setExecuteSampleStartTime tuple now)) + (.execute bolt-obj tuple) + (let [delta (tuple-execute-time-delta! tuple)] + (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta)) + (when delta + (stats/bolt-execute-tuple! executor-stats + (.getSourceComponent tuple) + (.getSourceStreamId tuple) + delta) + ))))))] ;; TODO: can get any SubscribedState objects out of the context now @@ -649,7 +684,8 @@ (report-error error) ))))) (reset! open-or-prepare-was-called? true) - (log-message "Prepared bolt " component-id ":" (keys task-datas)) + (log-message "Prepared bolt " component-id ":" (keys task-datas)) + (setup-metrics! executor-data) (let [receive-queue (:receive-queue executor-data) event-handler (mk-task-receiver executor-data tuple-action-fn)] diff --git a/src/clj/backtype/storm/daemon/task.clj b/src/clj/backtype/storm/daemon/task.clj index 7a314ef69..818e4fd88 100644 --- a/src/clj/backtype/storm/daemon/task.clj +++ b/src/clj/backtype/storm/daemon/task.clj @@ -28,7 +28,7 @@ (:default-shared-resources worker) (:user-shared-resources worker) (:shared-executor-data executor-data) - ))) + (:registered-metrics executor-data)))) (defn system-topology-context [worker executor-data tid] ((mk-topology-context-builder diff --git a/src/clj/backtype/storm/testing.clj b/src/clj/backtype/storm/testing.clj index cfa296511..10de04ddc 100644 --- a/src/clj/backtype/storm/testing.clj +++ b/src/clj/backtype/storm/testing.clj @@ -8,7 +8,7 @@ (:require [backtype.storm [process-simulator :as psim]]) (:import [org.apache.commons.io FileUtils]) (:import [java.io File]) - (:import [java.util HashMap]) + (:import [java.util HashMap ArrayList]) (:import [java.util.concurrent.atomic AtomicInteger]) (:import [java.util.concurrent ConcurrentHashMap]) (:import [backtype.storm.utils Time Utils RegisteredGlobalState]) @@ -587,6 +587,7 @@ [(int 1)] {} {} - (HashMap.))] + (HashMap.) + (ArrayList.))] (TupleImpl. context values 1 stream) )) diff --git a/src/jvm/backtype/storm/Config.java b/src/jvm/backtype/storm/Config.java index e8393c255..879fb32e2 100644 --- a/src/jvm/backtype/storm/Config.java +++ b/src/jvm/backtype/storm/Config.java @@ -410,6 +410,13 @@ public class Config extends HashMap { */ public static String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations"; + /* + * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format). + * Each listed class will be routed all the metrics data generated by the storm metrics API. + * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable. + */ + public static String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register"; + /** * The maximum parallelism allowed for a component in this topology. This configuration is diff --git a/src/jvm/backtype/storm/Constants.java b/src/jvm/backtype/storm/Constants.java index 950c533ad..705278986 100644 --- a/src/jvm/backtype/storm/Constants.java +++ b/src/jvm/backtype/storm/Constants.java @@ -8,4 +8,8 @@ public class Constants { public static final String SYSTEM_COMPONENT_ID = "__system"; public static final String SYSTEM_TICK_STREAM_ID = "__tick"; + public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics"; + public static final String METRICS_STREAM_ID = "__metrics"; + public static final String METRICS_TICK_STREAM_ID = "__metrics_tick"; } + \ No newline at end of file diff --git a/src/jvm/backtype/storm/metric/FixedValueMetric.java b/src/jvm/backtype/storm/metric/FixedValueMetric.java new file mode 100644 index 000000000..3e262ad4c --- /dev/null +++ b/src/jvm/backtype/storm/metric/FixedValueMetric.java @@ -0,0 +1,17 @@ +package backtype.storm.metric; + +public class FixedValueMetric implements IMetric { + Object _value; + + public FixedValueMetric(Object value) { + _value = value; + } + + public void setValue(Object value) { + _value = value; + } + + public Object getValueAndReset() { + return _value; + } +} diff --git a/src/jvm/backtype/storm/metric/IMetric.java b/src/jvm/backtype/storm/metric/IMetric.java new file mode 100644 index 000000000..ae57ff6c7 --- /dev/null +++ b/src/jvm/backtype/storm/metric/IMetric.java @@ -0,0 +1,5 @@ +package backtype.storm.metric; + +public interface IMetric { + public Object getValueAndReset(); +} diff --git a/src/jvm/backtype/storm/metric/IMetricsConsumer.java b/src/jvm/backtype/storm/metric/IMetricsConsumer.java new file mode 100644 index 000000000..f0e5e2444 --- /dev/null +++ b/src/jvm/backtype/storm/metric/IMetricsConsumer.java @@ -0,0 +1,21 @@ +package backtype.storm.metric; + +import backtype.storm.task.TopologyContext; +import java.util.Map; + +public interface IMetricsConsumer { + public static class DataPoint { + public String srcWorkerHost; + public int srcWorkerPort; + public String srcComponentId; + public int srcTaskId; + public long timestamp; + public int updateIntervalSecs; + public String name; + public Object value; + } + + void prepare(Map stormConf, Object registrationOptions, TopologyContext context); + void handleDataPoint(DataPoint dataPoint); + void cleanup(); +} \ No newline at end of file diff --git a/src/jvm/backtype/storm/metric/IReducer.java b/src/jvm/backtype/storm/metric/IReducer.java new file mode 100644 index 000000000..a9e00d33d --- /dev/null +++ b/src/jvm/backtype/storm/metric/IReducer.java @@ -0,0 +1,7 @@ +package backtype.storm.metric; + +public interface IReducer { + T init(); + T reduce(T accumulator, Object input); + Object extractResult(T accumulator); +} diff --git a/src/jvm/backtype/storm/metric/IncrementedMetric.java b/src/jvm/backtype/storm/metric/IncrementedMetric.java new file mode 100644 index 000000000..1f54feaad --- /dev/null +++ b/src/jvm/backtype/storm/metric/IncrementedMetric.java @@ -0,0 +1,22 @@ +package backtype.storm.metric; + +public class IncrementedMetric implements IMetric { + long _value = 0; + + public IncrementedMetric() { + } + + public void inc() { + _value++; + } + + public void inc(long incrementBy) { + _value += incrementBy; + } + + public Object getValueAndReset() { + long ret = _value; + _value = 0; + return ret; + } +} diff --git a/src/jvm/backtype/storm/metric/MeanReducer.java b/src/jvm/backtype/storm/metric/MeanReducer.java new file mode 100644 index 000000000..9ad3029cb --- /dev/null +++ b/src/jvm/backtype/storm/metric/MeanReducer.java @@ -0,0 +1,22 @@ +package backtype.storm.metric; + +class MeanReducerState { + public int count = 0; + public double sum = 0.0; +} + +public class MeanReducer implements IReducer { + public MeanReducerState init() { + return new MeanReducerState(); + } + + public MeanReducerState reduce(MeanReducerState acc, Object input) { + acc.count++; + acc.sum += (Double)input; + return acc; + } + + public Object extractResult(MeanReducerState acc) { + return new Double(acc.sum / (double)acc.count); + } +} diff --git a/src/jvm/backtype/storm/metric/MetricHolder.java b/src/jvm/backtype/storm/metric/MetricHolder.java new file mode 100644 index 000000000..27fd37191 --- /dev/null +++ b/src/jvm/backtype/storm/metric/MetricHolder.java @@ -0,0 +1,17 @@ +package backtype.storm.metric; + +public class MetricHolder { + private String _name; + private int _timeBucketIntervalInSecs; + private IMetric _metric; + + public MetricHolder(String name, IMetric metric, int timeBucketIntervalInSecs) { + _name = name; + _timeBucketIntervalInSecs = timeBucketIntervalInSecs; + _metric = metric; + } + + public String getName() { return _name; } + public int getTimeBucketIntervalInSecs() { return _timeBucketIntervalInSecs; } + public IMetric getMetric() { return _metric; } +} diff --git a/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java new file mode 100644 index 000000000..1acbd240a --- /dev/null +++ b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java @@ -0,0 +1,54 @@ +package backtype.storm.metric; + +import backtype.storm.Config; +import backtype.storm.task.IBolt; +import backtype.storm.task.OutputCollector; +import backtype.storm.task.TopologyContext; +import backtype.storm.tuple.Tuple; +import java.util.Map; + +public class MetricsConsumerBolt implements IBolt { + IMetricsConsumer _metricsConsumer; + String _consumerClassName; + OutputCollector _collector; + Object _registrationArgument; + + public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) { + _consumerClassName = consumerClassName; + _registrationArgument = registrationArgument; + } + + @Override + public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) { + try { + _metricsConsumer = (IMetricsConsumer)Class.forName(_consumerClassName).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Could not instantiate a class listed in config under section " + + Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e); + } + _metricsConsumer.prepare(stormConf, _registrationArgument, context); + _collector = collector; + } + + @Override + public void execute(Tuple input) { + IMetricsConsumer.DataPoint d = new IMetricsConsumer.DataPoint(); + d.srcComponentId = input.getSourceComponent(); + d.srcTaskId = input.getSourceTask(); + d.srcWorkerHost = input.getString(0); + d.srcWorkerPort = input.getInteger(1); + d.updateIntervalSecs = input.getInteger(2); + d.timestamp = input.getLong(3); + d.name = input.getString(4); + d.value = input.getValue(5); + + _metricsConsumer.handleDataPoint(d); + _collector.ack(input); + } + + @Override + public void cleanup() { + _metricsConsumer.cleanup(); + } + +} diff --git a/src/jvm/backtype/storm/metric/ReducedMetric.java b/src/jvm/backtype/storm/metric/ReducedMetric.java new file mode 100644 index 000000000..e8830e1e7 --- /dev/null +++ b/src/jvm/backtype/storm/metric/ReducedMetric.java @@ -0,0 +1,21 @@ +package backtype.storm.metric; + +public class ReducedMetric implements IMetric { + private IReducer _reducer; + private Object _accumulator; + + public ReducedMetric(IReducer reducer) { + _reducer = reducer; + _accumulator = _reducer.init(); + } + + public void update(Object value) { + _accumulator = _reducer.reduce(_accumulator, value); + } + + public Object getValueAndReset() { + Object ret = _reducer.extractResult(_accumulator); + _accumulator = _reducer.init(); + return ret; + } +} diff --git a/src/jvm/backtype/storm/task/TopologyContext.java b/src/jvm/backtype/storm/task/TopologyContext.java index e9d499a00..68c0dc491 100644 --- a/src/jvm/backtype/storm/task/TopologyContext.java +++ b/src/jvm/backtype/storm/task/TopologyContext.java @@ -4,6 +4,8 @@ import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; import backtype.storm.hooks.ITaskHook; +import backtype.storm.metric.IMetric; +import backtype.storm.metric.MetricHolder; import backtype.storm.state.ISubscribedState; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; @@ -29,6 +31,7 @@ public class TopologyContext extends WorkerTopologyContext { private Map _taskData = new HashMap(); private List _hooks = new ArrayList(); private Map _executorData; + private List _registeredMetrics; public TopologyContext(StormTopology topology, Map stormConf, @@ -36,12 +39,13 @@ public TopologyContext(StormTopology topology, Map stormConf, Map> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List workerTasks, Map defaultResources, - Map userResources, Map executorData) { + Map userResources, Map executorData, List registeredMetrics) { super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, workerPort, workerTasks, defaultResources, userResources); _taskId = taskId; _executorData = executorData; + _registeredMetrics = registeredMetrics; } /** @@ -190,4 +194,8 @@ public void addTaskHook(ITaskHook hook) { public Collection getHooks() { return _hooks; } + + public void registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) { + _registeredMetrics.add(new MetricHolder(name, metric, timeBucketSizeInSecs)); + } } \ No newline at end of file From a45098d7e32586484a611232cfe1727e008f46cb Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Wed, 31 Oct 2012 16:38:53 -0700 Subject: [PATCH 02/14] refactorings, metrics API has it's own namespace. --- src/clj/backtype/storm/daemon/executor.clj | 3 ++- .../backtype/storm/metric/MetricHolder.java | 2 ++ .../storm/metric/MetricsConsumerBolt.java | 1 + .../AssignableMetric.java} | 6 +++--- .../CountMetric.java} | 8 ++++--- .../storm/metric/{ => api}/IMetric.java | 2 +- .../metric/{ => api}/IMetricsConsumer.java | 2 +- .../storm/metric/{ => api}/IReducer.java | 2 +- .../storm/metric/{ => api}/MeanReducer.java | 4 +++- .../storm/metric/{ => api}/ReducedMetric.java | 2 +- .../backtype/storm/task/TopologyContext.java | 21 +++++++++++++++++-- 11 files changed, 39 insertions(+), 14 deletions(-) rename src/jvm/backtype/storm/metric/{FixedValueMetric.java => api/AssignableMetric.java} (59%) rename src/jvm/backtype/storm/metric/{IncrementedMetric.java => api/CountMetric.java} (64%) rename src/jvm/backtype/storm/metric/{ => api}/IMetric.java (66%) rename src/jvm/backtype/storm/metric/{ => api}/IMetricsConsumer.java (94%) rename src/jvm/backtype/storm/metric/{ => api}/IReducer.java (79%) rename src/jvm/backtype/storm/metric/{ => api}/MeanReducer.java (86%) rename src/jvm/backtype/storm/metric/{ => api}/ReducedMetric.java (93%) diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj index 1c10b2a20..03d2393aa 100644 --- a/src/clj/backtype/storm/daemon/executor.clj +++ b/src/clj/backtype/storm/daemon/executor.clj @@ -6,7 +6,8 @@ (:import [backtype.storm.spout ISpoutWaitStrategy]) (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo]) - (:import [backtype.storm.metric MetricHolder IMetric]) + (:import [backtype.storm.metric MetricHolder]) + (:import [backtype.storm.metric.api IMetric]) (:require [backtype.storm [tuple :as tuple]]) (:require [backtype.storm.daemon [task :as task]]) ) diff --git a/src/jvm/backtype/storm/metric/MetricHolder.java b/src/jvm/backtype/storm/metric/MetricHolder.java index 27fd37191..ca51e2b9c 100644 --- a/src/jvm/backtype/storm/metric/MetricHolder.java +++ b/src/jvm/backtype/storm/metric/MetricHolder.java @@ -1,5 +1,7 @@ package backtype.storm.metric; +import backtype.storm.metric.api.IMetric; + public class MetricHolder { private String _name; private int _timeBucketIntervalInSecs; diff --git a/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java index 1acbd240a..ce3fe2f94 100644 --- a/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java +++ b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java @@ -1,5 +1,6 @@ package backtype.storm.metric; +import backtype.storm.metric.api.IMetricsConsumer; import backtype.storm.Config; import backtype.storm.task.IBolt; import backtype.storm.task.OutputCollector; diff --git a/src/jvm/backtype/storm/metric/FixedValueMetric.java b/src/jvm/backtype/storm/metric/api/AssignableMetric.java similarity index 59% rename from src/jvm/backtype/storm/metric/FixedValueMetric.java rename to src/jvm/backtype/storm/metric/api/AssignableMetric.java index 3e262ad4c..b38a57e91 100644 --- a/src/jvm/backtype/storm/metric/FixedValueMetric.java +++ b/src/jvm/backtype/storm/metric/api/AssignableMetric.java @@ -1,9 +1,9 @@ -package backtype.storm.metric; +package backtype.storm.metric.api; -public class FixedValueMetric implements IMetric { +public class AssignableMetric implements IMetric { Object _value; - public FixedValueMetric(Object value) { + public AssignableMetric(Object value) { _value = value; } diff --git a/src/jvm/backtype/storm/metric/IncrementedMetric.java b/src/jvm/backtype/storm/metric/api/CountMetric.java similarity index 64% rename from src/jvm/backtype/storm/metric/IncrementedMetric.java rename to src/jvm/backtype/storm/metric/api/CountMetric.java index 1f54feaad..2a2b24137 100644 --- a/src/jvm/backtype/storm/metric/IncrementedMetric.java +++ b/src/jvm/backtype/storm/metric/api/CountMetric.java @@ -1,9 +1,11 @@ -package backtype.storm.metric; +package backtype.storm.metric.api; -public class IncrementedMetric implements IMetric { +import backtype.storm.metric.api.IMetric; + +public class CountMetric implements IMetric { long _value = 0; - public IncrementedMetric() { + public CountMetric() { } public void inc() { diff --git a/src/jvm/backtype/storm/metric/IMetric.java b/src/jvm/backtype/storm/metric/api/IMetric.java similarity index 66% rename from src/jvm/backtype/storm/metric/IMetric.java rename to src/jvm/backtype/storm/metric/api/IMetric.java index ae57ff6c7..400994d4c 100644 --- a/src/jvm/backtype/storm/metric/IMetric.java +++ b/src/jvm/backtype/storm/metric/api/IMetric.java @@ -1,4 +1,4 @@ -package backtype.storm.metric; +package backtype.storm.metric.api; public interface IMetric { public Object getValueAndReset(); diff --git a/src/jvm/backtype/storm/metric/IMetricsConsumer.java b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java similarity index 94% rename from src/jvm/backtype/storm/metric/IMetricsConsumer.java rename to src/jvm/backtype/storm/metric/api/IMetricsConsumer.java index f0e5e2444..39708e6d6 100644 --- a/src/jvm/backtype/storm/metric/IMetricsConsumer.java +++ b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java @@ -1,4 +1,4 @@ -package backtype.storm.metric; +package backtype.storm.metric.api; import backtype.storm.task.TopologyContext; import java.util.Map; diff --git a/src/jvm/backtype/storm/metric/IReducer.java b/src/jvm/backtype/storm/metric/api/IReducer.java similarity index 79% rename from src/jvm/backtype/storm/metric/IReducer.java rename to src/jvm/backtype/storm/metric/api/IReducer.java index a9e00d33d..929c31781 100644 --- a/src/jvm/backtype/storm/metric/IReducer.java +++ b/src/jvm/backtype/storm/metric/api/IReducer.java @@ -1,4 +1,4 @@ -package backtype.storm.metric; +package backtype.storm.metric.api; public interface IReducer { T init(); diff --git a/src/jvm/backtype/storm/metric/MeanReducer.java b/src/jvm/backtype/storm/metric/api/MeanReducer.java similarity index 86% rename from src/jvm/backtype/storm/metric/MeanReducer.java rename to src/jvm/backtype/storm/metric/api/MeanReducer.java index 9ad3029cb..b9830ee45 100644 --- a/src/jvm/backtype/storm/metric/MeanReducer.java +++ b/src/jvm/backtype/storm/metric/api/MeanReducer.java @@ -1,4 +1,6 @@ -package backtype.storm.metric; +package backtype.storm.metric.api; + +import backtype.storm.metric.api.IReducer; class MeanReducerState { public int count = 0; diff --git a/src/jvm/backtype/storm/metric/ReducedMetric.java b/src/jvm/backtype/storm/metric/api/ReducedMetric.java similarity index 93% rename from src/jvm/backtype/storm/metric/ReducedMetric.java rename to src/jvm/backtype/storm/metric/api/ReducedMetric.java index e8830e1e7..d59842036 100644 --- a/src/jvm/backtype/storm/metric/ReducedMetric.java +++ b/src/jvm/backtype/storm/metric/api/ReducedMetric.java @@ -1,4 +1,4 @@ -package backtype.storm.metric; +package backtype.storm.metric.api; public class ReducedMetric implements IMetric { private IReducer _reducer; diff --git a/src/jvm/backtype/storm/task/TopologyContext.java b/src/jvm/backtype/storm/task/TopologyContext.java index 68c0dc491..1ea46f558 100644 --- a/src/jvm/backtype/storm/task/TopologyContext.java +++ b/src/jvm/backtype/storm/task/TopologyContext.java @@ -4,8 +4,10 @@ import backtype.storm.generated.Grouping; import backtype.storm.generated.StormTopology; import backtype.storm.hooks.ITaskHook; -import backtype.storm.metric.IMetric; +import backtype.storm.metric.api.IMetric; +import backtype.storm.metric.api.IReducer; import backtype.storm.metric.MetricHolder; +import backtype.storm.metric.api.ReducedMetric; import backtype.storm.state.ISubscribedState; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; @@ -195,7 +197,22 @@ public Collection getHooks() { return _hooks; } - public void registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) { + /* + * Register a IMetric instance. + * Storm will then call getValueAndReset on the metric every timeBucketSizeInSecs + * and the returned value is sent to all metrics consumers. + * You must call this during IBolt::prepare or ISpout::open. + * @return The IMetric argument unchanged. + */ + public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) { _registeredMetrics.add(new MetricHolder(name, metric, timeBucketSizeInSecs)); + return metric; + } + + /* + * Convinience method for registering ReducedMetric. + */ + public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) { + return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs); } } \ No newline at end of file From d4d8e9bd4696f128f454b45da57c23a8283b0fc7 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Wed, 31 Oct 2012 18:41:21 -0700 Subject: [PATCH 03/14] IMetricsConsumer handles data points in batches. --- conf/defaults.yaml | 4 +++ src/clj/backtype/storm/daemon/common.clj | 4 +-- src/clj/backtype/storm/daemon/executor.clj | 35 +++++++++++-------- src/clj/backtype/storm/daemon/task.clj | 2 +- src/clj/backtype/storm/testing.clj | 2 +- .../backtype/storm/metric/MetricHolder.java | 16 +++------ .../storm/metric/MetricsConsumerBolt.java | 13 ++----- .../storm/metric/api/IMetricsConsumer.java | 25 +++++++++++-- .../backtype/storm/task/TopologyContext.java | 18 ++++++++-- 9 files changed, 72 insertions(+), 47 deletions(-) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 6da607b79..a4d0e7933 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -103,5 +103,9 @@ topology.max.error.report.per.interval: 5 topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" topology.trident.batch.emit.interval.millis: 500 +### register classes used in implementation of metrics api. +topology.kryo.register: + - backtype.storm.metric.api.IMetricsConsumer$TaskInfo + - backtype.storm.metric.api.IMetricsConsumer$DataPoint dev.zookeeper.path: "/tmp/dev-storm-zookeeper" diff --git a/src/clj/backtype/storm/daemon/common.clj b/src/clj/backtype/storm/daemon/common.clj index 780c40726..393a468a0 100644 --- a/src/clj/backtype/storm/daemon/common.clj +++ b/src/clj/backtype/storm/daemon/common.clj @@ -212,7 +212,7 @@ (doseq [[_ component] (all-components topology) :let [common (.get_common component)]] (.put_to_streams common METRICS-STREAM-ID - (thrift/output-fields ["worker-host" "worker-port" "interval" "timestamp" "name" "value"])))) + (thrift/output-fields ["task-info" "data-points"])))) (defn add-system-streams! [^StormTopology topology] (doseq [[_ component] (all-components topology) @@ -230,7 +230,7 @@ (reverse))) (defn number-duplicates [coll] - "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a2\"]" + "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]" (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll)) (defn metrics-consumer-register-ids [storm-conf] diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj index 03d2393aa..411f67082 100644 --- a/src/clj/backtype/storm/daemon/executor.clj +++ b/src/clj/backtype/storm/daemon/executor.clj @@ -7,7 +7,7 @@ (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo]) (:import [backtype.storm.metric MetricHolder]) - (:import [backtype.storm.metric.api IMetric]) + (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint]) (:require [backtype.storm [tuple :as tuple]]) (:require [backtype.storm.daemon [task :as task]]) ) @@ -214,7 +214,7 @@ :type executor-type ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field) :stats (mk-executor-stats <> (sampling-rate storm-conf)) - :registered-metrics (ArrayList.) + :interval->task->registered-metrics (HashMap.) :task->component (:task->component worker) :stream->component->grouper (outbound-components worker-context component-id) :report-error (throttled-report-error-fn <>) @@ -244,8 +244,8 @@ :kill-fn (:report-error-and-die executor-data)))) (defn setup-metrics! [executor-data] - (let [{:keys [storm-conf receive-queue worker-context registered-metrics]} executor-data - distinct-time-bucket-intervals (->> registered-metrics (map #(.getTimeBucketIntervalInSecs %)) distinct)] + (let [{:keys [storm-conf receive-queue worker-context interval->task->registered-metrics]} executor-data + distinct-time-bucket-intervals (keys interval->task->registered-metrics)] (doseq [interval distinct-time-bucket-intervals] (schedule-recurring (:user-timer (:worker executor-data)) @@ -257,18 +257,23 @@ [[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]])))))) (defn metrics-tick [executor-data task-datas ^TupleImpl tuple] - (let [{:keys [registered-metrics ^WorkerTopologyContext worker-context]} executor-data + (let [{:keys [interval->task->registered-metrics ^WorkerTopologyContext worker-context]} executor-data interval (.getInteger tuple 0)] - (doseq [^MetricHolder mh registered-metrics] - (when (= interval (.getTimeBucketIntervalInSecs mh)) - (let [^IMetric metric (.getMetric mh) - name (.getName mh) - value (.getValueAndReset metric) - timestamp (System/currentTimeMillis) - worker-host (. (java.net.InetAddress/getLocalHost) getCanonicalHostName) - worker-port (.getThisWorkerPort worker-context)] - (doseq [[task-id task-data] task-datas] - (task/send-unanchored task-data Constants/METRICS_STREAM_ID [worker-host worker-port interval timestamp name value]))))))) + (doseq [[task-id task-data] task-datas + :let [metric-holders (-> interval->task->registered-metrics (get interval) (get task-id)) + task-info (IMetricsConsumer$TaskInfo. + (. (java.net.InetAddress/getLocalHost) getCanonicalHostName) + (.getThisWorkerPort worker-context) + (:component-id executor-data) + task-id + (System/currentTimeMillis) + interval) + data-points (->> metric-holders + (map (fn [^MetricHolder mh] + (IMetricsConsumer$DataPoint. (.name mh) + (.getValueAndReset ^IMetric (.metric mh))))) + (into []))]] + (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points])))) (defn setup-ticks! [worker executor-data] (let [storm-conf (:storm-conf executor-data) diff --git a/src/clj/backtype/storm/daemon/task.clj b/src/clj/backtype/storm/daemon/task.clj index 818e4fd88..7a5f1d9c8 100644 --- a/src/clj/backtype/storm/daemon/task.clj +++ b/src/clj/backtype/storm/daemon/task.clj @@ -28,7 +28,7 @@ (:default-shared-resources worker) (:user-shared-resources worker) (:shared-executor-data executor-data) - (:registered-metrics executor-data)))) + (:interval->task->registered-metrics executor-data)))) (defn system-topology-context [worker executor-data tid] ((mk-topology-context-builder diff --git a/src/clj/backtype/storm/testing.clj b/src/clj/backtype/storm/testing.clj index 10de04ddc..0c8f2160b 100644 --- a/src/clj/backtype/storm/testing.clj +++ b/src/clj/backtype/storm/testing.clj @@ -588,6 +588,6 @@ {} {} (HashMap.) - (ArrayList.))] + (HashMap.))] (TupleImpl. context values 1 stream) )) diff --git a/src/jvm/backtype/storm/metric/MetricHolder.java b/src/jvm/backtype/storm/metric/MetricHolder.java index ca51e2b9c..92ec07384 100644 --- a/src/jvm/backtype/storm/metric/MetricHolder.java +++ b/src/jvm/backtype/storm/metric/MetricHolder.java @@ -3,17 +3,11 @@ import backtype.storm.metric.api.IMetric; public class MetricHolder { - private String _name; - private int _timeBucketIntervalInSecs; - private IMetric _metric; + public String name; + public IMetric metric; - public MetricHolder(String name, IMetric metric, int timeBucketIntervalInSecs) { - _name = name; - _timeBucketIntervalInSecs = timeBucketIntervalInSecs; - _metric = metric; + public MetricHolder(String name, IMetric metric) { + this.name = name; + this.metric = metric; } - - public String getName() { return _name; } - public int getTimeBucketIntervalInSecs() { return _timeBucketIntervalInSecs; } - public IMetric getMetric() { return _metric; } } diff --git a/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java index ce3fe2f94..e73556c72 100644 --- a/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java +++ b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java @@ -6,6 +6,7 @@ import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Tuple; +import java.util.Collection; import java.util.Map; public class MetricsConsumerBolt implements IBolt { @@ -33,17 +34,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll @Override public void execute(Tuple input) { - IMetricsConsumer.DataPoint d = new IMetricsConsumer.DataPoint(); - d.srcComponentId = input.getSourceComponent(); - d.srcTaskId = input.getSourceTask(); - d.srcWorkerHost = input.getString(0); - d.srcWorkerPort = input.getInteger(1); - d.updateIntervalSecs = input.getInteger(2); - d.timestamp = input.getLong(3); - d.name = input.getString(4); - d.value = input.getValue(5); - - _metricsConsumer.handleDataPoint(d); + _metricsConsumer.handleDataPoints((IMetricsConsumer.TaskInfo)input.getValue(0), (Collection)input.getValue(1)); _collector.ack(input); } diff --git a/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java index 39708e6d6..d43fa2e18 100644 --- a/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java +++ b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java @@ -1,21 +1,40 @@ package backtype.storm.metric.api; import backtype.storm.task.TopologyContext; +import java.util.Collection; import java.util.Map; public interface IMetricsConsumer { - public static class DataPoint { + public static class TaskInfo { + public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId, int srcTaskId, long timestamp, int updateIntervalSecs) { + this.srcWorkerHost = srcWorkerHost; + this.srcWorkerPort = srcWorkerPort; + this.srcComponentId = srcComponentId; + this.srcTaskId = srcTaskId; + this.timestamp = timestamp; + this.updateIntervalSecs = updateIntervalSecs; + } public String srcWorkerHost; public int srcWorkerPort; public String srcComponentId; public int srcTaskId; public long timestamp; public int updateIntervalSecs; + } + public static class DataPoint { + public DataPoint(String name, Object value) { + this.name = name; + this.value = value; + } + @Override + public String toString() { + return "[" + name + " = " + value + "]"; + } public String name; public Object value; } - void prepare(Map stormConf, Object registrationOptions, TopologyContext context); - void handleDataPoint(DataPoint dataPoint); + void prepare(Map stormConf, Object registrationArgument, TopologyContext context); + void handleDataPoints(TaskInfo taskInfo, Collection dataPoints); void cleanup(); } \ No newline at end of file diff --git a/src/jvm/backtype/storm/task/TopologyContext.java b/src/jvm/backtype/storm/task/TopologyContext.java index 1ea46f558..90014739f 100644 --- a/src/jvm/backtype/storm/task/TopologyContext.java +++ b/src/jvm/backtype/storm/task/TopologyContext.java @@ -33,7 +33,7 @@ public class TopologyContext extends WorkerTopologyContext { private Map _taskData = new HashMap(); private List _hooks = new ArrayList(); private Map _executorData; - private List _registeredMetrics; + private Map>> _registeredMetrics; public TopologyContext(StormTopology topology, Map stormConf, @@ -41,7 +41,7 @@ public TopologyContext(StormTopology topology, Map stormConf, Map> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List workerTasks, Map defaultResources, - Map userResources, Map executorData, List registeredMetrics) { + Map userResources, Map executorData, Map registeredMetrics) { super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, workerPort, workerTasks, defaultResources, userResources); @@ -205,7 +205,19 @@ public Collection getHooks() { * @return The IMetric argument unchanged. */ public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) { - _registeredMetrics.add(new MetricHolder(name, metric, timeBucketSizeInSecs)); + Map m1 = _registeredMetrics; + if(!m1.containsKey(timeBucketSizeInSecs)) { + m1.put(timeBucketSizeInSecs, new HashMap()); + } + + Map m2 = (Map)m1.get(timeBucketSizeInSecs); + if(!m2.containsKey(_taskId)) { + m2.put(_taskId, new ArrayList()); + } + + Collection c1 = (Collection)m2.get(_taskId); + c1.add(new MetricHolder(name, metric)); + return metric; } From d210aec9c6880f7c388f014b2a76563617f45575 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Wed, 31 Oct 2012 18:59:09 -0700 Subject: [PATCH 04/14] 1) added CombinedMetric 2) error if registerMetrics called after component was prepared/opened. --- src/clj/backtype/storm/daemon/task.clj | 3 ++- .../storm/metric/api/CombinedMetric.java | 21 +++++++++++++++++++ .../backtype/storm/metric/api/ICombiner.java | 6 ++++++ .../storm/metric/api/ReducedMetric.java | 2 +- .../backtype/storm/task/TopologyContext.java | 20 ++++++++++++++++-- 5 files changed, 48 insertions(+), 4 deletions(-) create mode 100644 src/jvm/backtype/storm/metric/api/CombinedMetric.java create mode 100644 src/jvm/backtype/storm/metric/api/ICombiner.java diff --git a/src/clj/backtype/storm/daemon/task.clj b/src/clj/backtype/storm/daemon/task.clj index 7a5f1d9c8..566680fd4 100644 --- a/src/clj/backtype/storm/daemon/task.clj +++ b/src/clj/backtype/storm/daemon/task.clj @@ -28,7 +28,8 @@ (:default-shared-resources worker) (:user-shared-resources worker) (:shared-executor-data executor-data) - (:interval->task->registered-metrics executor-data)))) + (:interval->task->registered-metrics executor-data) + (:open-or-prepare-was-called? executor-data)))) (defn system-topology-context [worker executor-data tid] ((mk-topology-context-builder diff --git a/src/jvm/backtype/storm/metric/api/CombinedMetric.java b/src/jvm/backtype/storm/metric/api/CombinedMetric.java new file mode 100644 index 000000000..cd7b08bc6 --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/CombinedMetric.java @@ -0,0 +1,21 @@ +package backtype.storm.metric.api; + +public class CombinedMetric implements IMetric { + private final ICombiner _combiner; + private Object _value; + + public CombinedMetric(ICombiner combiner) { + _combiner = combiner; + _value = _combiner.identity(); + } + + public void update(Object value) { + _value = _combiner.combine(_value, value); + } + + public Object getValueAndReset() { + Object ret = _value; + _value = _combiner.identity(); + return ret; + } +} diff --git a/src/jvm/backtype/storm/metric/api/ICombiner.java b/src/jvm/backtype/storm/metric/api/ICombiner.java new file mode 100644 index 000000000..7eb468e4e --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/ICombiner.java @@ -0,0 +1,6 @@ +package backtype.storm.metric.api; + +public interface ICombiner { + public T identity(); + public T combine(T a, T b); +} diff --git a/src/jvm/backtype/storm/metric/api/ReducedMetric.java b/src/jvm/backtype/storm/metric/api/ReducedMetric.java index d59842036..cfeef3b70 100644 --- a/src/jvm/backtype/storm/metric/api/ReducedMetric.java +++ b/src/jvm/backtype/storm/metric/api/ReducedMetric.java @@ -1,7 +1,7 @@ package backtype.storm.metric.api; public class ReducedMetric implements IMetric { - private IReducer _reducer; + private final IReducer _reducer; private Object _accumulator; public ReducedMetric(IReducer reducer) { diff --git a/src/jvm/backtype/storm/task/TopologyContext.java b/src/jvm/backtype/storm/task/TopologyContext.java index 90014739f..057d5a22c 100644 --- a/src/jvm/backtype/storm/task/TopologyContext.java +++ b/src/jvm/backtype/storm/task/TopologyContext.java @@ -6,8 +6,10 @@ import backtype.storm.hooks.ITaskHook; import backtype.storm.metric.api.IMetric; import backtype.storm.metric.api.IReducer; -import backtype.storm.metric.MetricHolder; +import backtype.storm.metric.api.ICombiner; import backtype.storm.metric.api.ReducedMetric; +import backtype.storm.metric.api.CombinedMetric; +import backtype.storm.metric.MetricHolder; import backtype.storm.state.ISubscribedState; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; @@ -34,6 +36,7 @@ public class TopologyContext extends WorkerTopologyContext { private List _hooks = new ArrayList(); private Map _executorData; private Map>> _registeredMetrics; + private clojure.lang.Atom _openOrPrepareWasCalled; public TopologyContext(StormTopology topology, Map stormConf, @@ -41,13 +44,15 @@ public TopologyContext(StormTopology topology, Map stormConf, Map> componentToStreamToFields, String stormId, String codeDir, String pidDir, Integer taskId, Integer workerPort, List workerTasks, Map defaultResources, - Map userResources, Map executorData, Map registeredMetrics) { + Map userResources, Map executorData, Map registeredMetrics, + clojure.lang.Atom openOrPrepareWasCalled) { super(topology, stormConf, taskToComponent, componentToSortedTasks, componentToStreamToFields, stormId, codeDir, pidDir, workerPort, workerTasks, defaultResources, userResources); _taskId = taskId; _executorData = executorData; _registeredMetrics = registeredMetrics; + _openOrPrepareWasCalled = openOrPrepareWasCalled; } /** @@ -205,6 +210,11 @@ public Collection getHooks() { * @return The IMetric argument unchanged. */ public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) { + if((Boolean)_openOrPrepareWasCalled.deref() == true) { + throw new RuntimeException("TopologyContext.registerMetric can only be called from within overridden " + + "IBolt::prepare() or ISpout::open() method."); + } + Map m1 = _registeredMetrics; if(!m1.containsKey(timeBucketSizeInSecs)) { m1.put(timeBucketSizeInSecs, new HashMap()); @@ -227,4 +237,10 @@ public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInS public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) { return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs); } + /* + * Convinience method for registering ReducedMetric. + */ + public IMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { + return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); + } } \ No newline at end of file From a3885fddac794c2fb960fb86bf421f6dcd3512ef Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Wed, 31 Oct 2012 19:11:56 -0700 Subject: [PATCH 05/14] fixed comment --- src/jvm/backtype/storm/task/TopologyContext.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/jvm/backtype/storm/task/TopologyContext.java b/src/jvm/backtype/storm/task/TopologyContext.java index 057d5a22c..8492f06c7 100644 --- a/src/jvm/backtype/storm/task/TopologyContext.java +++ b/src/jvm/backtype/storm/task/TopologyContext.java @@ -238,7 +238,7 @@ public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeI return registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs); } /* - * Convinience method for registering ReducedMetric. + * Convinience method for registering CombinedMetric. */ public IMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); From b375d5a0c6954cdea52500ee0fdce84955c01399 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Thu, 1 Nov 2012 12:10:55 -0700 Subject: [PATCH 06/14] added one metrics unit test. --- src/clj/backtype/storm/metric/testing.clj | 24 +++++++++ src/clj/backtype/storm/testing.clj | 3 +- test/clj/backtype/storm/metrics_test.clj | 62 +++++++++++++++++++++++ 3 files changed, 88 insertions(+), 1 deletion(-) create mode 100644 src/clj/backtype/storm/metric/testing.clj create mode 100644 test/clj/backtype/storm/metrics_test.clj diff --git a/src/clj/backtype/storm/metric/testing.clj b/src/clj/backtype/storm/metric/testing.clj new file mode 100644 index 000000000..414be012c --- /dev/null +++ b/src/clj/backtype/storm/metric/testing.clj @@ -0,0 +1,24 @@ +(ns backtype.storm.metric.testing + "This namespace is for AOT dependent metrics testing code." + (:gen-class)) + +(gen-class + :name clojure.storm.metric.testing.FakeMetricConsumer + :implements [backtype.storm.metric.api.IMetricsConsumer] + :prefix "impl-" + :state state + :init init) + +(defn impl-init [] [[] (atom [])]) + +(defn impl-prepare [this conf {:keys [ns var-name]} ctx] + (reset! (.state this) @(intern ns var-name)) + (reset! @(.state this) [])) + +(defn impl-cleanup [this] + (reset! @(.state this) [])) + +(defn impl-handleDataPoints [this task-info data-points] + (swap! @(.state this) conj [task-info data-points])) + + diff --git a/src/clj/backtype/storm/testing.clj b/src/clj/backtype/storm/testing.clj index 0c8f2160b..fe264d08c 100644 --- a/src/clj/backtype/storm/testing.clj +++ b/src/clj/backtype/storm/testing.clj @@ -588,6 +588,7 @@ {} {} (HashMap.) - (HashMap.))] + (HashMap.) + (atom false))] (TupleImpl. context values 1 stream) )) diff --git a/test/clj/backtype/storm/metrics_test.clj b/test/clj/backtype/storm/metrics_test.clj new file mode 100644 index 000000000..e794f686e --- /dev/null +++ b/test/clj/backtype/storm/metrics_test.clj @@ -0,0 +1,62 @@ +(ns backtype.storm.metrics-test + (:use [clojure test]) + (:import [backtype.storm.topology TopologyBuilder]) + (:import [backtype.storm.generated InvalidTopologyException SubmitOptions TopologyInitialStatus]) + (:import [backtype.storm.testing TestWordCounter TestWordSpout TestGlobalCount + TestAggregatesCounter TestConfBolt AckFailMapTracker]) + (:import [backtype.storm.metric.api CountMetric IMetricsConsumer$DataPoint IMetricsConsumer$TaskInfo]) + + (:use [backtype.storm bootstrap testing]) + (:use [backtype.storm.daemon common]) + (:use [backtype.storm.metric testing])) + + +(bootstrap) + +(defbolt count-acks {} {:prepare true} + [conf context collector] + + (let [ack-count (CountMetric.)] + (.registerMetric context "ack-count" ack-count 5) + (bolt + (execute [tuple] + (.inc ack-count) + (ack! collector tuple))))) + +(def datapoints-buffer (atom nil)) + +(defn metric-name->vals! [name] + (->> @datapoints-buffer + (mapcat (fn [[task-info data-points]] data-points)) + (filter #(= name (.name %))) + (map #(.value %)) + (into []))) + +(deftest test-time-buckets + (with-simulated-time-local-cluster + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [{"class" "clojure.storm.metric.testing.FakeMetricConsumer" + "argument" {:ns (.ns #'datapoints-buffer) :var-name 'datapoints-buffer}}]}] + (let [feeder (feeder-spout ["field1"]) + tracker (AckFailMapTracker.) + _ (.setAckFailDelegate feeder tracker) + topology (thrift/mk-topology + {"1" (thrift/mk-spout-spec feeder)} + {"2" (thrift/mk-bolt-spec {"1" :global} count-acks)})] + (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology) + + (.feed feeder ["a"] 1) + (advance-cluster-time cluster 6) + (is (= [1] (metric-name->vals! "ack-count"))) + + (advance-cluster-time cluster 5) + (is (= [1 0] (metric-name->vals! "ack-count"))) + + (advance-cluster-time cluster 20) + (is (= [1 0 0 0 0 0] (metric-name->vals! "ack-count"))) + + (.feed feeder ["b"] 2) + (.feed feeder ["c"] 3) + (advance-cluster-time cluster 5) + (is (= [1 0 0 0 0 0 2] (metric-name->vals! "ack-count")))))) + From d32701f554bee472f9e848194a8d2070ec31f879 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Fri, 2 Nov 2012 13:34:39 -0700 Subject: [PATCH 07/14] added parameter OutputCollector to IMetricsConsumer::prepare, so that MetricsConsumer can reportError. --- src/clj/backtype/storm/daemon/executor.clj | 5 +++-- src/jvm/backtype/storm/metric/MetricsConsumerBolt.java | 2 +- src/jvm/backtype/storm/metric/api/IMetricsConsumer.java | 3 ++- 3 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj index 411f67082..6a51c5126 100644 --- a/src/clj/backtype/storm/daemon/executor.clj +++ b/src/clj/backtype/storm/daemon/executor.clj @@ -266,14 +266,15 @@ (.getThisWorkerPort worker-context) (:component-id executor-data) task-id - (System/currentTimeMillis) + (long (/ (System/currentTimeMillis) 1000)) interval) data-points (->> metric-holders (map (fn [^MetricHolder mh] (IMetricsConsumer$DataPoint. (.name mh) (.getValueAndReset ^IMetric (.metric mh))))) (into []))]] - (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points])))) + (if data-points + (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))) (defn setup-ticks! [worker executor-data] (let [storm-conf (:storm-conf executor-data) diff --git a/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java index e73556c72..98127e415 100644 --- a/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java +++ b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java @@ -28,7 +28,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll throw new RuntimeException("Could not instantiate a class listed in config under section " + Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e); } - _metricsConsumer.prepare(stormConf, _registrationArgument, context); + _metricsConsumer.prepare(stormConf, _registrationArgument, context, collector); _collector = collector; } diff --git a/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java index d43fa2e18..ecaf87894 100644 --- a/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java +++ b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java @@ -1,5 +1,6 @@ package backtype.storm.metric.api; +import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import java.util.Collection; import java.util.Map; @@ -34,7 +35,7 @@ public String toString() { public Object value; } - void prepare(Map stormConf, Object registrationArgument, TopologyContext context); + void prepare(Map stormConf, Object registrationArgument, TopologyContext context, OutputCollector outputCollector); void handleDataPoints(TaskInfo taskInfo, Collection dataPoints); void cleanup(); } \ No newline at end of file From f1a4cf3ec77680d2e543dbae97dbdbabdd0fbcdc Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Fri, 2 Nov 2012 13:36:14 -0700 Subject: [PATCH 08/14] tiny change for best practice. --- src/clj/backtype/storm/daemon/executor.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj index 6a51c5126..87f974d78 100644 --- a/src/clj/backtype/storm/daemon/executor.clj +++ b/src/clj/backtype/storm/daemon/executor.clj @@ -273,7 +273,7 @@ (IMetricsConsumer$DataPoint. (.name mh) (.getValueAndReset ^IMetric (.metric mh))))) (into []))]] - (if data-points + (if (seq data-points) (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))) (defn setup-ticks! [worker executor-data] From 69ada7049d110ff4aad5c5acc4471e3bf0383a6c Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Fri, 2 Nov 2012 15:11:17 -0700 Subject: [PATCH 09/14] Moved IOutputCollector::reportError into its own interface IErrorReporter --- src/clj/backtype/storm/metric/testing.clj | 2 +- src/jvm/backtype/storm/metric/MetricsConsumerBolt.java | 5 +++-- src/jvm/backtype/storm/metric/api/IMetricsConsumer.java | 4 ++-- src/jvm/backtype/storm/task/IErrorReporter.java | 5 +++++ src/jvm/backtype/storm/task/IOutputCollector.java | 3 +-- 5 files changed, 12 insertions(+), 7 deletions(-) create mode 100644 src/jvm/backtype/storm/task/IErrorReporter.java diff --git a/src/clj/backtype/storm/metric/testing.clj b/src/clj/backtype/storm/metric/testing.clj index 414be012c..d46050fac 100644 --- a/src/clj/backtype/storm/metric/testing.clj +++ b/src/clj/backtype/storm/metric/testing.clj @@ -11,7 +11,7 @@ (defn impl-init [] [[] (atom [])]) -(defn impl-prepare [this conf {:keys [ns var-name]} ctx] +(defn impl-prepare [this conf {:keys [ns var-name]} ctx error-reporter] (reset! (.state this) @(intern ns var-name)) (reset! @(.state this) [])) diff --git a/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java index 98127e415..a8a697506 100644 --- a/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java +++ b/src/jvm/backtype/storm/metric/MetricsConsumerBolt.java @@ -1,8 +1,9 @@ package backtype.storm.metric; -import backtype.storm.metric.api.IMetricsConsumer; import backtype.storm.Config; +import backtype.storm.metric.api.IMetricsConsumer; import backtype.storm.task.IBolt; +import backtype.storm.task.IErrorReporter; import backtype.storm.task.OutputCollector; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Tuple; @@ -28,7 +29,7 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll throw new RuntimeException("Could not instantiate a class listed in config under section " + Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e); } - _metricsConsumer.prepare(stormConf, _registrationArgument, context, collector); + _metricsConsumer.prepare(stormConf, _registrationArgument, context, (IErrorReporter)collector); _collector = collector; } diff --git a/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java index ecaf87894..b5f3702bc 100644 --- a/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java +++ b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java @@ -1,6 +1,6 @@ package backtype.storm.metric.api; -import backtype.storm.task.OutputCollector; +import backtype.storm.task.IErrorReporter; import backtype.storm.task.TopologyContext; import java.util.Collection; import java.util.Map; @@ -35,7 +35,7 @@ public String toString() { public Object value; } - void prepare(Map stormConf, Object registrationArgument, TopologyContext context, OutputCollector outputCollector); + void prepare(Map stormConf, Object registrationArgument, TopologyContext context, IErrorReporter errorReporter); void handleDataPoints(TaskInfo taskInfo, Collection dataPoints); void cleanup(); } \ No newline at end of file diff --git a/src/jvm/backtype/storm/task/IErrorReporter.java b/src/jvm/backtype/storm/task/IErrorReporter.java new file mode 100644 index 000000000..d2e7e5d33 --- /dev/null +++ b/src/jvm/backtype/storm/task/IErrorReporter.java @@ -0,0 +1,5 @@ +package backtype.storm.task; + +public interface IErrorReporter { + void reportError(Throwable error); +} diff --git a/src/jvm/backtype/storm/task/IOutputCollector.java b/src/jvm/backtype/storm/task/IOutputCollector.java index 8381895e6..8e56bfa60 100644 --- a/src/jvm/backtype/storm/task/IOutputCollector.java +++ b/src/jvm/backtype/storm/task/IOutputCollector.java @@ -4,7 +4,7 @@ import java.util.Collection; import java.util.List; -public interface IOutputCollector { +public interface IOutputCollector extends IErrorReporter { /** * Returns the task ids that received the tuples. */ @@ -12,5 +12,4 @@ public interface IOutputCollector { void emitDirect(int taskId, String streamId, Collection anchors, List tuple); void ack(Tuple input); void fail(Tuple input); - void reportError(Throwable error); } From 079ebf8192a97d8e280790546005b04c5743e4a1 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Thu, 8 Nov 2012 14:17:51 -0800 Subject: [PATCH 10/14] fixed serialization exception due to metrics classes not being registered properly --- conf/defaults.yaml | 5 ----- src/clj/backtype/storm/daemon/common.clj | 2 +- src/jvm/backtype/storm/metric/api/IMetricsConsumer.java | 2 ++ .../backtype/storm/serialization/SerializationFactory.java | 2 ++ 4 files changed, 5 insertions(+), 6 deletions(-) diff --git a/conf/defaults.yaml b/conf/defaults.yaml index a4d0e7933..b469cf0c7 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -103,9 +103,4 @@ topology.max.error.report.per.interval: 5 topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory" topology.trident.batch.emit.interval.millis: 500 -### register classes used in implementation of metrics api. -topology.kryo.register: - - backtype.storm.metric.api.IMetricsConsumer$TaskInfo - - backtype.storm.metric.api.IMetricsConsumer$DataPoint - dev.zookeeper.path: "/tmp/dev-storm-zookeeper" diff --git a/src/clj/backtype/storm/daemon/common.clj b/src/clj/backtype/storm/daemon/common.clj index 393a468a0..17b88c3e5 100644 --- a/src/clj/backtype/storm/daemon/common.clj +++ b/src/clj/backtype/storm/daemon/common.clj @@ -239,7 +239,7 @@ (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER) (map #(get % "class")) (number-duplicates) - (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %)))) + (map #(str Constants/METRICS_COMPONENT_ID_PREFIX "_" %)))) (defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf] (let [inputs (->> (for [comp-id components-ids-that-emit-metrics] diff --git a/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java index b5f3702bc..5bfece354 100644 --- a/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java +++ b/src/jvm/backtype/storm/metric/api/IMetricsConsumer.java @@ -7,6 +7,7 @@ public interface IMetricsConsumer { public static class TaskInfo { + public TaskInfo() {} public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId, int srcTaskId, long timestamp, int updateIntervalSecs) { this.srcWorkerHost = srcWorkerHost; this.srcWorkerPort = srcWorkerPort; @@ -23,6 +24,7 @@ public TaskInfo(String srcWorkerHost, int srcWorkerPort, String srcComponentId, public int updateIntervalSecs; } public static class DataPoint { + public DataPoint() {} public DataPoint(String name, Object value) { this.name = name; this.value = value; diff --git a/src/jvm/backtype/storm/serialization/SerializationFactory.java b/src/jvm/backtype/storm/serialization/SerializationFactory.java index 43aebacc0..d9bd89206 100644 --- a/src/jvm/backtype/storm/serialization/SerializationFactory.java +++ b/src/jvm/backtype/storm/serialization/SerializationFactory.java @@ -38,6 +38,8 @@ public static Kryo getKryo(Map conf) { k.register(BigInteger.class, new BigIntegerSerializer()); k.register(TransactionAttempt.class); k.register(Values.class); + k.register(backtype.storm.metric.api.IMetricsConsumer.DataPoint.class); + k.register(backtype.storm.metric.api.IMetricsConsumer.TaskInfo.class); try { JavaBridge.registerPrimitives(k); JavaBridge.registerCollections(k); From 0f59a40950fe3af6c4b00ec9fb2b6c5d84620fca Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Thu, 8 Nov 2012 16:04:28 -0800 Subject: [PATCH 11/14] this change was causing a weird exception in nimbus, reverting it. --- src/clj/backtype/storm/daemon/common.clj | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/clj/backtype/storm/daemon/common.clj b/src/clj/backtype/storm/daemon/common.clj index 17b88c3e5..393a468a0 100644 --- a/src/clj/backtype/storm/daemon/common.clj +++ b/src/clj/backtype/storm/daemon/common.clj @@ -239,7 +239,7 @@ (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER) (map #(get % "class")) (number-duplicates) - (map #(str Constants/METRICS_COMPONENT_ID_PREFIX "_" %)))) + (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %)))) (defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf] (let [inputs (->> (for [comp-id components-ids-that-emit-metrics] From c177d096666496a06cf0e6282e2d6849f6641e61 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Tue, 13 Nov 2012 00:54:27 -0800 Subject: [PATCH 12/14] Added built-in storm metrics, Added MultiReducedMetric. Refactored MultiCountMetric. Added Unit tests. --- conf/defaults.yaml | 1 + .../backtype/storm/daemon/builtin_metrics.clj | 64 +++++++ src/clj/backtype/storm/daemon/common.clj | 2 +- src/clj/backtype/storm/daemon/executor.clj | 55 +++--- src/clj/backtype/storm/daemon/task.clj | 19 +- src/clj/backtype/storm/metric/testing.clj | 49 +++-- src/jvm/backtype/storm/Config.java | 25 +++ .../backtype/storm/metric/MetricHolder.java | 13 -- .../storm/metric/api/CountMetric.java | 4 +- .../storm/metric/api/MeanReducer.java | 12 +- .../storm/metric/api/MultiCountMetric.java | 28 +++ .../storm/metric/api/MultiReducedMetric.java | 30 ++++ .../backtype/storm/task/TopologyContext.java | 13 +- test/clj/backtype/storm/metrics_test.clj | 169 +++++++++++++++--- 14 files changed, 402 insertions(+), 82 deletions(-) create mode 100644 src/clj/backtype/storm/daemon/builtin_metrics.clj delete mode 100644 src/jvm/backtype/storm/metric/MetricHolder.java create mode 100644 src/jvm/backtype/storm/metric/api/MultiCountMetric.java create mode 100644 src/jvm/backtype/storm/metric/api/MultiReducedMetric.java diff --git a/conf/defaults.yaml b/conf/defaults.yaml index b469cf0c7..a8ebbb459 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -87,6 +87,7 @@ topology.max.task.parallelism: null topology.max.spout.pending: null topology.state.synchronization.timeout.secs: 60 topology.stats.sample.rate: 0.05 +topology.builtin.metrics.bucket.size.secs: 60 topology.fall.back.on.java.serialization: true topology.worker.childopts: null topology.executor.receive.buffer.size: 1024 #batched diff --git a/src/clj/backtype/storm/daemon/builtin_metrics.clj b/src/clj/backtype/storm/daemon/builtin_metrics.clj new file mode 100644 index 000000000..057fd513f --- /dev/null +++ b/src/clj/backtype/storm/daemon/builtin_metrics.clj @@ -0,0 +1,64 @@ +(ns backtype.storm.daemon.builtin-metrics + (:import [backtype.storm.metric.api MultiCountMetric MultiReducedMetric MeanReducer]) + (:import [backtype.storm Config]) + (:use [backtype.storm.stats :only [stats-rate]])) + +(defrecord BuiltinSpoutMetrics [^MultiCountMetric ack-count + ^MultiReducedMetric complete-latency + ^MultiCountMetric fail-count + ^MultiCountMetric emit-count + ^MultiCountMetric transfer-count]) +(defrecord BuiltinBoltMetrics [^MultiCountMetric ack-count + ^MultiReducedMetric process-latency + ^MultiCountMetric fail-count + ^MultiCountMetric execute-count + ^MultiReducedMetric execute-latency + ^MultiCountMetric emit-count + ^MultiCountMetric transfer-count]) + +(defn make-data [executor-type] + (condp = executor-type + :spout (BuiltinSpoutMetrics. (MultiCountMetric.) + (MultiReducedMetric. (MeanReducer.)) + (MultiCountMetric.) + (MultiCountMetric.) + (MultiCountMetric.)) + :bolt (BuiltinBoltMetrics. (MultiCountMetric.) + (MultiReducedMetric. (MeanReducer.)) + (MultiCountMetric.) + (MultiCountMetric.) + (MultiReducedMetric. (MeanReducer.)) + (MultiCountMetric.) + (MultiCountMetric.)))) + +(defn register-all [builtin-metrics storm-conf topology-context] + (doseq [[kw imetric] builtin-metrics] + (.registerMetric topology-context (str "__" (name kw)) imetric + (int (get storm-conf Config/TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS))))) + +(defn spout-acked-tuple! [^BuiltinSpoutMetrics m stats stream latency-ms] + (-> m .ack-count (.scope stream) (.incrBy (stats-rate stats))) + (-> m .complete-latency (.scope stream) (.update latency-ms))) + +(defn spout-failed-tuple! [^BuiltinSpoutMetrics m stats stream] + (-> m .fail-count (.scope stream) (.incrBy (stats-rate stats)))) + +(defn bolt-execute-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms] + (let [scope (str comp-id ":" stream)] + (-> m .execute-count (.scope scope) (.incrBy (stats-rate stats))) + (-> m .execute-latency (.scope scope) (.update latency-ms)))) + +(defn bolt-acked-tuple! [^BuiltinBoltMetrics m stats comp-id stream latency-ms] + (let [scope (str comp-id ":" stream)] + (-> m .ack-count (.scope scope) (.incrBy (stats-rate stats))) + (-> m .process-latency (.scope scope) (.update latency-ms)))) + +(defn bolt-failed-tuple! [^BuiltinBoltMetrics m stats comp-id stream] + (let [scope (str comp-id ":" stream)] + (-> m .fail-count (.scope scope) (.incrBy (stats-rate stats))))) + +(defn emitted-tuple! [m stats stream] + (-> m :emit-count (.scope stream) (.incrBy (stats-rate stats)))) + +(defn transferred-tuple! [m stats stream num-out-tasks] + (-> m :transfer-count (.scope stream) (.incrBy (* num-out-tasks (stats-rate stats))))) diff --git a/src/clj/backtype/storm/daemon/common.clj b/src/clj/backtype/storm/daemon/common.clj index 393a468a0..da93fdd4c 100644 --- a/src/clj/backtype/storm/daemon/common.clj +++ b/src/clj/backtype/storm/daemon/common.clj @@ -278,8 +278,8 @@ (validate-basic! topology) (let [ret (.deepCopy topology)] (add-acker! storm-conf ret) - (add-metric-streams! ret) (add-metric-components! storm-conf ret) + (add-metric-streams! ret) (add-system-streams! ret) (add-system-components! ret) (validate-structure! ret) diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj index 87f974d78..95aff4345 100644 --- a/src/clj/backtype/storm/daemon/executor.clj +++ b/src/clj/backtype/storm/daemon/executor.clj @@ -6,11 +6,10 @@ (:import [backtype.storm.spout ISpoutWaitStrategy]) (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo]) - (:import [backtype.storm.metric MetricHolder]) - (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint]) + (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint]) (:require [backtype.storm [tuple :as tuple]]) (:require [backtype.storm.daemon [task :as task]]) - ) + (:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics])) (bootstrap) @@ -214,7 +213,7 @@ :type executor-type ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field) :stats (mk-executor-stats <> (sampling-rate storm-conf)) - :interval->task->registered-metrics (HashMap.) + :interval->task->metric-registry (HashMap.) :task->component (:task->component worker) :stream->component->grouper (outbound-components worker-context component-id) :report-error (throttled-report-error-fn <>) @@ -244,8 +243,8 @@ :kill-fn (:report-error-and-die executor-data)))) (defn setup-metrics! [executor-data] - (let [{:keys [storm-conf receive-queue worker-context interval->task->registered-metrics]} executor-data - distinct-time-bucket-intervals (keys interval->task->registered-metrics)] + (let [{:keys [storm-conf receive-queue worker-context interval->task->metric-registry]} executor-data + distinct-time-bucket-intervals (keys interval->task->metric-registry)] (doseq [interval distinct-time-bucket-intervals] (schedule-recurring (:user-timer (:worker executor-data)) @@ -257,10 +256,10 @@ [[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]])))))) (defn metrics-tick [executor-data task-datas ^TupleImpl tuple] - (let [{:keys [interval->task->registered-metrics ^WorkerTopologyContext worker-context]} executor-data + (let [{:keys [interval->task->metric-registry ^WorkerTopologyContext worker-context]} executor-data interval (.getInteger tuple 0)] (doseq [[task-id task-data] task-datas - :let [metric-holders (-> interval->task->registered-metrics (get interval) (get task-id)) + :let [name->imetric (-> interval->task->metric-registry (get interval) (get task-id)) task-info (IMetricsConsumer$TaskInfo. (. (java.net.InetAddress/getLocalHost) getCanonicalHostName) (.getThisWorkerPort worker-context) @@ -268,10 +267,9 @@ task-id (long (/ (System/currentTimeMillis) 1000)) interval) - data-points (->> metric-holders - (map (fn [^MetricHolder mh] - (IMetricsConsumer$DataPoint. (.name mh) - (.getValueAndReset ^IMetric (.metric mh))))) + data-points (->> name->imetric + (map (fn [[name imetric]] + (IMetricsConsumer$DataPoint. name (.getValueAndReset ^IMetric imetric)))) (into []))]] (if (seq data-points) (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))) @@ -352,8 +350,8 @@ (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta) - ))) + (builtin-metrics/spout-failed-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info)) + (stats/spout-failed-tuple! (:stats executor-data) (:stream tuple-info) time-delta)))) (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta] (let [storm-conf (:storm-conf executor-data) @@ -364,8 +362,8 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta) - ))) + (builtin-metrics/spout-acked-tuple! (:builtin-metrics task-data) (:stats executor-data) (:stream tuple-info) time-delta) + (stats/spout-acked-tuple! (:stats executor-data) (:stream tuple-info) time-delta)))) (defn mk-task-receiver [executor-data tuple-action-fn] (let [^KryoTupleDeserializer deserializer (:deserializer executor-data) @@ -492,6 +490,7 @@ (if (sampler) 0)))) (or out-tasks []) ))]] + (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf (:user-context task-data)) (.open spout-obj storm-conf (:user-context task-data) @@ -609,11 +608,15 @@ (let [delta (tuple-execute-time-delta! tuple)] (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta)) (when delta + (builtin-metrics/bolt-execute-tuple! (:builtin-metrics task-data) + executor-stats + (.getSourceComponent tuple) + (.getSourceStreamId tuple) + delta) (stats/bolt-execute-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) - delta) - ))))))] + delta)))))))] ;; TODO: can get any SubscribedState objects out of the context now @@ -649,6 +652,7 @@ stream (MessageId/makeId anchors-to-ids))))) (or out-tasks [])))]] + (builtin-metrics/register-all (:builtin-metrics task-data) storm-conf user-context) (.prepare bolt-obj storm-conf user-context @@ -669,11 +673,15 @@ (let [delta (tuple-time-delta! tuple)] (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) (when delta + (builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data) + executor-stats + (.getSourceComponent tuple) + (.getSourceStreamId tuple) + delta) (stats/bolt-acked-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) - delta) - ))) + delta)))) (^void fail [this ^Tuple tuple] (fast-list-iter [root (.. tuple getMessageId getAnchors)] (task/send-unanchored task-data @@ -682,11 +690,14 @@ (let [delta (tuple-time-delta! tuple)] (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) (when delta + (builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data) + executor-stats + (.getSourceComponent tuple) + (.getSourceStreamId tuple)) (stats/bolt-failed-tuple! executor-stats (.getSourceComponent tuple) (.getSourceStreamId tuple) - delta) - ))) + delta)))) (reportError [this error] (report-error error) ))))) diff --git a/src/clj/backtype/storm/daemon/task.clj b/src/clj/backtype/storm/daemon/task.clj index 566680fd4..d39d2c81d 100644 --- a/src/clj/backtype/storm/daemon/task.clj +++ b/src/clj/backtype/storm/daemon/task.clj @@ -5,8 +5,9 @@ (:import [backtype.storm.tuple Tuple]) (:import [backtype.storm.generated SpoutSpec Bolt StateSpoutSpec]) (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo - EmitInfo BoltFailInfo BoltAckInfo]) - (:require [backtype.storm [tuple :as tuple]])) + EmitInfo BoltFailInfo BoltAckInfo]) + (:require [backtype.storm [tuple :as tuple]]) + (:require [backtype.storm.daemon.builtin-metrics :as builtin-metrics])) (bootstrap) @@ -28,7 +29,7 @@ (:default-shared-resources worker) (:user-shared-resources worker) (:shared-executor-data executor-data) - (:interval->task->registered-metrics executor-data) + (:interval->task->metric-registry executor-data) (:open-or-prepare-was-called? executor-data)))) (defn system-topology-context [worker executor-data tid] @@ -124,9 +125,11 @@ (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) (when (emit-sampler) + (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream) (stats/emitted-tuple! executor-stats stream) (if out-task-id - (stats/transferred-tuples! executor-stats stream 1))) + (stats/transferred-tuples! executor-stats stream 1) + (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream 1))) (if out-task-id [out-task-id]) )) ([^String stream ^List values] @@ -145,7 +148,9 @@ (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) (when (emit-sampler) (stats/emitted-tuple! executor-stats stream) - (stats/transferred-tuples! executor-stats stream (count out-tasks))) + (builtin-metrics/emitted-tuple! (:builtin-metrics task-data) executor-stats stream) + (stats/transferred-tuples! executor-stats stream (count out-tasks)) + (builtin-metrics/transferred-tuple! (:builtin-metrics task-data) executor-stats stream (count out-tasks))) out-tasks))) )) @@ -155,9 +160,9 @@ :task-id task-id :system-context (system-topology-context (:worker executor-data) executor-data task-id) :user-context (user-topology-context (:worker executor-data) executor-data task-id) + :builtin-metrics (builtin-metrics/make-data (:type executor-data)) :tasks-fn (mk-tasks-fn <>) - :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data)) - )) + :object (get-task-object (.getRawTopology ^TopologyContext (:system-context <>)) (:component-id executor-data)))) (defn mk-task [executor-data task-id] diff --git a/src/clj/backtype/storm/metric/testing.clj b/src/clj/backtype/storm/metric/testing.clj index d46050fac..36aa954e5 100644 --- a/src/clj/backtype/storm/metric/testing.clj +++ b/src/clj/backtype/storm/metric/testing.clj @@ -2,23 +2,52 @@ "This namespace is for AOT dependent metrics testing code." (:gen-class)) +(letfn [(for- [threader arg seq-exprs body] + `(reduce #(%2 %1) + ~arg + (for ~seq-exprs + (fn [arg#] (~threader arg# ~@body)))))] + (defmacro for-> + "Apply a thread expression to a sequence. + eg. + (-> 1 + (for-> [x [1 2 3]] + (+ x))) + => 7" + {:indent 1} + [arg seq-exprs & body] + (for- 'clojure.core/-> arg seq-exprs body))) + (gen-class :name clojure.storm.metric.testing.FakeMetricConsumer :implements [backtype.storm.metric.api.IMetricsConsumer] - :prefix "impl-" - :state state - :init init) + :prefix "impl-") -(defn impl-init [] [[] (atom [])]) +(def buffer (atom nil)) -(defn impl-prepare [this conf {:keys [ns var-name]} ctx error-reporter] - (reset! (.state this) @(intern ns var-name)) - (reset! @(.state this) [])) +(defn impl-prepare [this conf argument ctx error-reporter] + (reset! buffer {})) (defn impl-cleanup [this] - (reset! @(.state this) [])) + (reset! buffer {})) + +(defn vec-conj [coll x] (if coll + (conj coll x) + [x])) + +(defn expand-complex-datapoint [dp] + (if (or (map? (.value dp)) + (instance? java.util.AbstractMap (.value dp))) + (into [] (for [[k v] (.value dp)] + [(str (.name dp) "/" k) v])) + [[(.name dp) (.value dp)]])) -(defn impl-handleDataPoints [this task-info data-points] - (swap! @(.state this) conj [task-info data-points])) +(defn impl-handleDataPoints [this task-info data-points] + (swap! buffer + (fn [old] + (-> old + (for-> [dp data-points + [name val] (expand-complex-datapoint dp)] + (update-in [(.srcComponentId task-info) name (.srcTaskId task-info)] vec-conj val)))))) diff --git a/src/jvm/backtype/storm/Config.java b/src/jvm/backtype/storm/Config.java index 879fb32e2..77b49b420 100644 --- a/src/jvm/backtype/storm/Config.java +++ b/src/jvm/backtype/storm/Config.java @@ -460,6 +460,11 @@ public class Config extends HashMap { */ public static String TOPOLOGY_STATS_SAMPLE_RATE="topology.stats.sample.rate"; + /** + * The time period that builtin metrics data in bucketed into. + */ + public static String TOPOLOGY_BUILTIN_METRICS_BUCKET_SIZE_SECS="topology.builtin.metrics.bucket.size.secs"; + /** * Whether or not to use Java serialization in a topology. */ @@ -653,6 +658,26 @@ public void registerSerialization(Class klass, Class seria registerSerialization(this, klass, serializerClass); } + public void registerMetricsConsumer(Class klass, Object argument, long parallelismHint) { + HashMap m = new HashMap(); + m.put("class", klass.getCanonicalName()); + m.put("parallelism.hint", parallelismHint); + m.put("argument", argument); + + List l = (List)this.get(TOPOLOGY_METRICS_CONSUMER_REGISTER); + if(l == null) { l = new ArrayList(); } + l.add(m); + this.put(TOPOLOGY_METRICS_CONSUMER_REGISTER, l); + } + + public void registerMetricsConsumer(Class klass, long parallelismHint) { + registerMetricsConsumer(klass, null, parallelismHint); + } + + public void registerMetricsConsumer(Class klass) { + registerMetricsConsumer(klass, null, 1L); + } + public static void registerDecorator(Map conf, Class klass) { getRegisteredDecorators(conf).add(klass.getName()); } diff --git a/src/jvm/backtype/storm/metric/MetricHolder.java b/src/jvm/backtype/storm/metric/MetricHolder.java deleted file mode 100644 index 92ec07384..000000000 --- a/src/jvm/backtype/storm/metric/MetricHolder.java +++ /dev/null @@ -1,13 +0,0 @@ -package backtype.storm.metric; - -import backtype.storm.metric.api.IMetric; - -public class MetricHolder { - public String name; - public IMetric metric; - - public MetricHolder(String name, IMetric metric) { - this.name = name; - this.metric = metric; - } -} diff --git a/src/jvm/backtype/storm/metric/api/CountMetric.java b/src/jvm/backtype/storm/metric/api/CountMetric.java index 2a2b24137..7a8f829cc 100644 --- a/src/jvm/backtype/storm/metric/api/CountMetric.java +++ b/src/jvm/backtype/storm/metric/api/CountMetric.java @@ -8,11 +8,11 @@ public class CountMetric implements IMetric { public CountMetric() { } - public void inc() { + public void incr() { _value++; } - public void inc(long incrementBy) { + public void incrBy(long incrementBy) { _value += incrementBy; } diff --git a/src/jvm/backtype/storm/metric/api/MeanReducer.java b/src/jvm/backtype/storm/metric/api/MeanReducer.java index b9830ee45..2133dd9f3 100644 --- a/src/jvm/backtype/storm/metric/api/MeanReducer.java +++ b/src/jvm/backtype/storm/metric/api/MeanReducer.java @@ -14,7 +14,17 @@ public MeanReducerState init() { public MeanReducerState reduce(MeanReducerState acc, Object input) { acc.count++; - acc.sum += (Double)input; + if(input instanceof Double) { + acc.sum += (Double)input; + } else if(input instanceof Long) { + acc.sum += ((Long)input).doubleValue(); + } else if(input instanceof Integer) { + acc.sum += ((Integer)input).doubleValue(); + } else { + throw new RuntimeException( + "MeanReducer::reduce called with unsupported input type `" + input.getClass() + + "`. Supported types are Double, Long, Integer."); + } return acc; } diff --git a/src/jvm/backtype/storm/metric/api/MultiCountMetric.java b/src/jvm/backtype/storm/metric/api/MultiCountMetric.java new file mode 100644 index 000000000..02473ca6a --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/MultiCountMetric.java @@ -0,0 +1,28 @@ +package backtype.storm.metric.api; + +import backtype.storm.metric.api.IMetric; +import java.util.HashMap; +import java.util.Map; + +public class MultiCountMetric implements IMetric { + Map _value = new HashMap(); + + public MultiCountMetric() { + } + + public CountMetric scope(String key) { + CountMetric val = _value.get(key); + if(val == null) { + _value.put(key, val = new CountMetric()); + } + return val; + } + + public Object getValueAndReset() { + Map ret = new HashMap(); + for(Map.Entry e : _value.entrySet()) { + ret.put(e.getKey(), e.getValue().getValueAndReset()); + } + return ret; + } +} diff --git a/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java b/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java new file mode 100644 index 000000000..cfa39ec8b --- /dev/null +++ b/src/jvm/backtype/storm/metric/api/MultiReducedMetric.java @@ -0,0 +1,30 @@ +package backtype.storm.metric.api; + +import backtype.storm.metric.api.IMetric; +import java.util.HashMap; +import java.util.Map; + +public class MultiReducedMetric implements IMetric { + Map _value = new HashMap(); + IReducer _reducer; + + public MultiReducedMetric(IReducer reducer) { + _reducer = reducer; + } + + public ReducedMetric scope(String key) { + ReducedMetric val = _value.get(key); + if(val == null) { + _value.put(key, val = new ReducedMetric(_reducer)); + } + return val; + } + + public Object getValueAndReset() { + Map ret = new HashMap(); + for(Map.Entry e : _value.entrySet()) { + ret.put(e.getKey(), e.getValue().getValueAndReset()); + } + return ret; + } +} diff --git a/src/jvm/backtype/storm/task/TopologyContext.java b/src/jvm/backtype/storm/task/TopologyContext.java index 8492f06c7..872f8a95d 100644 --- a/src/jvm/backtype/storm/task/TopologyContext.java +++ b/src/jvm/backtype/storm/task/TopologyContext.java @@ -9,7 +9,6 @@ import backtype.storm.metric.api.ICombiner; import backtype.storm.metric.api.ReducedMetric; import backtype.storm.metric.api.CombinedMetric; -import backtype.storm.metric.MetricHolder; import backtype.storm.state.ISubscribedState; import backtype.storm.tuple.Fields; import backtype.storm.utils.Utils; @@ -35,7 +34,7 @@ public class TopologyContext extends WorkerTopologyContext { private Map _taskData = new HashMap(); private List _hooks = new ArrayList(); private Map _executorData; - private Map>> _registeredMetrics; + private Map>> _registeredMetrics; private clojure.lang.Atom _openOrPrepareWasCalled; @@ -222,11 +221,15 @@ public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInS Map m2 = (Map)m1.get(timeBucketSizeInSecs); if(!m2.containsKey(_taskId)) { - m2.put(_taskId, new ArrayList()); + m2.put(_taskId, new HashMap()); } - Collection c1 = (Collection)m2.get(_taskId); - c1.add(new MetricHolder(name, metric)); + Map m3 = (Map)m2.get(_taskId); + if(m3.containsKey(name)) { + throw new RuntimeException("The same metric name `" + name + "` was registered twice." ); + } else { + m3.put(name, metric); + } return metric; } diff --git a/test/clj/backtype/storm/metrics_test.clj b/test/clj/backtype/storm/metrics_test.clj index e794f686e..bd3f6c9b1 100644 --- a/test/clj/backtype/storm/metrics_test.clj +++ b/test/clj/backtype/storm/metrics_test.clj @@ -13,30 +13,72 @@ (bootstrap) +(defbolt acking-bolt {} {:prepare true} + [conf context collector] + (bolt + (execute [tuple] + (ack! collector tuple)))) + +(defbolt ack-every-other {} {:prepare true} + [conf context collector] + (let [state (atom -1)] + (bolt + (execute [tuple] + (let [val (swap! state -)] + (when (pos? val) + (ack! collector tuple) + )))))) + +(defn assert-loop [afn ids] + (while (not (every? afn ids)) + (Thread/sleep 1))) + +(defn assert-acked [tracker & ids] + (assert-loop #(.isAcked tracker %) ids)) + +(defn assert-failed [tracker & ids] + (assert-loop #(.isFailed tracker %) ids)) + (defbolt count-acks {} {:prepare true} [conf context collector] - (let [ack-count (CountMetric.)] - (.registerMetric context "ack-count" ack-count 5) + (let [mycustommetric (CountMetric.)] + (.registerMetric context "my-custom-metric" mycustommetric 5) (bolt (execute [tuple] - (.inc ack-count) + (.incr mycustommetric) (ack! collector tuple))))) -(def datapoints-buffer (atom nil)) +(def metrics-data backtype.storm.metric.testing/buffer) + +(defn wait-for-atleast-N-buckets! [N comp-id metric-name] + (while + (let [taskid->buckets (-> @metrics-data (get comp-id) (get metric-name))] + (or + (and (not= N 0) (nil? taskid->buckets)) + (not-every? #(<= N %) (map (comp count second) taskid->buckets)))) + (println "Waiting for at least" N "timebuckets to appear in FakeMetricsConsumer for component id" comp-id + "and metric name" metric-name) + (Thread/sleep 10))) -(defn metric-name->vals! [name] - (->> @datapoints-buffer - (mapcat (fn [[task-info data-points]] data-points)) - (filter #(= name (.name %))) - (map #(.value %)) - (into []))) +(defn lookup-bucket-by-comp-id-&-metric-name! [comp-id metric-name] + (-> @metrics-data + (get comp-id) + (get metric-name) + (first) ;; pick first task in the list, ignore other tasks' metric data. + (second) + (or []))) -(deftest test-time-buckets +(defmacro assert-buckets! [comp-id metric-name expected] + `(do + (let [N# (count ~expected)] + (wait-for-atleast-N-buckets! N# ~comp-id ~metric-name) + (is (= ~expected (subvec (lookup-bucket-by-comp-id-&-metric-name! ~comp-id ~metric-name) 0 N#)))))) + +(deftest test-custom-metric (with-simulated-time-local-cluster [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER - [{"class" "clojure.storm.metric.testing.FakeMetricConsumer" - "argument" {:ns (.ns #'datapoints-buffer) :var-name 'datapoints-buffer}}]}] + [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}]}] (let [feeder (feeder-spout ["field1"]) tracker (AckFailMapTracker.) _ (.setAckFailDelegate feeder tracker) @@ -44,19 +86,104 @@ {"1" (thrift/mk-spout-spec feeder)} {"2" (thrift/mk-bolt-spec {"1" :global} count-acks)})] (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology) - + (.feed feeder ["a"] 1) - (advance-cluster-time cluster 6) - (is (= [1] (metric-name->vals! "ack-count"))) - + (advance-cluster-time cluster 6) + (assert-buckets! "2" "my-custom-metric" [1]) + (advance-cluster-time cluster 5) - (is (= [1 0] (metric-name->vals! "ack-count"))) + (assert-buckets! "2" "my-custom-metric" [1 0]) (advance-cluster-time cluster 20) - (is (= [1 0 0 0 0 0] (metric-name->vals! "ack-count"))) - + (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0]) + (.feed feeder ["b"] 2) (.feed feeder ["c"] 3) (advance-cluster-time cluster 5) - (is (= [1 0 0 0 0 0 2] (metric-name->vals! "ack-count")))))) + (assert-buckets! "2" "my-custom-metric" [1 0 0 0 0 0 2])))) + + +(deftest test-builtin-metrics-1 + (with-simulated-time-local-cluster + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] + TOPOLOGY-STATS-SAMPLE-RATE 1.0 + TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 60}] + (let [feeder (feeder-spout ["field1"]) + tracker (AckFailMapTracker.) + _ (.setAckFailDelegate feeder tracker) + topology (thrift/mk-topology + {"myspout" (thrift/mk-spout-spec feeder)} + {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} acking-bolt)})] + (submit-local-topology (:nimbus cluster) "metrics-tester" {} topology) + + (.feed feeder ["a"] 1) + (advance-cluster-time cluster 61) + (assert-buckets! "myspout" "__ack-count/default" [1]) + (assert-buckets! "myspout" "__emit-count/default" [1]) + (assert-buckets! "myspout" "__transfer-count/default" [1]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1]) + + (advance-cluster-time cluster 120) + (assert-buckets! "myspout" "__ack-count/default" [1 0 0]) + (assert-buckets! "myspout" "__emit-count/default" [1 0 0]) + (assert-buckets! "myspout" "__transfer-count/default" [1 0 0]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0]) + + (.feed feeder ["b"] 1) + (.feed feeder ["c"] 1) + (advance-cluster-time cluster 60) + (assert-buckets! "myspout" "__ack-count/default" [1 0 0 2]) + (assert-buckets! "myspout" "__emit-count/default" [1 0 0 2]) + (assert-buckets! "myspout" "__transfer-count/default" [1 0 0 2]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 2]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 0 0 2])))) + + +(deftest test-builtin-metrics-2 + (with-simulated-time-local-cluster + [cluster :daemon-conf {TOPOLOGY-METRICS-CONSUMER-REGISTER + [{"class" "clojure.storm.metric.testing.FakeMetricConsumer"}] + TOPOLOGY-ENABLE-MESSAGE-TIMEOUTS true + TOPOLOGY-STATS-SAMPLE-RATE 1.0 + TOPOLOGY-BUILTIN-METRICS-BUCKET-SIZE-SECS 5}] + (let [feeder (feeder-spout ["field1"]) + tracker (AckFailMapTracker.) + _ (.setAckFailDelegate feeder tracker) + topology (thrift/mk-topology + {"myspout" (thrift/mk-spout-spec feeder)} + {"mybolt" (thrift/mk-bolt-spec {"myspout" :shuffle} ack-every-other)})] + (submit-local-topology (:nimbus cluster) + "metrics-tester" + {TOPOLOGY-MESSAGE-TIMEOUT-SECS 20} + topology) + + (.feed feeder ["a"] 1) + (advance-cluster-time cluster 6) + (assert-acked tracker 1) + (assert-buckets! "myspout" "__fail-count/default" []) + (assert-buckets! "myspout" "__ack-count/default" [1]) + (assert-buckets! "myspout" "__emit-count/default" [1]) + (assert-buckets! "myspout" "__transfer-count/default" [1]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1]) + + (.feed feeder ["b"] 2) + (advance-cluster-time cluster 5) + (assert-buckets! "myspout" "__fail-count/default" []) + (assert-buckets! "myspout" "__ack-count/default" [1 0]) + (assert-buckets! "myspout" "__emit-count/default" [1 1]) + (assert-buckets! "myspout" "__transfer-count/default" [1 1]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1]) + (advance-cluster-time cluster 30) + (assert-failed tracker 2) + (assert-buckets! "myspout" "__fail-count/default" [1]) + (assert-buckets! "myspout" "__ack-count/default" [1 0 0 0 0]) + (assert-buckets! "myspout" "__emit-count/default" [1 1 0 0 0]) + (assert-buckets! "myspout" "__transfer-count/default" [1 1 0 0 0]) + (assert-buckets! "mybolt" "__ack-count/myspout:default" [1 0 0 0 0]) + (assert-buckets! "mybolt" "__execute-count/myspout:default" [1 1 0 0 0])))) From fe3fa6ccd6796263b1f87a3759f62886d8d746c3 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Fri, 16 Nov 2012 13:54:27 -0800 Subject: [PATCH 13/14] trident metrics support. just expose underlying registerMetric storm API. --- project.clj | 2 +- .../trident/operation/TridentOperationContext.java | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/project.clj b/project.clj index 555930795..1778180bf 100644 --- a/project.clj +++ b/project.clj @@ -3,7 +3,7 @@ (do (println (str "ERROR: requires Leiningen 1.x but you are using " lein-version)) (System/exit 1))) -(defproject storm "0.8.2-wip15" +(defproject storm "0.8.2-wip16" :source-path "src/clj" :test-path "test/clj" :java-source-path "src/jvm" diff --git a/src/jvm/storm/trident/operation/TridentOperationContext.java b/src/jvm/storm/trident/operation/TridentOperationContext.java index 0aad4c652..75251a501 100644 --- a/src/jvm/storm/trident/operation/TridentOperationContext.java +++ b/src/jvm/storm/trident/operation/TridentOperationContext.java @@ -1,5 +1,6 @@ package storm.trident.operation; +import backtype.storm.metric.api.*; import backtype.storm.task.TopologyContext; import backtype.storm.tuple.Fields; import storm.trident.tuple.TridentTuple; @@ -29,4 +30,14 @@ public int numPartitions() { public int getPartitionIndex() { return _topoContext.getThisTaskIndex(); } + + public IMetric registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) { + return _topoContext.registerMetric(name, metric, timeBucketSizeInSecs); + } + public IMetric registerMetric(String name, IReducer reducer, int timeBucketSizeInSecs) { + return _topoContext.registerMetric(name, new ReducedMetric(reducer), timeBucketSizeInSecs); + } + public IMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { + return _topoContext.registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); + } } From a9fd4ab40c744b810ce00fae808f9f7e668a8332 Mon Sep 17 00:00:00 2001 From: Jason Jackson Date: Thu, 6 Dec 2012 13:55:06 -0800 Subject: [PATCH 14/14] MeanReducer no longer divides by zero. --- src/clj/backtype/storm/daemon/executor.clj | 5 ++++- src/jvm/backtype/storm/metric/api/MeanReducer.java | 6 +++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/clj/backtype/storm/daemon/executor.clj b/src/clj/backtype/storm/daemon/executor.clj index 95aff4345..2e176bcd2 100644 --- a/src/clj/backtype/storm/daemon/executor.clj +++ b/src/clj/backtype/storm/daemon/executor.clj @@ -269,7 +269,10 @@ interval) data-points (->> name->imetric (map (fn [[name imetric]] - (IMetricsConsumer$DataPoint. name (.getValueAndReset ^IMetric imetric)))) + (let [value (.getValueAndReset ^IMetric imetric)] + (if value + (IMetricsConsumer$DataPoint. name value))))) + (filter identity) (into []))]] (if (seq data-points) (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points]))))) diff --git a/src/jvm/backtype/storm/metric/api/MeanReducer.java b/src/jvm/backtype/storm/metric/api/MeanReducer.java index 2133dd9f3..38f627507 100644 --- a/src/jvm/backtype/storm/metric/api/MeanReducer.java +++ b/src/jvm/backtype/storm/metric/api/MeanReducer.java @@ -29,6 +29,10 @@ public MeanReducerState reduce(MeanReducerState acc, Object input) { } public Object extractResult(MeanReducerState acc) { - return new Double(acc.sum / (double)acc.count); + if(acc.count > 0) { + return new Double(acc.sum / (double)acc.count); + } else { + return null; + } } }