forked from nathanmarz/storm
-
Notifications
You must be signed in to change notification settings - Fork 0
/
zmq.clj
46 lines (40 loc) · 1.1 KB
/
zmq.clj
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
(ns backtype.storm.messaging.zmq
(:refer-clojure :exclude [send])
(:use [backtype.storm.messaging protocol])
(:require [zilch.mq :as mq]
[zilch.virtual-port :as mqvp]))
(defprotocol ZMQContextQuery
(zmq-context [this]))
(deftype ZMQConnection [socket]
Connection
(recv [this]
(mq/recv socket))
(send [this task message]
(mqvp/virtual-send socket task message))
(close [this]
(.close socket)
))
(deftype ZMQContext [context linger-ms ipc?]
Context
(bind [this storm-id port]
(-> context
(mq/socket mq/pull)
(mqvp/virtual-bind port)
(ZMQConnection.)
))
(connect [this storm-id host port]
(let [url (if ipc?
(str "ipc://" port ".ipc")
(str "tcp://" host ":" port))]
(-> context
(mq/socket mq/push)
(mq/set-linger linger-ms)
(mq/connect url)
(ZMQConnection.))))
(term [this]
(.term context))
ZMQContextQuery
(zmq-context [this]
context))
(defn mk-zmq-context [num-threads linger local?]
(ZMQContext. (mq/context num-threads) linger local?))