Permalink
Browse files

Merge pull request #9 from ianbarber/master

Multipart Message Support
  • Loading branch information...
2 parents b6a6abf + 75affe1 commit 0ec0347225898ed2db135863bf1b98134c44fece @yrashk yrashk committed Jun 10, 2011
Showing with 57 additions and 16 deletions.
  1. +35 −7 c_src/erlzmq_nif.c
  2. +2 −2 src/erlzmq.erl
  3. +20 −7 test/erlzmq_test.erl
View
@@ -539,6 +539,9 @@ NIF(erlzmq_nif_recv)
{
erlzmq_thread_request_t req;
erlzmq_socket_t * socket;
+ ERL_NIF_TERM flags_list;
+ size_t value_len = sizeof(int64_t);
+ int64_t flag_value = 0;
if (! enif_get_resource(env, argv[0], erlzmq_nif_resource_socket,
(void **) &socket)) {
@@ -606,16 +609,26 @@ NIF(erlzmq_nif_recv)
}
}
else {
+ if(zmq_getsockopt(socket->socket_zmq, ZMQ_RCVMORE, &flag_value, &value_len)) {
+ return return_zmq_errno(env, zmq_errno());
+ }
enif_mutex_unlock(socket->mutex);
+
+ // Should we send the multipart flag
+ if(flag_value == 1) {
+ flags_list = enif_make_list1(env, enif_make_atom(env, "rcvmore"));
+ } else {
+ flags_list = enif_make_list(env, 0);
+ }
ErlNifBinary binary;
enif_alloc_binary(zmq_msg_size(&msg), &binary);
memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg));
zmq_msg_close(&msg);
- return enif_make_tuple2(env, enif_make_atom(env, "ok"),
- enif_make_binary(env, &binary));
+ return enif_make_tuple3(env, enif_make_atom(env, "ok"),
+ enif_make_binary(env, &binary), flags_list);
}
}
@@ -749,14 +762,19 @@ static void * polling_thread(void * handle)
erlzmq_thread_request_t * r = vector_get(erlzmq_thread_request_t,
&requests, i);
if (item->revents & ZMQ_POLLIN) {
+ size_t value_len = sizeof(int64_t);
+ int64_t flag_value = 0;
+
assert(r->type == ERLZMQ_THREAD_REQUEST_RECV);
--count;
zmq_msg_t msg;
zmq_msg_init(&msg);
enif_mutex_lock(r->data.recv.socket->mutex);
if (zmq_recv(r->data.recv.socket->socket_zmq, &msg,
- r->data.recv.flags))
+ r->data.recv.flags) ||
+ zmq_getsockopt(r->data.recv.socket->socket_zmq,
+ ZMQ_RCVMORE, &flag_value, &value_len) )
{
enif_mutex_unlock(r->data.recv.socket->mutex);
if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
@@ -784,25 +802,35 @@ static void * polling_thread(void * handle)
enif_alloc_binary(zmq_msg_size(&msg), &binary);
memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg));
zmq_msg_close(&msg);
+ ERL_NIF_TERM flags_list;
+
+ // Should we send the multipart flag
+ if(flag_value == 1) {
+ flags_list = enif_make_list1(r->data.recv.env, enif_make_atom(r->data.recv.env, "rcvmore"));
+ } else {
+ flags_list = enif_make_list(r->data.recv.env, 0);
+ }
if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
- enif_make_tuple3(r->data.recv.env,
+ enif_make_tuple4(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"),
enif_make_tuple2(r->data.recv.env,
enif_make_uint64(r->data.recv.env,
r->data.recv.socket->socket_index),
enif_make_resource(r->data.recv.env, r->data.recv.socket)),
- enif_make_binary(r->data.recv.env, &binary)));
+ enif_make_binary(r->data.recv.env, &binary),
+ flags_list));
enif_free_env(r->data.recv.env);
r->data.recv.env = enif_alloc_env();
item->revents = 0;
}
else {
enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
- enif_make_tuple2(r->data.recv.env,
+ enif_make_tuple3(r->data.recv.env,
enif_make_copy(r->data.recv.env, r->data.recv.ref),
- enif_make_binary(r->data.recv.env, &binary)));
+ enif_make_binary(r->data.recv.env, &binary),
+ flags_list));
enif_free_env(r->data.recv.env);
enif_release_resource(r->data.recv.socket);
View
@@ -190,8 +190,8 @@ recv({I, Socket}, Flags)
Ref when is_reference(Ref) ->
Timeout = proplists:get_value(timeout, Flags, infinity),
receive
- {Ref, Result} ->
- {ok, Result}
+ {Ref, Result, Flag} ->
+ {ok, Result, Flag}
after Timeout ->
{error, {timeout, Ref}}
end;
View
@@ -15,7 +15,7 @@ hwm_test() ->
ok = hwm_loop(10, S2),
- ?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
+ ?assertMatch({ok, <<"test">>, _Flags}, erlzmq:recv(S1)),
?assertMatch(ok, erlzmq:send(S2, <<"test">>)),
ok = erlzmq:close(S1),
ok = erlzmq:close(S2),
@@ -129,44 +129,57 @@ create_bound_pair(Ctx, Type1, Type2, Mode, Transport) ->
{S1, S2}.
ping_pong({S1, S2}, Msg, active) ->
+ ok = erlzmq:send(S1, Msg, [sndmore]),
ok = erlzmq:send(S1, Msg),
receive
- {zmq, S2, Msg} ->
+ {zmq, S2, Msg, [rcvmore]} ->
+ ok
+ after
+ 1000 ->
+ ?assertMatch({ok, Msg}, timeout)
+ end,
+ receive
+ {zmq, S2, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok = erlzmq:send(S2, Msg),
receive
- {zmq, S1, Msg} ->
+ {zmq, S1, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok = erlzmq:send(S1, Msg),
receive
- {zmq, S2, Msg} ->
+ {zmq, S2, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok = erlzmq:send(S2, Msg),
receive
- {zmq, S1, Msg} ->
+ {zmq, S1, Msg, []} ->
ok
after
1000 ->
?assertMatch({ok, Msg}, timeout)
end,
ok;
+
ping_pong({S1, S2}, Msg, passive) ->
ok = erlzmq:send(S1, Msg),
- ?assertMatch({ok, Msg}, erlzmq:recv(S2)),
+ ?assertMatch({ok, Msg, []}, erlzmq:recv(S2)),
ok = erlzmq:send(S2, Msg),
- ?assertMatch({ok, Msg}, erlzmq:recv(S1)),
+ ?assertMatch({ok, Msg, []}, erlzmq:recv(S1)),
+ ok = erlzmq:send(S1, Msg, [sndmore]),
+ ok = erlzmq:send(S1, Msg),
+ ?assertMatch({ok, Msg, [rcvmore]}, erlzmq:recv(S2)),
+ ?assertMatch({ok, Msg, []}, erlzmq:recv(S2)),
ok.
basic_tests(Transport, Type1, Type2, Mode) ->

0 comments on commit 0ec0347

Please sign in to comment.