Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Added handing of all socket options and unit tests.

  • Loading branch information...
commit 884a8d7eabd6228da97411d567e106a877c76410 1 parent ab65e48
@saleyn saleyn authored
Showing with 356 additions and 147 deletions.
  1. +154 −28 c_src/zmq_drv.cpp
  2. +3 −1 c_src/zmq_drv.h
  3. +20 −14 include/zmq.hrl
  4. +179 −104 src/zmq.erl
View
182 c_src/zmq_drv.cpp
@@ -255,7 +255,7 @@ zmqdrv_error_code(zmq_drv_t *drv, int err)
static void
zmqdrv_ok(zmq_drv_t *drv, ErlDrvTermData pid)
{
- ErlDrvTermData spec[] = {ERL_DRV_ATOM, am_zok};
+ ErlDrvTermData spec[] = {ERL_DRV_ATOM, am_zok};
driver_send_term(drv->port, pid, spec, sizeof(spec)/sizeof(spec[0]));
}
@@ -266,7 +266,29 @@ zmqdrv_ok(zmq_drv_t *drv)
}
static void
-zmqdrv_binary_ok(zmq_drv_t *drv, ErlDrvTermData pid, void *data, size_t size)
+zmqdrv_ok_bool(zmq_drv_t *drv, ErlDrvTermData pid, bool val)
+{
+ ErlDrvTermData spec[] = {
+ ERL_DRV_ATOM, am_zok,
+ ERL_DRV_ATOM, (val ? am_true : am_false),
+ ERL_DRV_TUPLE, 2
+ };
+ driver_send_term(drv->port, pid, spec, sizeof(spec)/sizeof(spec[0]));
+}
+
+static void
+zmqdrv_ok_int64(zmq_drv_t *drv, ErlDrvTermData pid, int64_t val)
+{
+ ErlDrvTermData spec[] = {
+ ERL_DRV_ATOM, am_zok,
+ ERL_DRV_INT64, TERM_DATA(&val),
+ ERL_DRV_TUPLE, 2
+ };
+ driver_send_term(drv->port, pid, spec, sizeof(spec)/sizeof(spec[0]));
+}
+
+static void
+zmqdrv_ok_binary(zmq_drv_t *drv, ErlDrvTermData pid, void *data, size_t size)
{
/* Copy payload. */
ErlDrvTermData spec[] =
@@ -277,11 +299,6 @@ zmqdrv_binary_ok(zmq_drv_t *drv, ErlDrvTermData pid, void *data, size_t size)
driver_send_term(drv->port, pid, spec, sizeof(spec)/sizeof(spec[0]));
}
-static void
-zmqdrv_binary_ok(zmq_drv_t *drv, void *data, size_t size) {
- zmqdrv_binary_ok(drv, driver_caller(drv->port), data, size);
-}
-
//-------------------------------------------------------------------
// Driver callbacks
//-------------------------------------------------------------------
@@ -666,29 +683,138 @@ zmqdrv_setsockopt(zmq_drv_t *drv, ErlIOVec *ev)
static void
zmqdrv_getsockopt(zmq_drv_t *drv, ErlIOVec *ev)
{
- ErlDrvBinary* bin = ev->binv[1];
- char* bytes = bin->orig_bytes;
- uint32_t idx = ntohl(*(uint32_t*)(bytes+1));
- void* s = drv->get_zmq_socket(idx);
- uint32_t opt = ntohl (*(uint32_t*)(bytes+sizeof(idx)+1));
-
- if (opt == ZMQ_RCVMORE) {
- int64_t val;
- size_t valsz = sizeof (val);
- if (zmq_getsockopt (s, opt, &val, &valsz) < 0) {
- zmqdrv_error_code(drv, zmq_errno());
- return;
- }
-
- ErlDrvTermData spec[] = {
- ERL_DRV_ATOM, am_zok,
- ERL_DRV_ATOM, (val ? am_true : am_false),
- ERL_DRV_TUPLE, 2};
- driver_send_term(drv->port, driver_caller(drv->port), spec, sizeof(spec)/sizeof(spec[0]));
+ ErlDrvBinary* bin = ev->binv[1];
+ char* bytes = bin->orig_bytes;
+ uint32_t idx = ntohl(*(uint32_t*)(bytes+1));
+ void* s = drv->get_zmq_socket(idx);
+ zmq_sock_info* si = drv->get_socket_info(idx);
+ uint32_t opt = ntohl (*(uint32_t*)(bytes+sizeof(idx)+1));
+ union {
+ uint8_t a[255];
+ uint64_t ui64;
+ int64_t i64;
+ int i;
+ uint32_t ui;
+ } val;
+ size_t vallen;
+
+ if (idx > drv->zmq_socket_count || !s || !si) {
+ zmqdrv_error_code(drv, ENODEV);
return;
}
- zmqdrv_error(drv, "Not implemented");
+ zmqdrv_fprintf("setsockopt %p (setting %d options)\r\n", si->socket, (int)n);
+
+ switch (opt) {
+ case ZMQ_AFFINITY:
+ vallen = sizeof(uint64_t);
+ if (zmq_getsockopt(s, opt, &val.ui64, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.ui64);
+ break;
+ case ZMQ_BACKLOG:
+ vallen = sizeof(int);
+ if (zmq_getsockopt(s, opt, &val.i, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.i);
+ break;
+ case ZMQ_EVENTS:
+ vallen = sizeof(uint32_t);
+ if (zmq_getsockopt(s, opt, &val.ui, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.ui);
+ break;
+ case ZMQ_FD:
+ vallen = sizeof(int);
+ if (zmq_getsockopt(s, opt, &val.i, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.i);
+ break;
+ case ZMQ_HWM:
+ vallen = sizeof(uint64_t);
+ if (zmq_getsockopt(s, opt, &val.ui64, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.ui64);
+ break;
+ case ZMQ_IDENTITY:
+ vallen = sizeof(val);
+ if (zmq_getsockopt(s, opt, val.a, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_binary(drv, driver_caller(drv->port), val.a, vallen);
+ break;
+ case ZMQ_LINGER:
+ vallen = sizeof(int);
+ if (zmq_getsockopt(s, opt, &val.i, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_bool(drv, driver_caller(drv->port), !!val.i);
+ break;
+ case ZMQ_MCAST_LOOP:
+ vallen = sizeof(int64_t);
+ if (zmq_getsockopt(s, opt, &val.i64, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_bool(drv, driver_caller(drv->port), !!val.i64);
+ break;
+ case ZMQ_RATE:
+ vallen = sizeof(int64_t);
+ if (zmq_getsockopt(s, opt, &val.i64, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.i64);
+ break;
+ case ZMQ_RCVBUF:
+ vallen = sizeof(uint64_t);
+ if (zmq_getsockopt(s, opt, &val.ui64, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.ui64);
+ break;
+ case ZMQ_RCVMORE:
+ vallen = sizeof(int64_t);
+ if (zmq_getsockopt(s, opt, &val.i64, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_bool(drv, driver_caller(drv->port), !!val.i64);
+ break;
+ case ZMQ_RECONNECT_IVL:
+ vallen = sizeof(int);
+ if (zmq_getsockopt(s, opt, &val.i, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.i);
+ break;
+ case ZMQ_RECOVERY_IVL:
+ vallen = sizeof(int64_t);
+ if (zmq_getsockopt(s, opt, &val.i64, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.i64);
+ break;
+ case ZMQ_RECOVERY_IVL_MSEC:
+ vallen = sizeof(int64_t);
+ if (zmq_getsockopt(s, opt, &val.i64, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.i64);
+ break;
+ case ZMQ_SNDBUF:
+ vallen = sizeof(uint64_t);
+ if (zmq_getsockopt(s, opt, &val.ui64, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.ui64);
+ break;
+ case ZMQ_SWAP:
+ vallen = sizeof(int64_t);
+ if (zmq_getsockopt(s, opt, &val.i64, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.i64);
+ break;
+ case ZMQ_TYPE:
+ vallen = sizeof(int);
+ if (zmq_getsockopt(s, opt, &val.i, &vallen) < 0)
+ zmqdrv_error_code(drv, zmq_errno());
+ zmqdrv_ok_int64(drv, driver_caller(drv->port), val.i);
+ break;
+ case ZMQ_ACTIVE:
+ zmqdrv_ok_bool(drv, driver_caller(drv->port), si->active_mode);
+ break;
+ default:
+ zmqdrv_error(drv, "Option not implemented!");
+ return;
+ }
}
static void
@@ -853,7 +979,7 @@ zmqdrv_recv(zmq_drv_t *drv, ErlIOVec *ev)
msg_t msg;
if (zmq_recv(si->socket, &msg, ZMQ_NOBLOCK) == 0)
- zmqdrv_binary_ok(drv, zmq_msg_data(&msg), zmq_msg_size(&msg));
+ zmqdrv_ok_binary(drv, driver_caller(drv->port), zmq_msg_data(&msg), zmq_msg_size(&msg));
else if (zmq_errno() == EAGAIN) {
// No input available. Make the caller wait by not returning result
si->in_caller = driver_caller(drv->port);
View
4 c_src/zmq_drv.h
@@ -148,7 +148,9 @@ static void zmqdrv_socket_error(zmq_drv_t *drv, ErlDrvTermData pid, uint32_t idx
static void zmqdrv_error(zmq_drv_t *zmq_drv, const char *errstr);
static void zmqdrv_error_code(zmq_drv_t *zmq_drv, int err);
static void zmqdrv_ok(zmq_drv_t *zmq_drv);
-static void zmqdrv_binary_ok(zmq_drv_t *zmq_drv, void *data, size_t size);
+static void zmqdrv_ok_bool(zmq_drv_t *zmq_drv, ErlDrvTermData pid, bool val);
+static void zmqdrv_ok_int64(zmq_drv_t *zmq_drv, ErlDrvTermData pid, int64_t val);
+static void zmqdrv_ok_binary(zmq_drv_t *zmq_drv, ErlDrvTermData pid, void *data, size_t size);
static void zmqdrv_init(zmq_drv_t *zmq_drv, ErlIOVec *ev);
static void zmqdrv_term(zmq_drv_t *zmq_drv, ErlIOVec *ev);
static void zmqdrv_socket(zmq_drv_t *zmq_drv, ErlIOVec *ev);
View
34 include/zmq.hrl
@@ -18,20 +18,26 @@
-define('ZMQ_DOWNSTREAM', 8).
%% ZMQ socket options.
--define('ZMQ_HWM', 1).
--define('ZMQ_SWAP', 3).
--define('ZMQ_AFFINITY', 4).
--define('ZMQ_IDENTITY', 5).
--define('ZMQ_SUBSCRIBE', 6).
--define('ZMQ_UNSUBSCRIBE', 7).
--define('ZMQ_RATE', 8).
--define('ZMQ_RECOVERY_IVL', 9).
--define('ZMQ_MCAST_LOOP', 10).
--define('ZMQ_SNDBUF', 11).
--define('ZMQ_RCVBUF', 12).
--define('ZMQ_RCVMORE', 13).
--define('ZMQ_FD', 14).
--define('ZMQ_ACTIVE', 255). % This is driver's socket option rather than 0MQ's
+-define('ZMQ_HWM', 1).
+-define('ZMQ_SWAP', 3).
+-define('ZMQ_AFFINITY', 4).
+-define('ZMQ_IDENTITY', 5).
+-define('ZMQ_SUBSCRIBE', 6).
+-define('ZMQ_UNSUBSCRIBE', 7).
+-define('ZMQ_RATE', 8).
+-define('ZMQ_RECOVERY_IVL', 9).
+-define('ZMQ_MCAST_LOOP', 10).
+-define('ZMQ_SNDBUF', 11).
+-define('ZMQ_RCVBUF', 12).
+-define('ZMQ_RCVMORE', 13).
+-define('ZMQ_FD', 14).
+-define('ZMQ_EVENTS', 15).
+-define('ZMQ_TYPE', 16).
+-define('ZMQ_LINGER', 17).
+-define('ZMQ_RECONNECT_IVL', 18).
+-define('ZMQ_BACKLOG', 19).
+-define('ZMQ_RECOVERY_IVL_MSEC',20).
+-define('ZMQ_ACTIVE', 255). % This is driver's socket option rather than 0MQ's
%% ZMQ send/recv options.
-define('ZMQ_NOBLOCK', 1).
View
283 src/zmq.erl
@@ -11,7 +11,6 @@
%%%-------------------------------------------------------------------
%%% @type zmq_socket(). Opaque 0MQ socket type.
%%% @type zmq_sockopt() = {hwm, integer()}
-%%% | {lwm, integer()}
%%% | {swap, integer()}
%%% | {affinity, integer()}
%%% | {identity, string()}
@@ -23,6 +22,10 @@
%%% | {sndbuf, integer()}
%%% | {rcvbuf, integer()}
%%% | {rcvmore, boolean()}
+%%% | {linger, boolean()}
+%%% | {reconnect_ivl, integer()}
+%%% | {backlog, integer()}
+%%% | {recovery_ivl_msec, integer()}
%%% | {active, boolean()}.
%%% 0MQ socket options. See 0MQ man pages for details.
%%% One additional options `active' indicates to the driver
@@ -42,9 +45,10 @@
-behaviour(gen_server).
%% ZMQ API
--export([start_link/0, start_link/1,
+-export([start/0, start/1, start_link/0, start_link/1,
socket/1, socket/2, close/1, setsockopt/2, getsockopt/2,
- bind/2, connect/2, send/2, send/3, recv/1, format_error/1]).
+ bind/2, connect/2, send/2, send/3, recv/1,
+ socket_int_to_type/1, format_error/1]).
-export([port/0]).
@@ -55,6 +59,10 @@
-include("zmq.hrl").
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
+
-record(state, {port}).
%%%===================================================================
@@ -62,6 +70,21 @@
%%%===================================================================
%%--------------------------------------------------------------------
+%% @equiv start(1)
+%% @end
+%%--------------------------------------------------------------------
+start() ->
+ start(1).
+
+%%--------------------------------------------------------------------
+%% @doc Start the server.
+%% @spec (IoThreads) -> {ok, Pid} | {error, Error} | ignore
+%% @end
+%%--------------------------------------------------------------------
+start(IoThreads) when is_integer(IoThreads) ->
+ gen_server:start({local, ?MODULE}, ?MODULE, [IoThreads], []).
+
+%%--------------------------------------------------------------------
%% @equiv start_link(1)
%% @end
%%--------------------------------------------------------------------
@@ -69,7 +92,7 @@ start_link() ->
start_link(1).
%%--------------------------------------------------------------------
-%% @doc Start the server.
+%% @doc Start the server as part of a supervision tree.
%% @spec (IoThreads) -> {ok, Pid} | {error, Error} | ignore
%% @end
%%--------------------------------------------------------------------
@@ -90,23 +113,29 @@ socket(Type) when is_atom(Type) ->
%% @end
%%--------------------------------------------------------------------
socket(Type, Options) when is_atom(Type), is_list(Options) ->
-% gen_server:call(?MODULE, {socket, Type, Options}).
+ EncType = encode_msg_socket(Type),
% We are using direct call to the driver to create the socket,
% because we need the driver to know the socket owner's pid, so
% that it can deliver messages to its mailbox in the passive mode
- try gen_server:call(?MODULE, port) of
+ case gen_server:call(?MODULE, port) of
Port when is_port(Port) ->
- [check_sockopt({O, V}) || {O,V} <- Options],
- Msg = encode_msg_socket(Type),
- {ok, S} = driver(Port, Msg),
- case driver(Port, encode_sock_opts(S, Options)) of
+ ok;
+ Else ->
+ Port = undefined,
+ throw(Else)
+ end,
+
+ {ok, S} = driver(Port, EncType),
+ try
+ EncOpts = encode_setsockopts(S, Options),
+ case driver(Port, EncOpts) of
ok ->
{ok, {Port, S}};
{error, Why} ->
- driver(Port, encode_close(S)),
throw(Why)
end
catch _:Error ->
+ driver(Port, encode_close(S)),
{error, Error}
end.
@@ -128,7 +157,10 @@ close(Socket) when is_integer(Socket) ->
%% @end
%%--------------------------------------------------------------------
setsockopt(Socket, Opts) when is_integer(Socket), is_list(Opts) ->
- gen_server:call(?MODULE, {setsockopt, Socket, Opts}).
+ gen_server:call(?MODULE, {setsockopt, Socket, Opts});
+setsockopt({Port, S}, Opts) when is_integer(S), is_list(Opts)->
+ Msg = encode_setsockopts(S, Opts),
+ driver(Port, Msg).
%%--------------------------------------------------------------------
%% @doc Get socket option.
@@ -137,7 +169,8 @@ setsockopt(Socket, Opts) when is_integer(Socket), is_list(Opts) ->
%% @end
%%--------------------------------------------------------------------
getsockopt(Socket, Option) when is_integer(Socket), is_atom(Option) ->
- gen_server:call(?MODULE, {getsockopt, Socket, Option});
+ Msg = encode_getsockopt(Socket, Option),
+ gen_server:call(?MODULE, {getsockopt, Msg});
% Experimantal support of direct port communication
getsockopt({Port, S}, Option) when is_atom(Option)->
Msg = encode_getsockopt(S, Option),
@@ -273,35 +306,16 @@ init([IoThreads]) ->
% Reply = driver(State#state.port, Message),
% {reply, Reply, State};
-handle_call({socket, Type, Options}, _From, #state{port=Port} = State) ->
- ?log("~p, type:~s options:~p", [socket, Type, Options]),
- try
- [check_sockopt({O, V}) || {O,V} <- Options],
- Msg = encode_msg_socket(Type),
- {ok, S} = driver(Port, Msg),
- case driver(Port, encode_sock_opts(S, Options)) of
- ok ->
- ok;
- {error, Why} ->
- driver(Port, encode_close(S)),
- throw(Why)
- end,
- {reply, {ok, S}, State}
- catch _:Error ->
- {reply, {error, Error}, State}
- end;
-
handle_call({close, Socket}, _From, State) ->
?log("~p", [close]),
do_call(State, encode_close(Socket));
handle_call({setsockopt, Socket, Options}, _From, State) ->
?log("~p", [socketopt]),
- do_call(State, encode_sock_opts(Socket, Options));
+ do_call(State, encode_setsockopts(Socket, Options));
-handle_call({getsockopt, Socket, Option}, _From, State) ->
- ?log("~p", [getsockopt]),
- do_call(State, encode_getsockopt(Socket, Option));
+handle_call({getsockopt, EncodedOpt}, _From, State) ->
+ do_call(State, EncodedOpt);
handle_call({bind, Socket, Address}, _From, State) ->
?log("~p addr:~s", [bind, binary_to_list(Address)]),
@@ -406,83 +420,104 @@ init_context(Port, IoThreads) ->
end.
encode_msg_socket(Type) ->
- case Type of
- pair -> <<(?ZMQ_SOCKET):8, (?ZMQ_PAIR):8>>;
- pub -> <<(?ZMQ_SOCKET):8, (?ZMQ_PUB):8>>;
- push -> <<(?ZMQ_SOCKET):8, (?ZMQ_PUSH):8>>;
- pull -> <<(?ZMQ_SOCKET):8, (?ZMQ_PULL):8>>;
- sub -> <<(?ZMQ_SOCKET):8, (?ZMQ_SUB):8>>;
- req -> <<(?ZMQ_SOCKET):8, (?ZMQ_REQ):8>>;
- rep -> <<(?ZMQ_SOCKET):8, (?ZMQ_REP):8>>;
- xreq -> <<(?ZMQ_SOCKET):8, (?ZMQ_XREQ):8>>;
- xrep -> <<(?ZMQ_SOCKET):8, (?ZMQ_XREP):8>>;
- upstream -> <<(?ZMQ_SOCKET):8, (?ZMQ_UPSTREAM):8>>;
- downstream -> <<(?ZMQ_SOCKET):8, (?ZMQ_DOWNSTREAM):8>>;
- _ -> throw({unknown_sock_type, Type})
- end.
+ <<(?ZMQ_SOCKET):8, (socket_type_to_int(Type)):8>>.
+
+socket_type_to_int(pair) -> ?ZMQ_PAIR;
+socket_type_to_int(pub) -> ?ZMQ_PUB;
+socket_type_to_int(sub) -> ?ZMQ_SUB;
+socket_type_to_int(push) -> ?ZMQ_PUSH;
+socket_type_to_int(pull) -> ?ZMQ_PULL;
+socket_type_to_int(req) -> ?ZMQ_REQ;
+socket_type_to_int(rep) -> ?ZMQ_REP;
+socket_type_to_int(xreq) -> ?ZMQ_XREQ;
+socket_type_to_int(xrep) -> ?ZMQ_XREP;
+socket_type_to_int(upstream) -> ?ZMQ_UPSTREAM;
+socket_type_to_int(downstream) -> ?ZMQ_DOWNSTREAM.
+
+socket_int_to_type(?ZMQ_PAIR) -> pair;
+socket_int_to_type(?ZMQ_PUB) -> pub;
+socket_int_to_type(?ZMQ_SUB) -> sub;
+socket_int_to_type(?ZMQ_PUSH) -> push;
+socket_int_to_type(?ZMQ_PULL) -> pull;
+socket_int_to_type(?ZMQ_REQ) -> req;
+socket_int_to_type(?ZMQ_REP) -> rep;
+socket_int_to_type(?ZMQ_XREQ) -> xreq;
+socket_int_to_type(?ZMQ_XREP) -> xrep.
encode_close(Socket) ->
<<(?ZMQ_CLOSE):8, Socket:32>>.
-encode_sock_opts(Socket, Options) when length(Options) =< 255 ->
- Opts = lists:map(fun({O, Value}) ->
- V = check_sockopt({O, Value}),
- case O of
- hwm -> <<?ZMQ_HWM, 8, V:64/native>>;
- swap -> <<?ZMQ_SWAP, 8, V:64/native>>;
- affinity -> <<?ZMQ_AFFINITY, 8, V:64/native>>;
- identity -> <<?ZMQ_IDENTITY, (byte_size(V)):8, V/binary>>;
- subscribe -> <<?ZMQ_SUBSCRIBE, (byte_size(V)):8, V/binary>>;
- unsubscribe -> <<?ZMQ_UNSUBSCRIBE, (byte_size(V)):8, V/binary>>;
- rate -> <<?ZMQ_RATE, 8, V:64/native>>;
- recovery_ivl-> <<?ZMQ_RECOVERY_IVL, 8, V:64/native>>;
- mcast_loop -> <<?ZMQ_MCAST_LOOP, 8, V:64/native>>;
- sndbuf -> <<?ZMQ_SNDBUF, 8, V:64/native>>;
- rcvbuf -> <<?ZMQ_RCVBUF, 8, V:64/native>>;
- % Driver's socket options
- active -> <<?ZMQ_ACTIVE, 1, V>>;
- _ -> throw({unknown_sock_option, O})
- end
- end, Options),
+encode_setsockopts(Socket, Options) when length(Options) =< 255 ->
+ Opts = [encode_setsockopt(V) || V <- Options],
<<(?ZMQ_SETSOCKOPT):8, Socket:32, (length(Opts)):8, (list_to_binary(Opts))/binary>>.
-check_sockopt({hwm, V}) when is_integer(V) -> V;
-check_sockopt({swap, V}) when is_integer(V) -> V;
-check_sockopt({affinity, V}) when is_integer(V) -> V;
-check_sockopt({identity, V}) when is_list(V), length(V) =< 255 -> list_to_binary(V);
-check_sockopt({identity, V}) when is_binary(V), byte_size(V) =< 255 -> V;
+encode_setsockopt({hwm, V}) when is_integer(V) ->
+ <<?ZMQ_HWM, 8, V:64/native>>;
+encode_setsockopt({swap, V}) when is_integer(V) ->
+ <<?ZMQ_SWAP, 8, V:64/native>>;
+encode_setsockopt({affinity, V}) when is_integer(V) ->
+ <<?ZMQ_AFFINITY, 8, V:64/native>>;
+encode_setsockopt({identity, V}) when is_list(V), length(V) =< 255 ->
+ Bin = list_to_binary(V),
+ <<?ZMQ_IDENTITY, (byte_size(Bin)):8, Bin/binary>>;
+encode_setsockopt({identity, V}) when is_binary(V), byte_size(V) =< 255 ->
+ <<?ZMQ_IDENTITY, (byte_size(V)):8, V/binary>>;
% Note that 0MQ doesn't limit the size of subscribe/unsubscribe options,
% but we do for simplicity.
-check_sockopt({subscribe, V}) when is_list(V), length(V) =< 255 -> list_to_binary(V);
-check_sockopt({subscribe, V}) when is_binary(V), byte_size(V) =< 255 -> V;
-check_sockopt({unsubscribe, V}) when is_list(V), length(V) =< 255 -> list_to_binary(V);
-check_sockopt({unsubscribe, V}) when is_binary(V), byte_size(V) =< 255 -> V;
-check_sockopt({rate, V}) when is_integer(V) -> V;
-check_sockopt({recovery_ivl, V}) when is_integer(V) -> V;
-check_sockopt({mcast_loop, true}) -> 1;
-check_sockopt({mcast_loop,false}) -> 0;
-check_sockopt({sndbuf, V}) when is_integer(V) -> V;
-check_sockopt({rcvbuf, V}) when is_integer(V) -> V;
-check_sockopt({active, true}) -> 1;
-check_sockopt({active, false}) -> 0;
-check_sockopt(Option) -> throw({unknown_option, Option}).
-
-sockopt_to_int(Option) ->
- case Option of
- hwm -> ?ZMQ_HWM;
- swap -> ?ZMQ_SWAP;
- affinity -> ?ZMQ_AFFINITY;
- identity -> ?ZMQ_IDENTITY;
- subscribe -> ?ZMQ_SUBSCRIBE;
- unsubscribe -> ?ZMQ_UNSUBSCRIBE;
- rate -> ?ZMQ_RATE;
- recovery_ivl-> ?ZMQ_RECOVERY_IVL;
- mcast_loop -> ?ZMQ_MCAST_LOOP;
- sndbuf -> ?ZMQ_SNDBUF;
- rcvbuf -> ?ZMQ_RCVBUF;
- rcvmore -> ?ZMQ_RCVMORE;
- _ -> throw({unknown_sock_option, Option})
- end.
+encode_setsockopt({subscribe, V}) when is_list(V), length(V) =< 255 ->
+ Bin = list_to_binary(V),
+ <<?ZMQ_SUBSCRIBE, (byte_size(Bin)):8, Bin/binary>>;
+encode_setsockopt({subscribe, V}) when is_binary(V), byte_size(V) =< 255 ->
+ <<?ZMQ_SUBSCRIBE, (byte_size(V)):8, V/binary>>;
+encode_setsockopt({unsubscribe, V}) when is_list(V), length(V) =< 255 ->
+ Bin = list_to_binary(V),
+ <<?ZMQ_UNSUBSCRIBE, (byte_size(Bin)):8, Bin/binary>>;
+encode_setsockopt({unsubscribe, V}) when is_binary(V), byte_size(V) =< 255 ->
+ <<?ZMQ_UNSUBSCRIBE, (byte_size(V)):8, V/binary>>;
+encode_setsockopt({rate, V}) when is_integer(V) ->
+ <<?ZMQ_RATE, 8, V:64/native>>;
+encode_setsockopt({recovery_ivl, V}) when is_integer(V) ->
+ <<?ZMQ_RECOVERY_IVL, 8, V:64/native>>;
+encode_setsockopt({recovery_ivl_msec,V}) when is_integer(V) ->
+ <<?ZMQ_RECOVERY_IVL_MSEC, 8, V:64/native>>;
+encode_setsockopt({mcast_loop, V}) when is_boolean(V) ->
+ <<?ZMQ_MCAST_LOOP, 8, (bool_to_int(V)):64/native>>;
+encode_setsockopt({sndbuf, V}) when is_integer(V) ->
+ <<?ZMQ_SNDBUF, 8, V:64/native>>;
+encode_setsockopt({rcvbuf, V}) when is_integer(V) ->
+ <<?ZMQ_RCVBUF, 8, V:64/native>>;
+%encode_setsockopt({type, V}) when is_atom(V) ->
+% <<?ZMQ_TYPE, 1, socket_type_to_int(V)>>;
+encode_setsockopt({linger, V}) when is_boolean(V) ->
+ <<?ZMQ_LINGER, 4, (bool_to_int(V)):32/native>>;
+encode_setsockopt({reconnect_ivl, V}) when is_integer(V) ->
+ <<?ZMQ_RECONNECT_IVL, 4, V:32/native>>;
+encode_setsockopt({backlog, V}) when is_integer(V) ->
+ <<?ZMQ_BACKLOG, 4, V:32/native>>;
+% Driver's internal socket options
+encode_setsockopt({active, V}) when is_boolean(V) ->
+ <<?ZMQ_ACTIVE, 1, (bool_to_int(V))>>.
+
+bool_to_int(true) -> 1;
+bool_to_int(false) -> 0.
+
+sockopt_to_int(hwm) -> ?ZMQ_HWM;
+sockopt_to_int(swap) -> ?ZMQ_SWAP;
+sockopt_to_int(affinity) -> ?ZMQ_AFFINITY;
+sockopt_to_int(identity) -> ?ZMQ_IDENTITY;
+sockopt_to_int(subscribe) -> ?ZMQ_SUBSCRIBE;
+sockopt_to_int(unsubscribe) -> ?ZMQ_UNSUBSCRIBE;
+sockopt_to_int(rate) -> ?ZMQ_RATE;
+sockopt_to_int(recovery_ivl) -> ?ZMQ_RECOVERY_IVL;
+sockopt_to_int(mcast_loop) -> ?ZMQ_MCAST_LOOP;
+sockopt_to_int(sndbuf) -> ?ZMQ_SNDBUF;
+sockopt_to_int(rcvbuf) -> ?ZMQ_RCVBUF;
+sockopt_to_int(rcvmore) -> ?ZMQ_RCVMORE;
+sockopt_to_int(linger) -> ?ZMQ_LINGER;
+sockopt_to_int(reconnect_ivl) -> ?ZMQ_RECONNECT_IVL;
+sockopt_to_int(backlog) -> ?ZMQ_BACKLOG;
+sockopt_to_int(recovery_ivl_msec) -> ?ZMQ_RECOVERY_IVL_MSEC;
+sockopt_to_int(active) -> ?ZMQ_ACTIVE.
encode_getsockopt(Socket, Option) ->
O = sockopt_to_int(Option),
@@ -522,3 +557,43 @@ driver(Port, Message) ->
Err = {error, _} ->
Err
end.
+
+%%%----------------------------------------------------------------------------
+%%% Test Cases
+%%%----------------------------------------------------------------------------
+
+-ifdef(EUNIT).
+
+setsockopts_test() ->
+ zmq:start(),
+ {ok, S} = zmq:socket(xrep, []),
+ ?assertMatch(ok, zmq:setsockopt(S, [{hwm, 100}])),
+ ?assertMatch({ok, 100}, zmq:getsockopt(S, hwm)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{swap, 10}])),
+ ?assertMatch({ok, 10}, zmq:getsockopt(S, swap)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{affinity, 1}])),
+ ?assertMatch({ok, 1}, zmq:getsockopt(S, affinity)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{identity, <<"a">>}])),
+ ?assertMatch({ok,<<"a">>},zmq:getsockopt(S, identity)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{rate, 10000}])),
+ ?assertMatch({ok, 10000}, zmq:getsockopt(S, rate)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{recovery_ivl, 1234}])),
+ ?assertMatch({ok, 1234}, zmq:getsockopt(S, recovery_ivl)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{mcast_loop, true}])),
+ ?assertMatch({ok, true}, zmq:getsockopt(S, mcast_loop)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{sndbuf, 12300}])),
+ ?assertMatch({ok, 12300}, zmq:getsockopt(S, sndbuf)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{rcvbuf, 22300}])),
+ ?assertMatch({ok, 22300}, zmq:getsockopt(S, rcvbuf)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{linger, true}])),
+ ?assertMatch({ok, true}, zmq:getsockopt(S, linger)),
+ ?assertMatch({ok, false}, zmq:getsockopt(S, rcvmore)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{reconnect_ivl, 3000}])),
+ ?assertMatch({ok, 3000}, zmq:getsockopt(S, reconnect_ivl)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{recovery_ivl_msec, 5000}])),
+ ?assertMatch({ok, 5000}, zmq:getsockopt(S, recovery_ivl_msec)),
+ ?assertMatch(ok, zmq:setsockopt(S, [{active, false}])),
+ ?assertMatch({ok, false}, zmq:getsockopt(S, active)),
+ ok.
+
+-endif.
Please sign in to comment.
Something went wrong with that request. Please try again.