Skip to content
Browse files

migrate from the 2.1 api to 3.1 api for zmq

  • Loading branch information...
1 parent 08fe70c commit 7dcd68fa00425bbc309d5a4e500bff4f4c8c666a @ericbmerritt ericbmerritt committed Mar 8, 2012
Showing with 413 additions and 108 deletions.
  1. +15 −21 README.md
  2. +54 −42 c_src/erlzmq_nif.c
  3. +24 −13 include/erlzmq.hrl
  4. +93 −23 src/erlzmq.erl
  5. +227 −9 test/erlzmq_test.erl
View
36 README.md
@@ -1,44 +1,41 @@
erlzmq2
-====
+=======
NIF based Erlang bindings for the ZeroMQ messaging library.
Copyright (c) 2011 Yurii Rashkovskii, Evax Software and Michael Truog
Overview
-========
+--------
The erlzmq2 application provides high-performance NIF based Erlang
bindings for the ZeroMQ messaging library.
Downloading
-===========
+-----------
The erlzmq2 source code can be found on
-[GitHub](https://github.com/yrashk/erlzmq2)
+[GitHub](https://github.com/zeromq/erlzmq2)
$ git clone http://github.com/zeromq/erlzmq2.git
-It is also available on [Agner](http://erlagner.org/):
-
- $ agner build erlzmq
-
-In order to build erlzmq2 against a specific version of ZeroMQ (not
-`v2.1.11`), use this:
-
- $ ZEROMQ_VERSION=v<VERSION> agner build erlzmq
-
Building
-========
+--------
+
+Please note that to behave properly on your system ZeroMQ might
+require [some tuning](http://www.zeromq.org/docs:tuning-zeromq).
Build the code
$ make
-If you want to build against a specific version of ZeroMQ (not
-`v2.1.11`), use this:
+If you want to build against a specific version of ZeroMQ in the 3.1
+series (not `v3.1.0`), use this:
$ ZEROMQ_VERSION=v<VERSION> make
+Be aware that this will almost assuredly not work correctly for any
+versions of zeromq that are not in the 3.1 series.
+
Build the docs
$ make docs
@@ -55,11 +52,8 @@ Run the benchmarks (requires [python](http://www.python.org) and
This will run performance tests and output png graphs in the graphs
directory.
-Please note that to behave properly on your system ZeroMQ might
-require [some tuning](http://www.zeromq.org/docs:tuning-zeromq).
-
Architecture
-============
+------------
The bindings use Erlang's
[NIF (native implemented functions)](http://www.erlang.org/doc/man/erl_nif.html)
@@ -68,7 +62,7 @@ pair of inproc sockets by context are used to simulate blocking recv
calls without affecting the Erlang virtual machine's responsiveness.
License
-=======
+-------
The project is released under the MIT license.
View
96 c_src/erlzmq_nif.c
@@ -309,21 +309,16 @@ NIF(erlzmq_nif_setsockopt)
size_t option_len;
switch (option_name) {
// uint64_t
- case ZMQ_HWM:
case ZMQ_AFFINITY:
- case ZMQ_SNDBUF:
- case ZMQ_RCVBUF:
if (! enif_get_uint64(env, argv[2], &value_uint64)) {
return enif_make_badarg(env);
}
option_value = &value_uint64;
option_len = sizeof(int64_t);
break;
+
// int64_t
- case ZMQ_SWAP:
- case ZMQ_RATE:
- case ZMQ_RECOVERY_IVL:
- case ZMQ_MCAST_LOOP:
+ case ZMQ_MAXMSGSIZE:
if (! enif_get_int64(env, argv[2], &value_int64)) {
return enif_make_badarg(env);
}
@@ -341,10 +336,21 @@ NIF(erlzmq_nif_setsockopt)
option_len = value_binary.size;
break;
// int
+ case ZMQ_SNDHWM:
+ case ZMQ_RCVHWM:
+ case ZMQ_RATE:
+ case ZMQ_RECOVERY_IVL:
+ case ZMQ_RCVBUF:
+ case ZMQ_SNDBUF:
case ZMQ_LINGER:
case ZMQ_RECONNECT_IVL:
+ case ZMQ_RECONNECT_IVL_MAX:
case ZMQ_BACKLOG:
- if (! enif_get_int(env, argv[1], &value_int)) {
+ case ZMQ_MULTICAST_HOPS:
+ case ZMQ_RCVTIMEO:
+ case ZMQ_SNDTIMEO:
+ case ZMQ_IPV4ONLY:
+ if (! enif_get_int(env, argv[2], &value_int)) {
return enif_make_badarg(env);
}
option_value = &value_int;
@@ -388,12 +394,7 @@ NIF(erlzmq_nif_getsockopt)
size_t option_len;
switch(option_name) {
// int64_t
- case ZMQ_RCVMORE:
- case ZMQ_SWAP:
- case ZMQ_RATE:
- case ZMQ_RECOVERY_IVL:
- case ZMQ_RECOVERY_IVL_MSEC:
- case ZMQ_MCAST_LOOP:
+ case ZMQ_MAXMSGSIZE:
option_len = sizeof(value_int64);
enif_mutex_lock(socket->mutex);
if (zmq_getsockopt(socket->socket_zmq, option_name,
@@ -405,10 +406,7 @@ NIF(erlzmq_nif_getsockopt)
return enif_make_tuple2(env, enif_make_atom(env, "ok"),
enif_make_int64(env, value_int64));
// uint64_t
- case ZMQ_HWM:
case ZMQ_AFFINITY:
- case ZMQ_SNDBUF:
- case ZMQ_RCVBUF:
option_len = sizeof(value_uint64);
enif_mutex_lock(socket->mutex);
if (zmq_getsockopt(socket->socket_zmq, option_name,
@@ -435,10 +433,22 @@ NIF(erlzmq_nif_getsockopt)
enif_make_binary(env, &value_binary));
// int
case ZMQ_TYPE:
+ case ZMQ_RCVMORE:
+ case ZMQ_SNDHWM:
+ case ZMQ_RCVHWM:
+ case ZMQ_RATE:
+ case ZMQ_RECOVERY_IVL:
+ case ZMQ_SNDBUF:
+ case ZMQ_RCVBUF:
case ZMQ_LINGER:
case ZMQ_RECONNECT_IVL:
case ZMQ_RECONNECT_IVL_MAX:
case ZMQ_BACKLOG:
+ case ZMQ_MULTICAST_HOPS:
+ case ZMQ_RCVTIMEO:
+ case ZMQ_SNDTIMEO:
+ case ZMQ_IPV4ONLY:
+ case ZMQ_EVENTS:
case ZMQ_FD: // FIXME: ZMQ_FD returns SOCKET on Windows
option_len = sizeof(value_int);
enif_mutex_lock(socket->mutex);
@@ -483,12 +493,12 @@ NIF(erlzmq_nif_send)
int polling_thread_send = 1;
if (! socket->active) {
enif_mutex_lock(socket->mutex);
- if (zmq_send(socket->socket_zmq, &req.data.send.msg,
- req.data.send.flags | ZMQ_NOBLOCK)) {
+ if (zmq_sendmsg(socket->socket_zmq, &req.data.send.msg,
+ req.data.send.flags | ZMQ_DONTWAIT) == -1) {
enif_mutex_unlock(socket->mutex);
int const error = zmq_errno();
if (error != EAGAIN ||
- (error == EAGAIN && (req.data.send.flags & ZMQ_NOBLOCK))) {
+ (error == EAGAIN && (req.data.send.flags & ZMQ_DONTWAIT))) {
zmq_msg_close(&req.data.send.msg);
return return_zmq_errno(env, error);
}
@@ -520,7 +530,7 @@ NIF(erlzmq_nif_send)
enif_mutex_unlock(socket->context->mutex);
return return_zmq_errno(env, ETERM);
}
- if (zmq_send(socket->context->thread_socket, &msg, 0)) {
+ if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
@@ -568,25 +578,25 @@ NIF(erlzmq_nif_recv)
if (zmq_msg_init(&msg)) {
return return_zmq_errno(env, zmq_errno());
}
-
// try recv with noblock
enif_mutex_lock(socket->mutex);
- if (zmq_recv(socket->socket_zmq, &msg, ZMQ_NOBLOCK)) {
+ if (zmq_recvmsg(socket->socket_zmq, &msg, ZMQ_DONTWAIT) == -1) {
enif_mutex_unlock(socket->mutex);
+ int const error = zmq_errno();
zmq_msg_close(&msg);
- int const error = zmq_errno();
if (error != EAGAIN ||
- (error == EAGAIN && (req.data.recv.flags & ZMQ_NOBLOCK))) {
+ (error == EAGAIN && (req.data.recv.flags & ZMQ_DONTWAIT))) {
return return_zmq_errno(env, error);
}
+
req.type = ERLZMQ_THREAD_REQUEST_RECV;
req.data.recv.env = enif_alloc_env();
req.data.recv.ref = enif_make_ref(req.data.recv.env);
enif_self(env, &req.data.recv.pid);
req.data.recv.socket = socket;
- if (zmq_msg_init_size(&msg, sizeof(erlzmq_thread_request_t))) {
+ if (zmq_msg_init_size(&msg, sizeof(erlzmq_thread_request_t)) == -1) {
enif_free_env(req.data.recv.env);
return return_zmq_errno(env, zmq_errno());
}
@@ -598,7 +608,7 @@ NIF(erlzmq_nif_recv)
enif_mutex_unlock(socket->context->mutex);
return return_zmq_errno(env, ETERM);
}
- if (zmq_send(socket->context->thread_socket, &msg, 0)) {
+ if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
@@ -665,7 +675,7 @@ NIF(erlzmq_nif_close)
enif_mutex_unlock(socket->context->mutex);
return enif_make_atom(env, "ok");
}
- if (zmq_send(socket->context->thread_socket, &msg, 0)) {
+ if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
enif_free_env(req.data.close.env);
@@ -704,7 +714,7 @@ NIF(erlzmq_nif_term)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
enif_mutex_lock(context->mutex);
- if (zmq_send(context->thread_socket, &msg, 0)) {
+ if (zmq_sendmsg(context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(context->mutex);
zmq_msg_close(&msg);
enif_free_env(req.data.term.env);
@@ -770,17 +780,17 @@ static void * polling_thread(void * handle)
if (item->revents & ZMQ_POLLIN) {
size_t value_len = sizeof(int64_t);
int64_t flag_value = 0;
-
+
assert(r->type == ERLZMQ_THREAD_REQUEST_RECV);
--count;
zmq_msg_t msg;
zmq_msg_init(&msg);
enif_mutex_lock(r->data.recv.socket->mutex);
- if (zmq_recv(r->data.recv.socket->socket_zmq, &msg,
- r->data.recv.flags) ||
- (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON &&
- zmq_getsockopt(r->data.recv.socket->socket_zmq,
+ if (zmq_recvmsg(r->data.recv.socket->socket_zmq, &msg,
+ r->data.recv.flags) == -1 ||
+ (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON &&
+ zmq_getsockopt(r->data.recv.socket->socket_zmq,
ZMQ_RCVMORE, &flag_value, &value_len)) )
{
enif_mutex_unlock(r->data.recv.socket->mutex);
@@ -812,14 +822,16 @@ static void * polling_thread(void * handle)
if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
ERL_NIF_TERM flags_list;
-
+
// Should we send the multipart flag
if(flag_value == 1) {
- flags_list = enif_make_list1(r->data.recv.env, enif_make_atom(r->data.recv.env, "rcvmore"));
+ flags_list = enif_make_list1(r->data.recv.env,
+ enif_make_atom(r->data.recv.env,
+ "rcvmore"));
} else {
flags_list = enif_make_list(r->data.recv.env, 0);
}
-
+
enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
enif_make_tuple4(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"),
@@ -854,8 +866,8 @@ static void * polling_thread(void * handle)
--count;
enif_mutex_lock(r->data.send.socket->mutex);
- if (zmq_send(r->data.send.socket->socket_zmq,
- &r->data.send.msg, r->data.send.flags)) {
+ if (zmq_sendmsg(r->data.send.socket->socket_zmq,
+ &r->data.send.msg, r->data.send.flags) == -1) {
enif_mutex_unlock(r->data.send.socket->mutex);
enif_send(NULL, &r->data.send.pid, r->data.send.env,
enif_make_tuple2(r->data.send.env,
@@ -885,9 +897,9 @@ static void * polling_thread(void * handle)
zmq_msg_t msg;
zmq_msg_init(&msg);
enif_mutex_lock(context->mutex);
- status = zmq_recv(thread_socket, &msg, 0);
+ status = zmq_recvmsg(thread_socket, &msg, 0);
enif_mutex_unlock(context->mutex);
- assert(status == 0);
+ assert(status != -1);
assert(zmq_msg_size(&msg) == sizeof(erlzmq_thread_request_t));
@@ -1024,7 +1036,7 @@ static ERL_NIF_TERM add_active_req(ErlNifEnv* env, erlzmq_socket_t * socket)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
- if (zmq_send(socket->context->thread_socket, &msg, 0)) {
+ if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
zmq_msg_close(&msg);
enif_free_env(req.data.recv.env);
return return_zmq_errno(env, zmq_errno());
View
37 include/erlzmq.hrl
@@ -13,15 +13,12 @@
-define('ZMQ_XSUB', 10).
% ZMQ socket options.
--define('ZMQ_HWM', 1).
--define('ZMQ_SWAP', 3).
-define('ZMQ_AFFINITY', 4).
-define('ZMQ_IDENTITY', 5).
-define('ZMQ_SUBSCRIBE', 6).
-define('ZMQ_UNSUBSCRIBE', 7).
-define('ZMQ_RATE', 8).
-define('ZMQ_RECOVERY_IVL', 9).
--define('ZMQ_MCAST_LOOP', 10).
-define('ZMQ_SNDBUF', 11).
-define('ZMQ_RCVBUF', 12).
-define('ZMQ_RCVMORE', 13).
@@ -33,10 +30,21 @@
-define('ZMQ_BACKLOG', 19).
-define('ZMQ_RECOVERY_IVL_MSEC', 20).
-define('ZMQ_RECONNECT_IVL_MAX', 21).
+-define('ZMQ_MAXMSGSIZE', 22).
+-define('ZMQ_SNDHWM', 23).
+-define('ZMQ_RCVHWM', 24).
+-define('ZMQ_MULTICAST_HOPS', 25).
+-define('ZMQ_RCVTIMEO', 27).
+-define('ZMQ_SNDTIMEO', 28).
+-define('ZMQ_IPV4ONLY', 31).
+-define('ZMQ_LAST_ENDPOINT', 32).
+
+% Message options
+-define('ZMQ_MORE', 1).
% ZMQ send/recv flags
--define('ZMQ_NOBLOCK', 1).
--define('ZMQ_SNDMORE', 2).
+-define('ZMQ_DONTWAIT', 1).
+-define('ZMQ_SNDMORE', 2).
%% Types
@@ -89,7 +97,7 @@
%% {unknown, integer()}.
%% Possible error types.
-type erlzmq_error_type() :: enotsup | eprotonosupport | enobufs | enetdown |
- eaddrinuse | eaddnotavail | econnrefused |
+ eaddrinuse | eaddnotavail | econnrefused |
einprogress | efsm | enocompatproto | eterm |
emthread | errno() | {unknown, integer()}.
@@ -116,7 +124,7 @@
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_send">zmq_send</a> or
%% <a href="http://api.zeromq.org/master:zmq_recv">zmq_recv</a></i>
--type erlzmq_send_recv_flag() :: noblock | sndmore | recvmore | {timeout, timeout()}.
+-type erlzmq_send_recv_flag() :: dontwait | sndmore | recvmore | {timeout, timeout()}.
%% @type erlzmq_send_recv_flags() = list(erlzmq_send_recv_flag()).
%% A list of flags to use with {@link ezqm:send/3. send/3} and
@@ -126,17 +134,20 @@
%% @type erlzmq_sockopt() = hwm | swap | affinity | identity | subscribe |
%% unsubscribe | rate | recovery_ivl | mcast_loop | sndbuf | rcvbuf |
%% rcvmore | fd | events | linger | reconnect_ivl | backlog |
-%% recovery_ivl_msec | reconnect_ivl_max.
+%% | reconnect_ivl_max.
%% Available options for {@link erlzmq:setsockopt/3. setsockopt/3}
%% and {@link erlzmq:getsockopt/2. getsockopt/2}.<br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_setsockopt">zmq_setsockopt</a>
%% and <a href="http://api.zeromq.org/master:zmq_getsockopt">zmq_getsockopt</a></i>
--type erlzmq_sockopt() :: hwm | swap | affinity | identity | subscribe |
- unsubscribe | rate | recovery_ivl | mcast_loop |
- sndbuf | rcvbuf | rcvmore | fd | events | linger |
- reconnect_ivl | backlog | recovery_ivl_msec |
- reconnect_ivl_max.
+-type erlzmq_sockopt() :: affinity | identity | subscribe |
+ unsubscribe | rate | recovery_ivl | sndbuf |
+ rcvbuf | rcvmore | fd | events | linger |
+ reconnect_ivl | backlog |reconnect_ivl_max
+ | maxmsgsize | sndhwm | rcvhwm |
+ multicast_hops | rcvtimeo | sndtimeo |
+ ipv4only.
+
%% @type erlzmq_sockopt_value() = integer() | iolist().
%% Possible option values for {@link erlzmq:setsockopt/3. setsockopt/3}.
View
116 src/erlzmq.erl
@@ -2,17 +2,17 @@
%% ex: set softtabstop=4 tabstop=4 shiftwidth=4 expandtab fileencoding=utf-8:
%%
%% Copyright (c) 2011 Yurii Rashkovskii, Evax Software and Michael Truog
-%%
+%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
-%%
+%%
%% The above copyright notice and this permission notice shall be included in
%% all copies or substantial portions of the Software.
-%%
+%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -31,8 +31,12 @@
connect/2,
send/2,
send/3,
+ sendmsg/2,
+ sendmsg/3,
recv/1,
recv/2,
+ recvmsg/1,
+ recvmsg/2,
setsockopt/3,
getsockopt/2,
close/1,
@@ -116,7 +120,7 @@ socket(Context, Type, {active, true}) ->
erlzmq_nif:socket(Context, socket_type(Type), 1);
socket(Context, Type, {active, false}) ->
erlzmq_nif:socket(Context, socket_type(Type), 0).
-
+
%% @doc Accept connections on a socket.
%% <br />
@@ -171,11 +175,46 @@ send({I, Socket}, Binary, Flags)
ok;
{Ref, {error, _} = Error} ->
Error
+ after case erlzmq_nif:getsockopt(Socket,?'ZMQ_SNDTIMEO') of
+ {ok, -1} ->
+ infinity;
+ {ok, Else} ->
+ Else
+ end ->
+ {error, eagain}
end;
Result ->
Result
end.
+%% @equiv send(Socket, Msg, [])
+%% @doc This function exists for zeromq api compatibility and doesn't
+%% actually provide any different functionality then what you get with
+%% the {@link erlzmq:send/2} function. In fact this function just
+%% calls that function. So there is a slight bit of additional
+%% overhead as well.
+-spec sendmsg(erlzmq_socket(),
+ Binary :: binary()) ->
+ ok |
+ erlzmq_error().
+sendmsg(Socket, Binary) when is_binary(Binary) ->
+ send(Socket, Binary, []).
+
+%% @equiv send(Socket, Msg, Flags)
+%% @doc This function exists for zeromq api compatibility and doesn't
+%% actually provide any different functionality then what you get with
+%% the {@link erlzmq:send/3} function. In fact this function just
+%% calls that function. So there is a slight bit of additional
+%% overhead as well.
+-spec sendmsg(erlzmq_socket(),
+ Binary :: binary(),
+ Flags :: erlzmq_send_recv_flags()) ->
+ ok |
+ erlzmq_error().
+sendmsg(Socket, Binary, Flags) ->
+ send(Socket, Binary, Flags).
+
+
%% @equiv recv(Socket, 0)
-spec recv(Socket :: erlzmq_socket()) ->
{ok, erlzmq_data()} |
@@ -191,23 +230,51 @@ recv(Socket) ->
-spec recv(Socket :: erlzmq_socket(),
Flags :: erlzmq_send_recv_flags()) ->
{ok, erlzmq_data()} |
- erlzmq_error() |
- {error, {timeout, reference()}}.
+ erlzmq_error().
recv({I, Socket}, Flags)
when is_integer(I), is_list(Flags) ->
case erlzmq_nif:recv(Socket, sendrecv_flags(Flags)) of
Ref when is_reference(Ref) ->
- Timeout = proplists:get_value(timeout, Flags, infinity),
receive
{Ref, Result} ->
{ok, Result}
- after Timeout ->
- {error, {timeout, Ref}}
+ after case erlzmq_nif:getsockopt(Socket,?'ZMQ_RCVTIMEO') of
+ {ok, -1} ->
+ infinity;
+ {ok, Else} ->
+ Else
+ end ->
+ {error, eagain}
end;
Result ->
Result
end.
+%% @equiv recv(Socket, 0)
+%% @doc This function exists for zeromq api compatibility and doesn't
+%% actually provide any different functionality then what you get with
+%% the {@link erlzmq:recv/3} function. In fact this function just
+%% calls that function. So there is a slight bit of additional
+%% overhead as well.
+-spec recvmsg(Socket :: erlzmq_socket()) ->
+ {ok, erlzmq_data()} |
+ erlzmq_error().
+recvmsg(Socket) ->
+ recv(Socket, []).
+
+%% @equiv recv(Socket, Flags)
+%% @doc This function exists for zeromq api compatibility and doesn't
+%% actually provide any different functionality then what you get with
+%% the {@link erlzmq:recv/3} function. In fact this function just
+%% calls that function. So there is a slight bit of additional
+%% overhead as well.
+-spec recvmsg(Socket :: erlzmq_socket(),
+ Flags :: erlzmq_send_recv_flags()) ->
+ {ok, erlzmq_data()} |
+ erlzmq_error().
+recvmsg(Socket, Flags) ->
+ recv(Socket, Flags).
+
%% @doc Set an {@link erlzmq_sockopt(). option} associated with a socket.
%% <br />
%% <i>For more information see
@@ -343,20 +410,14 @@ socket_type(xsub) ->
sendrecv_flags([]) ->
0;
-sendrecv_flags([{timeout,_}]) ->
- 0;
-sendrecv_flags([noblock|Rest]) ->
- ?'ZMQ_NOBLOCK' bor sendrecv_flags(Rest);
+sendrecv_flags([dontwait|Rest]) ->
+ ?'ZMQ_DONTWAIT' bor sendrecv_flags(Rest);
sendrecv_flags([sndmore|Rest]) ->
?'ZMQ_SNDMORE' bor sendrecv_flags(Rest).
-spec option_name(Name :: erlzmq_sockopt()) ->
integer().
-option_name(hwm) ->
- ?'ZMQ_HWM';
-option_name(swap) ->
- ?'ZMQ_SWAP';
option_name(affinity) ->
?'ZMQ_AFFINITY';
option_name(identity) ->
@@ -369,8 +430,6 @@ option_name(rate) ->
?'ZMQ_RATE';
option_name(recovery_ivl) ->
?'ZMQ_RECOVERY_IVL';
-option_name(mcast_loop) ->
- ?'ZMQ_MCAST_LOOP';
option_name(sndbuf) ->
?'ZMQ_SNDBUF';
option_name(rcvbuf) ->
@@ -387,8 +446,19 @@ option_name(reconnect_ivl) ->
?'ZMQ_RECONNECT_IVL';
option_name(backlog) ->
?'ZMQ_BACKLOG';
-option_name(recovery_ivl_msec) ->
- ?'ZMQ_RECOVERY_IVL_MSEC';
option_name(reconnect_ivl_max) ->
- ?'ZMQ_RECONNECT_IVL_MAX'.
-
+ ?'ZMQ_RECONNECT_IVL_MAX';
+option_name(maxmsgsize) ->
+ ?'ZMQ_MAXMSGSIZE';
+option_name(sndhwm) ->
+ ?'ZMQ_SNDHWM';
+option_name(rcvhwm) ->
+ ?'ZMQ_RCVHWM';
+option_name(multicast_hops) ->
+ ?'ZMQ_MULTICAST_HOPS';
+option_name(rcvtimeo) ->
+ ?'ZMQ_RCVTIMEO';
+option_name(sndtimeo) ->
+ ?'ZMQ_SNDTIMEO';
+option_name(ipv4only) ->
+ ?'ZMQ_IPV4ONLY'.
View
236 test/erlzmq_test.erl
@@ -7,29 +7,71 @@ hwm_test() ->
{ok, S1} = erlzmq:socket(C, [pull, {active, false}]),
{ok, S2} = erlzmq:socket(C, [push, {active, false}]),
- ok = erlzmq:setsockopt(S2, linger, 0),
- ok = erlzmq:setsockopt(S2, hwm, 5),
+ ok = erlzmq:setsockopt(S1, rcvhwm, 2),
+ ok = erlzmq:setsockopt(S2, sndhwm, 2),
- ok = erlzmq:bind(S1, "tcp://127.0.0.1:5858"),
- ok = erlzmq:connect(S2, "tcp://127.0.0.1:5858"),
+
+ ok = erlzmq:bind(S1, "inproc://a"),
+ ok = erlzmq:connect(S2, "inproc://a"),
ok = hwm_loop(10, S2),
?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
+ ?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
+ ?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
+ ?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
+
?assertMatch(ok, erlzmq:send(S2, <<"test">>)),
+
+ ?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
+
ok = erlzmq:close(S1),
ok = erlzmq:close(S2),
ok = erlzmq:term(C).
hwm_loop(0, _S) ->
ok;
-hwm_loop(N, S) when N > 5 ->
- ?assertMatch(ok, erlzmq:send(S, <<"test">>, [noblock])),
+hwm_loop(N, S) when N > 6 ->
+ ?assertMatch(ok, erlzmq:send(S, <<"test">>, [dontwait])),
hwm_loop(N-1, S);
hwm_loop(N, S) ->
- ?assertMatch({error, _} ,erlzmq:send(S, <<"test">>, [noblock])),
+ ?assertMatch({error, _} ,erlzmq:send(S, <<"test">>, [dontwait])),
hwm_loop(N-1, S).
+invalid_rep_test() ->
+ {ok, Ctx} = erlzmq:context(),
+
+ {ok, XrepSocket} = erlzmq:socket(Ctx, [xrep, {active, false}]),
+ {ok, ReqSocket} = erlzmq:socket(Ctx, [req, {active, false}]),
+
+ ok = erlzmq:setsockopt(XrepSocket, linger, 0),
+ ok = erlzmq:setsockopt(ReqSocket, linger, 0),
+ ok = erlzmq:bind(XrepSocket, "inproc://hi"),
+ ok = erlzmq:connect(ReqSocket, "inproc://hi"),
+
+ %% Initial request.
+ ok = erlzmq:send(ReqSocket, <<"r">>),
+
+ %% Receive the request.
+ {ok, Addr} = erlzmq:recv(XrepSocket),
+ {ok, Bottom} = erlzmq:recv(XrepSocket),
+ {ok, _Body} = erlzmq:recv(XrepSocket),
+
+ %% Send invalid reply.
+ ok = erlzmq:send(XrepSocket, Addr),
+
+ %% Send valid reply.
+ ok = erlzmq:send(XrepSocket, Addr, [sndmore]),
+ ok = erlzmq:send(XrepSocket, Bottom, [sndmore]),
+ ok = erlzmq:send(XrepSocket, <<"b">>),
+
+ %% Check whether we've got the valid reply.
+ {ok, <<"b">>} = erlzmq:recv(ReqSocket),
+
+ %% Tear down the wiring.
+ ok = erlzmq:close(XrepSocket),
+ ok = erlzmq:close(ReqSocket),
+ ok = erlzmq:term(Ctx).
pair_inproc_test() ->
basic_tests("inproc://tester", pair, pair, active),
@@ -43,6 +85,85 @@ pair_tcp_test() ->
basic_tests("tcp://127.0.0.1:5554", pair, pair, active),
basic_tests("tcp://127.0.0.1:5555", pair, pair, passive).
+reqrep_device_test() ->
+ {ok, Ctx} = erlzmq:context(),
+
+ %% Create a req/rep device.
+ {ok, Xreq} = erlzmq:socket(Ctx, [xreq, {active, false}]),
+ ok = erlzmq:bind(Xreq, "tcp://127.0.0.1:5560"),
+ {ok, Xrep} = erlzmq:socket(Ctx, [xrep, {active, false}]),
+ ok = erlzmq:bind(Xrep, "tcp://127.0.0.1:5561"),
+
+ %% Create a worker.
+ {ok, Rep} = erlzmq:socket(Ctx, [rep, {active, false}]),
+ ok= erlzmq:connect(Rep, "tcp://127.0.0.1:5560"),
+
+ %% Create a client.
+ {ok, Req} = erlzmq:socket(Ctx, [req, {active, false}]),
+ ok = erlzmq:connect(Req, "tcp://127.0.0.1:5561"),
+
+ %% Send a request.
+ ok = erlzmq:send(Req, <<"ABC">>, [sndmore]),
+ ok = erlzmq:send(Req, <<"DEF">>),
+
+
+ %% Pass the request through the device.
+ lists:foreach(fun(_) ->
+ {ok, Msg} = erlzmq:recv(Xrep),
+ {ok, RcvMore}= erlzmq:getsockopt(Xrep, rcvmore),
+ case RcvMore of
+ 0 ->
+ ok = erlzmq:send(Xreq, Msg);
+ _ ->
+ ok = erlzmq:send(Xreq, Msg, [sndmore])
+ end
+ end,
+ lists:seq(1, 4)),
+
+ %% Receive the request.
+ {ok, Buff0} = erlzmq:recv(Rep),
+ ?assertMatch(<<"ABC">>, Buff0),
+ {ok, RcvMore1} = erlzmq:getsockopt(Rep, rcvmore),
+ ?assert(RcvMore1 > 0),
+ {ok, Buff1} = erlzmq:recv(Rep),
+ ?assertMatch(<<"DEF">>, Buff1),
+ {ok, RcvMore2} = erlzmq:getsockopt(Rep, rcvmore),
+ ?assertMatch(0, RcvMore2),
+
+ %% Send the reply.
+ ok = erlzmq:send(Rep, <<"GHI">>, [sndmore]),
+ ok = erlzmq:send (Rep, <<"JKL">>),
+
+ %% Pass the reply through the device.
+ lists:foreach(fun(_) ->
+ {ok, Msg} = erlzmq:recv(Xreq),
+ {ok,RcvMore3} = erlzmq:getsockopt(Xreq, rcvmore),
+ case RcvMore3 of
+ 0 ->
+ ok = erlzmq:send(Xrep, Msg);
+ _ ->
+ ok = erlzmq:send(Xrep, Msg, [sndmore])
+ end
+ end, lists:seq(1, 4)),
+
+ %% Receive the reply.
+ {ok, Buff2} = erlzmq:recv(Req),
+ ?assertMatch(<<"GHI">>, Buff2),
+ {ok, RcvMore4} = erlzmq:getsockopt(Req, rcvmore),
+ ?assert(RcvMore4 > 0),
+ {ok, Buff3} = erlzmq:recv(Req),
+ ?assertMatch(<<"JKL">>, Buff3),
+ {ok, RcvMore5} = erlzmq:getsockopt(Req, rcvmore),
+ ?assertMatch(0, RcvMore5),
+
+ %% Clean up.
+ ok = erlzmq:close(Req),
+ ok = erlzmq:close(Rep),
+ ok = erlzmq:close(Xrep),
+ ok = erlzmq:close(Xreq),
+ ok = erlzmq:term(Ctx).
+
+
reqrep_inproc_test() ->
basic_tests("inproc://test", req, rep, active),
basic_tests("inproc://test", req, rep, passive).
@@ -55,6 +176,103 @@ reqrep_tcp_test() ->
basic_tests("tcp://127.0.0.1:5556", req, rep, active),
basic_tests("tcp://127.0.0.1:5557", req, rep, passive).
+
+sub_forward_test() ->
+ {ok, Ctx} = erlzmq:context(),
+
+ %% First, create an intermediate device.
+ {ok, Xpub} = erlzmq:socket(Ctx, [xpub, {active, false}]),
+
+ ok = erlzmq:bind(Xpub, "tcp://127.0.0.1:5560"),
+
+ {ok, Xsub} = erlzmq:socket(Ctx, [xsub, {active, false}]),
+
+ ok = erlzmq:bind(Xsub, "tcp://127.0.0.1:5561"),
+
+ %% Create a publisher.
+ {ok, Pub} = erlzmq:socket(Ctx, [pub, {active, false}]),
+
+ ok = erlzmq:connect(Pub, "tcp://127.0.0.1:5561"),
+
+ %% Create a subscriber.
+ {ok, Sub} = erlzmq:socket(Ctx, [sub, {active, false}]),
+
+ ok = erlzmq:connect(Sub, "tcp://127.0.0.1:5560"),
+
+ %% Subscribe for all messages.
+ ok = erlzmq:setsockopt(Sub, subscribe, <<"">>),
+
+ %% Pass the subscription upstream through the device.
+ {ok, Buff0} = erlzmq:recv(Xpub),
+ ok = erlzmq:send(Xsub, Buff0),
+
+ %% Wait a bit till the subscription gets to the publisher.
+ timer:sleep(1000),
+
+ %% Send an empty message.
+ ok = erlzmq:send(Pub, <<>>),
+
+ %% Pass the message downstream through the device.
+ {ok, Buff} = erlzmq:recv(Xsub),
+
+ ok = erlzmq:send(Xpub, Buff),
+
+ %% Receive the message in the subscriber.
+ {ok, Buff} = erlzmq:recv(Sub),
+
+ %% Clean up.
+ ok = erlzmq:close(Xpub),
+ ok = erlzmq:close(Xsub),
+ ok = erlzmq:close(Pub),
+ ok = erlzmq:close(Sub),
+ ok = erlzmq:term(Ctx).
+
+timeo_test() ->
+ {ok, Ctx} = erlzmq:context(),
+ %% Create a disconnected socket.
+ {ok, Sb} = erlzmq:socket(Ctx, [pull, {active, false}]),
+ ok = erlzmq:bind(Sb, "inproc://timeout_test"),
+ %% Check whether non-blocking recv returns immediately.
+ {error, eagain} = erlzmq:recv(Sb, [dontwait]),
+ %% Check whether recv timeout is honoured.
+ Timeout0 = 500,
+ ok = erlzmq:setsockopt(Sb, rcvtimeo, Timeout0),
+ {Elapsed0, _} =
+ timer:tc(fun() ->
+ ?assertMatch({error, eagain}, erlzmq:recv(Sb))
+ end),
+ ?assert(Elapsed0 > 440000 andalso Elapsed0 < 550000),
+
+ %% Check whether connection during the wait doesn't distort the timeout.
+ Timeout1 = 2000,
+ ok = erlzmq:setsockopt(Sb, rcvtimeo, Timeout1),
+ proc_lib:spawn(fun() ->
+ timer:sleep(1000),
+ {ok, Sc} = erlzmq:socket(Ctx, [push, {active, false}]),
+ ok = erlzmq:connect(Sc, "inproc://timeout_test"),
+ timer:sleep(1000),
+ ok = erlzmq:close(Sc)
+ end),
+ {Elapsed1, _} = timer:tc(fun() ->
+ ?assertMatch({error, eagain}, erlzmq:recv(Sb))
+ end),
+ ?assert(Elapsed1 > 1900000 andalso Elapsed1 < 2100000),
+
+ %% Check that timeouts don't break normal message transfer.
+ {ok, Sc} = erlzmq:socket(Ctx, [push, {active, false}]),
+ ok = erlzmq:setsockopt(Sb, rcvtimeo, Timeout1),
+ ok = erlzmq:setsockopt(Sb, sndtimeo, Timeout1),
+ ok = erlzmq:connect(Sc, "inproc://timeout_test"),
+
+ Buff = <<"12345678ABCDEFGH12345678abcdefgh">>,
+ ok = erlzmq:send(Sc, Buff),
+ {ok, Buff} = erlzmq:recv(Sb),
+ %% Clean-up.
+ ok = erlzmq:close(Sc),
+ ok = erlzmq:close(Sb),
+ ok = erlzmq:term (Ctx).
+
+
bad_init_test() ->
?assertEqual({error, einval}, erlzmq:context(-1)).
@@ -94,7 +312,7 @@ shutdown_blocking_unblocking_test() ->
?assertMatch({error, {timeout, _}}, V),
{error, {timeout, Ref}} = V,
erlzmq:close(S),
- receive
+ receive
{Ref, ok} ->
ok
end.
@@ -177,7 +395,7 @@ ping_pong({S1, S2}, Msg, active) ->
?assertMatch({ok, Msg}, timeout)
end,
ok;
-
+
ping_pong({S1, S2}, Msg, passive) ->
ok = erlzmq:send(S1, Msg),
?assertMatch({ok, Msg}, erlzmq:recv(S2)),

0 comments on commit 7dcd68f

Please sign in to comment.
Something went wrong with that request. Please try again.