Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
flags parameter added to send function
  • Loading branch information
sustrik committed Aug 19, 2010
1 parent bc9ead2 commit d6d0206
Show file tree
Hide file tree
Showing 3 changed files with 34 additions and 14 deletions.
9 changes: 5 additions & 4 deletions c_src/zmq_drv.cpp 100644 → 100755
Expand Up @@ -391,7 +391,7 @@ zmqdrv_ready_input(ErlDrvData handle, ErlDrvEvent event)
// There was a pending unwritten message on this socket.
// Try to write it. If the write succeeds/fails clear the ZMQ_POLLOUT
// flag and notify the waiting caller of completion of operation.
rc = zmq_send(s, &(*si)->out_msg, ZMQ_NOBLOCK);
rc = zmq_send(s, &(*si)->out_msg, (*si)->out_flags | ZMQ_NOBLOCK);

zmqdrv_fprintf("resending message %p (size=%ld) on socket %p (ret=%d)\r\n",
zmq_msg_data(&(*si)->out_msg), zmq_msg_size(&(*si)->out_msg), s, rc);
Expand Down Expand Up @@ -748,8 +748,9 @@ zmqdrv_send(zmq_drv_t *drv, ErlIOVec *ev)
char* bytes = bin->orig_bytes;
uint32_t idx = ntohl(*(uint32_t*)(bytes+1));
zmq_sock_info* si = drv->get_socket_info(idx);
void* data = (void *)(bytes + 5);
size_t size = bin->orig_size - 5;
uint32_t flags = ntohl(*(uint32_t*)(bytes+5));
void* data = (void *)(bytes + 9);
size_t size = bin->orig_size - 9;

if (idx > drv->zmq_socket_count || !si) {
zmqdrv_error_code(drv, ENODEV);
Expand Down Expand Up @@ -779,7 +780,7 @@ zmqdrv_send(zmq_drv_t *drv, ErlIOVec *ev)
return;
}

if (zmq_send(si->socket, &si->out_msg, ZMQ_NOBLOCK) == 0) {
if (zmq_send(si->socket, &si->out_msg, flags | ZMQ_NOBLOCK) == 0) {
zmqdrv_ok(drv);
zmqdrv_ready_input((ErlDrvData)drv, (ErlDrvEvent)si->fd);
} else {
Expand Down
3 changes: 2 additions & 1 deletion c_src/zmq_drv.h 100644 → 100755
Expand Up @@ -56,6 +56,7 @@ struct zmq_sock_info {
int fd; // Signaling fd for this socket
ErlDrvTermData in_caller; // Caller's pid of the last recv() command in passive mode
zmq_msg_t out_msg; // Pending message to be written to 0MQ socket
int out_flags; // Send flags for the pending message
ErlDrvTermData out_caller; // Caller's pid of the last send() command
// if it resulted in EAGAIN error.
bool active_mode; // true - messages are delivered to owner
Expand All @@ -65,7 +66,7 @@ struct zmq_sock_info {

zmq_sock_info(zmq_socket_t _s, uint32_t _idx, ErlDrvTermData _owner, int _sig_fd)
: socket(_s), idx(_idx), owner(_owner), fd(_sig_fd), in_caller(0)
, out_caller(0), active_mode(true), prev(NULL), next(NULL)
, out_flags(0), out_caller(0), active_mode(true), prev(NULL), next(NULL)
{
zmq_msg_init(&out_msg);
}
Expand Down
36 changes: 27 additions & 9 deletions src/zmq.erl 100644 → 100755
Expand Up @@ -41,7 +41,7 @@
%% ZMQ API
-export([start_link/0, start_link/1,
socket/1, socket/2, close/1, setsockopt/2,
bind/2, connect/2, send/2, recv/1, format_error/1]).
bind/2, connect/2, send/2, send/3, recv/1, format_error/1]).

-export([port/0]).

Expand Down Expand Up @@ -160,16 +160,24 @@ connect({Port, S}, Address) when is_binary(Address) ->
Msg = encode_connect(S, Address),
driver(Port, Msg).

%%--------------------------------------------------------------------
%% @equiv send(Socket::zmq_socket(), Msg::binary(), [])
%% @end
%%--------------------------------------------------------------------
send(Socket, Data) ->
send(Socket, Data, []).

%%--------------------------------------------------------------------
%% @doc Send a message to a given 0MQ socket.
%% @spec (Socket::zmq_socket(), Msg::binary()) -> ok | {error, Reason}
%% @end
%%--------------------------------------------------------------------
send(Socket, Data) when is_integer(Socket), is_binary(Data) ->
gen_server:call(?MODULE, {send, Socket, Data});
send(Socket, Data, Flags)
when is_integer(Socket), is_binary(Data), is_list(Flags) ->
gen_server:call(?MODULE, {send, Socket, Data, Flags});
% Experimantal support of direct port communication
send({Port, S}, Data) ->
Msg = encode_msg_send(S, Data),
send({Port, S}, Data, Flags) ->
Msg = encode_msg_send(S, Data, Flags),
driver(Port, Msg).

%%--------------------------------------------------------------------
Expand Down Expand Up @@ -282,9 +290,9 @@ handle_call({connect, Socket, Address}, _From, State) ->
?log("~p addr:~s", [connect, binary_to_list(Address)]),
do_call(State, encode_connect(Socket, Address));

handle_call({send, Socket, Data}, _From, State) ->
handle_call({send, Socket, Data, Flags}, _From, State) ->
?log("~p", [send]),
do_call(State, encode_msg_send(Socket, Data));
do_call(State, encode_msg_send(Socket, Data, Flags));

handle_call({recv, Socket}, _From, State) ->
?log("~p", [recv]),
Expand Down Expand Up @@ -440,8 +448,18 @@ encode_bind(Socket, Address) ->
<<(?ZMQ_BIND):8, Socket:32, Address/binary>>.
encode_connect(Socket, Address) ->
<<(?ZMQ_CONNECT):8, Socket:32, Address/binary>>.
encode_msg_send(Socket, Data) ->
<<(?ZMQ_SEND):8, Socket:32, Data/binary>>.

send_flags_to_int([]) -> 0;
send_flags_to_int([H|T]) ->
send_flags_to_int(T) bor
case H of
sndmore -> 2;
_ -> throw({unknown_send_option, 0})
end.

encode_msg_send(Socket, Data, Flags) ->
F = send_flags_to_int(Flags),
<<(?ZMQ_SEND):8, Socket:32, F:32, Data/binary>>.
encode_msg_recv(Socket) ->
<<(?ZMQ_RECV):8, Socket:32>>.

Expand Down

0 comments on commit d6d0206

Please sign in to comment.