Permalink
Browse files

Merge branch 'master' of git://github.com/zeromq/erlzmq2 into version

  • Loading branch information...
2 parents c1604ba + 5442d91 commit bd1aaa4d7efdd7fb98463ddb8d6d07fd1d4857b7 @gar1t gar1t committed Jun 11, 2011
Showing with 37 additions and 8 deletions.
  1. +20 −4 c_src/erlzmq_nif.c
  2. +17 −4 test/erlzmq_test.erl
View
@@ -609,7 +609,7 @@ NIF(erlzmq_nif_recv)
}
else {
enif_mutex_unlock(socket->mutex);
-
+
ErlNifBinary binary;
enif_alloc_binary(zmq_msg_size(&msg), &binary);
memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg));
@@ -760,14 +760,20 @@ static void * polling_thread(void * handle)
erlzmq_thread_request_t * r = vector_get(erlzmq_thread_request_t,
&requests, i);
if (item->revents & ZMQ_POLLIN) {
+ size_t value_len = sizeof(int64_t);
+ int64_t flag_value = 0;
+
assert(r->type == ERLZMQ_THREAD_REQUEST_RECV);
--count;
zmq_msg_t msg;
zmq_msg_init(&msg);
enif_mutex_lock(r->data.recv.socket->mutex);
if (zmq_recv(r->data.recv.socket->socket_zmq, &msg,
- r->data.recv.flags))
+ r->data.recv.flags) ||
+ (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON &&
+ zmq_getsockopt(r->data.recv.socket->socket_zmq,
+ ZMQ_RCVMORE, &flag_value, &value_len)) )
{
enif_mutex_unlock(r->data.recv.socket->mutex);
if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
@@ -797,14 +803,24 @@ static void * polling_thread(void * handle)
zmq_msg_close(&msg);
if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
+ ERL_NIF_TERM flags_list;
+
+ // Should we send the multipart flag
+ if(flag_value == 1) {
+ flags_list = enif_make_list1(r->data.recv.env, enif_make_atom(r->data.recv.env, "rcvmore"));
+ } else {
+ flags_list = enif_make_list(r->data.recv.env, 0);
+ }
+
enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
- enif_make_tuple3(r->data.recv.env,
+ enif_make_tuple4(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"),
enif_make_tuple2(r->data.recv.env,
enif_make_uint64(r->data.recv.env,
r->data.recv.socket->socket_index),
enif_make_resource(r->data.recv.env, r->data.recv.socket)),
- enif_make_binary(r->data.recv.env, &binary)));
+ enif_make_binary(r->data.recv.env, &binary),
+ flags_list));
enif_free_env(r->data.recv.env);
r->data.recv.env = enif_alloc_env();
item->revents = 0;
View
@@ -132,44 +132,57 @@ create_bound_pair(Ctx, Type1, Type2, Mode, Transport) ->
{S1, S2}.
ping_pong({S1, S2}, Msg, active) ->
+ ok = erlzmq:send(S1, Msg, [sndmore]),
ok = erlzmq:send(S1, Msg),
receive
- {zmq, S2, Msg} ->
+ {zmq, S2, Msg, [rcvmore]} ->
+ ok
+ after
+ 1000 ->
+ ?assertMatch({ok, Msg}, timeout)
+ end,
+ receive
+ {zmq, S2, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok = erlzmq:send(S2, Msg),
receive
- {zmq, S1, Msg} ->
+ {zmq, S1, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok = erlzmq:send(S1, Msg),
receive
- {zmq, S2, Msg} ->
+ {zmq, S2, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok = erlzmq:send(S2, Msg),
receive
- {zmq, S1, Msg} ->
+ {zmq, S1, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok;
+
ping_pong({S1, S2}, Msg, passive) ->
ok = erlzmq:send(S1, Msg),
?assertMatch({ok, Msg}, erlzmq:recv(S2)),
ok = erlzmq:send(S2, Msg),
?assertMatch({ok, Msg}, erlzmq:recv(S1)),
+ ok = erlzmq:send(S1, Msg, [sndmore]),
+ ok = erlzmq:send(S1, Msg),
+ ?assertMatch({ok, Msg}, erlzmq:recv(S2)),
+ ?assertMatch({ok, Msg}, erlzmq:recv(S2)),
ok.
basic_tests(Transport, Type1, Type2, Mode) ->

0 comments on commit bd1aaa4

Please sign in to comment.