Skip to content

Commit

Permalink
Changing the recv() back to a 2-tuple response
Browse files Browse the repository at this point in the history
  • Loading branch information
ianbarber committed Jun 10, 2011
1 parent 0ec0347 commit 6cf222c
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 34 deletions.
42 changes: 15 additions & 27 deletions c_src/erlzmq_nif.c
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -539,9 +539,6 @@ NIF(erlzmq_nif_recv)
{ {
erlzmq_thread_request_t req; erlzmq_thread_request_t req;
erlzmq_socket_t * socket; erlzmq_socket_t * socket;
ERL_NIF_TERM flags_list;
size_t value_len = sizeof(int64_t);
int64_t flag_value = 0;


if (! enif_get_resource(env, argv[0], erlzmq_nif_resource_socket, if (! enif_get_resource(env, argv[0], erlzmq_nif_resource_socket,
(void **) &socket)) { (void **) &socket)) {
Expand Down Expand Up @@ -609,26 +606,16 @@ NIF(erlzmq_nif_recv)
} }
} }
else { else {
if(zmq_getsockopt(socket->socket_zmq, ZMQ_RCVMORE, &flag_value, &value_len)) {
return return_zmq_errno(env, zmq_errno());
}
enif_mutex_unlock(socket->mutex); enif_mutex_unlock(socket->mutex);


// Should we send the multipart flag
if(flag_value == 1) {
flags_list = enif_make_list1(env, enif_make_atom(env, "rcvmore"));
} else {
flags_list = enif_make_list(env, 0);
}

ErlNifBinary binary; ErlNifBinary binary;
enif_alloc_binary(zmq_msg_size(&msg), &binary); enif_alloc_binary(zmq_msg_size(&msg), &binary);
memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg)); memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg));


zmq_msg_close(&msg); zmq_msg_close(&msg);


return enif_make_tuple3(env, enif_make_atom(env, "ok"), return enif_make_tuple2(env, enif_make_atom(env, "ok"),
enif_make_binary(env, &binary), flags_list); enif_make_binary(env, &binary));
} }
} }


Expand Down Expand Up @@ -773,8 +760,9 @@ static void * polling_thread(void * handle)
enif_mutex_lock(r->data.recv.socket->mutex); enif_mutex_lock(r->data.recv.socket->mutex);
if (zmq_recv(r->data.recv.socket->socket_zmq, &msg, if (zmq_recv(r->data.recv.socket->socket_zmq, &msg,
r->data.recv.flags) || r->data.recv.flags) ||
(r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON &&
zmq_getsockopt(r->data.recv.socket->socket_zmq, zmq_getsockopt(r->data.recv.socket->socket_zmq,
ZMQ_RCVMORE, &flag_value, &value_len) ) ZMQ_RCVMORE, &flag_value, &value_len)) )
{ {
enif_mutex_unlock(r->data.recv.socket->mutex); enif_mutex_unlock(r->data.recv.socket->mutex);
if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) { if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
Expand Down Expand Up @@ -802,16 +790,17 @@ static void * polling_thread(void * handle)
enif_alloc_binary(zmq_msg_size(&msg), &binary); enif_alloc_binary(zmq_msg_size(&msg), &binary);
memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg)); memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg));
zmq_msg_close(&msg); zmq_msg_close(&msg);
ERL_NIF_TERM flags_list;

// Should we send the multipart flag
if(flag_value == 1) {
flags_list = enif_make_list1(r->data.recv.env, enif_make_atom(r->data.recv.env, "rcvmore"));
} else {
flags_list = enif_make_list(r->data.recv.env, 0);
}


if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) { if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
ERL_NIF_TERM flags_list;

// Should we send the multipart flag
if(flag_value == 1) {
flags_list = enif_make_list1(r->data.recv.env, enif_make_atom(r->data.recv.env, "rcvmore"));
} else {
flags_list = enif_make_list(r->data.recv.env, 0);
}

enif_send(NULL, &r->data.recv.pid, r->data.recv.env, enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
enif_make_tuple4(r->data.recv.env, enif_make_tuple4(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"), enif_make_atom(r->data.recv.env, "zmq"),
Expand All @@ -827,10 +816,9 @@ static void * polling_thread(void * handle)
} }
else { else {
enif_send(NULL, &r->data.recv.pid, r->data.recv.env, enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
enif_make_tuple3(r->data.recv.env, enif_make_tuple2(r->data.recv.env,
enif_make_copy(r->data.recv.env, r->data.recv.ref), enif_make_copy(r->data.recv.env, r->data.recv.ref),
enif_make_binary(r->data.recv.env, &binary), enif_make_binary(r->data.recv.env, &binary)));
flags_list));


enif_free_env(r->data.recv.env); enif_free_env(r->data.recv.env);
enif_release_resource(r->data.recv.socket); enif_release_resource(r->data.recv.socket);
Expand Down
4 changes: 2 additions & 2 deletions src/erlzmq.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -190,8 +190,8 @@ recv({I, Socket}, Flags)
Ref when is_reference(Ref) -> Ref when is_reference(Ref) ->
Timeout = proplists:get_value(timeout, Flags, infinity), Timeout = proplists:get_value(timeout, Flags, infinity),
receive receive
{Ref, Result, Flag} -> {Ref, Result} ->
{ok, Result, Flag} {ok, Result}
after Timeout -> after Timeout ->
{error, {timeout, Ref}} {error, {timeout, Ref}}
end; end;
Expand Down
10 changes: 5 additions & 5 deletions test/erlzmq_test.erl
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ hwm_test() ->


ok = hwm_loop(10, S2), ok = hwm_loop(10, S2),


?assertMatch({ok, <<"test">>, _Flags}, erlzmq:recv(S1)), ?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
?assertMatch(ok, erlzmq:send(S2, <<"test">>)), ?assertMatch(ok, erlzmq:send(S2, <<"test">>)),
ok = erlzmq:close(S1), ok = erlzmq:close(S1),
ok = erlzmq:close(S2), ok = erlzmq:close(S2),
Expand Down Expand Up @@ -173,13 +173,13 @@ ping_pong({S1, S2}, Msg, active) ->


ping_pong({S1, S2}, Msg, passive) -> ping_pong({S1, S2}, Msg, passive) ->
ok = erlzmq:send(S1, Msg), ok = erlzmq:send(S1, Msg),
?assertMatch({ok, Msg, []}, erlzmq:recv(S2)), ?assertMatch({ok, Msg}, erlzmq:recv(S2)),
ok = erlzmq:send(S2, Msg), ok = erlzmq:send(S2, Msg),
?assertMatch({ok, Msg, []}, erlzmq:recv(S1)), ?assertMatch({ok, Msg}, erlzmq:recv(S1)),
ok = erlzmq:send(S1, Msg, [sndmore]), ok = erlzmq:send(S1, Msg, [sndmore]),
ok = erlzmq:send(S1, Msg), ok = erlzmq:send(S1, Msg),
?assertMatch({ok, Msg, [rcvmore]}, erlzmq:recv(S2)), ?assertMatch({ok, Msg}, erlzmq:recv(S2)),
?assertMatch({ok, Msg, []}, erlzmq:recv(S2)), ?assertMatch({ok, Msg}, erlzmq:recv(S2)),
ok. ok.


basic_tests(Transport, Type1, Type2, Mode) -> basic_tests(Transport, Type1, Type2, Mode) ->
Expand Down

0 comments on commit 6cf222c

Please sign in to comment.