Skip to content

Commit

Permalink
Merge branch 'master' into 0.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Sep 18, 2012
2 parents 3690995 + 4710bfe commit 2d7affe
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 5 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
## 0.8.2

* Added high water mark to ZeroMQ sockets (defaults to 10000) configurable with zmq.hwm
* Storm UI now uses nimbus.host to find Nimbus rather than always using localhost (thanks Frostman)
* Added report-error! to Clojure DSL
* Automatically throttle errors sent to Zookeeper/Storm UI when too many are reported in a time interval (all errors are still logged) Configured with TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL and TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS

## 0.8.1

* Exposed Storm's unit testing facilities via the backtype.storm.Testing class. Notable functions are Testing/withLocalCluster and Testing/completeTopology (thanks xumingming)
Expand Down
1 change: 1 addition & 0 deletions README.markdown
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ You must not remove this notice, or any other, from this software.
* Sjoerd Mulder ([@sjoerdmulder](https://github.com/sjoerdmulder))
* Yuta Okamoto ([@okapies](https://github.com/okapies))
* Barry Hart ([@barrywhart](https://github.com/barrywhart))
* Sergey Lukjanov([@Frostman](https://github.com/Frostman))

## Acknowledgements

Expand Down
2 changes: 2 additions & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -97,5 +97,7 @@ topology.worker.shared.thread.pool.size: 4
topology.disruptor.wait.strategy: "com.lmax.disruptor.BlockingWaitStrategy"
topology.spout.wait.strategy: "backtype.storm.spout.SleepSpoutWaitStrategy"
topology.sleep.spout.wait.strategy.time.ms: 1
topology.error.throttle.interval.secs: 10
topology.max.error.report.per.interval: 5

dev.zookeeper.path: "/tmp/dev-storm-zookeeper"
3 changes: 3 additions & 0 deletions src/clj/backtype/storm/clojure.clj
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@
(defn fail! [collector ^Tuple tuple]
(.fail ^OutputCollector (:output-collector collector) tuple))

(defn report-error! [collector ^Tuple tuple]
(.reportError ^OutputCollector (:output-collector collector) tuple))

(defnk emit-spout! [collector values
:stream Utils/DEFAULT_STREAM_ID :id nil]
(let [values (tuple-values values collector stream)]
Expand Down
23 changes: 19 additions & 4 deletions src/clj/backtype/storm/daemon/executor.clj
Original file line number Diff line number Diff line change
Expand Up @@ -143,9 +143,24 @@
(render-stats [this])
(get-executor-id [this]))

(defn report-error [executor error]
(log-error error)
(cluster/report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor) error))
(defn throttled-report-error-fn [executor]
(let [storm-conf (:storm-conf executor)
error-interval-secs (storm-conf TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS)
max-per-interval (storm-conf TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL)
interval-start-time (atom (current-time-secs))
interval-errors (atom 0)
]
(fn [error]
(log-error error)
(when (> (time-delta @interval-start-time)
error-interval-secs)
(reset! interval-errors 0)
(reset! interval-start-time (current-time-secs)))
(swap! interval-errors inc)

(when (<= @interval-errors max-per-interval)
(cluster/report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor) error)
))))

;; in its own function so that it can be mocked out by tracked topologies
(defn mk-executor-transfer-fn [batch-transfer->worker]
Expand Down Expand Up @@ -184,7 +199,7 @@
:stats (mk-executor-stats <> (sampling-rate storm-conf))
:task->component (:task->component worker)
:stream->component->grouper (outbound-components worker-context component-id)
:report-error (partial report-error <>)
:report-error (throttled-report-error-fn <>)
:report-error-and-die (fn [error]
((:report-error <>) error)
((:suicide-fn <>)))
Expand Down
2 changes: 1 addition & 1 deletion src/clj/backtype/storm/ui/core.clj
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
(def ^:dynamic *STORM-CONF* (read-storm-config))

(defmacro with-nimbus [nimbus-sym & body]
`(thrift/with-nimbus-connection [~nimbus-sym "localhost" (*STORM-CONF* NIMBUS-THRIFT-PORT)]
`(thrift/with-nimbus-connection [~nimbus-sym (*STORM-CONF* NIMBUS-HOST) (*STORM-CONF* NIMBUS-THRIFT-PORT)]
~@body
))

Expand Down
12 changes: 12 additions & 0 deletions src/jvm/backtype/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,18 @@ public class Config extends HashMap<String, Object> {
*/
public static String TOPOLOGY_WORKER_SHARED_THREAD_POOL_SIZE="topology.worker.shared.thread.pool.size";

/**
* The interval in seconds to use for determining whether to throttle error reported to Zookeeper. For example,
* an interval of 10 seconds with topology.max.error.report.per.interval set to 5 will only allow 5 errors to be
* reported to Zookeeper per task for every 10 second interval of time.
*/
public static String TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS="topology.error.throttle.interval.secs";

/**
* See doc for TOPOLOGY_ERROR_THROTTLE_INTERVAL_SECS
*/
public static String TOPOLOGY_MAX_ERROR_REPORT_PER_INTERVAL="topology.max.error.report.per.interval";

/**
* Name of the topology. This config is automatically set by Storm when the topology is submitted.
*/
Expand Down
48 changes: 48 additions & 0 deletions test/clj/backtype/storm/integration_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -488,6 +488,54 @@
(read-tuples results "2")
)))))

(defbolt report-errors-bolt {}
[tuple collector]
(doseq [i (range (.getValue tuple 0))]
(report-error! collector (RuntimeException.)))
(ack! collector tuple))

(deftest test-throttled-errors
(with-simulated-time
(with-tracked-cluster [cluster]
(let [state (:storm-cluster-state cluster)
[feeder checker] (ack-tracking-feeder ["num"])
tracked (mk-tracked-topology
cluster
(topology
{"1" (spout-spec feeder)}
{"2" (bolt-spec {"1" :shuffle} report-errors-bolt)}))
_ (submit-local-topology (:nimbus cluster)
"test-errors"
{TOPOLOGY-ERROR-THROTTLE-INTERVAL-SECS 10
TOPOLOGY-MAX-ERROR-REPORT-PER-INTERVAL 4
TOPOLOGY-DEBUG true
}
(:topology tracked))
storm-id (get-storm-id state "test-errors")
errors-count (fn [] (count (.errors state storm-id "2")))]
;; so it launches the topology
(advance-cluster-time cluster 2)
(.feed feeder [6])
(tracked-wait tracked 1)
(is (= 4 (errors-count)))

(advance-time-secs! 5)
(.feed feeder [2])
(tracked-wait tracked 1)
(is (= 4 (errors-count)))

(advance-time-secs! 6)
(.feed feeder [2])
(tracked-wait tracked 1)
(is (= 6 (errors-count)))

(advance-time-secs! 6)
(.feed feeder [3])
(tracked-wait tracked 1)
(is (= 8 (errors-count)))

))))

(deftest test-acking-branching-complex
;; test acking with branching in the topology
)
Expand Down

0 comments on commit 2d7affe

Please sign in to comment.