Skip to content

Commit

Permalink
remove usage of 2 part zmq messages
Browse files Browse the repository at this point in the history
  • Loading branch information
nathanmarz committed Mar 25, 2013
1 parent 9956175 commit 98d2446
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions src/clj/backtype/storm/messaging/zmq.clj
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,19 @@
(:require [zilch.mq :as mq]))


(defn parse-packet [^bytes part1 ^bytes part2]
(let [bb (ByteBuffer/wrap part1)
port (.getShort bb)]
[(int port) part2]
(defn mk-packet [task ^bytes message]
(let [bb (ByteBuffer/allocate (+ 2 (count message)))]
(.putShort bb (short task))
(.put bb message)
(.array bb)
))

(defn parse-packet [^bytes packet]
(let [bb (ByteBuffer/wrap packet)
port (.getShort bb)
msg (byte-array (- (count packet) 2))]
(.get bb msg)
[(int port) msg]
))

(defn get-bind-zmq-url [local? port]
Expand All @@ -26,27 +35,19 @@
(defprotocol ZMQContextQuery
(zmq-context [this]))

(def NOBLOCK-SNDMORE (bit-or ZMQ/SNDMORE ZMQ/NOBLOCK))

(deftype ZMQConnection [socket ^ByteBuffer bb]
(deftype ZMQConnection [socket]
Connection
(recv-with-flags [this flags]
(let [part1 (mq/recv socket flags)]
(when part1
(when-not (mq/recv-more? socket)
(throw (RuntimeException. "Should always receive two-part ZMQ messages")))
(parse-packet part1 (mq/recv socket)))))
(if-let [packet (mq/recv socket flags)]
(parse-packet packet)))
(send [this task message]
(.clear bb)
(.putShort bb (short task))
(mq/send socket (.array bb) NOBLOCK-SNDMORE)
(mq/send socket message ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
(mq/send socket (mk-packet task message) ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
(close [this]
(.close socket)
))

(defn mk-connection [socket]
(ZMQConnection. socket (ByteBuffer/allocate 2)))
(ZMQConnection. socket))

(deftype ZMQContext [context linger-ms hwm local?]
Context
Expand Down

3 comments on commit 98d2446

@jasonjckn
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nathanmarz You didn't add type hints for the ByteBuffer in this commit. Does clojure infer this or something? This is a super hot code path.

@nathanmarz
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're referring to mk-packet and parse-packet? Clojure knows that bb is of type ByteBuffer.

@jasonjckn
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am. Okay cool I didn't know clojure did that!

Please sign in to comment.