Permalink
Browse files

Added ezmq:close/1 and ezmq:term/1 functions as GC cleanup magic was …

…often resulting in some hard to debug behaviour.
  • Loading branch information...
1 parent 8e258f8 commit 307d4ee38cda531aa9bfa1c5a5dc48fabf990977 @yrashk yrashk committed Mar 4, 2011
Showing with 68 additions and 24 deletions.
  1. +41 −17 c_src/ezmq_nif.c
  2. +3 −1 perf/local_lat.erl
  3. +4 −1 perf/local_thr.erl
  4. +3 −1 perf/remote_lat.erl
  5. +3 −2 perf/remote_thr.erl
  6. +7 −1 src/ezmq.erl
  7. +7 −1 src/ezmq_nif.erl
View
58 c_src/ezmq_nif.c
@@ -44,6 +44,8 @@ NIF(ezmq_nif_getsockopt);
NIF(ezmq_nif_send);
NIF(ezmq_nif_brecv);
NIF(ezmq_nif_recv);
+NIF(ezmq_nif_close);
+NIF(ezmq_nif_term);
static ErlNifFunc nif_funcs[] =
{
@@ -55,7 +57,9 @@ static ErlNifFunc nif_funcs[] =
{"getsockopt", 2, ezmq_nif_getsockopt},
{"send", 3, ezmq_nif_send},
{"brecv", 2, ezmq_nif_brecv},
- {"recv", 2, ezmq_nif_recv}
+ {"recv", 2, ezmq_nif_recv},
+ {"close", 1, ezmq_nif_close},
+ {"term", 1, ezmq_nif_term}
};
void * polling_thread(void * handle);
@@ -570,9 +574,30 @@ void * polling_thread(void * handle)
return NULL;
}
-static void ezmq_nif_resource_context_cleanup(ErlNifEnv* env, void* arg)
+NIF(ezmq_nif_close)
{
- ezmq_context * ctx = (ezmq_context *)arg;
+
+ ezmq_socket * socket;
+
+ if (!enif_get_resource(env, argv[0], ezmq_nif_resource_socket, (void **) &socket)) {
+ return enif_make_badarg(env);
+ }
+
+ if (-1 == zmq_close(socket->socket)) {
+ return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, zmq_errno()));
+ } else {
+ return enif_make_atom(env, "ok");
+ }
+}
+
+NIF(ezmq_nif_term)
+{
+ ezmq_context * ctx;
+
+ if (!enif_get_resource(env, argv[0], ezmq_nif_resource_context, (void **) &ctx)) {
+ return enif_make_badarg(env);
+ }
+
zmq_msg_t msg;
ezmq_recv recv;
recv.env = NULL;
@@ -585,29 +610,28 @@ static void ezmq_nif_resource_context_cleanup(ErlNifEnv* env, void* arg)
free(ctx->ipc_socket_name);
enif_mutex_destroy(ctx->mutex);
enif_cond_destroy(ctx->cond);
- zmq_term(ctx->context);
-}
-static void ezmq_nif_resource_socket_cleanup(ErlNifEnv* env, void* arg)
-{
- ezmq_socket * socket = (ezmq_socket *)arg;
- zmq_close(socket->socket);
+ if (-1 == zmq_term(ctx->context)) {
+ return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, zmq_errno()));
+ } else {
+ return enif_make_atom(env, "ok");
+ }
}
static int on_load(ErlNifEnv* env, void** priv_data, ERL_NIF_TERM load_info)
{
ezmq_nif_resource_context =
enif_open_resource_type(env, "ezmq_nif",
- "ezmq_nif_resource_context",
- &ezmq_nif_resource_context_cleanup,
- ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER,
- 0);
+ "ezmq_nif_resource_context",
+ NULL,
+ ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER,
+ 0);
ezmq_nif_resource_socket =
enif_open_resource_type(env, "ezmq_nif",
- "ezmq_nif_resource_socket",
- &ezmq_nif_resource_socket_cleanup,
- ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER,
- 0);
+ "ezmq_nif_resource_socket",
+ NULL,
+ ERL_NIF_RT_CREATE | ERL_NIF_RT_TAKEOVER,
+ 0);
return 0;
}
View
4 perf/local_lat.erl
@@ -13,5 +13,7 @@ main([BindTo,MessageSizeStr,RoundtripCountStr]) ->
RMsg = Msg,
ezmq:send(Socket, Msg)
end,
- [ Do() || _I <- lists:seq(1,RoundtripCount) ].
+ [ Do() || _I <- lists:seq(1,RoundtripCount) ],
+ ezmq:close(Socket),
+ ezmq:term(Context).
View
5 perf/local_thr.erl
@@ -21,5 +21,8 @@ main([BindTo,MessageSizeStr,MessageCountStr]) ->
"message count: ~p~n"
"mean throughput: ~p [msg/s]~n"
"mean throughput: ~p [Mb/s]~n",
- [MessageSize, MessageCount, Throughput, Megabits]).
+ [MessageSize, MessageCount, Throughput, Megabits]),
+
+ ezmq:close(Socket),
+ ezmq:term(Context).
View
4 perf/remote_lat.erl
@@ -22,4 +22,6 @@ main([ConnectTo,MessageSizeStr,RoundtripCountStr]) ->
io:format("message size: ~p [B]~n"
"roundtrip count: ~p~n"
"average latency: ~p [us]~n",
- [MessageSize, RoundtripCount, Latency]).
+ [MessageSize, RoundtripCount, Latency]),
+ ezmq:close(Socket),
+ ezmq:term(Context).
View
5 perf/remote_thr.erl
@@ -8,5 +8,6 @@ main([ConnectTo,MessageSizeStr,MessageCountStr]) ->
{ok, Socket} = ezmq:socket(Context,pub),
ezmq:connect(Socket, ConnectTo),
Msg = list_to_binary(lists:duplicate(MessageSize, 0)),
- ezmq_perf:send_loop(MessageCount, Socket, Msg).
-
+ ezmq_perf:send_loop(MessageCount, Socket, Msg),
+ ezmq:close(Socket),
+ ezmq:term(Context).
View
8 src/ezmq.erl
@@ -1,6 +1,6 @@
-module(ezmq).
-include_lib("ezmq.hrl").
--export([context/0, context/1, socket/2, bind/2, connect/2, send/2, send/3, brecv/1, brecv/2, recv/1, recv/2, setsockopt/3, getsockopt/2]).
+-export([context/0, context/1, socket/2, bind/2, connect/2, send/2, send/3, brecv/1, brecv/2, recv/1, recv/2, setsockopt/3, getsockopt/2, close/1, term/1]).
context() ->
context(1).
@@ -52,6 +52,12 @@ setsockopt(Socket, Name, Value) ->
getsockopt(Socket, Name) ->
ezmq_nif:getsockopt(Socket, option_name(Name)).
+close(Socket) ->
+ ezmq_nif:close(Socket).
+
+term(Context) ->
+ ezmq_nif:term(Context).
+
%% Private
socket_type(pair) ->
View
8 src/ezmq_nif.erl
@@ -1,6 +1,6 @@
-module(ezmq_nif).
--export([context/1, socket/2, bind/2, connect/2, send/3, brecv/2, recv/2, setsockopt/3, getsockopt/2]).
+-export([context/1, socket/2, bind/2, connect/2, send/3, brecv/2, recv/2, setsockopt/3, getsockopt/2, close/1, term/1]).
-on_load(init/0).
@@ -42,3 +42,9 @@ setsockopt(_Socket, _OptionName, _OptionValue) ->
getsockopt(_Socket, _OptionName) ->
erlang:nif_error(not_loaded).
+
+close(_Socket) ->
+ erlang:nif_error(not_loaded).
+
+term(_Context) ->
+ erlang:nif_error(not_loaded).

0 comments on commit 307d4ee

Please sign in to comment.