Skip to content

Commit

Permalink
Remove brecv function that could block the current VM scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
evax committed Mar 18, 2011
1 parent d49e300 commit e4a8986
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 95 deletions.
107 changes: 35 additions & 72 deletions c_src/erlzmq_nif.c
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ NIF(erlzmq_nif_connect);
NIF(erlzmq_nif_setsockopt);
NIF(erlzmq_nif_getsockopt);
NIF(erlzmq_nif_send);
NIF(erlzmq_nif_brecv);
NIF(erlzmq_nif_recv);
NIF(erlzmq_nif_close);
NIF(erlzmq_nif_term);
Expand All @@ -58,7 +57,6 @@ static ErlNifFunc nif_funcs[] =
{"setsockopt", 3, erlzmq_nif_setsockopt},
{"getsockopt", 2, erlzmq_nif_getsockopt},
{"send", 3, erlzmq_nif_send},
{"brecv", 2, erlzmq_nif_brecv},
{"recv", 2, erlzmq_nif_recv},
{"close", 1, erlzmq_nif_close},
{"term", 1, erlzmq_nif_term}
Expand Down Expand Up @@ -384,48 +382,6 @@ NIF(erlzmq_nif_send)

}

int brecv(zmq_msg_t * msg, erlzmq_socket * socket, int flags) {
int error;
if ((error = zmq_msg_init(msg))) {
return zmq_errno();
}

if ((error = zmq_recv(socket->socket, msg, flags))) {
return zmq_errno();
}

return 0;
}

NIF(erlzmq_nif_brecv)
{
erlzmq_socket * socket;
int _flags;

if (!enif_get_resource(env, argv[0], erlzmq_nif_resource_socket, (void **) &socket)) {
return enif_make_badarg(env);
}

if (!enif_get_int(env, argv[1], &_flags)) {
return enif_make_badarg(env);
}

int error;
zmq_msg_t msg;

if ((error = brecv(&msg, socket, _flags))) {
return enif_make_tuple2(env, enif_make_atom(env, "error"), enif_make_int(env, error));
}

ErlNifBinary bin;
enif_alloc_binary(zmq_msg_size(&msg), &bin);
memcpy(bin.data, zmq_msg_data(&msg), zmq_msg_size(&msg));

zmq_msg_close(&msg);

return enif_make_tuple2(env, enif_make_atom(env, "ok"), enif_make_binary(env, &bin));
}

