Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

remove usage of 2 part zmq messages

  • Loading branch information...
commit 98d24466c3ab48f0a2f8fc71440d8f5e114bcd3a 1 parent 9956175
Nathan Marz nathanmarz authored
Showing with 18 additions and 17 deletions.
  1. +18 −17 src/clj/backtype/storm/messaging/zmq.clj
35 src/clj/backtype/storm/messaging/zmq.clj
View
@@ -6,10 +6,19 @@
(:require [zilch.mq :as mq]))
-(defn parse-packet [^bytes part1 ^bytes part2]
- (let [bb (ByteBuffer/wrap part1)
- port (.getShort bb)]
- [(int port) part2]
+(defn mk-packet [task ^bytes message]
+ (let [bb (ByteBuffer/allocate (+ 2 (count message)))]
+ (.putShort bb (short task))
+ (.put bb message)
+ (.array bb)
+ ))
+
+(defn parse-packet [^bytes packet]
+ (let [bb (ByteBuffer/wrap packet)
+ port (.getShort bb)
+ msg (byte-array (- (count packet) 2))]
+ (.get bb msg)
+ [(int port) msg]
))
(defn get-bind-zmq-url [local? port]
@@ -26,27 +35,19 @@
(defprotocol ZMQContextQuery
(zmq-context [this]))
-(def NOBLOCK-SNDMORE (bit-or ZMQ/SNDMORE ZMQ/NOBLOCK))
-
-(deftype ZMQConnection [socket ^ByteBuffer bb]
+(deftype ZMQConnection [socket]
Connection
(recv-with-flags [this flags]
- (let [part1 (mq/recv socket flags)]
- (when part1
- (when-not (mq/recv-more? socket)
- (throw (RuntimeException. "Should always receive two-part ZMQ messages")))
- (parse-packet part1 (mq/recv socket)))))
+ (if-let [packet (mq/recv socket flags)]
+ (parse-packet packet)))
(send [this task message]
- (.clear bb)
- (.putShort bb (short task))
- (mq/send socket (.array bb) NOBLOCK-SNDMORE)
- (mq/send socket message ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
+ (mq/send socket (mk-packet task message) ZMQ/NOBLOCK)) ;; TODO: how to do backpressure if doing noblock?... need to only unblock if the target disappears
(close [this]
(.close socket)
))
(defn mk-connection [socket]
- (ZMQConnection. socket (ByteBuffer/allocate 2)))
+ (ZMQConnection. socket))
(deftype ZMQContext [context linger-ms hwm local?]
Context
Please sign in to comment.
Something went wrong with that request. Please try again.