Permalink
Browse files

Merge remote-tracking branch 'jason/metrics' into 0.9.0

  • Loading branch information...
2 parents 9e0a738 + 69ada70 commit 7207af53c0250c492bb4e00c383dfa8a5fe46f8c @nathanmarz committed Nov 2, 2012
View
@@ -103,5 +103,9 @@ topology.max.error.report.per.interval: 5
topology.kryo.factory: "backtype.storm.serialization.DefaultKryoFactory"
topology.trident.batch.emit.interval.millis: 500
+### register classes used in implementation of metrics api.
+topology.kryo.register:
+ - backtype.storm.metric.api.IMetricsConsumer$TaskInfo
+ - backtype.storm.metric.api.IMetricsConsumer$DataPoint
dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
View
@@ -21,3 +21,10 @@
# drpc.servers:
# - "server1"
# - "server2"
+
+## Metrics Consumers
+# topology.metrics.consumers.register:
+# - class: "org.mycompany.MyMetricsConsumer"
+# argument:
+# - endpoint: "metrics-collector.mycompany.org"
+# parallelism.hint: 1
@@ -23,6 +23,8 @@
(def SYSTEM-COMPONENT-ID Constants/SYSTEM_COMPONENT_ID)
(def SYSTEM-TICK-STREAM-ID Constants/SYSTEM_TICK_STREAM_ID)
+(def METRICS-STREAM-ID Constants/METRICS_STREAM_ID)
+(def METRICS-TICK-STREAM-ID Constants/METRICS_TICK_STREAM_ID)
;; the task id is the virtual port
;; node->host is here so that tasks know who to talk to just from assignment
@@ -206,27 +208,78 @@
(.put_to_bolts ret "__acker" acker-bolt)
))
+(defn add-metric-streams! [^StormTopology topology]
+ (doseq [[_ component] (all-components topology)
+ :let [common (.get_common component)]]
+ (.put_to_streams common METRICS-STREAM-ID
+ (thrift/output-fields ["task-info" "data-points"]))))
+
(defn add-system-streams! [^StormTopology topology]
(doseq [[_ component] (all-components topology)
:let [common (.get_common component)]]
- (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))
- ;; TODO: consider adding a stats stream for stats aggregation
- ))
+ (.put_to_streams common SYSTEM-STREAM-ID (thrift/output-fields ["event"]))))
+
+
+(defn map-occurrences [afn coll]
+ (->> coll
+ (reduce (fn [[counts new-coll] x]
+ (let [occurs (inc (get counts x 0))]
+ [(assoc counts x occurs) (cons (afn x occurs) new-coll)]))
+ [{} []])
+ (second)
+ (reverse)))
+
+(defn number-duplicates [coll]
+ "(number-duplicates [\"a\", \"b\", \"a\"]) => [\"a\", \"b\", \"a#2\"]"
+ (map-occurrences (fn [x occurences] (if (>= occurences 2) (str x "#" occurences) x)) coll))
+
+(defn metrics-consumer-register-ids [storm-conf]
+ "Generates a list of component ids for each metrics consumer
+ e.g. [\"__metrics_org.mycompany.MyMetricsConsumer\", ..] "
+ (->> (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER)
+ (map #(get % "class"))
+ (number-duplicates)
+ (map #(str Constants/METRICS_COMPONENT_ID_PREFIX %))))
+
+(defn metrics-consumer-bolt-specs [components-ids-that-emit-metrics storm-conf]
+ (let [inputs (->> (for [comp-id components-ids-that-emit-metrics]
+ {[comp-id METRICS-STREAM-ID] :shuffle})
+ (into {}))
+
+ mk-bolt-spec (fn [class arg p]
+ (thrift/mk-bolt-spec*
+ inputs
+ (backtype.storm.metric.MetricsConsumerBolt. class arg)
+ {} :p p :conf {TOPOLOGY-TASKS p}))]
+
+ (map
+ (fn [component-id register]
+ [component-id (mk-bolt-spec (get register "class")
+ (get register "argument")
+ (or (get register "parallelism.hint") 1))])
+
+ (metrics-consumer-register-ids storm-conf)
+ (get storm-conf TOPOLOGY-METRICS-CONSUMER-REGISTER))))
+
+(defn add-metric-components! [storm-conf ^StormTopology topology]
+ (doseq [[comp-id bolt-spec] (metrics-consumer-bolt-specs (keys (all-components topology)) storm-conf)]
+ (.put_to_bolts topology comp-id bolt-spec)))
(defn add-system-components! [^StormTopology topology]
(let [system-spout (thrift/mk-spout-spec*
- (NoOpSpout.)
- {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
- }
- :p 0
- :conf {TOPOLOGY-TASKS 0})]
- (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)
- ))
+ (NoOpSpout.)
+ {SYSTEM-TICK-STREAM-ID (thrift/output-fields ["rate_secs"])
+ METRICS-TICK-STREAM-ID (thrift/output-fields ["interval"])}
+ :p 0
+ :conf {TOPOLOGY-TASKS 0})]
+ (.put_to_spouts topology SYSTEM-COMPONENT-ID system-spout)))
(defn system-topology! [storm-conf ^StormTopology topology]
(validate-basic! topology)
(let [ret (.deepCopy topology)]
(add-acker! storm-conf ret)
+ (add-metric-streams! ret)
+ (add-metric-components! storm-conf ret)
(add-system-streams! ret)
(add-system-components! ret)
(validate-structure! ret)
@@ -5,7 +5,9 @@
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.spout ISpoutWaitStrategy])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
- EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
+ EmitInfo BoltFailInfo BoltAckInfo BoltExecuteInfo])
+ (:import [backtype.storm.metric MetricHolder])
+ (:import [backtype.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint])
(:require [backtype.storm [tuple :as tuple]])
(:require [backtype.storm.daemon [task :as task]])
)
@@ -212,6 +214,7 @@
:type executor-type
;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field)
:stats (mk-executor-stats <> (sampling-rate storm-conf))
+ :interval->task->registered-metrics (HashMap.)
:task->component (:task->component worker)
:stream->component->grouper (outbound-components worker-context component-id)
:report-error (throttled-report-error-fn <>)
@@ -238,7 +241,40 @@
(worker-transfer-fn serializer alist)
(.setObject cached-emit (ArrayList.))
)))
- :kill-fn (:report-error-and-die executor-data))))
+ :kill-fn (:report-error-and-die executor-data))))
+
+(defn setup-metrics! [executor-data]
+ (let [{:keys [storm-conf receive-queue worker-context interval->task->registered-metrics]} executor-data
+ distinct-time-bucket-intervals (keys interval->task->registered-metrics)]
+ (doseq [interval distinct-time-bucket-intervals]
+ (schedule-recurring
+ (:user-timer (:worker executor-data))
+ interval
+ interval
+ (fn []
+ (disruptor/publish
+ receive-queue
+ [[nil (TupleImpl. worker-context [interval] -1 Constants/METRICS_TICK_STREAM_ID)]]))))))
+
+(defn metrics-tick [executor-data task-datas ^TupleImpl tuple]
+ (let [{:keys [interval->task->registered-metrics ^WorkerTopologyContext worker-context]} executor-data
+ interval (.getInteger tuple 0)]
+ (doseq [[task-id task-data] task-datas
+ :let [metric-holders (-> interval->task->registered-metrics (get interval) (get task-id))
+ task-info (IMetricsConsumer$TaskInfo.
+ (. (java.net.InetAddress/getLocalHost) getCanonicalHostName)
+ (.getThisWorkerPort worker-context)
+ (:component-id executor-data)
+ task-id
+ (long (/ (System/currentTimeMillis) 1000))
+ interval)
+ data-points (->> metric-holders
+ (map (fn [^MetricHolder mh]
+ (IMetricsConsumer$DataPoint. (.name mh)
+ (.getValueAndReset ^IMetric (.metric mh)))))
+ (into []))]]
+ (if (seq data-points)
+ (task/send-unanchored task-data Constants/METRICS_STREAM_ID [task-info data-points])))))
(defn setup-ticks! [worker executor-data]
(let [storm-conf (:storm-conf executor-data)
@@ -279,7 +315,7 @@
(mk-threads executor-data task-datas))
threads (concat handlers system-threads)]
(setup-ticks! worker executor-data)
-
+
(log-message "Finished loading executor " component-id ":" (pr-str executor-id))
;; TODO: add method here to get rendered stats... have worker call that when heartbeating
(reify
@@ -377,8 +413,9 @@
))))
tuple-action-fn (fn [task-id ^TupleImpl tuple]
(let [stream-id (.getSourceStreamId tuple)]
- (if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
- (.rotate pending)
+ (condp = stream-id
+ Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
+ Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
(let [id (.getValue tuple 0)
[stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
(when spout-id
@@ -389,7 +426,7 @@
ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta)
ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
- spout-id tuple-finished-info time-delta)
+ spout-id tuple-finished-info time-delta)
)))
;; TODO: on failure, emit tuple to failure stream
))))
@@ -472,6 +509,7 @@
)))))
(reset! open-or-prepare-was-called? true)
(log-message "Opened spout " component-id ":" (keys task-datas))
+ (setup-metrics! executor-data)
(disruptor/consumer-started! (:receive-queue executor-data))
(fn []
@@ -550,28 +588,32 @@
;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
;; TODO: how to handle incremental updates as well as synchronizations at same time
;; TODO: need to version tuples somehow
+
;;(log-debug "Received tuple " tuple " at task " task-id)
;; need to do it this way to avoid reflection
- (let [task-data (get task-datas task-id)
- ^IBolt bolt-obj (:object task-data)
- user-context (:user-context task-data)
- sampler? (sampler)
- execute-sampler? (execute-sampler)
- now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
- (when sampler?
- (.setProcessSampleStartTime tuple now))
- (when execute-sampler?
- (.setExecuteSampleStartTime tuple now))
- (.execute bolt-obj tuple)
- (let [delta (tuple-execute-time-delta! tuple)]
- (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
- (when delta
- (stats/bolt-execute-tuple! executor-stats
- (.getSourceComponent tuple)
- (.getSourceStreamId tuple)
- delta)
- ))))]
+ (let [stream-id (.getSourceStreamId tuple)]
+ (condp = stream-id
+ Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data task-datas tuple)
+ (let [task-data (get task-datas task-id)
+ ^IBolt bolt-obj (:object task-data)
+ user-context (:user-context task-data)
+ sampler? (sampler)
+ execute-sampler? (execute-sampler)
+ now (if (or sampler? execute-sampler?) (System/currentTimeMillis))]
+ (when sampler?
+ (.setProcessSampleStartTime tuple now))
+ (when execute-sampler?
+ (.setExecuteSampleStartTime tuple now))
+ (.execute bolt-obj tuple)
+ (let [delta (tuple-execute-time-delta! tuple)]
+ (task/apply-hooks user-context .boltExecute (BoltExecuteInfo. tuple task-id delta))
+ (when delta
+ (stats/bolt-execute-tuple! executor-stats
+ (.getSourceComponent tuple)
+ (.getSourceStreamId tuple)
+ delta)
+ ))))))]
;; TODO: can get any SubscribedState objects out of the context now
@@ -649,7 +691,8 @@
(report-error error)
)))))
(reset! open-or-prepare-was-called? true)
- (log-message "Prepared bolt " component-id ":" (keys task-datas))
+ (log-message "Prepared bolt " component-id ":" (keys task-datas))
+ (setup-metrics! executor-data)
(let [receive-queue (:receive-queue executor-data)
event-handler (mk-task-receiver executor-data tuple-action-fn)]
@@ -28,7 +28,8 @@
(:default-shared-resources worker)
(:user-shared-resources worker)
(:shared-executor-data executor-data)
- )))
+ (:interval->task->registered-metrics executor-data)
+ (:open-or-prepare-was-called? executor-data))))
(defn system-topology-context [worker executor-data tid]
((mk-topology-context-builder
@@ -0,0 +1,24 @@
+(ns backtype.storm.metric.testing
+ "This namespace is for AOT dependent metrics testing code."
+ (:gen-class))
+
+(gen-class
+ :name clojure.storm.metric.testing.FakeMetricConsumer
+ :implements [backtype.storm.metric.api.IMetricsConsumer]
+ :prefix "impl-"
+ :state state
+ :init init)
+
+(defn impl-init [] [[] (atom [])])
+
+(defn impl-prepare [this conf {:keys [ns var-name]} ctx error-reporter]
+ (reset! (.state this) @(intern ns var-name))
+ (reset! @(.state this) []))
+
+(defn impl-cleanup [this]
+ (reset! @(.state this) []))
+
+(defn impl-handleDataPoints [this task-info data-points]
+ (swap! @(.state this) conj [task-info data-points]))
+
+
@@ -8,7 +8,7 @@
(:require [backtype.storm [process-simulator :as psim]])
(:import [org.apache.commons.io FileUtils])
(:import [java.io File])
- (:import [java.util HashMap])
+ (:import [java.util HashMap ArrayList])
(:import [java.util.concurrent.atomic AtomicInteger])
(:import [java.util.concurrent ConcurrentHashMap])
(:import [backtype.storm.utils Time Utils RegisteredGlobalState])
@@ -587,6 +587,8 @@
[(int 1)]
{}
{}
- (HashMap.))]
+ (HashMap.)
+ (HashMap.)
+ (atom false))]
(TupleImpl. context values 1 stream)
))
@@ -410,6 +410,13 @@
*/
public static String TOPOLOGY_SKIP_MISSING_KRYO_REGISTRATIONS= "topology.skip.missing.kryo.registrations";
+ /*
+ * A list of classes implementing IMetricsConsumer (See storm.yaml.example for exact config format).
+ * Each listed class will be routed all the metrics data generated by the storm metrics API.
+ * Each listed class maps 1:1 to a system bolt named __metrics_ClassName#N, and it's parallelism is configurable.
+ */
+ public static String TOPOLOGY_METRICS_CONSUMER_REGISTER = "topology.metrics.consumer.register";
+
/**
* The maximum parallelism allowed for a component in this topology. This configuration is
@@ -8,4 +8,8 @@
public static final String SYSTEM_COMPONENT_ID = "__system";
public static final String SYSTEM_TICK_STREAM_ID = "__tick";
+ public static final String METRICS_COMPONENT_ID_PREFIX = "__metrics";
+ public static final String METRICS_STREAM_ID = "__metrics";
+ public static final String METRICS_TICK_STREAM_ID = "__metrics_tick";
}
+
@@ -0,0 +1,13 @@
+package backtype.storm.metric;
+
+import backtype.storm.metric.api.IMetric;
+
+public class MetricHolder {
+ public String name;
+ public IMetric metric;
+
+ public MetricHolder(String name, IMetric metric) {
+ this.name = name;
+ this.metric = metric;
+ }
+}
Oops, something went wrong.

0 comments on commit 7207af5

Please sign in to comment.