Permalink
Browse files

Provide a 2tuple/index envelope for matching on erlzmq sockets return…

…ed from the NIF.
  • Loading branch information...
1 parent fa6cb33 commit 9ed06d960158d1ceae5d786700a9eedafc3e52e5 @okeuday okeuday committed Mar 25, 2011
Showing with 122 additions and 91 deletions.
  1. +13 −3 c_src/erlzmq_nif.c
  2. +2 −2 src/erlzmq.app.src
  3. +104 −83 src/erlzmq.erl
  4. +3 −3 test/erlzmq_test.erl
View
@@ -38,6 +38,7 @@ typedef struct erlzmq_context {
void * ipc_socket;
char * ipc_socket_name;
int running;
+ int64_t socket_index;
ErlNifCond * cond;
ErlNifMutex * mutex;
ErlNifTid polling_tid;
@@ -49,6 +50,7 @@ typedef struct erlzmq_context {
typedef struct erlzmq_socket {
erlzmq_context_t * context;
+ int64_t socket_index;
void * socket_zmq;
int active;
} erlzmq_socket_t;
@@ -121,6 +123,7 @@ NIF(erlzmq_nif_context)
zmq_bind(handle->ipc_socket,socket_id);
handle->running = 0;
+ handle->socket_index = 1;
handle->mutex = enif_mutex_create("erlzmq_context_t_mutex");
handle->cond = enif_cond_create("erlzmq_context_t_cond");
@@ -174,12 +177,15 @@ NIF(erlzmq_nif_socket)
sizeof(erlzmq_socket_t));
handle->context = ctx;
+ handle->socket_index = ctx->socket_index++;
handle->socket_zmq = zmq_socket(ctx->context_zmq, socket_type);
handle->active = active;
- ERL_NIF_TERM result = enif_make_resource(env, handle);
+ ERL_NIF_TERM socket =
+ enif_make_tuple2(env, enif_make_uint64(env, handle->socket_index),
+ enif_make_resource(env, handle));
- return enif_make_tuple2(env, enif_make_atom(env, "ok"), result);
+ return enif_make_tuple2(env, enif_make_atom(env, "ok"), socket);
}
NIF(erlzmq_nif_bind)
@@ -647,10 +653,14 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);
if (r->socket->active == ERLZMQ_ACTIVE_ON) {
+ ERL_NIF_TERM socket =
+ enif_make_tuple2(r->env,
+ enif_make_uint64(r->env, r->socket->socket_index),
+ enif_make_resource(r->env, r->socket));
enif_send(NULL, &r->pid, r->env,
enif_make_tuple3(r->env,
enif_make_atom(r->env, "zmq"),
- enif_make_resource(r->env, r->socket),
+ socket,
enif_make_binary(r->env, &binary)));
}
else {
View
@@ -1,12 +1,12 @@
{application, erlzmq,
[
- {description, ""},
+ {description, "Erlang ZeroMQ Driver"},
{vsn, "2.0"},
+ {modules, [erlzmq, erlzmq_nif]},
{registered, []},
{applications, [
kernel,
stdlib
]},
- {mod, { erlzmq_app, []}},
{env, []}
]}.
View
@@ -24,13 +24,26 @@
-module(erlzmq).
%% @headerfile "erlzmq.hrl"
-include_lib("erlzmq.hrl").
--export([context/0, context/1, socket/2, bind/2, connect/2, send/2, send/3,
- recv/1, recv/2, setsockopt/3, getsockopt/2, close/1, term/1, term/2]).
+-export([context/0,
+ context/1,
+ socket/2,
+ bind/2,
+ connect/2,
+ send/2,
+ send/3,
+ recv/1,
+ recv/2,
+ setsockopt/3,
+ getsockopt/2,
+ close/1,
+ term/1,
+ term/2]).
-export_type([erlzmq_socket/0, erlzmq_context/0]).
%% @equiv context(1)
-%% @spec context() -> {ok, erlzmq_context()} | erlzmq_error()
--spec context() -> {ok, erlzmq_context()} | erlzmq_error().
+-spec context() ->
+ {ok, erlzmq_context()} |
+ erlzmq_error().
context() ->
context(1).
@@ -46,9 +59,9 @@ context() ->
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq-init">zmq_init</a></i>
%% @end
-%% @spec context(pos_integer()) -> {ok, erlzmq_context()} | erlzmq_error()
--spec context(Threads :: pos_integer()) -> {ok, erlzmq_context()} | erlzmq_error().
-
+-spec context(Threads :: pos_integer()) ->
+ {ok, erlzmq_context()} |
+ erlzmq_error().
context(Threads) when is_integer(Threads) ->
erlzmq_nif:context(Threads).
@@ -65,9 +78,12 @@ context(Threads) when is_integer(Threads) ->
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_socket">zmq_socket</a>.</i>
%% @end
-%% @spec socket(erlzmq_context(), erlzmq_socket_type() | list(erlzmq_socket_type() | {active, boolean()})) -> {ok, erlzmq_socket()} | erlzmq_error()
--spec socket(Context :: erlzmq_context(), Type :: erlzmq_socket_type() | list(erlzmq_socket_type() | {active, boolean()})) -> {ok, erlzmq_socket()} | erlzmq_error().
-
+-spec socket(Context :: erlzmq_context(),
+ Type :: erlzmq_socket_type() |
+ list(erlzmq_socket_type() |
+ {active, boolean()})) ->
+ {ok, {pos_integer(), erlzmq_socket()}} |
+ erlzmq_error().
socket(Context, Type) when is_atom(Type) ->
socket(Context, [Type]);
socket(Context, [H | _] = L) ->
@@ -90,39 +106,47 @@ socket(Context, [H | _] = L) ->
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_bind">zmq_bind</a>.</i>
%% @end
-%% @spec bind(erlzmq_socket(), erlzmq_endpoint()) -> ok | erlzmq_error()
--spec bind(Socket :: erlzmq_socket(), Endpoint :: erlzmq_endpoint()) -> ok | erlzmq_error().
-
-bind(Socket, Endpoint) when is_list(Endpoint) ->
- erlzmq_result(erlzmq_nif:bind(Socket, Endpoint)).
+-spec bind(SocketTuple :: {pos_integer(), erlzmq_socket()},
+ Endpoint :: erlzmq_endpoint()) ->
+ ok |
+ erlzmq_error().
+bind({I, Socket}, Endpoint)
+ when is_integer(I), is_list(Endpoint) ->
+ erlzmq_nif:bind(Socket, Endpoint).
%% @doc Connect a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_connect">zmq_connect</a>.</i>
%% @end
-%% @spec connect(erlzmq_socket(), erlzmq_endpoint()) -> ok | erlzmq_error()
--spec connect(Socket :: erlzmq_socket(), Endpoint :: erlzmq_endpoint()) -> ok | erlzmq_error().
-
-connect(Socket, Endpoint) when is_list(Endpoint) ->
- erlzmq_result(erlzmq_nif:connect(Socket, Endpoint)).
+-spec connect(SocketTuple :: {pos_integer(), erlzmq_socket()},
+ Endpoint :: erlzmq_endpoint()) ->
+ ok |
+ erlzmq_error().
+connect({I, Socket}, Endpoint)
+ when is_integer(I), is_list(Endpoint) ->
+ erlzmq_nif:connect(Socket, Endpoint).
%% @equiv send(Socket, Msg, [])
-%% @spec send(erlzmq_socket(), erlzmq_data()) -> ok | erlzmq_error()
--spec send(Socket :: erlzmq_socket(), Data :: erlzmq_data()) -> ok | erlzmq_error().
-
-send(Socket, Binary) when is_binary(Binary) ->
- send(Socket, Binary, []).
+-spec send(SocketTuple :: {pos_integer(), erlzmq_socket()},
+ Data :: erlzmq_data()) ->
+ ok |
+ erlzmq_error().
+send(SocketTuple, Binary) when is_binary(Binary) ->
+ send(SocketTuple, Binary, []).
%% @doc Send a message on a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_send">zmq_send</a>.</i>
%% @end
-%% @spec send(ezma_socket(), erlzmq_data(), erlzmq_send_recv_flags()) -> ok | erlzmq_error()
--spec send(Socket :: erlzmq_socket(), Data :: erlzmq_data(), Flags :: erlzmq_send_recv_flags()) -> ok | erlzmq_error().
-
-send(Socket, Binary, Flags) when is_binary(Binary), is_list(Flags) ->
+-spec send(SocketTuple :: {pos_integer(), erlzmq_socket()},
+ Data :: erlzmq_data(),
+ Flags :: erlzmq_send_recv_flags()) ->
+ ok |
+ erlzmq_error().
+send({I, Socket}, Binary, Flags)
+ when is_integer(I), is_binary(Binary), is_list(Flags) ->
case erlzmq_nif:send(Socket, Binary, sendrecv_flags(Flags)) of
Ref when is_reference(Ref) ->
receive
@@ -132,78 +156,83 @@ send(Socket, Binary, Flags) when is_binary(Binary), is_list(Flags) ->
Error
end;
Result ->
- erlzmq_result(Result)
+ Result
end.
%% @equiv recv(Socket, 0)
-%% @spec recv(erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error()
--spec recv(Socket :: erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error().
-
-recv(Socket) ->
- recv(Socket, []).
+-spec recv(SocketTuple :: {pos_integer(), erlzmq_socket()}) ->
+ {ok, erlzmq_data()} |
+ erlzmq_error().
+recv(SocketTuple) ->
+ recv(SocketTuple, []).
%% @doc Receive a message from a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_recv">zmq_recv</a>.</i>
%% @end
-%% @spec recv(erlzmq_socket(), erlzmq_send_recv_flags()) -> {ok, erlzmq_data()} | erlzmq_error()
--spec recv(Socket :: erlzmq_socket(), Flags :: erlzmq_send_recv_flags()) -> {ok, erlzmq_data()} | erlzmq_error() | {error, timeout, reference()}.
-
-recv(Socket, Flags) when is_list(Flags) ->
+-spec recv(SocketTuple :: {pos_integer(), erlzmq_socket()},
+ Flags :: erlzmq_send_recv_flags()) ->
+ {ok, erlzmq_data()} |
+ erlzmq_error() |
+ {error, {timeout, reference()}}.
+recv({I, Socket}, Flags)
+ when is_integer(I), is_list(Flags) ->
case erlzmq_nif:recv(Socket, sendrecv_flags(Flags)) of
Ref when is_reference(Ref) ->
Timeout = proplists:get_value(timeout, Flags, infinity),
receive
{Ref, Result} ->
{ok, Result}
after Timeout ->
- {error, timeout, Ref}
+ {error, {timeout, Ref}}
end;
Result ->
- erlzmq_result(Result)
+ Result
end.
%% @doc Set an {@link erlzmq_sockopt(). option} associated with a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_setsockopt">zmq_setsockopt</a>.</i>
%% @end
-%% @spec setsockopt(erlzmq_socket(), erlzmq_sockopt(), erlzmq_sockopt_value()) -> ok | erlzmq_error()
--spec setsockopt(Socket :: erlzmq_socket(), Name :: erlzmq_sockopt(), erlzmq_sockopt_value()) -> ok | erlzmq_error().
-
-setsockopt(Socket, Name, Value) when is_list(Value) ->
- setsockopt(Socket, Name, erlang:list_to_binary(Value));
-setsockopt(Socket, Name, Value) when is_atom(Name) ->
- erlzmq_result(erlzmq_nif:setsockopt(Socket, option_name(Name), Value)).
+-spec setsockopt(SocketTuple :: {pos_integer(), erlzmq_socket()},
+ Name :: erlzmq_sockopt(),
+ erlzmq_sockopt_value()) ->
+ ok |
+ erlzmq_error().
+setsockopt(SocketTuple, Name, Value) when is_list(Value) ->
+ setsockopt(SocketTuple, Name, erlang:list_to_binary(Value));
+setsockopt({I, Socket}, Name, Value) when is_integer(I), is_atom(Name) ->
+ erlzmq_nif:setsockopt(Socket, option_name(Name), Value).
%% @doc Get an {@link erlzmq_sockopt(). option} associated with a socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_getsockopt">zmq_getsockopt</a>.</i>
%% @end
-%% @spec getsockopt(erlzmq_socket(), erlzmq_sockopt()) -> {ok, erlzmq_sockopt_value()} | erlzmq_error()
--spec getsockopt(Socket :: erlzmq_socket(), Name :: erlzmq_sockopt()) -> {ok, erlzmq_sockopt_value()} | erlzmq_error().
-
-getsockopt(Socket, Name) when is_atom(Name) ->
- erlzmq_result(erlzmq_nif:getsockopt(Socket, option_name(Name))).
-
+-spec getsockopt(SocketTuple :: {pos_integer(), erlzmq_socket()},
+ Name :: erlzmq_sockopt()) ->
+ {ok, erlzmq_sockopt_value()} |
+ erlzmq_error().
+getsockopt({I, Socket}, Name) when is_integer(I), is_atom(Name) ->
+ erlzmq_nif:getsockopt(Socket, option_name(Name)).
%% @doc Close the given socket.
%% <br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_close">zmq_close</a>.</i>
%% @end
-%% @spec close(erlzmq_socket()) -> ok | erlzmq_error()
--spec close(Socket :: erlzmq_socket()) -> ok | erlzmq_error().
-
-close(Socket) ->
- erlzmq_result(erlzmq_nif:close(Socket)).
+-spec close(SocketTuple :: {pos_integer(), erlzmq_socket()}) ->
+ ok |
+ erlzmq_error().
+close({I, Socket}) when is_integer(I) ->
+ erlzmq_nif:close(Socket).
%% @equiv term(Context, infinity)
-%% @spec term(erlzmq_context()) -> ok | erlzmq_error()
--spec term(Context :: erlzmq_context()) -> ok | erlzmq_error().
-
+-spec term(Context :: erlzmq_context()) ->
+ ok |
+ erlzmq_error().
term(Context) ->
term(Context, infinity).
@@ -216,8 +245,11 @@ term(Context) ->
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_term">zmq_term</a>.</i>
%% @end
-%% @spec term(erlzmq_context(), timeout()) -> ok | erlzmq_error()
--spec term(Context :: erlzmq_context(), Timeout :: timeout()) -> ok | erlzmq_error() | {error, timeout, reference()}.
+-spec term(Context :: erlzmq_context(),
+ Timeout :: timeout()) ->
+ ok |
+ erlzmq_error() |
+ {error, {timeout, reference()}}.
term(Context, Timeout) ->
case erlzmq_nif:term(Context) of
@@ -226,16 +258,17 @@ term(Context, Timeout) ->
{Ref, Result} ->
Result
after Timeout ->
- {error, timeout, Ref}
+ {error, {timeout, Ref}}
end;
Result ->
- erlzmq_result(Result)
+ Result
end.
%% Private
--spec socket_type(Type :: erlzmq_socket_type()) -> integer().
+-spec socket_type(Type :: erlzmq_socket_type()) ->
+ integer().
socket_type(pair) ->
?'ZMQ_PAIR';
@@ -260,7 +293,8 @@ socket_type(xpub) ->
socket_type(xsub) ->
?'ZMQ_XSUB'.
--spec sendrecv_flags(Flags :: erlzmq_send_recv_flags()) -> integer().
+-spec sendrecv_flags(Flags :: erlzmq_send_recv_flags()) ->
+ integer().
sendrecv_flags([]) ->
0;
@@ -271,7 +305,8 @@ sendrecv_flags([noblock|Rest]) ->
sendrecv_flags([sndmore|Rest]) ->
?'ZMQ_SNDMORE' bor sendrecv_flags(Rest).
--spec option_name(Name :: erlzmq_sockopt()) -> integer().
+-spec option_name(Name :: erlzmq_sockopt()) ->
+ integer().
option_name(hwm) ->
?'ZMQ_HWM';
@@ -312,17 +347,3 @@ option_name(recovery_ivl_msec) ->
option_name(reconnect_ivl_max) ->
?'ZMQ_RECONNECT_IVL_MAX'.
-
--spec erlzmq_result(ok) -> ok;
- ({ok, Value :: term()}) -> Value :: term();
- ({error, Value :: atom()}) -> Value :: atom();
- ({error, integer()}) -> {error, erlzmq_error_type()}.
-
-erlzmq_result(ok) ->
- ok;
-erlzmq_result({ok, _} = Result) ->
- Result;
-erlzmq_result({error, Code} = Error) when is_atom(Code) ->
- Error;
-erlzmq_result({error, Code} = Error) when is_integer(Code) ->
- Error.
Oops, something went wrong.

0 comments on commit 9ed06d9

Please sign in to comment.