Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

merge master

  • Loading branch information...
commit d99f13d0ac93feeea1cb53c8eebc9a4009e51b75 2 parents b7d7962 + a942da6
@nathanmarz authored
View
6 CHANGELOG.md
@@ -1,5 +1,8 @@
## Unreleased
+NOTE: The change from 0.7.0 in which OutputCollector no longer assumes immutable inputs has been reverted to support optimized sending of tuples to colocated tasks
+
+ * Messages sent to colocated tasks are sent in-memory, skipping serialization (useful in conjunction with localOrShuffle grouping) (thanks xumingming)
* Upgrade to Clojure 1.4 (thanks sorenmacbeth)
* Can override the hostname that supervisors report using "storm.local.hostname" config.
* Make request timeout within DRPC server configurable via "drpc.request.timeout.secs"
@@ -10,6 +13,7 @@
* Added close method to OpaqueTransactionalSpout coordinator
* Added "storm dev-zookeeper" command for launching a local zookeeper server. Useful for testing a one node Storm cluster locally. Zookeeper dir configured with "dev.zookeeper.path"
* Use new style classes for Python multilang adapter (thanks hellp)
+ * Added "storm version" command
* Bug fix: Fixed criticial bug in opaque transactional topologies that would lead to duplicate messages when using pipelining
* Bug fix: Workers will now die properly if a ShellBolt subprocess dies (thanks tomo)
* Bug fix: Hide the BasicOutputCollector#getOutputter method, since it shouldn't be a publicly available method.
@@ -69,7 +73,7 @@
* Logging now always goes to logs/ in the Storm directory, regardless of where you launched the daemon (thanks haitaoyao)
* Improved Clojure DSL: can emit maps and Tuples implement the appropriate interfaces to integrate with Clojure's seq functions (thanks schleyfox)
* Added "ui.childopts" config (thanks ddillinger)
- * Bug fix: OutputCollector no longer assumes immutable inputs
+ * Bug fix: OutputCollector no longer assumes immutable inputs [NOTE: this was reverted in 0.7.2 because it conflicts with sending tuples to colocated tasks without serialization]
* Bug fix: DRPC topologies now throw a proper error when no DRPC servers are configured instead of NPE (thanks danharvey)
* Bug fix: Fix local mode so multiple topologies can be run on one LocalCluster
* Bug fix: "storm supervisor" now uses supervisor.childopts instead of nimbus.childopts (thanks ddillinger)
View
2  README.markdown
@@ -2,6 +2,8 @@ Storm is a distributed realtime computation system. Similar to how Hadoop provid
The [Rationale page](https://github.com/nathanmarz/storm/wiki/Rationale) on the wiki explains what Storm is and why it was built. The [video](http://www.infoq.com/presentations/Storm) and [slides](http://www.slideshare.net/nathanmarz/storm-distributed-and-faulttolerant-realtime-computation) of Storm's launch presentation are also good introductions to the project.
+Follow [@stormprocessor](https://twitter.com/stormprocessor) on Twitter for updates on the project.
+
## Documentation
Documentation and tutorials can be found on the [Storm wiki](http://github.com/nathanmarz/storm/wiki).
View
15 bin/storm
@@ -132,7 +132,7 @@ def activate(*args):
jvmtype="-client",
extrajars=[CONF_DIR, STORM_DIR + "/bin"])
-def list(*args):
+def listtopos(*args):
"""Syntax: [storm list]
List the running topologies and their statuses.
@@ -273,6 +273,17 @@ def dev_zookeeper():
jvmtype="-server",
extrajars=[STORM_DIR + "/log4j", STORM_DIR + "/conf"])
+def version():
+ """Syntax: [storm version]
+
+ Prints the version number of this Storm release.
+ """
+ releasefile = STORM_DIR + "/RELEASE"
+ if os.path.exists(releasefile):
+ print open(releasefile).readline().strip()
+ else:
+ print "Unknown"
+
def print_classpath():
"""Syntax: [storm classpath]
@@ -305,7 +316,7 @@ COMMANDS = {"jar": jar, "kill": kill, "shell": shell, "nimbus": nimbus, "ui": ui
"drpc": drpc, "supervisor": supervisor, "localconfvalue": print_localconfvalue,
"remoteconfvalue": print_remoteconfvalue, "repl": repl, "classpath": print_classpath,
"activate": activate, "deactivate": deactivate, "rebalance": rebalance, "help": print_usage,
- "list": list, "dev-zookeeper": dev_zookeeper}
+ "list": listtopos, "dev-zookeeper": dev_zookeeper, "version": version}
def parse_config(config_list):
global CONFIG_OPTS
View
113 src/clj/backtype/storm/daemon/task.clj
@@ -1,8 +1,9 @@
(ns backtype.storm.daemon.task
(:use [backtype.storm.daemon common])
(:use [backtype.storm bootstrap])
- (:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap])
+ (:import [java.util.concurrent ConcurrentLinkedQueue ConcurrentHashMap LinkedBlockingQueue])
(:import [backtype.storm.hooks ITaskHook])
+ (:import [backtype.storm.tuple Tuple])
(:import [backtype.storm.hooks.info SpoutAckInfo SpoutFailInfo
EmitInfo BoltFailInfo BoltAckInfo])
(:require [backtype.storm [tuple :as tuple]]))
@@ -156,8 +157,10 @@
(.getThisTaskId topology-context)
stream))))
-(defn mk-task [conf storm-conf topology-context user-context storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn]
+(defn mk-task [conf storm-conf topology-context user-context storm-id cluster-state storm-active-atom transfer-fn suicide-fn
+ receive-queue]
(let [task-id (.getThisTaskId topology-context)
+ worker-port (.getThisWorkerPort topology-context)
component-id (.getThisComponentId topology-context)
storm-conf (component-conf storm-conf topology-context component-id)
_ (log-message "Loading task " component-id ":" task-id)
@@ -196,9 +199,7 @@
stream->component->grouper (outbound-components topology-context user-context)
component->tasks (reverse-map task-info)
- ;; important it binds to virtual port before function returns
- puller (msg/bind mq-context storm-id task-id)
-
+
;; TODO: consider DRYing things up and moving stats
task-readable-name (get-readable-name topology-context)
@@ -239,7 +240,7 @@
_ (send-unanchored topology-context tasks-fn transfer-fn SYSTEM-STREAM-ID ["startup"])
executor-threads (dofor
[exec (with-error-reaction report-error-and-die
- (mk-executors task-object storm-conf puller tasks-fn
+ (mk-executors task-object storm-conf receive-queue tasks-fn
transfer-fn
storm-active-atom topology-context
user-context task-stats report-error))]
@@ -254,8 +255,9 @@
[this]
(log-message "Shutting down task " storm-id ":" task-id)
(reset! active false)
- ;; empty messages are skip messages (this unblocks the socket)
- (msg/send-local-task-empty mq-context storm-id task-id)
+ ;; put an empty message into receive-queue
+ ;; empty messages are skip messages (this unblocks the receive-queue.take thread)
+ (.put receive-queue (byte-array []))
(doseq [t all-threads]
(.interrupt t)
(.join t))
@@ -263,7 +265,6 @@
(.cleanup hook))
(.remove-task-heartbeat! storm-cluster-state storm-id task-id)
(.disconnect storm-cluster-state)
- (.close puller)
(close-component task-object)
(log-message "Shut down task " storm-id ":" task-id))
DaemonCommon
@@ -290,7 +291,18 @@
(stats/spout-acked-tuple! task-stats (:stream tuple-info) time-delta)
))
-(defmethod mk-executors ISpout [^ISpout spout storm-conf puller tasks-fn transfer-fn storm-active-atom
+(defn mk-task-receiver [^LinkedBlockingQueue receive-queue ^KryoTupleDeserializer deserializer tuple-action-fn]
+ (fn []
+ (let [msg (.take receive-queue)
+ is-tuple? (instance? Tuple msg)]
+ (when (or is-tuple? (not (empty? msg))) ; skip empty messages (used during shutdown)
+ (log-debug "Processing message " msg)
+ (let [^Tuple tuple (if is-tuple? msg (.deserialize deserializer msg))]
+ (tuple-action-fn tuple)
+ ))
+ )))
+
+(defmethod mk-executors ISpout [^ISpout spout storm-conf ^LinkedBlockingQueue receive-queue tasks-fn transfer-fn storm-active-atom
^TopologyContext topology-context ^TopologyContext user-context
task-stats report-error-fn]
(let [wait-fn (fn [] @storm-active-atom)
@@ -349,10 +361,24 @@
)
(reportError [this error]
(report-error-fn error)
- ))]
+ ))
+ tuple-action-fn (fn [^Tuple tuple]
+ (let [id (.getValue tuple 0)
+ [spout-id tuple-finished-info start-time-ms] (.remove pending id)]
+ (when spout-id
+ (let [time-delta (time-delta-ms start-time-ms)]
+ (condp = (.getSourceStreamId tuple)
+ ACKER-ACK-STREAM-ID (.add event-queue #(ack-spout-msg spout user-context storm-conf spout-id
+ tuple-finished-info time-delta task-stats sampler))
+ ACKER-FAIL-STREAM-ID (.add event-queue #(fail-spout-msg spout user-context storm-conf spout-id
+ tuple-finished-info time-delta task-stats sampler))
+ )))
+ ;; TODO: on failure, emit tuple to failure stream
+ ))]
(log-message "Opening spout " component-id ":" task-id)
(.open spout storm-conf user-context (SpoutOutputCollector. output-collector))
(log-message "Opened spout " component-id ":" task-id)
+ ;; TODO: should redesign this to only use one thread
[(fn []
;; This design requires that spouts be non-blocking
(loop []
@@ -377,23 +403,7 @@
;; TODO: log that it's getting throttled
(Time/sleep 100)))
))
- (fn []
- (let [^bytes ser-msg (msg/recv puller)]
- ;; skip empty messages (used during shutdown)
- (when-not (empty? ser-msg)
- (let [tuple (.deserialize deserializer ser-msg)
- id (.getValue tuple 0)
- [spout-id tuple-finished-info start-time-ms] (.remove pending id)]
- (when spout-id
- (let [time-delta (time-delta-ms start-time-ms)]
- (condp = (.getSourceStreamId tuple)
- ACKER-ACK-STREAM-ID (.add event-queue #(ack-spout-msg spout user-context storm-conf spout-id
- tuple-finished-info time-delta task-stats sampler))
- ACKER-FAIL-STREAM-ID (.add event-queue #(fail-spout-msg spout user-context storm-conf spout-id
- tuple-finished-info time-delta task-stats sampler))
- ))))
- ;; TODO: on failure, emit tuple to failure stream
- )))
+ (mk-task-receiver receive-queue deserializer tuple-action-fn)
]
))
@@ -405,7 +415,7 @@
;; TODO: this portion is not thread safe (multiple threads updating same value at same time)
(.put pending key (bit-xor curr id))))
-(defmethod mk-executors IBolt [^IBolt bolt storm-conf puller tasks-fn transfer-fn storm-active-atom
+(defmethod mk-executors IBolt [^IBolt bolt storm-conf ^LinkedBlockingQueue receive-queue tasks-fn transfer-fn storm-active-atom
^TopologyContext topology-context ^TopologyContext user-context
task-stats report-error-fn]
(let [deserializer (KryoTupleDeserializer. storm-conf topology-context)
@@ -466,7 +476,26 @@
)))
(reportError [this error]
(report-error-fn error)
- ))]
+ ))
+ tuple-action-fn (fn [^Tuple tuple]
+ ;; synchronization needs to be done with a key provided by this bolt, otherwise:
+ ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
+ ;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
+ ;; buffer other tuples until fully synchronized, then process all of those tuples
+ ;; then go into normal loop
+ ;; spill to disk?
+ ;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task
+ ;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
+ ;; or just timeout the sync messages that are coming in until full sync is hit from that task
+ ;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
+ ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
+ ;; TODO: how to handle incremental updates as well as synchronizations at same time
+ ;; TODO: need to version tuples somehow
+
+ (log-debug "Received tuple " tuple " at task " (.getThisTaskId topology-context))
+ (.put tuple-start-times tuple (System/currentTimeMillis))
+
+ (.execute bolt tuple))]
(log-message "Preparing bolt " component-id ":" task-id)
(.prepare bolt
storm-conf
@@ -474,29 +503,7 @@
(OutputCollector. output-collector))
(log-message "Prepared bolt " component-id ":" task-id)
;; TODO: can get any SubscribedState objects out of the context now
- [(fn []
- ;; synchronization needs to be done with a key provided by this bolt, otherwise:
- ;; spout 1 sends synchronization (s1), dies, same spout restarts somewhere else, sends synchronization (s2) and incremental update. s2 and update finish before s1 -> lose the incremental update
- ;; TODO: for state sync, need to first send sync messages in a loop and receive tuples until synchronization
- ;; buffer other tuples until fully synchronized, then process all of those tuples
- ;; then go into normal loop
- ;; spill to disk?
- ;; could be receiving incremental updates while waiting for sync or even a partial sync because of another failed task
- ;; should remember sync requests and include a random sync id in the request. drop anything not related to active sync requests
- ;; or just timeout the sync messages that are coming in until full sync is hit from that task
- ;; need to drop incremental updates from tasks where waiting for sync. otherwise, buffer the incremental updates
- (let [^bytes ser (msg/recv puller)]
- (when-not (empty? ser) ; skip empty messages (used during shutdown)
- (log-debug "Processing message")
- (let [tuple (.deserialize deserializer ser)]
- ;; TODO: for state sync, need to check if tuple comes from state spout. if so, update state
- ;; TODO: how to handle incremental updates as well as synchronizations at same time
- ;; TODO: need to version tuples somehow
- (log-debug "Received tuple " tuple " at task " (.getThisTaskId topology-context))
- (.put tuple-start-times tuple (System/currentTimeMillis))
-
- (.execute bolt tuple)
- ))))]
+ [(mk-task-receiver receive-queue deserializer tuple-action-fn)]
))
(defmethod close-component ISpout [spout]
View
62 src/clj/backtype/storm/daemon/worker.clj
@@ -9,11 +9,6 @@
(defmulti mk-suicide-fn cluster-mode)
-(defn local-mode-zmq? [conf]
- (or (= (conf STORM-CLUSTER-MODE) "distributed")
- (conf STORM-LOCAL-MODE-ZMQ)))
-
-
(defn read-worker-task-ids [storm-cluster-state storm-id supervisor-id port]
(let [assignment (:task->node+port (.assignment-info storm-cluster-state storm-id nil))]
(doall
@@ -53,11 +48,13 @@
(-> (reverse-map task->component) (select-keys components) vals)))
))
-(defn mk-transfer-fn [storm-conf context transfer-queue]
- (let [^KryoTupleSerializer serializer (KryoTupleSerializer. storm-conf context)]
- (fn [task ^Tuple tuple]
- (.put ^LinkedBlockingQueue transfer-queue [task (.serialize serializer tuple)])
- )))
+(defn mk-transfer-fn [transfer-queue receive-queue-map serializer]
+ (fn [task ^Tuple tuple]
+ (if (contains? receive-queue-map task)
+ (.put (receive-queue-map task) tuple)
+ (let [tuple (.serialize serializer tuple)]
+ (.put ^LinkedBlockingQueue transfer-queue [task tuple]))
+ )))
;; TODO: should worker even take the storm-id as input? this should be
;; deducable from cluster state (by searching through assignments)
@@ -73,7 +70,7 @@
storm-active-atom (atom false)
cluster-state (cluster/mk-distributed-cluster-state conf)
storm-cluster-state (cluster/mk-storm-cluster-state cluster-state)
- task-ids (read-worker-task-ids storm-cluster-state storm-id supervisor-id port)
+ task-ids (set (read-worker-task-ids storm-cluster-state storm-id supervisor-id port))
;; because in local mode, its not a separate
;; process. supervisor will register it in this case
_ (when (= :distributed (cluster-mode conf))
@@ -96,7 +93,7 @@
(worker-pids-root conf worker-id)
%
port
- task-ids)
+ (vec task-ids))
mk-user-context #(TopologyContext. topology
storm-conf
task->component
@@ -106,7 +103,7 @@
(worker-pids-root conf worker-id)
%
port
- task-ids)
+ (vec task-ids))
mq-context (if mq-context
mq-context
(msg-loader/mk-zmq-context (storm-conf ZMQ-THREADS)
@@ -118,15 +115,22 @@
task->node+port (atom {})
transfer-queue (LinkedBlockingQueue.) ; possibly bound the size of it
+ receive-queue-map (apply merge (dofor [tid task-ids]
+ {tid (LinkedBlockingQueue.)}))
- transfer-fn (mk-transfer-fn storm-conf (mk-topology-context nil) transfer-queue)
+ ^KryoTupleSerializer serializer (KryoTupleSerializer. storm-conf (mk-topology-context nil))
+ transfer-fn (mk-transfer-fn transfer-queue receive-queue-map serializer)
refresh-connections (fn this
([]
(this (fn [& ignored] (.add event-manager this))))
([callback]
(let [assignment (.assignment-info storm-cluster-state storm-id callback)
my-assignment (select-keys (:task->node+port assignment) outbound-tasks)
- needed-connections (set (vals my-assignment))
+ ;; we dont need a connection for the local tasks anymore
+ needed-connections (->> my-assignment
+ (filter-key (complement task-ids))
+ vals
+ set)
current-connections (set (keys @node+port->socket))
new-connections (set/difference needed-connections current-connections)
remove-connections (set/difference current-connections needed-connections)]
@@ -175,7 +179,7 @@
)
:priority Thread/MAX_PRIORITY)
suicide-fn (mk-suicide-fn conf active)
- tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id mq-context cluster-state storm-active-atom transfer-fn suicide-fn))
+ tasks (dofor [tid task-ids] (task/mk-task conf storm-conf (mk-topology-context tid) (mk-user-context tid) storm-id cluster-state storm-active-atom transfer-fn suicide-fn (receive-queue-map tid)))
threads [(async-loop
(fn []
(.add event-manager refresh-connections)
@@ -193,21 +197,23 @@
(doseq [[task ser-tuple] drainer]
(let [socket (node+port->socket (task->node+port task))]
(msg/send socket task ser-tuple)
- ))
- ))
+ )
+ )))
(.clear drainer)
0 )
:args-fn (fn [] [(ArrayList.)]))
heartbeat-thread]
- virtual-port-shutdown (when (local-mode-zmq? conf)
- (log-message "Launching virtual port for " supervisor-id ":" port)
- (msg-loader/launch-virtual-port!
- (= (conf STORM-CLUSTER-MODE) "local")
- mq-context
- port
- :kill-fn (fn [t]
- (halt-process! 11))
- :valid-ports task-ids))
+ deserializer (KryoTupleDeserializer. storm-conf (mk-topology-context nil))
+ receive-thread-shutdown (do
+ (log-message "Launching receive-thread for " supervisor-id ":" port)
+ (msg-loader/launch-receive-thread!
+ mq-context
+ storm-id
+ port
+ receive-queue-map
+ :kill-fn (fn [t]
+ (halt-process! 11))))
+
shutdown* (fn []
(log-message "Shutting down worker " storm-id " " supervisor-id " " port)
(reset! active false)
@@ -216,7 +222,7 @@
;; this will do best effort flushing since the linger period
;; was set on creation
(.close socket))
- (if virtual-port-shutdown (virtual-port-shutdown))
+ (receive-thread-shutdown)
(log-message "Terminating zmq context")
(msg/term mq-context)
(log-message "Disconnecting from storm cluster state context")
View
52 src/clj/backtype/storm/messaging/loader.clj
@@ -1,5 +1,7 @@
-(ns backtype.storm.messaging.loader
- (:require [backtype.storm.messaging.local :as local]))
+(ns backtype.storm.messaging.loader
+ (:use [backtype.storm util log])
+ (:import [java.util.concurrent LinkedBlockingQueue])
+ (:require [backtype.storm.messaging [local :as local] [protocol :as msg]]))
(defn mk-local-context []
(local/mk-local-context))
@@ -11,14 +13,38 @@
var-get)]
(apply afn args)))
-(defn launch-virtual-port! [local? context port & args]
- (require '[zilch.virtual-port :as mqvp])
- (require '[backtype.storm.messaging.zmq :as zmq])
- (let [afn (-> 'zilch.virtual-port/launch-virtual-port!
- find-var
- var-get)
- url (if local?
- (str "ipc://" port ".ipc")
- (str "tcp://*:" port))
- ]
- (apply afn (concat [(.zmq-context context) url] args))))
+(defnk launch-receive-thread!
+ [context storm-id port receive-queue-map
+ :daemon true
+ :kill-fn (fn [t] (System/exit 1))
+ :priority Thread/NORM_PRIORITY]
+ (let [vthread (async-loop
+ (fn [socket receive-queue-map]
+ (let [[task msg] (msg/recv socket)]
+ (if (= task -1)
+ (do
+ (log-message "Receiving-thread:[" storm-id ", " port "] received shutdown notice")
+ (.close socket)
+ nil )
+ (do
+ (if (contains? receive-queue-map task)
+ (.put ^LinkedBlockingQueue (receive-queue-map task) msg)
+ (log-message "Receiving-thread:[" storm-id ", " port "] received invalid message for unknown task " task ". Dropping..."))
+ 0 ))))
+ :args-fn (fn [] [(msg/bind context storm-id port) receive-queue-map])
+ :daemon daemon
+ :kill-fn kill-fn
+ :priority priority)]
+ (fn []
+ (let [kill-socket (msg/connect context storm-id "localhost" port)]
+ (log-message "Shutting down receiving-thread: [" storm-id ", " port "]")
+ (msg/send kill-socket
+ -1
+ (byte-array []))
+ (log-message "Waiting for receiving-thread:[" storm-id ", " port "] to die")
+ (.join vthread)
+ (.close kill-socket)
+ (log-message "Shutdown receiving-thread: [" storm-id ", " port "]")
+ ))))
+
+
View
16 src/clj/backtype/storm/messaging/local.clj
@@ -11,15 +11,15 @@
(swap! queues-map assoc id (LinkedBlockingQueue.))))
(@queues-map id)))
-(deftype LocalConnection [storm-id queues-map lock queue]
+(deftype LocalConnection [storm-id port queues-map lock queue]
Connection
(recv [this]
(when-not queue
(throw (IllegalArgumentException. "Cannot receive on this socket")))
(.take queue))
(send [this task message]
- (let [send-queue (add-queue! queues-map lock storm-id task)]
- (.put send-queue message)
+ (let [send-queue (add-queue! queues-map lock storm-id port)]
+ (.put send-queue [task message])
))
(close [this]
))
@@ -27,15 +27,11 @@
(deftype LocalContext [queues-map lock]
Context
- (bind [this storm-id virtual-port]
- (LocalConnection. storm-id queues-map lock (add-queue! queues-map lock storm-id virtual-port)))
+ (bind [this storm-id port]
+ (LocalConnection. storm-id port queues-map lock (add-queue! queues-map lock storm-id port)))
(connect [this storm-id host port]
- (LocalConnection. storm-id queues-map lock nil)
+ (LocalConnection. storm-id port queues-map lock nil)
)
- (send-local-task-empty [this storm-id virtual-port]
- (let [queue (add-queue! queues-map lock storm-id virtual-port)]
- (.put queue (byte-array []))
- ))
(term [this]
))
View
3  src/clj/backtype/storm/messaging/protocol.clj
@@ -9,9 +9,8 @@
)
(defprotocol Context
- (bind [context storm-id virtual-port])
+ (bind [context storm-id port])
(connect [context storm-id host port])
- (send-local-task-empty [context storm-id virtual-port])
(term [context])
)
View
59 src/clj/backtype/storm/messaging/zmq.clj
@@ -1,8 +1,35 @@
(ns backtype.storm.messaging.zmq
(:refer-clojure :exclude [send])
(:use [backtype.storm.messaging protocol])
- (:require [zilch.mq :as mq]
- [zilch.virtual-port :as mqvp]))
+ (:import [java.nio ByteBuffer])
+ (:import [org.zeromq ZMQ])
+ (:require [zilch.mq :as mq]))
+
+(defn mk-packet [task ^bytes message]
+ (let [bb (ByteBuffer/allocate (+ 2 (count message)))]
+ (.putShort bb (short task))
+ (.put bb message)
+ (.array bb)
+ ))
+
+(defn parse-packet [^bytes packet]
+ (let [bb (ByteBuffer/wrap packet)
+ port (.getShort bb)
+ msg (byte-array (- (count packet) 2))]
+ (.get bb msg)
+ [port msg]
+ ))
+
+(defn get-bind-zmq-url [local? port]
+ (if local?
+ (str "ipc://" port ".ipc")
+ (str "tcp://*:" port)))
+
+(defn get-connect-zmq-url [local? host port]
+ (if local?
+ (str "ipc://" port ".ipc")
+ (str "tcp://" host ":" port)))
+
(defprotocol ZMQContextQuery
(zmq-context [this]))
@@ -10,41 +37,33 @@
(deftype ZMQConnection [socket]
Connection
(recv [this]
- (mq/recv socket))
+ (parse-packet (mq/recv socket)))
(send [this task message]
- (mqvp/virtual-send socket task message))
+ (mq/send socket (mk-packet task message) ZMQ/NOBLOCK))
(close [this]
(.close socket)
))
-(deftype ZMQContext [context linger-ms ipc?]
+(deftype ZMQContext [context linger-ms local?]
Context
- (bind [this storm-id virtual-port]
+ (bind [this storm-id port]
(-> context
(mq/socket mq/pull)
- (mqvp/virtual-bind virtual-port)
+ (mq/bind (get-bind-zmq-url local? port))
(ZMQConnection.)
))
(connect [this storm-id host port]
- (let [url (if ipc?
- (str "ipc://" port ".ipc")
- (str "tcp://" host ":" port))]
- (-> context
- (mq/socket mq/push)
- (mq/set-linger linger-ms)
- (mq/connect url)
- (ZMQConnection.))))
- (send-local-task-empty [this storm-id virtual-port]
- (let [pusher (-> context (mq/socket mq/push) (mqvp/virtual-connect virtual-port))]
- (mq/send pusher (mq/barr))
- (.close pusher)))
+ (-> context
+ (mq/socket mq/push)
+ (mq/set-linger linger-ms)
+ (mq/connect (get-connect-zmq-url local? host port))
+ (ZMQConnection.)))
(term [this]
(.term context))
ZMQContextQuery
(zmq-context [this]
context))
-
(defn mk-zmq-context [num-threads linger local?]
(ZMQContext. (mq/context num-threads) linger local?))
View
3  src/clj/backtype/storm/testing.clj
@@ -87,8 +87,7 @@
))
(defn mk-shared-context [conf]
- (if (and (= (conf STORM-CLUSTER-MODE) "local")
- (not (conf STORM-LOCAL-MODE-ZMQ)))
+ (if-not (conf STORM-LOCAL-MODE-ZMQ)
(msg-loader/mk-local-context)
))
View
95 src/clj/zilch/virtual_port.clj
@@ -1,95 +0,0 @@
-(ns zilch.virtual-port
- (:use [backtype.storm util log])
- (:require [zilch [mq :as mq]])
- (:import [java.nio ByteBuffer])
- (:import [java.util.concurrent Semaphore]))
-
-(mq/zeromq-imports)
-
-(defn mk-packet [virtual-port ^bytes message]
- (let [bb (ByteBuffer/allocate (+ 2 (count message)))]
- (.putShort bb (short virtual-port))
- (.put bb message)
- (.array bb)
- ))
-
-(defn parse-packet [^bytes packet]
- (let [bb (ByteBuffer/wrap packet)
- port (.getShort bb)
- msg (byte-array (- (count packet) 2))]
- (.get bb msg)
- [port msg]
- ))
-
-(defn virtual-url [port]
- (str "inproc://" port))
-
-(defn- get-virtual-socket! [context mapping-atom port]
- (when-not (contains? @mapping-atom port)
- (log-message "Connecting to virtual port " port)
- (swap! mapping-atom
- assoc
- port
- (-> context (mq/socket mq/push) (mq/connect (virtual-url port)))
- ))
- (@mapping-atom port))
-
-(defn close-virtual-sockets! [mapping-atom]
- (doseq [[_ virtual-socket] @mapping-atom]
- (.close virtual-socket))
- (reset! mapping-atom {}))
-
-(defn virtual-send
- ([^ZMQ$Socket socket virtual-port ^bytes message flags]
- (mq/send socket (mk-packet virtual-port message) flags))
- ([^ZMQ$Socket socket virtual-port ^bytes message]
- (virtual-send socket virtual-port message ZMQ/NOBLOCK)))
-
-(defnk launch-virtual-port!
- [context url :daemon true
- :kill-fn (fn [t] (System/exit 1))
- :priority Thread/NORM_PRIORITY
- :valid-ports nil]
- (let [valid-ports (set (map short valid-ports))
- vthread (async-loop
- (fn [^ZMQ$Socket socket virtual-mapping]
- (let [[port msg] (parse-packet (mq/recv socket))]
- (if (= port -1)
- (do
- (log-message "Virtual port " url " received shutdown notice")
- (close-virtual-sockets! virtual-mapping)
- (.close socket)
- nil )
- (if (or (nil? valid-ports) (contains? valid-ports port))
- (let [^ZMQ$Socket virtual-socket (get-virtual-socket! context virtual-mapping port)]
- ;; TODO: probably need to handle multi-part messages here or something
- (mq/send virtual-socket msg)
- 0
- )
- (log-message "Received invalid message directed at port " port ". Dropping...")
- ))))
- :args-fn (fn [] [(-> context (mq/socket mq/pull) (mq/bind url)) (atom {})])
- :daemon daemon
- :kill-fn kill-fn
- :priority priority)]
- (fn []
- (let [kill-socket (-> context (mq/socket mq/push) (mq/connect url))]
- (log-message "Shutting down virtual port at url: " url)
- (virtual-send kill-socket
- -1
- (mq/barr 1))
- (.close kill-socket)
- (log-message "Waiting for virtual port at url " url " to die")
- (.join vthread)
- (log-message "Shutdown virtual port at url: " url)
- ))))
-
-(defn virtual-bind
- [^ZMQ$Socket socket virtual-port]
- (mq/bind socket (virtual-url virtual-port))
- )
-
-(defn virtual-connect
- [^ZMQ$Socket socket virtual-port]
- (mq/connect socket (virtual-url virtual-port))
- )
View
24 src/jvm/backtype/storm/spout/SpoutOutputCollector.java
@@ -23,7 +23,8 @@ public SpoutOutputCollector(ISpoutOutputCollector delegate) {
* When Storm detects that this tuple has been fully processed, or has failed
* to be fully processed, the spout will receive an ack or fail callback respectively
* with the messageId as long as the messageId was not null. If the messageId was null,
- * Storm will not track the tuple and no callback will be received.
+ * Storm will not track the tuple and no callback will be received. The emitted values must be
+ * immutable.
*
* @return the list of task ids that this tuple was sent to
*/
@@ -36,7 +37,8 @@ public SpoutOutputCollector(ISpoutOutputCollector delegate) {
* When Storm detects that this tuple has been fully processed, or has failed
* to be fully processed, the spout will receive an ack or fail callback respectively
* with the messageId as long as the messageId was not null. If the messageId was null,
- * Storm will not track the tuple and no callback will be received.
+ * Storm will not track the tuple and no callback will be received. The emitted values must be
+ * immutable.
*
* @return the list of task ids that this tuple was sent to
*/
@@ -46,7 +48,8 @@ public SpoutOutputCollector(ISpoutOutputCollector delegate) {
/**
* Emits a tuple to the default output stream with a null message id. Storm will
- * not track this message so ack and fail will never be called for this tuple.
+ * not track this message so ack and fail will never be called for this tuple. The
+ * emitted values must be immutable.
*/
public List<Integer> emit(List<Object> tuple) {
return emit(tuple, null);
@@ -54,7 +57,8 @@ public SpoutOutputCollector(ISpoutOutputCollector delegate) {
/**
* Emits a tuple to the specified output stream with a null message id. Storm will
- * not track this message so ack and fail will never be called for this tuple.
+ * not track this message so ack and fail will never be called for this tuple. The
+ * emitted values must be immutable.
*/
public List<Integer> emit(String streamId, List<Object> tuple) {
return emit(streamId, tuple, null);
@@ -63,7 +67,8 @@ public SpoutOutputCollector(ISpoutOutputCollector delegate) {
/**
* Emits a tuple to the specified task on the specified output stream. This output
* stream must have been declared as a direct stream, and the specified task must
- * use a direct grouping on this stream to receive the message.
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
*/
public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
_delegate.emitDirect(taskId, streamId, tuple, messageId);
@@ -72,7 +77,8 @@ public void emitDirect(int taskId, String streamId, List<Object> tuple, Object m
/**
* Emits a tuple to the specified task on the default output stream. This output
* stream must have been declared as a direct stream, and the specified task must
- * use a direct grouping on this stream to receive the message.
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
*/
public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
emitDirect(taskId, Utils.DEFAULT_STREAM_ID, tuple, messageId);
@@ -81,7 +87,8 @@ public void emitDirect(int taskId, List<Object> tuple, Object messageId) {
/**
* Emits a tuple to the specified task on the specified output stream. This output
* stream must have been declared as a direct stream, and the specified task must
- * use a direct grouping on this stream to receive the message.
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
*
* <p> Because no message id is specified, Storm will not track this message
* so ack and fail will never be called for this tuple.</p>
@@ -93,7 +100,8 @@ public void emitDirect(int taskId, String streamId, List<Object> tuple) {
/**
* Emits a tuple to the specified task on the default output stream. This output
* stream must have been declared as a direct stream, and the specified task must
- * use a direct grouping on this stream to receive the message.
+ * use a direct grouping on this stream to receive the message. The emitted values must be
+ * immutable.
*
* <p> Because no message id is specified, Storm will not track this message
* so ack and fail will never be called for this tuple.</p>
View
28 src/jvm/backtype/storm/task/OutputCollector.java
@@ -20,7 +20,8 @@ public OutputCollector(IOutputCollector delegate) {
}
/**
- * Emits a new tuple to a specific stream with a single anchor.
+ * Emits a new tuple to a specific stream with a single anchor. The emitted values must be
+ * immutable.
*
* @param streamId the stream to emit to
* @param anchor the tuple to anchor to
@@ -34,7 +35,8 @@ public OutputCollector(IOutputCollector delegate) {
/**
* Emits a new unanchored tuple to the specified stream. Because it's unanchored,
* if a failure happens downstream, this new tuple won't affect whether any
- * spout tuples are considered failed or not.
+ * spout tuples are considered failed or not. The emitted values must be
+ * immutable.
*
* @param streamId the stream to emit to
* @param tuple the new output tuple from this bolt
@@ -45,7 +47,8 @@ public OutputCollector(IOutputCollector delegate) {
}
/**
- * Emits a new tuple to the default stream anchored on a group of input tuples.
+ * Emits a new tuple to the default stream anchored on a group of input tuples. The emitted
+ * values must be immutable.
*
* @param anchors the tuples to anchor to
* @param tuple the new output tuple from this bolt
@@ -57,7 +60,8 @@ public OutputCollector(IOutputCollector delegate) {
/**
- * Emits a new tuple to the default stream anchored on a single tuple.
+ * Emits a new tuple to the default stream anchored on a single tuple. The emitted values must be
+ * immutable.
*
* @param anchor the tuple to anchor to
* @param tuple the new output tuple from this bolt
@@ -70,7 +74,8 @@ public OutputCollector(IOutputCollector delegate) {
/**
* Emits a new unanchored tuple to the default stream. Beacuse it's unanchored,
* if a failure happens downstream, this new tuple won't affect whether any
- * spout tuples are considered failed or not.
+ * spout tuples are considered failed or not. The emitted values must be
+ * immutable.
*
* @param tuple the new output tuple from this bolt
* @return the list of task ids that this new tuple was sent to
@@ -84,7 +89,8 @@ public OutputCollector(IOutputCollector delegate) {
* If the target bolt does not subscribe to this bolt using a direct grouping,
* the tuple will not be sent. If the specified output stream is not declared
* as direct, or the target bolt subscribes with a non-direct grouping,
- * an error will occur at runtime.
+ * an error will occur at runtime. The emitted values must be
+ * immutable.
*
* @param taskId the taskId to send the new tuple to
* @param streamId the stream to send the tuple on. It must be declared as a direct stream in the topology definition.
@@ -102,6 +108,7 @@ public void emitDirect(int taskId, String streamId, Tuple anchor, List<Object> t
* as direct, or the target bolt subscribes with a non-direct grouping,
* an error will occur at runtime. Note that this method does not use anchors,
* so downstream failures won't affect the failure status of any spout tuples.
+ * The emitted values must be immutable.
*
* @param taskId the taskId to send the new tuple to
* @param streamId the stream to send the tuple on. It must be declared as a direct stream in the topology definition.
@@ -116,7 +123,8 @@ public void emitDirect(int taskId, String streamId, List<Object> tuple) {
* If the target bolt does not subscribe to this bolt using a direct grouping,
* the tuple will not be sent. If the specified output stream is not declared
* as direct, or the target bolt subscribes with a non-direct grouping,
- * an error will occur at runtime.
+ * an error will occur at runtime. The emitted values must be
+ * immutable.
*
* <p>The default stream must be declared as direct in the topology definition.
* See OutputDeclarer#declare for how this is done when defining topologies
@@ -135,7 +143,8 @@ public void emitDirect(int taskId, Collection<Tuple> anchors, List<Object> tuple
* If the target bolt does not subscribe to this bolt using a direct grouping,
* the tuple will not be sent. If the specified output stream is not declared
* as direct, or the target bolt subscribes with a non-direct grouping,
- * an error will occur at runtime.
+ * an error will occur at runtime. The emitted values must be
+ * immutable.
*
* <p>The default stream must be declared as direct in the topology definition.
* See OutputDeclarer#declare for how this is done when defining topologies
@@ -155,7 +164,8 @@ public void emitDirect(int taskId, Tuple anchor, List<Object> tuple) {
* If the target bolt does not subscribe to this bolt using a direct grouping,
* the tuple will not be sent. If the specified output stream is not declared
* as direct, or the target bolt subscribes with a non-direct grouping,
- * an error will occur at runtime.
+ * an error will occur at runtime. The emitted values must be
+ * immutable.
*
* <p>The default stream must be declared as direct in the topology definition.
* See OutputDeclarer#declare for how this is done when defining topologies
Please sign in to comment.
Something went wrong with that request. Please try again.