Permalink
Newer
Older
100644 395 lines (364 sloc) 11.8 KB
1
%% -*- coding:utf-8;Mode:erlang;tab-width:4;c-basic-offset:4;indent-tabs-mode:nil -*-
2
%% ex: set softtabstop=4 tabstop=4 shiftwidth=4 expandtab fileencoding=utf-8:
3
%%
4
%% Copyright (c) 2011 Yurii Rashkovskii, Evax Software and Michael Truog
5
%%
6
%% Permission is hereby granted, free of charge, to any person obtaining a copy
7
%% of this software and associated documentation files (the "Software"), to deal
8
%% in the Software without restriction, including without limitation the rights
9
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10
%% copies of the Software, and to permit persons to whom the Software is
11
%% furnished to do so, subject to the following conditions:
12
%%
13
%% The above copyright notice and this permission notice shall be included in
14
%% all copies or substantial portions of the Software.
15
%%
16
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19
%% AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20
%% LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21
%% OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
22
%% THE SOFTWARE.
23
24
-module(erlzmq).
25
%% @headerfile "erlzmq.hrl"
26
-include_lib("erlzmq.hrl").
27
-export([context/0,
28
context/1,
29
socket/2,
30
bind/2,
31
connect/2,
32
send/2,
33
send/3,
34
recv/1,
35
recv/2,
36
setsockopt/3,
37
getsockopt/2,
38
close/1,
Jun 9, 2011
41
term/2,
42
version/0]).
43
-export_type([erlzmq_socket/0, erlzmq_context/0]).
44
45
%% @equiv context(1)
46
-spec context() ->
47
{ok, erlzmq_context()} |
48
erlzmq_error().
49
context() ->
50
context(1).
51
52
%% @doc Create a new erlzmq context with the specified number of io threads.
53
%% <br />
54
%% If the context can be created an 'ok' tuple containing an
55
%% {@type erlzmq_context()} handle to the created context is returned;
56
%% if not, it returns an 'error' tuple with an {@type erlzmq_type_error()}
57
%% describing the error.
58
%% <br />
59
%% The context must be later cleaned up calling {@link erlzmq:term/1. term/1}
60
%% <br />
61
%% <i>For more information see
62
%% <a href="http://api.zeromq.org/master:zmq-init">zmq_init</a></i>
63
%% @end
64
-spec context(Threads :: pos_integer()) ->
65
{ok, erlzmq_context()} |
66
erlzmq_error().
67
context(Threads) when is_integer(Threads) ->
68
erlzmq_nif:context(Threads).
69
70
71
%% @doc Create a socket.
72
%% <br />
73
%% This functions creates a socket of the given
Mar 30, 2011
74
%% {@link erlzmq_socket_type(). type}, optionally setting it to active mode,
75
%% and associates it with the given {@link erlzmq_context(). context}.
76
%% <br />
77
%% If the socket can be created an 'ok' tuple containing a
78
%% {@type erlzmq_socket()} handle to the created socket is returned;
79
%% if not, it returns an {@type erlzmq_error()} describing the error.
80
%% <br />
81
%% In line with Erlang's socket paradigm, a socket can be either active or
82
%% passive. Passive sockets tend to have lower latency and have a higher
83
%% throughput for small message sizes. Active sockets on the contrary give
84
%% the highest throughput for messages above 32k. A benchmarking tool is
Mar 30, 2011
85
%% included in the source distribution.<br />
86
%% <i>For more information see
87
%% <a href="http://api.zeromq.org/master:zmq_socket">zmq_socket</a>.</i>
88
%% @end
89
-spec socket(Context :: erlzmq_context(),
90
Type :: erlzmq_socket_type() |
91
list(erlzmq_socket_type() |
92
{active, boolean()})) ->
93
{ok, erlzmq_socket()} |
94
erlzmq_error().
95
socket(Context, Type) when is_atom(Type) ->
96
socket(Context, [Type]);
Ransom Richardson
Feb 17, 2012
97
socket(Context, [H | T]) when is_atom(H) ->
98
case T of
99
[] ->
100
% active is false by default
101
% (to avoid latency on small messages (messages < 32KB))
Ransom Richardson
Feb 17, 2012
102
socket(Context, H, {active, false});
103
[Active] ->
104
socket(Context, H, Active)
105
end;
106
socket(Context, [H | [Type]]) when is_tuple(H) ->
107
socket(Context, Type, H).
108
109
-spec socket(Context :: erlzmq_context(),
110
Type :: erlzmq_socket_type(),
111
{active, boolean()}) ->
112
{ok, erlzmq_socket()} |
113
erlzmq_error().
114
socket(Context, Type, {active, true}) ->
115
true = (Type =/= pub) and (Type =/= push) and (Type =/= xpub),
116
erlzmq_nif:socket(Context, socket_type(Type), 1);
117
socket(Context, Type, {active, false}) ->
118
erlzmq_nif:socket(Context, socket_type(Type), 0).
119
120
121
%% @doc Accept connections on a socket.
122
%% <br />
123
%% <i>For more information see
124
%% <a href="http://api.zeromq.org/master:zmq_bind">zmq_bind</a>.</i>
125
%% @end
126
-spec bind(Socket :: erlzmq_socket(),
127
Endpoint :: erlzmq_endpoint()) ->
128
ok |
129
erlzmq_error().
130
bind({I, Socket}, Endpoint)
131
when is_integer(I), is_list(Endpoint) ->
132
erlzmq_nif:bind(Socket, Endpoint).
133
134
%% @doc Connect a socket.
135
%% <br />
136
%% <i>For more information see
137
%% <a href="http://api.zeromq.org/master:zmq_connect">zmq_connect</a>.</i>
138
%% @end
139
-spec connect(Socket :: erlzmq_socket(),
140
Endpoint :: erlzmq_endpoint()) ->
141
ok |
142
erlzmq_error().
143
connect({I, Socket}, Endpoint)
144
when is_integer(I), is_list(Endpoint) ->
145
erlzmq_nif:connect(Socket, Endpoint).
146
147
%% @equiv send(Socket, Msg, [])
Ransom Richardson
Feb 17, 2012
148
-spec send(erlzmq_socket(),
149
Binary :: binary()) ->
152
send(Socket, Binary) when is_binary(Binary) ->
153
send(Socket, Binary, []).
154
155
%% @doc Send a message on a socket.
156
%% <br />
157
%% <i>For more information see
158
%% <a href="http://api.zeromq.org/master:zmq_send">zmq_send</a>.</i>
159
%% @end
Ransom Richardson
Feb 17, 2012
160
-spec send(erlzmq_socket(),
161
Binary :: binary(),
162
Flags :: erlzmq_send_recv_flags()) ->
163
ok |
164
erlzmq_error().
165
send({I, Socket}, Binary, Flags)
166
when is_integer(I), is_binary(Binary), is_list(Flags) ->
167
case erlzmq_nif:send(Socket, Binary, sendrecv_flags(Flags)) of
168
Ref when is_reference(Ref) ->
169
receive
170
{Ref, ok} ->
171
ok;
172
{Ref, {error, _} = Error} ->
173
Error
174
end;
175
Result ->
178
179
%% @equiv recv(Socket, 0)
180
-spec recv(Socket :: erlzmq_socket()) ->
181
{ok, erlzmq_data()} |
182
erlzmq_error().
183
recv(Socket) ->
184
recv(Socket, []).
185
186
%% @doc Receive a message from a socket.
187
%% <br />
188
%% <i>For more information see
189
%% <a href="http://api.zeromq.org/master:zmq_recv">zmq_recv</a>.</i>
190
%% @end
191
-spec recv(Socket :: erlzmq_socket(),
192
Flags :: erlzmq_send_recv_flags()) ->
193
{ok, erlzmq_data()} |
194
erlzmq_error() |
195
{error, {timeout, reference()}}.
196
recv({I, Socket}, Flags)
197
when is_integer(I), is_list(Flags) ->
198
case erlzmq_nif:recv(Socket, sendrecv_flags(Flags)) of
199
Ref when is_reference(Ref) ->
200
Timeout = proplists:get_value(timeout, Flags, infinity),
201
receive
202
{Ref, Result} ->
203
{ok, Result}
204
after Timeout ->
206
end;
207
Result ->
209
end.
210
211
%% @doc Set an {@link erlzmq_sockopt(). option} associated with a socket.
212
%% <br />
213
%% <i>For more information see
214
%% <a href="http://api.zeromq.org/master:zmq_setsockopt">zmq_setsockopt</a>.</i>
215
%% @end
Ransom Richardson
Feb 17, 2012
216
-spec setsockopt(erlzmq_socket(),
Ransom Richardson
Feb 17, 2012
218
erlzmq_sockopt_value() | binary()) ->
221
setsockopt(Socket, Name, Value) when is_list(Value) ->
222
setsockopt(Socket, Name, erlang:list_to_binary(Value));
223
setsockopt({I, Socket}, Name, Value) when is_integer(I), is_atom(Name) ->
224
erlzmq_nif:setsockopt(Socket, option_name(Name), Value).
225
226
%% @doc Get an {@link erlzmq_sockopt(). option} associated with a socket.
227
%% <br />
228
%% <i>For more information see
229
%% <a href="http://api.zeromq.org/master:zmq_getsockopt">zmq_getsockopt</a>.</i>
230
%% @end
231
-spec getsockopt(Socket :: erlzmq_socket(),
232
Name :: erlzmq_sockopt()) ->
233
{ok, erlzmq_sockopt_value()} |
234
erlzmq_error().
235
getsockopt({I, Socket}, Name) when is_integer(I), is_atom(Name) ->
236
erlzmq_nif:getsockopt(Socket, option_name(Name)).
237
238
%% @equiv close(Socket, infinity)
239
-spec close(Socket :: erlzmq_socket()) ->
242
close(Socket) ->
243
close(Socket, infinity).
245
%% @doc Close the given socket.
246
%% <br />
247
%% <i>For more information see
248
%% <a href="http://api.zeromq.org/master:zmq_close">zmq_close</a>.</i>
249
%% @end
250
-spec close(Socket :: erlzmq_socket(),
254
close({I, Socket}, Timeout) when is_integer(I) ->
255
case erlzmq_nif:close(Socket) of
256
Ref when is_reference(Ref) ->
257
receive
258
{Ref, Result} ->
259
Result
260
after
261
Timeout ->
262
{error, {timeout, Ref}}
263
end;
264
Result ->
265
Result
266
end.
267
268
%% @equiv term(Context, infinity)
269
-spec term(Context :: erlzmq_context()) ->
270
ok |
271
erlzmq_error().
272
term(Context) ->
273
term(Context, infinity).
274
275
%% @doc Terminate the given context waiting up to Timeout ms.
276
%% <br />
277
%% This function should be called after all sockets associated with
278
%% the given context have been closed.<br />
279
%% If not it will block the given Timeout amount of time.
280
%% <i>For more information see
281
%% <a href="http://api.zeromq.org/master:zmq_term">zmq_term</a>.</i>
282
%% @end
283
-spec term(Context :: erlzmq_context(),
284
Timeout :: timeout()) ->
285
ok |
286
erlzmq_error() |
287
{error, {timeout, reference()}}.
288
289
term(Context, Timeout) ->
290
case erlzmq_nif:term(Context) of
291
Ref when is_reference(Ref) ->
292
receive
293
{Ref, Result} ->
294
Result
298
end;
299
Result ->
301
end.
302
Jun 9, 2011
303
%% @doc Returns the 0MQ library version.
304
%% @end
305
-spec version() -> {integer(), integer(), integer()}.
306
307
version() -> erlzmq_nif:version().
308
309
%% Private
310
311
-spec socket_type(Type :: erlzmq_socket_type()) ->
312
integer().
313
314
socket_type(pair) ->
315
?'ZMQ_PAIR';
316
socket_type(pub) ->
317
?'ZMQ_PUB';
318
socket_type(sub) ->
319
?'ZMQ_SUB';
320
socket_type(req) ->
321
?'ZMQ_REQ';
322
socket_type(rep) ->
323
?'ZMQ_REP';
324
socket_type(dealer) ->
325
?'ZMQ_DEALER';
326
socket_type(xreq) ->
327
?'ZMQ_XREQ';
328
socket_type(router) ->
329
?'ZMQ_ROUTER';
330
socket_type(xrep) ->
331
?'ZMQ_XREP';
332
socket_type(pull) ->
333
?'ZMQ_PULL';
334
socket_type(push) ->
335
?'ZMQ_PUSH';
336
socket_type(xpub) ->
337
?'ZMQ_XPUB';
338
socket_type(xsub) ->
339
?'ZMQ_XSUB'.
340
341
-spec sendrecv_flags(Flags :: erlzmq_send_recv_flags()) ->
342
integer().
343
344
sendrecv_flags([]) ->
345
0;
346
sendrecv_flags([{timeout,_}]) ->
347
0;
348
sendrecv_flags([noblock|Rest]) ->
349
?'ZMQ_NOBLOCK' bor sendrecv_flags(Rest);
350
sendrecv_flags([sndmore|Rest]) ->
351
?'ZMQ_SNDMORE' bor sendrecv_flags(Rest).
352
353
-spec option_name(Name :: erlzmq_sockopt()) ->
354
integer().
355
356
option_name(hwm) ->
357
?'ZMQ_HWM';
358
option_name(swap) ->
359
?'ZMQ_SWAP';
360
option_name(affinity) ->
361
?'ZMQ_AFFINITY';
362
option_name(identity) ->
363
?'ZMQ_IDENTITY';
364
option_name(subscribe) ->
365
?'ZMQ_SUBSCRIBE';
366
option_name(unsubscribe) ->
367
?'ZMQ_UNSUBSCRIBE';
368
option_name(rate) ->
369
?'ZMQ_RATE';
370
option_name(recovery_ivl) ->
371
?'ZMQ_RECOVERY_IVL';
372
option_name(mcast_loop) ->
373
?'ZMQ_MCAST_LOOP';
374
option_name(sndbuf) ->
375
?'ZMQ_SNDBUF';
376
option_name(rcvbuf) ->
377
?'ZMQ_RCVBUF';
378
option_name(rcvmore) ->
379
?'ZMQ_RCVMORE';
380
option_name(fd) ->
381
?'ZMQ_FD';
382
option_name(events) ->
383
?'ZMQ_EVENTS';
384
option_name(linger) ->
385
?'ZMQ_LINGER';
386
option_name(reconnect_ivl) ->
387
?'ZMQ_RECONNECT_IVL';
388
option_name(backlog) ->
389
?'ZMQ_BACKLOG';
390
option_name(recovery_ivl_msec) ->
391
?'ZMQ_RECOVERY_IVL_MSEC';
392
option_name(reconnect_ivl_max) ->
393
?'ZMQ_RECONNECT_IVL_MAX'.
394