Permalink
Browse files

getsockopt allows for getting RCVMORE option

  • Loading branch information...
1 parent 10b27d4 commit 042b0c571550e0fca8ae37449ceda3c8e80893b1 @sustrik sustrik committed Aug 20, 2010
Showing with 62 additions and 2 deletions.
  1. +22 −0 c_src/zmq_drv.cpp
  2. +40 −2 src/zmq.erl
View
22 c_src/zmq_drv.cpp
@@ -666,6 +666,28 @@ zmqdrv_setsockopt(zmq_drv_t *drv, ErlIOVec *ev)
static void
zmqdrv_getsockopt(zmq_drv_t *drv, ErlIOVec *ev)
{
+ ErlDrvBinary* bin = ev->binv[1];
+ char* bytes = bin->orig_bytes;
+ uint32_t idx = ntohl(*(uint32_t*)(bytes+1));
+ void* s = drv->get_zmq_socket(idx);
+ uint32_t opt = ntohl (*(uint32_t*)(bytes+sizeof(idx)+1));
+
+ if (opt == ZMQ_RCVMORE) {
+ int64_t val;
+ size_t valsz = sizeof (val);
+ if (zmq_getsockopt (s, opt, &val, &valsz) < 0) {
+ zmqdrv_error_code(drv, zmq_errno());
+ return;
+ }
+
+ ErlDrvTermData spec[] = {
+ ERL_DRV_ATOM, am_ok,
+ ERL_DRV_ATOM, val ? driver_mk_atom((char*) "true") : driver_mk_atom((char*) "false"),
+ ERL_DRV_TUPLE, 2};
+ driver_send_term(drv->port, driver_caller(drv->port), spec, sizeof(spec)/sizeof(spec[0]));
+ return;
+ }
+
zmqdrv_error(drv, "Not implemented");
}
View
42 src/zmq.erl
@@ -43,7 +43,7 @@
%% ZMQ API
-export([start_link/0, start_link/1,
- socket/1, socket/2, close/1, setsockopt/2,
+ socket/1, socket/2, close/1, setsockopt/2, getsockopt/2,
bind/2, connect/2, send/2, send/3, recv/1, format_error/1]).
-export([port/0]).
@@ -131,6 +131,19 @@ setsockopt(Socket, Opts) when is_integer(Socket), is_list(Opts) ->
gen_server:call(?MODULE, {setsockopt, Socket, Opts}).
%%--------------------------------------------------------------------
+%% @doc Get socket option.
+%% @spec (Socket::zmq_socket(), Option) -> {ok, Value} | {error, Reason}
+%% Option = zmq_sockopt()
+%% @end
+%%--------------------------------------------------------------------
+getsockopt(Socket, Option) when is_integer(Socket), is_atom(Option) ->
+ gen_server:call(?MODULE, {getsockopt, Socket, Option});
+% Experimantal support of direct port communication
+getsockopt({Port, S}, Option) when is_atom(Option)->
+ Msg = encode_getsockopt(S, Option),
+ driver(Port, Msg).
+
+%%--------------------------------------------------------------------
%% @doc Bind a 0MQ socket to address.
%% @spec (Socket::zmq_socket(), Address) -> ok | {error, Reason}
%% Address = string() | binary()
@@ -286,6 +299,10 @@ handle_call({setsockopt, Socket, Options}, _From, State) ->
?log("~p", [socketopt]),
do_call(State, encode_sock_opts(Socket, Options));
+handle_call({getsockopt, Socket, Option}, _From, State) ->
+ ?log("~p", [getsockopt]),
+ do_call(State, encode_getsockopt(Socket, Option));
+
handle_call({bind, Socket, Address}, _From, State) ->
?log("~p addr:~s", [bind, binary_to_list(Address)]),
do_call(State, encode_bind(Socket, Address));
@@ -448,6 +465,27 @@ check_sockopt({active, true}) -> 1;
check_sockopt({active, false}) -> 0;
check_sockopt(Option) -> throw({unknown_option, Option}).
+sockopt_to_int(Option) ->
+ case Option of
+ hwm -> ?ZMQ_HWM;
+ swap -> ?ZMQ_SWAP;
+ affinity -> ?ZMQ_AFFINITY;
+ identity -> ?ZMQ_IDENTITY;
+ subscribe -> ?ZMQ_SUBSCRIBE;
+ unsubscribe -> ?ZMQ_UNSUBSCRIBE;
+ rate -> ?ZMQ_RATE;
+ recovery_ivl-> ?ZMQ_RECOVERY_IVL;
+ mcast_loop -> ?ZMQ_MCAST_LOOP;
+ sndbuf -> ?ZMQ_SNDBUF;
+ rcvbuf -> ?ZMQ_RCVBUF;
+ rcvmore -> ?ZMQ_RCVMORE;
+ _ -> throw({unknown_sock_option, Option})
+ end.
+
+encode_getsockopt(Socket, Option) ->
+ O = sockopt_to_int(Option),
+ <<(?ZMQ_GETSOCKOPT):8, Socket:32, O:32>>.
+
encode_bind(Socket, Address) ->
<<(?ZMQ_BIND):8, Socket:32, Address/binary>>.
encode_connect(Socket, Address) ->
@@ -457,7 +495,7 @@ send_flags_to_int([]) -> 0;
send_flags_to_int([H|T]) ->
send_flags_to_int(T) bor
case H of
- sndmore -> 2;
+ sndmore -> ?ZMQ_SNDMORE;
_ -> throw({unknown_send_option, 0})
end.

0 comments on commit 042b0c5

Please sign in to comment.