Skip to content

Commit

Permalink
fix disruptor cache / consumer started problem for single threaded cl…
Browse files Browse the repository at this point in the history
…aim strategy
  • Loading branch information
Nathan Marz committed May 29, 2012
1 parent 2892dd8 commit debb8da
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 36 deletions.
65 changes: 33 additions & 32 deletions src/clj/backtype/storm/daemon/executor.clj
Expand Up @@ -163,7 +163,8 @@
:conf (:conf worker) :conf (:conf worker)
:storm-active-atom (:storm-active-atom worker) :storm-active-atom (:storm-active-atom worker)
:batch-transfer-queue batch-transfer->worker :batch-transfer-queue batch-transfer->worker
:transfer-fn (fn [task tuple] (disruptor/publish batch-transfer->worker [task tuple])) :transfer-fn (fn [task tuple]
(disruptor/publish batch-transfer->worker [task tuple]))
:suicide-fn (:suicide-fn worker) :suicide-fn (:suicide-fn worker)
:storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker)) :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker))
:type executor-type :type executor-type
Expand Down Expand Up @@ -227,15 +228,13 @@
report-error-and-die (:report-error-and-die executor-data) report-error-and-die (:report-error-and-die executor-data)
component-id (:component-id executor-data) component-id (:component-id executor-data)



;; starting the batch-transfer->worker ensures that anything publishing to that queue
;; doesn't block (because it's a single threaded queue and the caching/consumer started
;; trick isn't thread-safe)
system-threads [(start-batch-transfer->worker-handler! worker executor-data)]
handlers (with-error-reaction report-error-and-die handlers (with-error-reaction report-error-and-die
(mk-threads executor-data task-datas)) (mk-threads executor-data task-datas))
threads (concat handlers threads (concat handlers system-threads)]
[(start-batch-transfer->worker-handler! worker executor-data)
])]
;;technically this is called twice for bolts, but that's ok
(disruptor/consumer-started! (:receive-queue executor-data))

(setup-ticks! worker executor-data) (setup-ticks! worker executor-data)


(log-message "Finished loading executor " component-id ":" (pr-str executor-id)) (log-message "Finished loading executor " component-id ":" (pr-str executor-id))
Expand All @@ -251,10 +250,12 @@
[this] [this]
(log-message "Shutting down executor " component-id ":" (pr-str executor-id)) (log-message "Shutting down executor " component-id ":" (pr-str executor-id))
(disruptor/halt-with-interrupt! (:receive-queue executor-data)) (disruptor/halt-with-interrupt! (:receive-queue executor-data))
(disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data))
(doseq [t threads] (doseq [t threads]
(.interrupt t) (.interrupt t)
(.join t)) (.join t))
;; must do this after the threads are killed, this ensures that the interrupt message
;; goes through properly
(disruptor/halt-with-interrupt! (:batch-transfer-queue executor-data))


(doseq [user-context (map :user-context (vals task-datas))] (doseq [user-context (map :user-context (vals task-datas))]
(doseq [hook (.getHooks user-context)] (doseq [hook (.getHooks user-context)]
Expand Down Expand Up @@ -403,33 +404,33 @@
)) ))
))) )))
(log-message "Opened spout " component-id ":" (keys task-datas)) (log-message "Opened spout " component-id ":" (keys task-datas))
;; TODO: should redesign this to only use one thread
[(async-loop [(async-loop
(fn [] (fn []
;; This design requires that spouts be non-blocking (disruptor/consumer-started! (:receive-queue executor-data))
(disruptor/consume-batch receive-queue event-handler) (fn []
(if (or (not max-spout-pending) ;; This design requires that spouts be non-blocking
(< (.size pending) max-spout-pending)) (disruptor/consume-batch receive-queue event-handler)
(if-let [active? (wait-fn)] (if (or (not max-spout-pending)
(do (< (.size pending) max-spout-pending))
(when-not @last-active (if-let [active? (wait-fn)]
(reset! last-active true) (do
(log-message "Activating spout " component-id ":" (keys task-datas)) (when-not @last-active
(fast-list-iter [^ISpout spout spouts] (.activate spout))) (reset! last-active true)
(log-message "Activating spout " component-id ":" (keys task-datas))
(fast-list-iter [^ISpout spout spouts] (.activate spout)))


(fast-list-iter [^ISpout spout spouts] (.nextTuple spout))) (fast-list-iter [^ISpout spout spouts] (.nextTuple spout)))
(do (do
(when @last-active (when @last-active
(reset! last-active false) (reset! last-active false)
(log-message "Deactivating spout " component-id ":" (keys task-datas)) (log-message "Deactivating spout " component-id ":" (keys task-datas))
(fast-list-iter [^ISpout spout spouts] (.activate spout))) (fast-list-iter [^ISpout spout spouts] (.activate spout)))
;; TODO: log that it's getting throttled ;; TODO: log that it's getting throttled
(Time/sleep 100)))) (Time/sleep 100))))
0) 0))
:kill-fn (:report-error-and-die executor-data) :kill-fn (:report-error-and-die executor-data)
) :factory? true
;; TODO: need to start the consumer )]
]
)) ))


