Skip to content

Commit

Permalink
Merge branch 'master' into 0.9.0
Browse files Browse the repository at this point in the history
  • Loading branch information
Nathan Marz committed Oct 30, 2012
2 parents 6b1796f + 10e8639 commit cf67a08
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 26 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
* Execute latency now tracked and shown in Storm UI * Execute latency now tracked and shown in Storm UI
* Adding testTuple methods for easily creating Tuple instances to Testing API (thanks xumingming) * Adding testTuple methods for easily creating Tuple instances to Testing API (thanks xumingming)
* Trident now throws an error during construction of a topology when try to select fields that don't exist in a stream (thanks xumingming) * Trident now throws an error during construction of a topology when try to select fields that don't exist in a stream (thanks xumingming)
* Bug fix: Fix deadlock bug due to variant of dining philosophers problem. Spouts now use an overflow buffer to prevent blocking and guarantee that it can consume the incoming queue of acks/fails.
* Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology * Bug fix: Fix race condition in supervisor that would lead to supervisor continuously crashing due to not finding "stormconf.ser" file for an already killed topology
* Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified * Bug fix: bin/storm script now displays a helpful error message when an invalid command is specified


Expand Down
3 changes: 2 additions & 1 deletion src/clj/backtype/storm/bootstrap.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
(require (quote [backtype.storm [stats :as stats] [disruptor :as disruptor]])) (require (quote [backtype.storm [stats :as stats] [disruptor :as disruptor]]))
(import (quote [org.slf4j Logger])) (import (quote [org.slf4j Logger]))


(import (quote [com.lmax.disruptor InsufficientCapacityException]))
(import (quote [backtype.storm.generated Nimbus Nimbus$Processor (import (quote [backtype.storm.generated Nimbus Nimbus$Processor
Nimbus$Iface StormTopology ShellComponent Nimbus$Iface StormTopology ShellComponent
NotAliveException AlreadyAliveException GlobalStreamId NotAliveException AlreadyAliveException GlobalStreamId
Expand All @@ -42,6 +43,6 @@
SupervisorInfo WorkerHeartbeat])) SupervisorInfo WorkerHeartbeat]))
(import (quote [backtype.storm.grouping CustomStreamGrouping])) (import (quote [backtype.storm.grouping CustomStreamGrouping]))
(import (quote [java.io File FileOutputStream FileInputStream])) (import (quote [java.io File FileOutputStream FileInputStream]))
(import (quote [java.util Collection List Random Map HashMap Collections ArrayList])) (import (quote [java.util Collection List Random Map HashMap Collections ArrayList LinkedList]))
(import (quote [org.apache.commons.io FileUtils])) (import (quote [org.apache.commons.io FileUtils]))
)) ))
62 changes: 50 additions & 12 deletions src/clj/backtype/storm/daemon/executor.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -164,8 +164,22 @@


;; in its own function so that it can be mocked out by tracked topologies ;; in its own function so that it can be mocked out by tracked topologies
(defn mk-executor-transfer-fn [batch-transfer->worker] (defn mk-executor-transfer-fn [batch-transfer->worker]
(fn [task tuple] (fn this
(disruptor/publish batch-transfer->worker [task tuple]))) ([task tuple block? ^List overflow-buffer]
(if (and overflow-buffer (not (.isEmpty overflow-buffer)))
(.add overflow-buffer [task tuple])
(try-cause
(disruptor/publish batch-transfer->worker [task tuple] block?)
(catch InsufficientCapacityException e
(if overflow-buffer
(.add overflow-buffer [task tuple])
(throw e))
))))
([task tuple overflow-buffer]
(this task tuple (nil? overflow-buffer) overflow-buffer))
([task tuple]
(this task tuple nil)
)))


