Permalink
Browse files

Optimize sending tuples between tasks colocated in same worker

  • Loading branch information...
1 parent 35c70ed commit b0ddc4db8b18a6bfed2f044681975870c84cd0c0 @xumingming xumingming committed Apr 20, 2012
@@ -4,6 +4,7 @@
(:use [clojure.contrib.seq :only [positions]])
(:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap])
(:import [backtype.storm.hooks ITaskHook])
+ (:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo])
(:require [backtype.storm [tuple :as tuple]]))
@@ -157,8 +158,10 @@
(.getThisTaskId topology-context)
stream))))
-(defn mk-task [conf storm-conf topology-context user-context storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn]
+(defn mk-task [conf storm-conf topology-context user-context storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn
+ receive-queue]
(let [task-id (.getThisTaskId topology-context)
+ worker-port (.getThisWorkerPort topology-context)
component-id (.getThisComponentId topology-context)
storm-conf (component-conf storm-conf topology-context component-id)
_ (log-message "Loading task " component-id ":" task-id)
@@ -197,9 +200,7 @@
stream->component->grouper (outbound-components topology-context user-context)
component->tasks (reverse-map task-info)
- ;; important it binds to virtual port before function returns
- puller (msg/bind mq-context storm-id task-id)
-
+
;; TODO: consider DRYing things up and moving stats
task-readable-name (get-readable-name topology-context)
@@ -240,7 +241,7 @@
_ (send-unanchored topology-context tasks-fn transfer-fn SYSTEM-STREAM-ID ["startup"])
executor-threads (dofor
[exec (with-error-reaction report-error-and-die
- (mk-executors task-object storm-conf puller tasks-fn
+ (mk-executors task-object storm-conf receive-queue tasks-fn
transfer-fn
storm-active-atom topology-context
user-context task-stats report-error))]
@@ -255,16 +256,16 @@
[this]
(log-message "Shutting down task " storm-id ":" task-id)
(reset! active false)
- ;; empty messages are skip messages (this unblocks the socket)
- (msg/send-local-task-empty mq-context storm-id task-id)
+ ;; put an empty message into receive-queue
+ ;; empty messages are skip messages (this unblocks the receive-queue.take thread)
+ (.put receive-queue (byte-array []))
(doseq [t all-threads]
(.interrupt t)
(.join t))
(doseq [hook (.getHooks user-context)]
(.cleanup hook))
(.remove-task-heartbeat! storm-cluster-state storm-id task-id)
(.disconnect storm-cluster-state)
- (.close puller)
(close-component task-object)
(log-message "Shut down task " storm-id ":" task-id))
DaemonCommon
@@ -291,7 +292,7 @@
(stats/spout-acked-tuple! task-stats (:stream tuple-info) time-delta)
))
-(defmethod mk-executors ISpout [^ISpout spout storm-conf puller tasks-fn transfer-fn storm-active-atom
+(defmethod mk-executors ISpout [^ISpout spout storm-conf receive-queue tasks-fn transfer-fn storm-active-atom
^TopologyContext topology-context ^TopologyContext user-context
task-stats report-error-fn]
(let [wait-fn (fn [] @storm-active-atom)
@@ -379,10 +380,15 @@
(Time/sleep 100)))
))
(fn []
- (let [^bytes ser-msg (msg/recv puller)]
+ (let [msg (.take receive-queue)
+ is-ser-msg? (not (instance? Tuple msg))
+ is-empty-msg? (or (nil? msg) (and is-ser-msg? (empty? msg)))]
;; skip empty messages (used during shutdown)
- (when-not (empty? ser-msg)
- (let [tuple (.deserialize deserializer ser-msg)
+ (when-not is-empty-msg?
+ (let [tuple (if is-ser-msg?
+ (.deserialize deserializer msg)
+ msg)
+
id (.getValue tuple 0)
[spout-id tuple-finished-info start-time-ms] (.remove pending id)]
(when spout-id
@@ -406,7 +412,7 @@
;; TODO: this portion is not thread safe (multiple threads updating same value at same time)
(.put pending key (bit-xor curr id))))
-(defmethod mk-executors IBolt [^IBolt bolt storm-conf puller tasks-fn transfer-fn storm-active-atom
+(defmethod mk-executors IBolt [^IBolt bolt storm-conf receive-queue tasks-fn transfer-fn storm-active-atom
^TopologyContext topology-context ^TopologyContext user-context
task-stats report-error-fn]
(let [deserializer (KryoTupleDeserializer. storm-conf topology-context)
@@ -486,10 +492,14 @@
;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
;; or just timeout the sync messages that are coming in until full sync is hit from that task
;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
- (let [^bytes ser (msg/recv puller)]
- (when-not (empty? ser) ; skip empty messages (used during shutdown)
+ (let [msg (.take receive-queue)
+ is-ser-msg? (not (instance? Tuple msg))
+ is-empty-msg? (or (nil? msg) (and is-ser-msg? (empty? msg)))]
+ (when-not is-empty-msg? ; skip empty messages (used during shutdown)
(log-debug "Processing message")
- (let [tuple (.deserialize deserializer ser)]
+ (let [tuple (if is-ser-msg?
+ (.deserialize deserializer msg)
+ msg)]
;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
;; TODO: how to handle incremental updates as well as synchronizations at same time
;; TODO: need to version tuples somehow
@@ -53,11 +53,10 @@
(-> (reverse-map task->component) (select-keys components) vals)))
))
-(defn mk-transfer-fn [storm-conf context transfer-queue]
- (let [^KryoTupleSerializer serializer (KryoTupleSerializer. storm-conf context)]
- (fn [task ^Tuple tuple]
- (.put ^LinkedBlockingQueue transfer-queue [task (.serialize serializer tuple)])
- )))
+(defn mk-transfer-fn [transfer-queue]
+ (fn [task ^Tuple tuple]
+ (.put ^LinkedBlockingQueue transfer-queue [task tuple])
+ ))
;; TODO: should worker even take the storm-id as input? this should be
;; deducable from cluster state (by searching through assignments)
@@ -74,6 +73,7 @@
cluster-state (cluster/mk-distributed-cluster-state conf)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
task-ids (read-worker-task-ids storm-cluster-state storm-id supervisor-id port)
+ task-ids-set (set task-ids)
;; because in local mode, its not a separate
;; process. supervisor will register it in this case
_ (when (= :distributed (cluster-mode conf))
@@ -118,15 +118,19 @@
task->node+port (atom {})
transfer-queue (LinkedBlockingQueue.) ; possibly bound the size of it
+ receive-queue-map (apply merge (dofor [tid task-ids]
+ {tid (LinkedBlockingQueue.)}))
- transfer-fn (mk-transfer-fn storm-conf (mk-topology-context nil) transfer-queue)
+ ^KryoTupleSerializer serializer (KryoTupleSerializer. storm-conf (mk-topology-context nil))
+ transfer-fn (mk-transfer-fn transfer-queue)
refresh-connections (fn this
([]
(this (fn [& ignored] (.add event-manager this))))
([callback]
(let [assignment (.assignment-info storm-cluster-state storm-id callback)
my-assignment (select-keys (:task->node+port assignment) outbound-tasks)
- needed-connections (set (vals my-assignment))
+ ;; we dont need a connection for the local tasks anymore
+ needed-connections (set (vals (filter #(not (contains? task-ids-set (key %))) my-assignment)))
current-connections (set (keys @node+port->socket))
new-connections (set/difference needed-connections current-connections)
remove-connections (set/difference current-connections needed-connections)]
@@ -175,7 +179,7 @@
)
:priority Thread/MAX_PRIORITY)
suicide-fn (mk-suicide-fn conf active)
- tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn))
+ tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn (receive-queue-map tid)))
threads [(async-loop
(fn []
(.add event-manager refresh-connections)
@@ -190,24 +194,42 @@
(read-locked endpoint-socket-lock
(let [node+port->socket @node+port->socket
task->node+port @task->node+port]
- (doseq [[task ser-tuple] drainer]
- (let [socket (node+port->socket (task->node+port task))]
- (msg/send socket task ser-tuple)
- ))
- ))
+ (doseq [[task tuple] drainer]
+ ;; if its a local-task, add the tuple to its receive-queue directly
+ ;; otherwise, send it through the socket.
+ (if (contains? task-ids-set task)
+ (let [^LinkedBlockingQueue target-receive-queue (receive-queue-map task)]
+ (.put target-receive-queue tuple))
+ (let [socket (node+port->socket (task->node+port task))]
+ (msg/send socket task (.serialize serializer tuple))
+ ))
+ )))
(.clear drainer)
0 )
:args-fn (fn [] [(ArrayList.)]))
heartbeat-thread]
- virtual-port-shutdown (when (local-mode-zmq? conf)
- (log-message "Launching virtual port for " supervisor-id ":" port)
- (msg-loader/launch-virtual-port!
- (= (conf STORM-CLUSTER-MODE) "local")
- mq-context
- port
- :kill-fn (fn [t]
- (halt-process! 11))
- :valid-ports task-ids))
+ deserializer (KryoTupleDeserializer. storm-conf (mk-topology-context nil))
+ virtual-port-shutdown (if (local-mode-zmq? conf)
+ (do
+ (log-message "Launching virtual port for " supervisor-id ":" port)
+ (msg-loader/launch-virtual-port!
+ (= (conf STORM-CLUSTER-MODE) "local")
+ mq-context
+ port
+ receive-queue-map
+ :kill-fn (fn [t]
+ (halt-process! 11))
+ :valid-ports task-ids))
+ (do
+ (log-message "Launching FAKE virtual port")
+ (msg-loader/launch-fake-virtual-port!
+ mq-context
+ storm-id
+ port
+ receive-queue-map
+ deserializer)))
+
+
shutdown* (fn []
(log-message "Shutting down worker " storm-id " " supervisor-id " " port)
(reset! active false)
@@ -216,7 +238,7 @@
;; this will do best effort flushing since the linger period
;; was set on creation
(.close socket))
- (if virtual-port-shutdown (virtual-port-shutdown))
+ (virtual-port-shutdown)
(log-message "Terminating zmq context")
(msg/term mq-context)
(log-message "Disconnecting from storm cluster state context")
@@ -1,4 +1,5 @@
-(ns backtype.storm.messaging.loader
+(ns backtype.storm.messaging.loader
+ (:require [zilch.virtual-port :as mqvp])
(:require [backtype.storm.messaging.local :as local]))
(defn mk-local-context []
@@ -11,7 +12,7 @@
var-get)]
(apply afn args)))
-(defn launch-virtual-port! [local? context port & args]
+(defn launch-virtual-port! [local? context port receive-queue-map & args]
(require '[zilch.virtual-port :as mqvp])
(require '[backtype.storm.messaging.zmq :as zmq])
(let [afn (-> 'zilch.virtual-port/launch-virtual-port!
@@ -21,4 +22,14 @@
(str "ipc://" port ".ipc")
(str "tcp://*:" port))
]
- (apply afn (concat [(.zmq-context context) url] args))))
+ (apply afn (concat [(.zmq-context context) url receive-queue-map] args))))
+
+(defn launch-fake-virtual-port! [context storm-id port receive-queue-map deserializer]
+ (mqvp/launch-fake-virtual-port!
+ context
+ storm-id
+ port
+ receive-queue-map
+ deserializer))
+
+
@@ -11,31 +11,27 @@
(swap! queues-map assoc id (LinkedBlockingQueue.))))
(@queues-map id)))
-(deftype LocalConnection [storm-id queues-map lock queue]
+(deftype LocalConnection [storm-id port queues-map lock queue]
Connection
(recv [this]
(when-not queue
(throw (IllegalArgumentException. "Cannot receive on this socket")))
(.take queue))
(send [this task message]
- (let [send-queue (add-queue! queues-map lock storm-id task)]
- (.put send-queue message)
+ (let [send-queue (add-queue! queues-map lock storm-id port)]
+ (.put send-queue [task message])
))
(close [this]
))
(deftype LocalContext [queues-map lock]
Context
- (bind [this storm-id virtual-port]
- (LocalConnection. storm-id queues-map lock (add-queue! queues-map lock storm-id virtual-port)))
+ (bind [this storm-id port]
+ (LocalConnection. storm-id port queues-map lock (add-queue! queues-map lock storm-id port)))
(connect [this storm-id host port]
- (LocalConnection. storm-id queues-map lock nil)
+ (LocalConnection. storm-id port queues-map lock nil)
)
- (send-local-task-empty [this storm-id virtual-port]
- (let [queue (add-queue! queues-map lock storm-id virtual-port)]
- (.put queue (byte-array []))
- ))
(term [this]
))
@@ -9,9 +9,8 @@
)
(defprotocol Context
- (bind [context storm-id virtual-port])
+ (bind [context storm-id port])
(connect [context storm-id host port])
- (send-local-task-empty [context storm-id virtual-port])
(term [context])
)
@@ -19,10 +19,10 @@
(deftype ZMQContext [context linger-ms ipc?]
Context
- (bind [this storm-id virtual-port]
+ (bind [this storm-id port]
(-> context
(mq/socket mq/pull)
- (mqvp/virtual-bind virtual-port)
+ (mqvp/virtual-bind port)
(ZMQConnection.)
))
(connect [this storm-id host port]
@@ -34,10 +34,6 @@
(mq/set-linger linger-ms)
(mq/connect url)
(ZMQConnection.))))
- (send-local-task-empty [this storm-id virtual-port]
- (let [pusher (-> context (mq/socket mq/push) (mqvp/virtual-connect virtual-port))]
- (mq/send pusher (mq/barr))
- (.close pusher)))
(term [this]
(.term context))
ZMQContextQuery
Oops, something went wrong.

0 comments on commit b0ddc4d

Please sign in to comment.