From b0ddc4db8b18a6bfed2f044681975870c84cd0c0 Mon Sep 17 00:00:00 2001 From: James Xu Date: Sat, 21 Apr 2012 01:45:57 +0800 Subject: [PATCH] Optimize sending tuples between tasks colocated in same worker --- src/clj/backtype/storm/daemon/task.clj | 42 +++++++----- src/clj/backtype/storm/daemon/worker.clj | 68 ++++++++++++------- src/clj/backtype/storm/messaging/loader.clj | 17 ++++- src/clj/backtype/storm/messaging/local.clj | 16 ++--- src/clj/backtype/storm/messaging/protocol.clj | 3 +- src/clj/backtype/storm/messaging/zmq.clj | 8 +-- src/clj/zilch/virtual_port.clj | 61 +++++++++++------ 7 files changed, 133 insertions(+), 82 deletions(-) diff --git a/src/clj/backtype/storm/daemon/task.clj b/src/clj/backtype/storm/daemon/task.clj index 522ca551f..7c2bf5f09 100644 --- a/src/clj/backtype/storm/daemon/task.clj +++ b/src/clj/backtype/storm/daemon/task.clj @@ -4,6 +4,7 @@ (:use [clojure.contrib.seq :only [positions]]) (:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap]) (:import [backtype.storm.hooks ITaskHook]) + (:import [backtype.storm.tuple Tuple]) (:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo EmitInfo BoltFailInfo BoltAckInfo]) (:require [backtype.storm [tuple :as tuple]])) @@ -157,8 +158,10 @@ (.getThisTaskId topology-context) stream)))) -(defn mk-task [conf storm-conf topology-context user-context storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn] +(defn mk-task [conf storm-conf topology-context user-context storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn + receive-queue] (let [task-id (.getThisTaskId topology-context) + worker-port (.getThisWorkerPort topology-context) component-id (.getThisComponentId topology-context) storm-conf (component-conf storm-conf topology-context component-id) _ (log-message "Loading task " component-id ":" task-id) @@ -197,9 +200,7 @@ stream->component->grouper (outbound-components topology-context user-context) component->tasks (reverse-map task-info) - ;; important it binds to virtual port before function returns - puller (msg/bind mq-context storm-id task-id) - + ;; TODO: consider DRYing things up and moving stats task-readable-name (get-readable-name topology-context) @@ -240,7 +241,7 @@ _ (send-unanchored topology-context tasks-fn transfer-fn SYSTEM-STREAM-ID ["startup"]) executor-threads (dofor [exec (with-error-reaction report-error-and-die - (mk-executors task-object storm-conf puller tasks-fn + (mk-executors task-object storm-conf receive-queue tasks-fn transfer-fn storm-active-atom topology-context user-context task-stats report-error))] @@ -255,8 +256,9 @@ [this] (log-message "Shutting down task " storm-id ":" task-id) (reset! active false) - ;; empty messages are skip messages (this unblocks the socket) - (msg/send-local-task-empty mq-context storm-id task-id) + ;; put an empty message into receive-queue + ;; empty messages are skip messages (this unblocks the receive-queue.take thread) + (.put receive-queue (byte-array [])) (doseq [t all-threads] (.interrupt t) (.join t)) @@ -264,7 +266,6 @@ (.cleanup hook)) (.remove-task-heartbeat! storm-cluster-state storm-id task-id) (.disconnect storm-cluster-state) - (.close puller) (close-component task-object) (log-message "Shut down task " storm-id ":" task-id)) DaemonCommon @@ -291,7 +292,7 @@ (stats/spout-acked-tuple! task-stats (:stream tuple-info) time-delta) )) -(defmethod mk-executors ISpout [^ISpout spout storm-conf puller tasks-fn transfer-fn storm-active-atom +(defmethod mk-executors ISpout [^ISpout spout storm-conf receive-queue tasks-fn transfer-fn storm-active-atom ^TopologyContext topology-context ^TopologyContext user-context task-stats report-error-fn] (let [wait-fn (fn [] @storm-active-atom) @@ -379,10 +380,15 @@ (Time/sleep 100))) )) (fn [] - (let [^bytes ser-msg (msg/recv puller)] + (let [msg (.take receive-queue) + is-ser-msg? (not (instance? Tuple msg)) + is-empty-msg? (or (nil? msg) (and is-ser-msg? (empty? msg)))] ;; skip empty messages (used during shutdown) - (when-not (empty? ser-msg) - (let [tuple (.deserialize deserializer ser-msg) + (when-not is-empty-msg? + (let [tuple (if is-ser-msg? + (.deserialize deserializer msg) + msg) + id (.getValue tuple 0) [spout-id tuple-finished-info start-time-ms] (.remove pending id)] (when spout-id @@ -406,7 +412,7 @@ ;; TODO: this portion is not thread safe (multiple threads updating same value at same time) (.put pending key (bit-xor curr id)))) -(defmethod mk-executors IBolt [^IBolt bolt storm-conf puller tasks-fn transfer-fn storm-active-atom +(defmethod mk-executors IBolt [^IBolt bolt storm-conf receive-queue tasks-fn transfer-fn storm-active-atom ^TopologyContext topology-context ^TopologyContext user-context task-stats report-error-fn] (let [deserializer (KryoTupleDeserializer. storm-conf topology-context) @@ -486,10 +492,14 @@ ;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests ;; or just timeout the sync messages that are coming in until full sync is hit from that task ;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates - (let [^bytes ser (msg/recv puller)] - (when-not (empty? ser) ; skip empty messages (used during shutdown) + (let [msg (.take receive-queue) + is-ser-msg? (not (instance? Tuple msg)) + is-empty-msg? (or (nil? msg) (and is-ser-msg? (empty? msg)))] + (when-not is-empty-msg? ; skip empty messages (used during shutdown) (log-debug "Processing message") - (let [tuple (.deserialize deserializer ser)] + (let [tuple (if is-ser-msg? + (.deserialize deserializer msg) + msg)] ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state ;; TODO: how to handle incremental updates as well as synchronizations at same time ;; TODO: need to version tuples somehow diff --git a/src/clj/backtype/storm/daemon/worker.clj b/src/clj/backtype/storm/daemon/worker.clj index b99e43d34..066780802 100644 --- a/src/clj/backtype/storm/daemon/worker.clj +++ b/src/clj/backtype/storm/daemon/worker.clj @@ -53,11 +53,10 @@ (-> (reverse-map task->component) (select-keys components) vals))) )) -(defn mk-transfer-fn [storm-conf context transfer-queue] - (let [^KryoTupleSerializer serializer (KryoTupleSerializer. storm-conf context)] - (fn [task ^Tuple tuple] - (.put ^LinkedBlockingQueue transfer-queue [task (.serialize serializer tuple)]) - ))) +(defn mk-transfer-fn [transfer-queue] + (fn [task ^Tuple tuple] + (.put ^LinkedBlockingQueue transfer-queue [task tuple]) + )) ;; TODO: should worker even take the storm-id as input? this should be ;; deducable from cluster state (by searching through assignments) @@ -74,6 +73,7 @@ cluster-state (cluster/mk-distributed-cluster-state conf) storm-cluster-state (cluster/mk-storm-cluster-state cluster-state) task-ids (read-worker-task-ids storm-cluster-state storm-id supervisor-id port) + task-ids-set (set task-ids) ;; because in local mode, its not a separate ;; process. supervisor will register it in this case _ (when (= :distributed (cluster-mode conf)) @@ -118,15 +118,19 @@ task->node+port (atom {}) transfer-queue (LinkedBlockingQueue.) ; possibly bound the size of it + receive-queue-map (apply merge (dofor [tid task-ids] + {tid (LinkedBlockingQueue.)})) - transfer-fn (mk-transfer-fn storm-conf (mk-topology-context nil) transfer-queue) + ^KryoTupleSerializer serializer (KryoTupleSerializer. storm-conf (mk-topology-context nil)) + transfer-fn (mk-transfer-fn transfer-queue) refresh-connections (fn this ([] (this (fn [& ignored] (.add event-manager this)))) ([callback] (let [assignment (.assignment-info storm-cluster-state storm-id callback) my-assignment (select-keys (:task->node+port assignment) outbound-tasks) - needed-connections (set (vals my-assignment)) + ;; we dont need a connection for the local tasks anymore + needed-connections (set (vals (filter #(not (contains? task-ids-set (key %))) my-assignment))) current-connections (set (keys @node+port->socket)) new-connections (set/difference needed-connections current-connections) remove-connections (set/difference current-connections needed-connections)] @@ -175,7 +179,7 @@ ) :priority Thread/MAX_PRIORITY) suicide-fn (mk-suicide-fn conf active) - tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn)) + tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn (receive-queue-map tid))) threads [(async-loop (fn [] (.add event-manager refresh-connections) @@ -190,24 +194,42 @@ (read-locked endpoint-socket-lock (let [node+port->socket @node+port->socket task->node+port @task->node+port] - (doseq [[task ser-tuple] drainer] - (let [socket (node+port->socket (task->node+port task))] - (msg/send socket task ser-tuple) - )) - )) + (doseq [[task tuple] drainer] + ;; if its a local-task, add the tuple to its receive-queue directly + ;; otherwise, send it through the socket. + (if (contains? task-ids-set task) + (let [^LinkedBlockingQueue target-receive-queue (receive-queue-map task)] + (.put target-receive-queue tuple)) + (let [socket (node+port->socket (task->node+port task))] + (msg/send socket task (.serialize serializer tuple)) + )) + ))) (.clear drainer) 0 ) :args-fn (fn [] [(ArrayList.)])) heartbeat-thread] - virtual-port-shutdown (when (local-mode-zmq? conf) - (log-message "Launching virtual port for " supervisor-id ":" port) - (msg-loader/launch-virtual-port! - (= (conf STORM-CLUSTER-MODE) "local") - mq-context - port - :kill-fn (fn [t] - (halt-process! 11)) - :valid-ports task-ids)) + deserializer (KryoTupleDeserializer. storm-conf (mk-topology-context nil)) + virtual-port-shutdown (if (local-mode-zmq? conf) + (do + (log-message "Launching virtual port for " supervisor-id ":" port) + (msg-loader/launch-virtual-port! + (= (conf STORM-CLUSTER-MODE) "local") + mq-context + port + receive-queue-map + :kill-fn (fn [t] + (halt-process! 11)) + :valid-ports task-ids)) + (do + (log-message "Launching FAKE virtual port") + (msg-loader/launch-fake-virtual-port! + mq-context + storm-id + port + receive-queue-map + deserializer))) + + shutdown* (fn [] (log-message "Shutting down worker " storm-id " " supervisor-id " " port) (reset! active false) @@ -216,7 +238,7 @@ ;; this will do best effort flushing since the linger period ;; was set on creation (.close socket)) - (if virtual-port-shutdown (virtual-port-shutdown)) + (virtual-port-shutdown) (log-message "Terminating zmq context") (msg/term mq-context) (log-message "Disconnecting from storm cluster state context") diff --git a/src/clj/backtype/storm/messaging/loader.clj b/src/clj/backtype/storm/messaging/loader.clj index 82a0dbe5b..2fef1645c 100644 --- a/src/clj/backtype/storm/messaging/loader.clj +++ b/src/clj/backtype/storm/messaging/loader.clj @@ -1,4 +1,5 @@ -(ns backtype.storm.messaging.loader +(ns backtype.storm.messaging.loader + (:require [zilch.virtual-port :as mqvp]) (:require [backtype.storm.messaging.local :as local])) (defn mk-local-context [] @@ -11,7 +12,7 @@ var-get)] (apply afn args))) -(defn launch-virtual-port! [local? context port & args] +(defn launch-virtual-port! [local? context port receive-queue-map & args] (require '[zilch.virtual-port :as mqvp]) (require '[backtype.storm.messaging.zmq :as zmq]) (let [afn (-> 'zilch.virtual-port/launch-virtual-port! @@ -21,4 +22,14 @@ (str "ipc://" port ".ipc") (str "tcp://*:" port)) ] - (apply afn (concat [(.zmq-context context) url] args)))) + (apply afn (concat [(.zmq-context context) url receive-queue-map] args)))) + +(defn launch-fake-virtual-port! [context storm-id port receive-queue-map deserializer] + (mqvp/launch-fake-virtual-port! + context + storm-id + port + receive-queue-map + deserializer)) + + diff --git a/src/clj/backtype/storm/messaging/local.clj b/src/clj/backtype/storm/messaging/local.clj index 0d13cb8b6..f01c9307d 100644 --- a/src/clj/backtype/storm/messaging/local.clj +++ b/src/clj/backtype/storm/messaging/local.clj @@ -11,15 +11,15 @@ (swap! queues-map assoc id (LinkedBlockingQueue.)))) (@queues-map id))) -(deftype LocalConnection [storm-id queues-map lock queue] +(deftype LocalConnection [storm-id port queues-map lock queue] Connection (recv [this] (when-not queue (throw (IllegalArgumentException. "Cannot receive on this socket"))) (.take queue)) (send [this task message] - (let [send-queue (add-queue! queues-map lock storm-id task)] - (.put send-queue message) + (let [send-queue (add-queue! queues-map lock storm-id port)] + (.put send-queue [task message]) )) (close [this] )) @@ -27,15 +27,11 @@ (deftype LocalContext [queues-map lock] Context - (bind [this storm-id virtual-port] - (LocalConnection. storm-id queues-map lock (add-queue! queues-map lock storm-id virtual-port))) + (bind [this storm-id port] + (LocalConnection. storm-id port queues-map lock (add-queue! queues-map lock storm-id port))) (connect [this storm-id host port] - (LocalConnection. storm-id queues-map lock nil) + (LocalConnection. storm-id port queues-map lock nil) ) - (send-local-task-empty [this storm-id virtual-port] - (let [queue (add-queue! queues-map lock storm-id virtual-port)] - (.put queue (byte-array [])) - )) (term [this] )) diff --git a/src/clj/backtype/storm/messaging/protocol.clj b/src/clj/backtype/storm/messaging/protocol.clj index fb4c6763c..191bb9ad4 100644 --- a/src/clj/backtype/storm/messaging/protocol.clj +++ b/src/clj/backtype/storm/messaging/protocol.clj @@ -9,9 +9,8 @@ ) (defprotocol Context - (bind [context storm-id virtual-port]) + (bind [context storm-id port]) (connect [context storm-id host port]) - (send-local-task-empty [context storm-id virtual-port]) (term [context]) ) diff --git a/src/clj/backtype/storm/messaging/zmq.clj b/src/clj/backtype/storm/messaging/zmq.clj index 391bd60d1..8b561b5aa 100644 --- a/src/clj/backtype/storm/messaging/zmq.clj +++ b/src/clj/backtype/storm/messaging/zmq.clj @@ -19,10 +19,10 @@ (deftype ZMQContext [context linger-ms ipc?] Context - (bind [this storm-id virtual-port] + (bind [this storm-id port] (-> context (mq/socket mq/pull) - (mqvp/virtual-bind virtual-port) + (mqvp/virtual-bind port) (ZMQConnection.) )) (connect [this storm-id host port] @@ -34,10 +34,6 @@ (mq/set-linger linger-ms) (mq/connect url) (ZMQConnection.)))) - (send-local-task-empty [this storm-id virtual-port] - (let [pusher (-> context (mq/socket mq/push) (mqvp/virtual-connect virtual-port))] - (mq/send pusher (mq/barr)) - (.close pusher))) (term [this] (.term context)) ZMQContextQuery diff --git a/src/clj/zilch/virtual_port.clj b/src/clj/zilch/virtual_port.clj index c469eca52..fbf5e6e8b 100644 --- a/src/clj/zilch/virtual_port.clj +++ b/src/clj/zilch/virtual_port.clj @@ -1,10 +1,12 @@ (ns zilch.virtual-port (:use [clojure.contrib.def :only [defnk]]) (:use [backtype.storm util log]) + (:use [backtype.storm.bootstrap]) (:require [zilch [mq :as mq]]) (:import [java.nio ByteBuffer]) - (:import [java.util.concurrent Semaphore])) + (:import [java.util.concurrent Semaphore LinkedBlockingQueue])) +(bootstrap) (mq/zeromq-imports) (defn mk-packet [virtual-port ^bytes message] @@ -25,21 +27,6 @@ (defn virtual-url [port] (str "inproc://" port)) -(defn- get-virtual-socket! [context mapping-atom port] - (when-not (contains? @mapping-atom port) - (log-message "Connecting to virtual port " port) - (swap! mapping-atom - assoc - port - (-> context (mq/socket mq/push) (mq/connect (virtual-url port))) - )) - (@mapping-atom port)) - -(defn close-virtual-sockets! [mapping-atom] - (doseq [[_ virtual-socket] @mapping-atom] - (.close virtual-socket)) - (reset! mapping-atom {})) - (defn virtual-send ([^ZMQ$Socket socket virtual-port ^bytes message flags] (mq/send socket (mk-packet virtual-port message) flags)) @@ -47,29 +34,30 @@ (virtual-send socket virtual-port message ZMQ/NOBLOCK))) (defnk launch-virtual-port! - [context url :daemon true + [context url receive-queue-map + :daemon true :kill-fn (fn [t] (System/exit 1)) :priority Thread/NORM_PRIORITY :valid-ports nil] (let [valid-ports (set (map short valid-ports)) vthread (async-loop - (fn [^ZMQ$Socket socket virtual-mapping] + (fn [^ZMQ$Socket socket receive-queue-map] (let [[port msg] (parse-packet (mq/recv socket))] (if (= port -1) (do (log-message "Virtual port " url " received shutdown notice") - (close-virtual-sockets! virtual-mapping) (.close socket) nil ) (if (or (nil? valid-ports) (contains? valid-ports port)) - (let [^ZMQ$Socket virtual-socket (get-virtual-socket! context virtual-mapping port)] + (let [port (int port) + ^LinkedBlockingQueue receive-queue (receive-queue-map port)] ;; TODO: probably need to handle multi-part messages here or something - (mq/send virtual-socket msg) + (.put receive-queue msg) 0 ) (log-message "Received invalid message directed at port " port ". Dropping...") )))) - :args-fn (fn [] [(-> context (mq/socket mq/pull) (mq/bind url)) (atom {})]) + :args-fn (fn [] [(-> context (mq/socket mq/pull) (mq/bind url)) receive-queue-map]) :daemon daemon :kill-fn kill-fn :priority priority)] @@ -85,6 +73,35 @@ (log-message "Shutdown virtual port at url: " url) )))) +(defn launch-fake-virtual-port! [context storm-id port receive-queue-map deserializer] + (let [socket (-> context (msg/bind storm-id port)) + vthread (async-loop + (fn [] + (let [[port ser-msg] (msg/recv socket) + msg (if (nil? ser-msg) + nil + (.deserialize deserializer ser-msg)) + ] + (if (= (int port) -1) + (do + (log-message "FAKE virtual port received shutdown notice") + (.close socket) + nil ) + (let [port (int port) + receive-queue (receive-queue-map port)] + (.put receive-queue msg) + 0 + )))) + )] + (fn [] + (let [kill-socket (-> context (msg/connect storm-id nil port))] + (log-message "Shutting down FAKE virtual port") + (msg/send kill-socket -1 nil) + (.close kill-socket) + (log-message "Waiting for FAKE virtual port to die") + (.join vthread) + (log-message "Shutdown FAKE virtual port") + )))) (defn virtual-bind [^ZMQ$Socket socket virtual-port] (mq/bind socket (virtual-url virtual-port))