Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

storm metrics API initial implementation ready for review. no unit te…

…sting

currently.
  • Loading branch information...
commit 4a01197115ea84e8be627fa102ae18c40f94c47b 1 parent 10e8639
Jason Jackson authored
View
7 conf/storm.yaml.example
@@ -21,3 +21,10 @@
# drpc.servers:
# - "server1"
# - "server2"
+
+## Metrics Consumers
+# topology.metrics.consumers.register:
+# - class: "org.mycompany.MyMetricsConsumer"
+# argument:
+# - endpoint: "metrics-collector.mycompany.org"
+# parallelism.hint: 1
View
73 src/clj/backtype/storm/daemon/common.clj
@@ -23,6 +23,8 @@
(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID)
(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID)
+(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID)
+(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID)
;; the task id is the virtual port
;; node->host is here so that tasks know who to talk to just from assignment
@@ -206,27 +208,78 @@
(.put_to_bolts ret "__acker" acker-bolt)
))
+(defn add-metric-streams! [^StormTopology topology]
+ (doseq [[_ component] (all-components topology)
+ :let [common (.get_common component)]]
+ (.put_to_streams common METRICS-STREAM-ID
+ (thrift/output-fields ["worker-host" "worker-port" "interval" "timestamp" "name" "value"]))))
+
(defn add-system-streams! [^StormTopology topology]
(doseq [[_ component] (all-components topology)
:let [common (.get_common component)]]
- (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))
- ;; TODO: consider adding a stats stream for stats aggregation
- ))
+ (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))))
+
+
+(defn map-occurrences [afn coll]
+ (->> coll
+ (reduce (fn [[counts new-coll] x]
+ (let [occurs (inc (get counts x 0))]
+ [(assoc counts x occurs) (cons (afn x occurs) new-coll)]))
+ [{} []])
+ (second)
+ (reverse)))
+
+(defn number-duplicates [coll]
+ "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a2\"]"
+ (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll))
+
+(defn metrics-consumer-register-ids [storm-conf]
+ "Generates a list of component ids for each metrics consumer
+ e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] "
+ (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)
+ (map #(get % "class"))
+ (number-duplicates)
+ (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
+
+(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf]
+ (let [inputs (->> (for [comp-id components-ids-that-emit-metrics]
+ {[comp-id METRICS-STREAM-ID] :shuffle})
+ (into {}))
+
+ mk-bolt-spec (fn [class arg p]
+ (thrift/mk-bolt-spec*
+ inputs
+ (backtype.storm.metric.MetricsConsumerBolt. class arg)
+ {} :p p :conf {TOPOLOGY-TASKS p}))]
+
+ (map
+ (fn [component-id register]
+ [component-id (mk-bolt-spec (get register "class")
+ (get register "argument")
+ (or (get register "parallelism.hint") 1))])
+
+ (metrics-consumer-register-ids storm-conf)
+ (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
+
+(defn add-metric-components! [storm-conf ^StormTopology topology]
+ (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)]
+ (.put_to_bolts topology comp-id bolt-spec)))
(defn add-system-components! [^StormTopology topology]
(let [system-spout (thrift/mk-spout-spec*
- (NoOpSpout.)
- {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
- }
- :p 0
- :conf {TOPOLOGY-TASKS 0})]
- (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)
- ))
+ (NoOpSpout.)
+ {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
+ METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
+ :p 0
+ :conf {TOPOLOGY-TASKS 0})]
+ (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)))
(defn system-topology! [storm-conf ^StormTopology topology]
(validate-basic! topology)
(let [ret (.deepCopy topology)]
(add-acker! storm-conf ret)
+ (add-metric-streams! ret)
+ (add-metric-components! storm-conf ret)
(add-system-streams! ret)
(add-system-components! ret)
(validate-structure! ret)
View
88 src/clj/backtype/storm/daemon/executor.clj
@@ -5,7 +5,8 @@
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.spout ISpoutWaitStrategy])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
- EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
+ EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
+ (:import [backtype.storm.metric MetricHolder IMetric])
(:require [backtype.storm [tuple :as tuple]])
(:require [backtype.storm.daemon [task :as task]])
)
@@ -212,6 +213,7 @@
:type executor-type
;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
:stats (mk-executor-stats <> (sampling-rate storm-conf))
+ :registered-metrics (ArrayList.)
:task->component (:task->component worker)
:stream->component->grouper (outbound-components worker-context component-id)
:report-error (throttled-report-error-fn <>)
@@ -238,7 +240,34 @@
(worker-transfer-fn serializer alist)
(.setObject cached-emit (ArrayList.))
)))
- :kill-fn (:report-error-and-die executor-data))))
+ :kill-fn (:report-error-and-die executor-data))))
+
+(defn setup-metrics! [executor-data]
+ (let [{:keys [storm-conf receive-queue worker-context registered-metrics]} executor-data
+ distinct-time-bucket-intervals (->> registered-metrics (map #(.getTimeBucketIntervalInSecs %)) distinct)]
+ (doseq [interval distinct-time-bucket-intervals]
+ (schedule-recurring
+ (:user-timer (:worker executor-data))
+ interval
+ interval
+ (fn []
+ (disruptor/publish
+ receive-queue
+ [[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]]))))))
+
+(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
+ (let [{:keys [registered-metrics ^WorkerTopologyContext worker-context]} executor-data
+ interval (.getInteger tuple 0)]
+ (doseq [^MetricHolder mh registered-metrics]
+ (when (= interval (.getTimeBucketIntervalInSecs mh))
+ (let [^IMetric metric (.getMetric mh)
+ name (.getName mh)
+ value (.getValueAndReset metric)
+ timestamp (System/currentTimeMillis)
+ worker-host (. (java.net.InetAddress/getLocalHost) getCanonicalHostName)
+ worker-port (.getThisWorkerPort worker-context)]
+ (doseq [[task-id task-data] task-datas]
+ (task/send-unanchored task-data Constants/METRICS_STREAM_ID [worker-host worker-port interval timestamp name value])))))))
(defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
@@ -279,7 +308,7 @@
(mk-threads executor-data task-datas))
threads (concat handlers system-threads)]
(setup-ticks! worker executor-data)
-
+
(log-message "Finished loading executor " component-id ":" (pr-str executor-id))
;; TODO: add method here to get rendered stats... have worker call that when heartbeating
(reify
@@ -377,8 +406,9 @@
))))
tuple-action-fn (fn [task-id ^TupleImpl tuple]
(let [stream-id (.getSourceStreamId tuple)]
- (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
- (.rotate pending)
+ (condp = stream-id
+ Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
+ Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
(let [id (.getValue tuple 0)
[stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
(when spout-id
@@ -389,7 +419,7 @@
ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta)
ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta)
+ spout-id tuple-finished-info time-delta)
)))
;; TODO: on failure, emit tuple to failure stream
))))
@@ -472,6 +502,7 @@
)))))
(reset! open-or-prepare-was-called? true)
(log-message "Opened spout " component-id ":" (keys task-datas))
+ (setup-metrics! executor-data)
(disruptor/consumer-started! (:receive-queue executor-data))
(fn []
@@ -550,28 +581,32 @@
;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
;; TODO: how to handle incremental updates as well as synchronizations at same time
;; TODO: need to version tuples somehow
+
;;(log-debug "Received tuple " tuple " at task " task-id)
;; need to do it this way to avoid reflection
- (let [task-data (get task-datas task-id)
- ^IBolt bolt-obj (:object task-data)
- user-context (:user-context task-data)
- sampler? (sampler)
- execute-sampler? (execute-sampler)
- now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
- (when sampler?
- (.setProcessSampleStartTime tuple now))
- (when execute-sampler?
- (.setExecuteSampleStartTime tuple now))
- (.execute bolt-obj tuple)
- (let [delta (tuple-execute-time-delta! tuple)]
- (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
- (when delta
- (stats/bolt-execute-tuple! executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta)
- ))))]
+ (let [stream-id (.getSourceStreamId tuple)]
+ (condp = stream-id
+ Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
+ (let [task-data (get task-datas task-id)
+ ^IBolt bolt-obj (:object task-data)
+ user-context (:user-context task-data)
+ sampler? (sampler)
+ execute-sampler? (execute-sampler)
+ now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
+ (when sampler?
+ (.setProcessSampleStartTime tuple now))
+ (when execute-sampler?
+ (.setExecuteSampleStartTime tuple now))
+ (.execute bolt-obj tuple)
+ (let [delta (tuple-execute-time-delta! tuple)]
+ (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
+ (when delta
+ (stats/bolt-execute-tuple! executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple)
+ delta)
+ ))))))]
;; TODO: can get any SubscribedState objects out of the context now
@@ -649,7 +684,8 @@
(report-error error)
)))))
(reset! open-or-prepare-was-called? true)
- (log-message "Prepared bolt " component-id ":" (keys task-datas))
+ (log-message "Prepared bolt " component-id ":" (keys task-datas))
+ (setup-metrics! executor-data)
(let [receive-queue (:receive-queue executor-data)
event-handler (mk-task-receiver executor-data tuple-action-fn)]
View
2  src/clj/backtype/storm/daemon/task.clj
@@ -28,7 +28,7 @@
(:default-shared-resources worker)
(:user-shared-resources worker)
(:shared-executor-data executor-data)
- )))
+ (:registered-metrics executor-data))))
(defn system-topology-context [worker executor-data tid]
((mk-topology-context-builder
View
5 src/clj/backtype/storm/testing.clj
@@ -8,7 +8,7 @@
(:require [backtype.storm [process-simulator :as psim]])
(:import [org.apache.commons.io FileUtils])
(:import [java.io File])
- (:import [java.util HashMap])
+ (:import [java.util HashMap ArrayList])
(:import [java.util.concurrent.atomic AtomicInteger])
(:import [java.util.concurrent ConcurrentHashMap])
(:import [backtype.storm.utils Time Utils RegisteredGlobalState])
@@ -587,6 +587,7 @@
[(int 1)]
{}
{}
- (HashMap.))]
+ (HashMap.)
+ (ArrayList.))]
(TupleImpl. context values 1 stream)
))
View
7 src/jvm/backtype/storm/Config.java
@@ -410,6 +410,13 @@
*/
public static String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations";
+ /*
+ * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format).
+ * Each listed class will be routed all the metrics data generated by the storm metrics API.
+ * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
+ */
+ public static String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
+
/**
* The maximum parallelism allowed for a component in this topology. This configuration is
View
4 src/jvm/backtype/storm/Constants.java
@@ -8,4 +8,8 @@
public static final String SYSTEM_COMPONENT_ID = "__system";
public static final String SYSTEM_TICK_STREAM_ID = "__tick";
+ public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
+ public static final String METRICS_STREAM_ID = "__metrics";
+ public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
}
+
View
17 src/jvm/backtype/storm/metric/FixedValueMetric.java
@@ -0,0 +1,17 @@
+package backtype.storm.metric;
+
+public class FixedValueMetric implements IMetric {
+ Object _value;
+
+ public FixedValueMetric(Object value) {
+ _value = value;
+ }
+
+ public void setValue(Object value) {
+ _value = value;
+ }
+
+ public Object getValueAndReset() {
+ return _value;
+ }
+}
View
5 src/jvm/backtype/storm/metric/IMetric.java
@@ -0,0 +1,5 @@
+package backtype.storm.metric;
+
+public interface IMetric {
+ public Object getValueAndReset();
+}
View
21 src/jvm/backtype/storm/metric/IMetricsConsumer.java
@@ -0,0 +1,21 @@
+package backtype.storm.metric;
+
+import backtype.storm.task.TopologyContext;
+import java.util.Map;
+
+public interface IMetricsConsumer {
+ public static class DataPoint {
+ public String srcWorkerHost;
+ public int srcWorkerPort;
+ public String srcComponentId;
+ public int srcTaskId;
+ public long timestamp;
+ public int updateIntervalSecs;
+ public String name;
+ public Object value;
+ }
+
+ void prepare(Map stormConf, Object registrationOptions, TopologyContext context);
+ void handleDataPoint(DataPoint dataPoint);
+ void cleanup();
+}
View
7 src/jvm/backtype/storm/metric/IReducer.java
@@ -0,0 +1,7 @@
+package backtype.storm.metric;
+
+public interface IReducer<T> {
+ T init();
+ T reduce(T accumulator, Object input);
+ Object extractResult(T accumulator);
+}
View
22 src/jvm/backtype/storm/metric/IncrementedMetric.java
@@ -0,0 +1,22 @@
+package backtype.storm.metric;
+
+public class IncrementedMetric implements IMetric {
+ long _value = 0;
+
+ public IncrementedMetric() {
+ }
+
+ public void inc() {
+ _value++;
+ }
+
+ public void inc(long incrementBy) {
+ _value += incrementBy;
+ }
+
+ public Object getValueAndReset() {
+ long ret = _value;
+ _value = 0;
+ return ret;
+ }
+}
View
22 src/jvm/backtype/storm/metric/MeanReducer.java
@@ -0,0 +1,22 @@
+package backtype.storm.metric;
+
+class MeanReducerState {
+ public int count = 0;
+ public double sum = 0.0;
+}
+
+public class MeanReducer implements IReducer<MeanReducerState> {
+ public MeanReducerState init() {
+ return new MeanReducerState();
+ }
+
+ public MeanReducerState reduce(MeanReducerState acc, Object input) {
+ acc.count++;
+ acc.sum += (Double)input;
+ return acc;
+ }
+
+ public Object extractResult(MeanReducerState acc) {
+ return new Double(acc.sum / (double)acc.count);
+ }
+}
View
17 src/jvm/backtype/storm/metric/MetricHolder.java
@@ -0,0 +1,17 @@
+package backtype.storm.metric;
+
+public class MetricHolder {
+ private String _name;
+ private int _timeBucketIntervalInSecs;
+ private IMetric _metric;
+
+ public MetricHolder(String name, IMetric metric, int timeBucketIntervalInSecs) {
+ _name = name;
+ _timeBucketIntervalInSecs = timeBucketIntervalInSecs;
+ _metric = metric;
+ }
+
+ public String getName() { return _name; }
+ public int getTimeBucketIntervalInSecs() { return _timeBucketIntervalInSecs; }
+ public IMetric getMetric() { return _metric; }
+}
View
54 src/jvm/backtype/storm/metric/MetricsConsumerBolt.java
@@ -0,0 +1,54 @@
+package backtype.storm.metric;
+
+import backtype.storm.Config;
+import backtype.storm.task.IBolt;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.tuple.Tuple;
+import java.util.Map;
+
+public class MetricsConsumerBolt implements IBolt {
+ IMetricsConsumer _metricsConsumer;
+ String _consumerClassName;
+ OutputCollector _collector;
+ Object _registrationArgument;
+
+ public MetricsConsumerBolt(String consumerClassName, Object registrationArgument) {
+ _consumerClassName = consumerClassName;
+ _registrationArgument = registrationArgument;
+ }
+
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ try {
+ _metricsConsumer = (IMetricsConsumer)Class.forName(_consumerClassName).newInstance();
+ } catch (Exception e) {
+ throw new RuntimeException("Could not instantiate a class listed in config under section " +
+ Config.TOPOLOGY_METRICS_CONSUMER_REGISTER + " with fully qualified name " + _consumerClassName, e);
+ }
+ _metricsConsumer.prepare(stormConf, _registrationArgument, context);
+ _collector = collector;
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ IMetricsConsumer.DataPoint d = new IMetricsConsumer.DataPoint();
+ d.srcComponentId = input.getSourceComponent();
+ d.srcTaskId = input.getSourceTask();
+ d.srcWorkerHost = input.getString(0);
+ d.srcWorkerPort = input.getInteger(1);
+ d.updateIntervalSecs = input.getInteger(2);
+ d.timestamp = input.getLong(3);
+ d.name = input.getString(4);
+ d.value = input.getValue(5);
+
+ _metricsConsumer.handleDataPoint(d);
+ _collector.ack(input);
+ }
+
+ @Override
+ public void cleanup() {
+ _metricsConsumer.cleanup();
+ }
+
+}
View
21 src/jvm/backtype/storm/metric/ReducedMetric.java
@@ -0,0 +1,21 @@
+package backtype.storm.metric;
+
+public class ReducedMetric implements IMetric {
+ private IReducer _reducer;
+ private Object _accumulator;
+
+ public ReducedMetric(IReducer reducer) {
+ _reducer = reducer;
+ _accumulator = _reducer.init();
+ }
+
+ public void update(Object value) {
+ _accumulator = _reducer.reduce(_accumulator, value);
+ }
+
+ public Object getValueAndReset() {
+ Object ret = _reducer.extractResult(_accumulator);
+ _accumulator = _reducer.init();
+ return ret;
+ }
+}
View
10 src/jvm/backtype/storm/task/TopologyContext.java
@@ -4,6 +4,8 @@
import backtype.storm.generated.Grouping;
import backtype.storm.generated.StormTopology;
import backtype.storm.hooks.ITaskHook;
+import backtype.storm.metric.IMetric;
+import backtype.storm.metric.MetricHolder;
import backtype.storm.state.ISubscribedState;
import backtype.storm.tuple.Fields;
import backtype.storm.utils.Utils;
@@ -29,6 +31,7 @@
private Map<String, Object> _taskData = new HashMap<String, Object>();
private List<ITaskHook> _hooks = new ArrayList<ITaskHook>();
private Map<String, Object> _executorData;
+ private List<MetricHolder> _registeredMetrics;
public TopologyContext(StormTopology topology, Map stormConf,
@@ -36,12 +39,13 @@ public TopologyContext(StormTopology topology, Map stormConf,
Map<String, Map<String, Fields>> componentToStreamToFields,
String stormId, String codeDir, String pidDir, Integer taskId,
Integer workerPort, List<Integer> workerTasks, Map<String, Object> defaultResources,
- Map<String, Object> userResources, Map<String, Object> executorData) {
+ Map<String, Object> userResources, Map<String, Object> executorData, List<MetricHolder> registeredMetrics) {
super(topology, stormConf, taskToComponent, componentToSortedTasks,
componentToStreamToFields, stormId, codeDir, pidDir,
workerPort, workerTasks, defaultResources, userResources);
_taskId = taskId;
_executorData = executorData;
+ _registeredMetrics = registeredMetrics;
}
/**
@@ -190,4 +194,8 @@ public void addTaskHook(ITaskHook hook) {
public Collection<ITaskHook> getHooks() {
return _hooks;
}
+
+ public void registerMetric(String name, IMetric metric, int timeBucketSizeInSecs) {
+ _registeredMetrics.add(new MetricHolder(name, metric, timeBucketSizeInSecs));
+ }
}
Please sign in to comment.
Something went wrong with that request. Please try again.