NIF(erlzmq_nif_recv)
{

Expand All @@ -446,45 +402,52 @@ NIF(erlzmq_nif_recv)
int error;
zmq_msg_t msg;

// try brecv with noblock

error = brecv(&msg, socket, ZMQ_NOBLOCK);
if (zmq_msg_init(&msg)) {
goto errno_out;
}

if (error == EAGAIN) { // if nothing is there, hand it off to the receiver thread
if (recv.flags & ZMQ_NOBLOCK) {
goto out;
}
recv.env = enif_alloc_env();
recv.ref = enif_make_ref(recv.env);
recv.socket = socket->socket;
// try recv with noblock
if (zmq_recv(socket->socket, &msg, ZMQ_NOBLOCK)) {
error = zmq_errno();
if (error == EAGAIN) { // if nothing is there, hand it off to the receiver thread
if (recv.flags & ZMQ_NOBLOCK) {
goto out;
}
recv.env = enif_alloc_env();
recv.ref = enif_make_ref(recv.env);
recv.socket = socket->socket;

if ((error = zmq_msg_init_size(&msg, sizeof(erlzmq_recv)))) {
if (zmq_msg_init_size(&msg, sizeof(erlzmq_recv))) {
goto q_err;
}
}

memcpy(zmq_msg_data(&msg), &recv, sizeof(erlzmq_recv));
memcpy(zmq_msg_data(&msg), &recv, sizeof(erlzmq_recv));

if ((error = zmq_send(socket->context->ipc_socket, &msg, 0))) {
if (zmq_send(socket->context->ipc_socket, &msg, 0)) {
zmq_msg_close(&msg);
goto q_err;
}
}

zmq_msg_close(&msg);
zmq_msg_close(&msg);

return enif_make_copy(env, recv.ref);
return enif_make_copy(env, recv.ref);
q_err:
enif_free_env(recv.env);
return enif_make_tuple2(env, enif_make_atom(env, "error"),
enif_make_int(env, zmq_errno()));
} else if (error == 0) { // return result immediately
ErlNifBinary bin;
enif_alloc_binary(zmq_msg_size(&msg), &bin);
memcpy(bin.data, zmq_msg_data(&msg), zmq_msg_size(&msg));
enif_free_env(recv.env);
goto errno_out;
} else {
goto out;
}
}
ErlNifBinary bin;
enif_alloc_binary(zmq_msg_size(&msg), &bin);
memcpy(bin.data, zmq_msg_data(&msg), zmq_msg_size(&msg));

zmq_msg_close(&msg);
zmq_msg_close(&msg);

return enif_make_tuple2(env, enif_make_atom(env, "ok"),
enif_make_binary(env, &bin));
}
return enif_make_tuple2(env, enif_make_atom(env, "ok"),
enif_make_binary(env, &bin));
errno_out:
error = zmq_errno();
out:
return enif_make_tuple2(env, enif_make_atom(env, "error"),
enif_make_int(env, error));
Expand Down
20 changes: 1 addition & 19 deletions src/erlzmq.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@
%% @headerfile "erlzmq.hrl"
-include_lib("erlzmq.hrl").
-export([context/0, context/1, socket/2, bind/2, connect/2, send/2, send/3,
brecv/1, brecv/2, recv/1, recv/2, setsockopt/3, getsockopt/2,
close/1, term/1, term/2]).
recv/1, recv/2, setsockopt/3, getsockopt/2, close/1, term/1, term/2]).
-export_type([erlzmq_socket/0, erlzmq_context/0]).

%% @equiv context(1)
Expand Down Expand Up @@ -89,23 +88,6 @@ send(Socket, Binary) ->
send(Socket, Binary, Flags) when is_list(Flags) ->
erlzmq_result(erlzmq_nif:send(Socket, Binary, sendrecv_flags(Flags))).

%% @equiv brecv(Socket, 0)
%% @spec brecv(erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error()
-spec brecv(Socket :: erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error().

brecv(Socket) ->
erlzmq_result(brecv(Socket, [])).

%% @doc Receive a message from a socket in a blocking way.
%% This function can block the current VM scheduler. <b>DO NOT USE IT UNLESS YOU REALLY KNOW WHAT YOU ARE DOING</b>.
%% @end
%% @spec brecv(erlzmq_socket(), erlzmq_send_recv_flags()) -> {ok, erlzmq_data()} | erlzmq_error()
-spec brecv(Socket :: erlzmq_socket(), Flags :: erlzmq_send_recv_flags()) -> {ok, erlzmq_data()} | erlzmq_error().

brecv(Socket, Flags) when is_list(Flags) ->
erlzmq_result( erlzmq_nif:brecv(Socket, sendrecv_flags(Flags))).


%% @equiv recv(Socket, 0)
%% @spec recv(erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error()
-spec recv(Socket :: erlzmq_socket()) -> {ok, erlzmq_data()} | erlzmq_error().
Expand Down
5 changes: 1 addition & 4 deletions src/erlzmq_nif.erl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
%% @hidden
-module(erlzmq_nif).

-export([context/1, socket/2, bind/2, connect/2, send/3, brecv/2, recv/2, setsockopt/3, getsockopt/2, close/1, term/1]).
-export([context/1, socket/2, bind/2, connect/2, send/3, recv/2, setsockopt/3, getsockopt/2, close/1, term/1]).

-on_load(init/0).

Expand Down Expand Up @@ -32,9 +32,6 @@ connect(_Socket, _Endpoint) ->
send(_Socket, _Binary, _Flags) ->
erlang:nif_error(not_loaded).

brecv(_Socket, _Flags) ->
erlang:nif_error(not_loaded).

recv(_Socket, _Flags) ->
erlang:nif_error(not_loaded).

Expand Down

0 comments on commit e4a8986

Please sign in to comment.