(defn- tuple-time-delta! [^TupleImpl tuple] (defn- tuple-time-delta! [^TupleImpl tuple]
Expand Down
2 changes: 2 additions & 0 deletions src/clj/backtype/storm/daemon/task.clj
Expand Up @@ -150,6 +150,8 @@
storm-conf (:storm-conf executor-data)] storm-conf (:storm-conf executor-data)]
(doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)] (doseq [klass (storm-conf TOPOLOGY-AUTO-TASK-HOOKS)]
(.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance))) (.addTaskHook ^TopologyContext (:user-context task-data) (-> klass Class/forName .newInstance)))
;; when this is called, the threads for the executor haven't been started yet,
;; so we won't be risking trampling on the single-threaded claim strategy disruptor queue
(send-unanchored task-data SYSTEM-STREAM-ID ["startup"]) (send-unanchored task-data SYSTEM-STREAM-ID ["startup"])
task-data task-data
)) ))
3 changes: 2 additions & 1 deletion src/clj/backtype/storm/disruptor.clj
Expand Up @@ -55,7 +55,8 @@
(fn [] (fn []
(consume-batch-when-available queue handler) (consume-batch-when-available queue handler)
0 ) 0 )
:kill-fn kill-fn)] :kill-fn kill-fn
)]
(consumer-started! queue) (consumer-started! queue)
ret ret
)) ))
Expand Down
1 change: 0 additions & 1 deletion src/jvm/backtype/storm/tuple/TupleImpl.java
@@ -1,7 +1,6 @@
package backtype.storm.tuple; package backtype.storm.tuple;


import backtype.storm.task.GeneralTopologyContext; import backtype.storm.task.GeneralTopologyContext;
import backtype.storm.task.TopologyContext;
import java.util.List; import java.util.List;


public class TupleImpl extends Tuple { public class TupleImpl extends Tuple {
Expand Down
10 changes: 8 additions & 2 deletions src/jvm/backtype/storm/utils/DisruptorQueue.java
Expand Up @@ -7,6 +7,7 @@
import com.lmax.disruptor.RingBuffer; import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.Sequence; import com.lmax.disruptor.Sequence;
import com.lmax.disruptor.SequenceBarrier; import com.lmax.disruptor.SequenceBarrier;
import com.lmax.disruptor.SingleThreadedClaimStrategy;
import com.lmax.disruptor.WaitStrategy; import com.lmax.disruptor.WaitStrategy;
import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentLinkedQueue;


Expand All @@ -32,6 +33,9 @@ public DisruptorQueue(ClaimStrategy claim, WaitStrategy wait) {
_consumer = new Sequence(); _consumer = new Sequence();
_barrier = _buffer.newBarrier(); _barrier = _buffer.newBarrier();
_buffer.setGatingSequences(_consumer); _buffer.setGatingSequences(_consumer);
if(claim instanceof SingleThreadedClaimStrategy) {
consumerStartedFlag = true;
}
} }


public void consumeBatch(EventHandler<Object> handler) { public void consumeBatch(EventHandler<Object> handler) {
Expand Down Expand Up @@ -95,8 +99,10 @@ public void publish(Object obj) {
} }


public void consumerStarted() { public void consumerStarted() {
consumerStartedFlag = true; if(!consumerStartedFlag) {
flushCache(); consumerStartedFlag = true;
flushCache();
}
} }


private void flushCache() { private void flushCache() {
Expand Down

0 comments on commit debb8da

Please sign in to comment.