From d6d020694ab87c9be3d8fe062f206d7ac71ae5ff Mon Sep 17 00:00:00 2001 From: Martin Sustrik Date: Thu, 19 Aug 2010 08:47:22 +0200 Subject: [PATCH] flags parameter added to send function --- c_src/zmq_drv.cpp | 9 +++++---- c_src/zmq_drv.h | 3 ++- src/zmq.erl | 36 +++++++++++++++++++++++++++--------- 3 files changed, 34 insertions(+), 14 deletions(-) mode change 100644 => 100755 c_src/zmq_drv.cpp mode change 100644 => 100755 c_src/zmq_drv.h mode change 100644 => 100755 src/zmq.erl diff --git a/c_src/zmq_drv.cpp b/c_src/zmq_drv.cpp old mode 100644 new mode 100755 index 0a0e11b..7306ed2 --- a/c_src/zmq_drv.cpp +++ b/c_src/zmq_drv.cpp @@ -391,7 +391,7 @@ zmqdrv_ready_input(ErlDrvData handle, ErlDrvEvent event) // There was a pending unwritten message on this socket. // Try to write it. If the write succeeds/fails clear the ZMQ_POLLOUT // flag and notify the waiting caller of completion of operation. - rc = zmq_send(s, &(*si)->out_msg, ZMQ_NOBLOCK); + rc = zmq_send(s, &(*si)->out_msg, (*si)->out_flags | ZMQ_NOBLOCK); zmqdrv_fprintf("resending message %p (size=%ld) on socket %p (ret=%d)\r\n", zmq_msg_data(&(*si)->out_msg), zmq_msg_size(&(*si)->out_msg), s, rc); @@ -748,8 +748,9 @@ zmqdrv_send(zmq_drv_t *drv, ErlIOVec *ev) char* bytes = bin->orig_bytes; uint32_t idx = ntohl(*(uint32_t*)(bytes+1)); zmq_sock_info* si = drv->get_socket_info(idx); - void* data = (void *)(bytes + 5); - size_t size = bin->orig_size - 5; + uint32_t flags = ntohl(*(uint32_t*)(bytes+5)); + void* data = (void *)(bytes + 9); + size_t size = bin->orig_size - 9; if (idx > drv->zmq_socket_count || !si) { zmqdrv_error_code(drv, ENODEV); @@ -779,7 +780,7 @@ zmqdrv_send(zmq_drv_t *drv, ErlIOVec *ev) return; } - if (zmq_send(si->socket, &si->out_msg, ZMQ_NOBLOCK) == 0) { + if (zmq_send(si->socket, &si->out_msg, flags | ZMQ_NOBLOCK) == 0) { zmqdrv_ok(drv); zmqdrv_ready_input((ErlDrvData)drv, (ErlDrvEvent)si->fd); } else { diff --git a/c_src/zmq_drv.h b/c_src/zmq_drv.h old mode 100644 new mode 100755 index 18a29db..13d5596 --- a/c_src/zmq_drv.h +++ b/c_src/zmq_drv.h @@ -56,6 +56,7 @@ struct zmq_sock_info { int fd; // Signaling fd for this socket ErlDrvTermData in_caller; // Caller's pid of the last recv() command in passive mode zmq_msg_t out_msg; // Pending message to be written to 0MQ socket + int out_flags; // Send flags for the pending message ErlDrvTermData out_caller; // Caller's pid of the last send() command // if it resulted in EAGAIN error. bool active_mode; // true - messages are delivered to owner @@ -65,7 +66,7 @@ struct zmq_sock_info { zmq_sock_info(zmq_socket_t _s, uint32_t _idx, ErlDrvTermData _owner, int _sig_fd) : socket(_s), idx(_idx), owner(_owner), fd(_sig_fd), in_caller(0) - , out_caller(0), active_mode(true), prev(NULL), next(NULL) + , out_flags(0), out_caller(0), active_mode(true), prev(NULL), next(NULL) { zmq_msg_init(&out_msg); } diff --git a/src/zmq.erl b/src/zmq.erl old mode 100644 new mode 100755 index 022ed05..211606d --- a/src/zmq.erl +++ b/src/zmq.erl @@ -41,7 +41,7 @@ %% ZMQ API -export([start_link/0, start_link/1, socket/1, socket/2, close/1, setsockopt/2, - bind/2, connect/2, send/2, recv/1, format_error/1]). + bind/2, connect/2, send/2, send/3, recv/1, format_error/1]). -export([port/0]). @@ -160,16 +160,24 @@ connect({Port, S}, Address) when is_binary(Address) -> Msg = encode_connect(S, Address), driver(Port, Msg). +%%-------------------------------------------------------------------- +%% @equiv send(Socket::zmq_socket(), Msg::binary(), []) +%% @end +%%-------------------------------------------------------------------- +send(Socket, Data) -> + send(Socket, Data, []). + %%-------------------------------------------------------------------- %% @doc Send a message to a given 0MQ socket. %% @spec (Socket::zmq_socket(), Msg::binary()) -> ok | {error, Reason} %% @end %%-------------------------------------------------------------------- -send(Socket, Data) when is_integer(Socket), is_binary(Data) -> - gen_server:call(?MODULE, {send, Socket, Data}); +send(Socket, Data, Flags) + when is_integer(Socket), is_binary(Data), is_list(Flags) -> + gen_server:call(?MODULE, {send, Socket, Data, Flags}); % Experimantal support of direct port communication -send({Port, S}, Data) -> - Msg = encode_msg_send(S, Data), +send({Port, S}, Data, Flags) -> + Msg = encode_msg_send(S, Data, Flags), driver(Port, Msg). %%-------------------------------------------------------------------- @@ -282,9 +290,9 @@ handle_call({connect, Socket, Address}, _From, State) -> ?log("~p addr:~s", [connect, binary_to_list(Address)]), do_call(State, encode_connect(Socket, Address)); -handle_call({send, Socket, Data}, _From, State) -> +handle_call({send, Socket, Data, Flags}, _From, State) -> ?log("~p", [send]), - do_call(State, encode_msg_send(Socket, Data)); + do_call(State, encode_msg_send(Socket, Data, Flags)); handle_call({recv, Socket}, _From, State) -> ?log("~p", [recv]), @@ -440,8 +448,18 @@ encode_bind(Socket, Address) -> <<(?ZMQ_BIND):8, Socket:32, Address/binary>>. encode_connect(Socket, Address) -> <<(?ZMQ_CONNECT):8, Socket:32, Address/binary>>. -encode_msg_send(Socket, Data) -> - <<(?ZMQ_SEND):8, Socket:32, Data/binary>>. + +send_flags_to_int([]) -> 0; +send_flags_to_int([H|T]) -> + send_flags_to_int(T) bor + case H of + sndmore -> 2; + _ -> throw({unknown_send_option, 0}) + end. + +encode_msg_send(Socket, Data, Flags) -> + F = send_flags_to_int(Flags), + <<(?ZMQ_SEND):8, Socket:32, F:32, Data/binary>>. encode_msg_recv(Socket) -> <<(?ZMQ_RECV):8, Socket:32>>.