/
mdcliapi2.java
132 lines (115 loc) · 3.88 KB
/
mdcliapi2.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
/**
* (c) 2011 Arkadiusz Orzechowski
*
* This file is part of ZGuide
*
* ZGuide is free software; you can redistribute it and/or modify it under
* the terms of the Lesser GNU General Public License as published by
* the Free Software Foundation; either version 3 of the License, or
* (at your option) any later version.
*
* ZGuide is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* Lesser GNU General Public License for more details.
*
* You should have received a copy of the Lesser GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
import java.util.Formatter;
import org.zeromq.ZContext;
import org.zeromq.ZFrame;
import org.zeromq.ZMQ;
import org.zeromq.ZMsg;
/**
* Majordomo Protocol Client API, asynchronous Java version. Implements the
* MDP/Worker spec at http://rfc.zeromq.org/spec:7.
*
* @author Arkadiusz Orzechowski <aorzecho@gmail.com>
*/
public class mdcliapi2 {
private String broker;
private ZContext ctx;
private ZMQ.Socket client;
private long timeout = 2500;
private boolean verbose;
private Formatter log = new Formatter(System.out);
public long getTimeout() {
return timeout;
}
public void setTimeout(long timeout) {
this.timeout = timeout;
}
public mdcliapi2(String broker, boolean verbose) {
this.broker = broker;
this.verbose = verbose;
ctx = new ZContext();
reconnectToBroker();
}
/**
* Connect or reconnect to broker
*/
void reconnectToBroker() {
if (client != null) {
ctx.destroySocket(client);
}
client = ctx.createSocket(ZMQ.DEALER);
client.connect(broker);
if (verbose)
log.format("I: connecting to broker at %s...\n", broker);
}
/**
* Returns the reply message or NULL if there was no reply. Does not attempt
* to recover from a broker failure, this is not possible without storing
* all unanswered requests and resending them all…
*/
public ZMsg recv() {
ZMsg reply = null;
// Poll socket for a reply, with timeout
ZMQ.Poller items = ctx.getContext().poller();
items.register(client, ZMQ.Poller.POLLIN);
if (items.poll(timeout * 1000) == -1)
return null; // Interrupted
if (items.pollin(0)) {
ZMsg msg = ZMsg.recvMsg(client);
if (verbose) {
log.format("I: received reply: \n");
msg.dump(log.out());
}
// Don't try to handle errors, just assert noisily
assert (msg.size() >= 4);
ZFrame empty = msg.pop();
assert (empty.getData().length == 0);
empty.destroy();
ZFrame header = msg.pop();
assert (MDP.C_CLIENT.equals(header.toString()));
header.destroy();
ZFrame replyService = msg.pop();
replyService.destroy();
reply = msg;
}
return reply;
}
/**
* Send request to broker and get reply by hook or crook Takes ownership of
* request message and destroys it when sent.
*/
public void send(String service, ZMsg request) {
assert (request != null);
// Prefix request with protocol frames
// Frame 0: empty (REQ emulation)
// Frame 1: "MDPCxy" (six bytes, MDP/Client x.y)
// Frame 2: Service name (printable string)
request.addFirst(service);
request.addFirst(MDP.C_CLIENT.newFrame());
request.addFirst("");
if (verbose) {
log.format("I: send request to '%s' service: \n", service);
request.dump(log.out());
}
request.send(client);
}
public void destroy() {
ctx.destroy();
}
}