(defn executor-data [worker executor-id] (defn executor-data [worker executor-id]
(let [worker-context (worker-context worker) (let [worker-context (worker-context worker)
Expand Down Expand Up @@ -383,7 +397,16 @@
event-handler (mk-task-receiver executor-data tuple-action-fn) event-handler (mk-task-receiver executor-data tuple-action-fn)
has-ackers? (has-ackers? storm-conf) has-ackers? (has-ackers? storm-conf)
emitted-count (MutableLong. 0) emitted-count (MutableLong. 0)
empty-emit-streak (MutableLong. 0)] empty-emit-streak (MutableLong. 0)

;; the overflow buffer is used to ensure that spouts never block when emitting
;; this ensures that the spout can always clear the incoming buffer (acks and fails), which
;; prevents deadlock from occuring across the topology (e.g. Spout -> Bolt -> Acker -> Spout, and all
;; buffers filled up)
;; when the overflow buffer is full, spouts stop calling nextTuple until it's able to clear the overflow buffer
;; this limits the size of the overflow buffer to however many tuples a spout emits in one call of nextTuple,
;; preventing memory issues
overflow-buffer (LinkedList.)]


[(async-loop [(async-loop
(fn [] (fn []
Expand All @@ -406,13 +429,16 @@
(fast-list-iter [out-task out-tasks id out-ids] (fast-list-iter [out-task out-tasks id out-ids]
(let [tuple-id (if rooted? (let [tuple-id (if rooted?
(MessageId/makeRootId root-id id) (MessageId/makeRootId root-id id)
(MessageId/makeUnanchored))] (MessageId/makeUnanchored))
out-tuple (TupleImpl. worker-context
values
task-id
out-stream-id
tuple-id)]
(transfer-fn out-task (transfer-fn out-task
(TupleImpl. worker-context out-tuple
values overflow-buffer)
task-id ))
out-stream-id
tuple-id))))
(if rooted? (if rooted?
(do (do
(.put pending root-id [task-id (.put pending root-id [task-id
Expand All @@ -421,7 +447,8 @@
(if (sampler) (System/currentTimeMillis))]) (if (sampler) (System/currentTimeMillis))])
(task/send-unanchored task-data (task/send-unanchored task-data
ACKER-INIT-STREAM-ID ACKER-INIT-STREAM-ID
[root-id (bit-xor-vals out-ids) task-id])) [root-id (bit-xor-vals out-ids) task-id]
overflow-buffer))
(when message-id (when message-id
(ack-spout-msg executor-data task-data message-id (ack-spout-msg executor-data task-data message-id
{:stream out-stream-id :values values} {:stream out-stream-id :values values}
Expand Down Expand Up @@ -450,10 +477,21 @@
(fn [] (fn []
;; This design requires that spouts be non-blocking ;; This design requires that spouts be non-blocking
(disruptor/consume-batch receive-queue event-handler) (disruptor/consume-batch receive-queue event-handler)

;; try to clear the overflow-buffer
(try-cause
(while (not (.isEmpty overflow-buffer))
(let [[out-task out-tuple] (.peek overflow-buffer)]
(transfer-fn out-task out-tuple false nil)
(.removeFirst overflow-buffer)))
(catch InsufficientCapacityException e
))

(let [active? @(:storm-active-atom executor-data) (let [active? @(:storm-active-atom executor-data)
curr-count (.get emitted-count)] curr-count (.get emitted-count)]
(if (or (not max-spout-pending) (if (and (.isEmpty overflow-buffer)
(< (.size pending) max-spout-pending)) (or (not max-spout-pending)
(< (.size pending) max-spout-pending)))
(if active? (if active?
(do (do
(when-not @last-active (when-not @last-active
Expand Down
27 changes: 17 additions & 10 deletions src/clj/backtype/storm/daemon/task.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -82,16 +82,23 @@




;; TODO: this is all expensive... should be precomputed ;; TODO: this is all expensive... should be precomputed
(defn send-unanchored [task-data stream values] (defn send-unanchored
(let [^TopologyContext topology-context (:system-context task-data) ([task-data stream values overflow-buffer]
tasks-fn (:tasks-fn task-data) (let [^TopologyContext topology-context (:system-context task-data)
transfer-fn (-> task-data :executor-data :transfer-fn)] tasks-fn (:tasks-fn task-data)
(fast-list-iter [t (tasks-fn stream values)] transfer-fn (-> task-data :executor-data :transfer-fn)
(transfer-fn t out-tuple (TupleImpl. topology-context
(TupleImpl. topology-context values
values (.getThisTaskId topology-context)
(.getThisTaskId topology-context) stream)]
stream))))) (fast-list-iter [t (tasks-fn stream values)]
(transfer-fn t
out-tuple
overflow-buffer)
)))
([task-data stream values]
(send-unanchored task-data stream values nil)
))


(defn mk-tasks-fn [task-data] (defn mk-tasks-fn [task-data]
(let [task-id (:task-id task-data) (let [task-id (:task-id task-data)
Expand Down
10 changes: 8 additions & 2 deletions src/clj/backtype/storm/disruptor.clj
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -46,8 +46,14 @@
(defmacro handler [& args] (defmacro handler [& args]
`(clojure-handler (fn ~@args))) `(clojure-handler (fn ~@args)))


(defn publish [^DisruptorQueue q o] (defn publish
(.publish q o)) ([^DisruptorQueue q o block?]
(.publish q o block?))
([q o]
(publish q o true)))

(defn try-publish [^DisruptorQueue q o]
(.tryPublish q o))


(defn consume-batch [^DisruptorQueue queue handler] (defn consume-batch [^DisruptorQueue queue handler]
(.consumeBatch queue handler)) (.consumeBatch queue handler))
Expand Down
22 changes: 21 additions & 1 deletion src/jvm/backtype/storm/utils/DisruptorQueue.java
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@
import com.lmax.disruptor.ClaimStrategy; import com.lmax.disruptor.ClaimStrategy;
import com.lmax.disruptor.EventFactory; import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventHandler; import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.InsufficientCapacityException;
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence; import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SingleThreadedClaimStrategy; import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.WaitStrategy;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;


/** /**
* *
Expand Down Expand Up @@ -92,8 +95,25 @@ private void consumeBatchToCursor(long cursor, EventHandler<Object> handler) {
* Caches until consumerStarted is called, upon which the cache is flushed to the consumer * Caches until consumerStarted is called, upon which the cache is flushed to the consumer
*/ */
public void publish(Object obj) { public void publish(Object obj) {
try {
publish(obj, true);
} catch (InsufficientCapacityException ex) {
throw new RuntimeException("This code should be unreachable!");
}
}

public void tryPublish(Object obj) throws InsufficientCapacityException {
publish(obj, false);
}

public void publish(Object obj, boolean block) throws InsufficientCapacityException {
if(consumerStartedFlag) { if(consumerStartedFlag) {
final long id = _buffer.next(); final long id;
if(block) {
id = _buffer.next();
} else {
id = _buffer.tryNext(1);
}
final MutableObject m = _buffer.get(id); final MutableObject m = _buffer.get(id);
m.setObject(obj); m.setObject(obj);
_buffer.publish(id); _buffer.publish(id);
Expand Down

0 comments on commit cf67a08

Please sign in to comment.