Skip to content

Commit

Permalink
Optimize sending tuples between tasks colocated in same worker
Browse files Browse the repository at this point in the history
  • Loading branch information
xumingming committed Apr 20, 2012
1 parent 35c70ed commit b0ddc4d
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 82 deletions.
42 changes: 26 additions & 16 deletions src/clj/backtype/storm/daemon/task.clj
Expand Up @@ -4,6 +4,7 @@
(:use [clojure.contrib.seq :only [positions]])
(:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap])
(:import [backtype.storm.hooks ITaskHook])
(:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo])
(:require [backtype.storm [tuple :as tuple]]))
Expand Down Expand Up @@ -157,8 +158,10 @@
(.getThisTaskId topology-context)
stream))))

(defn mk-task [conf storm-conf topology-context user-context storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn]
(defn mk-task [conf storm-conf topology-context user-context storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn
receive-queue]
(let [task-id (.getThisTaskId topology-context)
worker-port (.getThisWorkerPort topology-context)
component-id (.getThisComponentId topology-context)
storm-conf (component-conf storm-conf topology-context component-id)
_ (log-message "Loading task " component-id ":" task-id)
Expand Down Expand Up @@ -197,9 +200,7 @@

stream->component->grouper (outbound-components topology-context user-context)
component->tasks (reverse-map task-info)
;; important it binds to virtual port before function returns
puller (msg/bind mq-context storm-id task-id)


;; TODO: consider DRYing things up and moving stats
task-readable-name (get-readable-name topology-context)

Expand Down Expand Up @@ -240,7 +241,7 @@
_ (send-unanchored topology-context tasks-fn transfer-fn SYSTEM-STREAM-ID ["startup"])
executor-threads (dofor
[exec (with-error-reaction report-error-and-die
(mk-executors task-object storm-conf puller tasks-fn
(mk-executors task-object storm-conf receive-queue tasks-fn
transfer-fn
storm-active-atom topology-context
user-context task-stats report-error))]
Expand All @@ -255,16 +256,16 @@
[this]
(log-message "Shutting down task " storm-id ":" task-id)
(reset! active false)
;; empty messages are skip messages (this unblocks the socket)
(msg/send-local-task-empty mq-context storm-id task-id)
;; put an empty message into receive-queue
;; empty messages are skip messages (this unblocks the receive-queue.take thread)
(.put receive-queue (byte-array []))
(doseq [t all-threads]
(.interrupt t)
(.join t))
(doseq [hook (.getHooks user-context)]
(.cleanup hook))
(.remove-task-heartbeat! storm-cluster-state storm-id task-id)
(.disconnect storm-cluster-state)
(.close puller)
(close-component task-object)
(log-message "Shut down task " storm-id ":" task-id))
DaemonCommon
Expand All @@ -291,7 +292,7 @@
(stats/spout-acked-tuple! task-stats (:stream tuple-info) time-delta)
))

(defmethod mk-executors ISpout [^ISpout spout storm-conf puller tasks-fn transfer-fn storm-active-atom
(defmethod mk-executors ISpout [^ISpout spout storm-conf receive-queue tasks-fn transfer-fn storm-active-atom
^TopologyContext topology-context ^TopologyContext user-context
task-stats report-error-fn]
(let [wait-fn (fn [] @storm-active-atom)
Expand Down Expand Up @@ -379,10 +380,15 @@
(Time/sleep 100)))
))
(fn []
(let [^bytes ser-msg (msg/recv puller)]
(let [msg (.take receive-queue)
is-ser-msg? (not (instance? Tuple msg))
is-empty-msg? (or (nil? msg) (and is-ser-msg? (empty? msg)))]
;; skip empty messages (used during shutdown)
(when-not (empty? ser-msg)
(let [tuple (.deserialize deserializer ser-msg)
(when-not is-empty-msg?
(let [tuple (if is-ser-msg?
(.deserialize deserializer msg)
msg)

id (.getValue tuple 0)
[spout-id tuple-finished-info start-time-ms] (.remove pending id)]
(when spout-id
Expand All @@ -406,7 +412,7 @@
;; TODO: this portion is not thread safe (multiple threads updating same value at same time)
(.put pending key (bit-xor curr id))))

(defmethod mk-executors IBolt [^IBolt bolt storm-conf puller tasks-fn transfer-fn storm-active-atom
(defmethod mk-executors IBolt [^IBolt bolt storm-conf receive-queue tasks-fn transfer-fn storm-active-atom
^TopologyContext topology-context ^TopologyContext user-context
task-stats report-error-fn]
(let [deserializer (KryoTupleDeserializer. storm-conf topology-context)
Expand Down Expand Up @@ -486,10 +492,14 @@
;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
;; or just timeout the sync messages that are coming in until full sync is hit from that task
;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
(let [^bytes ser (msg/recv puller)]
(when-not (empty? ser) ; skip empty messages (used during shutdown)
(let [msg (.take receive-queue)
is-ser-msg? (not (instance? Tuple msg))
is-empty-msg? (or (nil? msg) (and is-ser-msg? (empty? msg)))]
(when-not is-empty-msg? ; skip empty messages (used during shutdown)
(log-debug "Processing message")
(let [tuple (.deserialize deserializer ser)]
(let [tuple (if is-ser-msg?
(.deserialize deserializer msg)
msg)]
;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
;; TODO: how to handle incremental updates as well as synchronizations at same time
;; TODO: need to version tuples somehow
Expand Down
68 changes: 45 additions & 23 deletions src/clj/backtype/storm/daemon/worker.clj
Expand Up @@ -53,11 +53,10 @@
(-> (reverse-map task->component) (select-keys components) vals)))
))

(defn mk-transfer-fn [storm-conf context transfer-queue]
(let [^KryoTupleSerializer serializer (KryoTupleSerializer. storm-conf context)]
(fn [task ^Tuple tuple]
(.put ^LinkedBlockingQueue transfer-queue [task (.serialize serializer tuple)])
)))
(defn mk-transfer-fn [transfer-queue]
(fn [task ^Tuple tuple]
(.put ^LinkedBlockingQueue transfer-queue [task tuple])
))

;; TODO: should worker even take the storm-id as input? this should be
;; deducable from cluster state (by searching through assignments)
Expand All @@ -74,6 +73,7 @@
cluster-state (cluster/mk-distributed-cluster-state conf)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
task-ids (read-worker-task-ids storm-cluster-state storm-id supervisor-id port)
task-ids-set (set task-ids)
;; because in local mode, its not a separate
;; process. supervisor will register it in this case
_ (when (= :distributed (cluster-mode conf))
Expand Down Expand Up @@ -118,15 +118,19 @@
task->node+port (atom {})

transfer-queue (LinkedBlockingQueue.) ; possibly bound the size of it
receive-queue-map (apply merge (dofor [tid task-ids]
{tid (LinkedBlockingQueue.)}))

transfer-fn (mk-transfer-fn storm-conf (mk-topology-context nil) transfer-queue)
^KryoTupleSerializer serializer (KryoTupleSerializer. storm-conf (mk-topology-context nil))
transfer-fn (mk-transfer-fn transfer-queue)
refresh-connections (fn this
([]
(this (fn [& ignored] (.add event-manager this))))
([callback]
(let [assignment (.assignment-info storm-cluster-state storm-id callback)
my-assignment (select-keys (:task->node+port assignment) outbound-tasks)
needed-connections (set (vals my-assignment))
;; we dont need a connection for the local tasks anymore
needed-connections (set (vals (filter #(not (contains? task-ids-set (key %))) my-assignment)))
current-connections (set (keys @node+port->socket))
new-connections (set/difference needed-connections current-connections)
remove-connections (set/difference current-connections needed-connections)]
Expand Down Expand Up @@ -175,7 +179,7 @@
)
:priority Thread/MAX_PRIORITY)
suicide-fn (mk-suicide-fn conf active)
tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn))
tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn (receive-queue-map tid)))
threads [(async-loop
(fn []
(.add event-manager refresh-connections)
Expand All @@ -190,24 +194,42 @@
(read-locked endpoint-socket-lock
(let [node+port->socket @node+port->socket
task->node+port @task->node+port]
(doseq [[task ser-tuple] drainer]
(let [socket (node+port->socket (task->node+port task))]
(msg/send socket task ser-tuple)
))
))
(doseq [[task tuple] drainer]
;; if its a local-task, add the tuple to its receive-queue directly
;; otherwise, send it through the socket.
(if (contains? task-ids-set task)
(let [^LinkedBlockingQueue target-receive-queue (receive-queue-map task)]
(.put target-receive-queue tuple))
(let [socket (node+port->socket (task->node+port task))]
(msg/send socket task (.serialize serializer tuple))
))
)))
(.clear drainer)
0 )
:args-fn (fn [] [(ArrayList.)]))
heartbeat-thread]
virtual-port-shutdown (when (local-mode-zmq? conf)
(log-message "Launching virtual port for " supervisor-id ":" port)
(msg-loader/launch-virtual-port!
(= (conf STORM-CLUSTER-MODE) "local")
mq-context
port
:kill-fn (fn [t]
(halt-process! 11))
:valid-ports task-ids))
deserializer (KryoTupleDeserializer. storm-conf (mk-topology-context nil))
virtual-port-shutdown (if (local-mode-zmq? conf)
(do
(log-message "Launching virtual port for " supervisor-id ":" port)
(msg-loader/launch-virtual-port!
(= (conf STORM-CLUSTER-MODE) "local")
mq-context
port
receive-queue-map
:kill-fn (fn [t]
(halt-process! 11))
:valid-ports task-ids))
(do
(log-message "Launching FAKE virtual port")
(msg-loader/launch-fake-virtual-port!
mq-context
storm-id
port
receive-queue-map
deserializer)))


shutdown* (fn []
(log-message "Shutting down worker " storm-id " " supervisor-id " " port)
(reset! active false)
Expand All @@ -216,7 +238,7 @@
;; this will do best effort flushing since the linger period
;; was set on creation
(.close socket))
(if virtual-port-shutdown (virtual-port-shutdown))
(virtual-port-shutdown)
(log-message "Terminating zmq context")
(msg/term mq-context)
(log-message "Disconnecting from storm cluster state context")
Expand Down
17 changes: 14 additions & 3 deletions src/clj/backtype/storm/messaging/loader.clj
@@ -1,4 +1,5 @@
(ns backtype.storm.messaging.loader
(ns backtype.storm.messaging.loader
(:require [zilch.virtual-port :as mqvp])
(:require [backtype.storm.messaging.local :as local]))

(defn mk-local-context []
Expand All @@ -11,7 +12,7 @@
var-get)]
(apply afn args)))

(defn launch-virtual-port! [local? context port & args]
(defn launch-virtual-port! [local? context port receive-queue-map & args]
(require '[zilch.virtual-port :as mqvp])
(require '[backtype.storm.messaging.zmq :as zmq])
(let [afn (-> 'zilch.virtual-port/launch-virtual-port!
Expand All @@ -21,4 +22,14 @@
(str "ipc://" port ".ipc")
(str "tcp://*:" port))
]
(apply afn (concat [(.zmq-context context) url] args))))
(apply afn (concat [(.zmq-context context) url receive-queue-map] args))))

(defn launch-fake-virtual-port! [context storm-id port receive-queue-map deserializer]
(mqvp/launch-fake-virtual-port!
context
storm-id
port
receive-queue-map
deserializer))


16 changes: 6 additions & 10 deletions src/clj/backtype/storm/messaging/local.clj
Expand Up @@ -11,31 +11,27 @@
(swap! queues-map assoc id (LinkedBlockingQueue.))))
(@queues-map id)))

(deftype LocalConnection [storm-id queues-map lock queue]
(deftype LocalConnection [storm-id port queues-map lock queue]
Connection
(recv [this]
(when-not queue
(throw (IllegalArgumentException. "Cannot receive on this socket")))
(.take queue))
(send [this task message]
(let [send-queue (add-queue! queues-map lock storm-id task)]
(.put send-queue message)
(let [send-queue (add-queue! queues-map lock storm-id port)]
(.put send-queue [task message])
))
(close [this]
))


(deftype LocalContext [queues-map lock]
Context
(bind [this storm-id virtual-port]
(LocalConnection. storm-id queues-map lock (add-queue! queues-map lock storm-id virtual-port)))
(bind [this storm-id port]
(LocalConnection. storm-id port queues-map lock (add-queue! queues-map lock storm-id port)))
(connect [this storm-id host port]
(LocalConnection. storm-id queues-map lock nil)
(LocalConnection. storm-id port queues-map lock nil)
)
(send-local-task-empty [this storm-id virtual-port]
(let [queue (add-queue! queues-map lock storm-id virtual-port)]
(.put queue (byte-array []))
))
(term [this]
))

Expand Down
3 changes: 1 addition & 2 deletions src/clj/backtype/storm/messaging/protocol.clj
Expand Up @@ -9,9 +9,8 @@
)

(defprotocol Context
(bind [context storm-id virtual-port])
(bind [context storm-id port])
(connect [context storm-id host port])
(send-local-task-empty [context storm-id virtual-port])
(term [context])
)

8 changes: 2 additions & 6 deletions src/clj/backtype/storm/messaging/zmq.clj
Expand Up @@ -19,10 +19,10 @@

(deftype ZMQContext [context linger-ms ipc?]
Context
(bind [this storm-id virtual-port]
(bind [this storm-id port]
(-> context
(mq/socket mq/pull)
(mqvp/virtual-bind virtual-port)
(mqvp/virtual-bind port)
(ZMQConnection.)
))
(connect [this storm-id host port]
Expand All @@ -34,10 +34,6 @@
(mq/set-linger linger-ms)
(mq/connect url)
(ZMQConnection.))))
(send-local-task-empty [this storm-id virtual-port]
(let [pusher (-> context (mq/socket mq/push) (mqvp/virtual-connect virtual-port))]
(mq/send pusher (mq/barr))
(.close pusher)))
(term [this]
(.term context))
ZMQContextQuery
Expand Down

0 comments on commit b0ddc4d

Please sign in to comment.