Permalink
Browse files

Patched to support ZeroMQ 2.0.7 (thanks to Chris)

  • Loading branch information...
saleyn committed Jul 24, 2010
1 parent 45c95ba commit bfc69ab8c340ea68aca364850e3ce78aba587b06
Showing with 50 additions and 94 deletions.
  1. +9 −3 README
  2. +27 −54 c_src/zmq_drv.cpp
  3. +0 −6 c_src/zmq_drv.h
  4. +2 −5 include/zmq.hrl
  5. +9 −19 src/zmq.erl
  6. +1 −2 src/zmq_pubserver.erl
  7. +1 −2 src/zmq_repserver.erl
  8. +1 −2 src/zmq_reqclient.erl
  9. +0 −1 src/zmq_subclient.erl
View
12 README
@@ -1,6 +1,11 @@
Erlang bindings for ZeroMQ (http://www.zeromq.org).
Building
+ The bindings work against the master branch of ZeroMQ:
+ http://github.com/sustrik/zeromq2
+
+ $ git clone git@github.com:zeromq/erlzmq.git erlzmq
+ $ cd erlzmq
$ ./bootstrap
$ ./configure --with-zeromq=/path/to/zeromq
$ make
@@ -17,8 +22,9 @@ Examples
PUB/SUB zmq_pubserver/zmq_subclient
REQ/REP zmq_repserver/zmq_reqclient
- 1> zmq_pubserver:run().
- 2> zmq_subclient:run().
+ 1> zmq:start_link().
+ 2> zmq_pubserver:run().
+ 3> zmq_subclient:run().
You can run a server and any number of clients
in the same Erlang shell or on different nodes.
@@ -31,7 +37,7 @@ Copyright
Copywight (c) 2010 Serge Aleynikov
Contacts
- Report bugs to <dhammika at gmail dot com> and <saleyn at gmail dot com>
+ http://github.com/zeromq/erlzmq/issues
Copying
Erlang bindings are released under open-source BSD License (see LICENSE file)
View
@@ -78,8 +78,11 @@ zmq_drv_t::~zmq_drv_t()
zmq_pid_sockets.clear();
zmq_fd_sockets.clear();
- if (zmq_context)
+ if (zmq_context) {
+ zmqdrv_fprintf("calling zmq_term(context) ...\r\n");
zmq_term(zmq_context);
+ zmqdrv_fprintf("terminated zmq context\r\n");
+ }
}
void zmq_drv_t::add_socket(zmq_sock_info* s)
@@ -200,7 +203,6 @@ static ErlDrvTermData error_atom(int err)
case EADDRNOTAVAIL: strcpy(errstr, "eaddrnotavail"); break;
case ECONNREFUSED: strcpy(errstr, "econnrefused"); break;
case EINPROGRESS: strcpy(errstr, "einprogress"); break;
- case EMTHREAD: strcpy(errstr, "emthread"); break;
case EFSM: strcpy(errstr, "efsm"); break;
case ENOCOMPATPROTO: strcpy(errstr, "enocompatproto"); break;
default:
@@ -309,6 +311,7 @@ static void
zmqdrv_stop(ErlDrvData handle)
{
delete reinterpret_cast<zmq_drv_t*>(handle);
+ zmqdrv_fprintf("driver stopped by pid\r\n");
}
static void
@@ -328,41 +331,20 @@ zmqdrv_ready_input(ErlDrvData handle, ErlDrvEvent event)
assert(si != it->second.end());
- // FIXME: Currently 0MQ design assumes one thread owning
- // many 0MQ sockets - so we get the app_thread from the
- // first socket in the list of sockets associated with app_thread's
- // signaling fd. If 0MQ's implementation merges the app_thread
- // concept with 0MQ socket object, this implementation would
- // have to call zmq_process in the loop for every 0MQ socket.
- zmq_app_thread_t app_thread;
- zmq_getsockopt((*si)->socket, ZMQ_APP_THREAD, &app_thread, sizeof(app_thread));
-
- if (app_thread == NULL) {
- zmqdrv_fprintf("warning: no app_thread for socket %p [sig_fd=%ld]\r\n",
- (*si)->socket, (long)event);
- // If this happens, there's something severely wrong with the socket
- // structure - bail out by crashing the driver.
- driver_failure_atom(drv->port, (char*)"no_application_thread_found");
- return;
- }
-
- zmq_process(app_thread);
-
for (; si != it->second.end(); ++si) {
zmq_socket_t s = (*si)->socket;
uint32_t idx = (*si)->idx;
ErlDrvTermData owner = (*si)->owner;
int rc = 0;
uint32_t events;
+ size_t events_size = sizeof(events);
- zmq_getsockopt(s, ZMQ_EVENTS | ZMQ_POLLIN, &events, sizeof(events));
+ zmq_getsockopt(s, ZMQ_EVENTS, &events, &events_size);
while (((*si)->active_mode || (*si)->in_caller) && (events & ZMQ_POLLIN)) {
msg_t msg;
- do {
- rc = zmq_recv(s, &msg, ZMQ_NOBLOCK);
- } while (rc == -1 && zmq_errno() == EINTR);
+ rc = zmq_recv(s, &msg, ZMQ_NOBLOCK);
ErlDrvTermData pid = (*si)->active_mode ? owner : (*si)->in_caller;
@@ -400,18 +382,16 @@ zmqdrv_ready_input(ErlDrvData handle, ErlDrvEvent event)
// FIXME: add error handling
zmqdrv_fprintf("received %ld byte message relayed to pid %ld\r\n", zmq_msg_size(&msg), pid);
- zmq_getsockopt(s, ZMQ_EVENTS | ZMQ_POLLIN, &events, sizeof(events));
+ zmq_getsockopt(s, ZMQ_EVENTS, &events, &events_size);
}
- zmq_getsockopt(s, ZMQ_EVENTS | ZMQ_POLLIN, &events, sizeof(events));
+ zmq_getsockopt(s, ZMQ_EVENTS, &events, &events_size);
if ((*si)->out_caller != 0 && (events & ZMQ_POLLOUT)) {
// There was a pending unwritten message on this socket.
// Try to write it. If the write succeeds/fails clear the ZMQ_POLLOUT
// flag and notify the waiting caller of completion of operation.
- do {
- rc = zmq_send(s, &(*si)->out_msg, ZMQ_NOBLOCK);
- } while (rc == -1 && zmq_errno() == EINTR);
+ rc = zmq_send(s, &(*si)->out_msg, ZMQ_NOBLOCK);
zmqdrv_fprintf("resending message %p (size=%ld) on socket %p (ret=%d)\r\n",
zmq_msg_data(&(*si)->out_msg), zmq_msg_size(&(*si)->out_msg), s, rc);
@@ -519,24 +499,20 @@ zmqdrv_init(zmq_drv_t *drv, ErlIOVec *ev)
* call to explicitely convert a term to external binary format.
*/
- uint32_t app_threads;
uint32_t io_threads;
- uint32_t flags;
ErlDrvBinary* input = ev->binv[1];
char* bytes = input->orig_bytes;
- app_threads = ntohl(*(uint32_t *)(bytes + 1));
- io_threads = ntohl(*(uint32_t *)(bytes + 5));
- flags = ntohl(*(uint32_t *)(bytes + 9));
+ io_threads = ntohl(*(uint32_t *)(bytes + 1));
- zmqdrv_fprintf("appthreads = %u, iothreads = %u\r\n", app_threads, io_threads);
+ zmqdrv_fprintf("iothreads = %u\r\n", io_threads);
if (drv->zmq_context) {
zmqdrv_error_code(drv, EBUSY);
return;
}
- drv->zmq_context = (void *)zmq_init(app_threads, io_threads, flags);
+ drv->zmq_context = (void *)zmq_init(io_threads);
if (!drv->zmq_context) {
zmqdrv_error_code(drv, zmq_errno());
@@ -554,7 +530,11 @@ zmqdrv_term(zmq_drv_t *drv, ErlIOVec *ev)
return;
}
- if (zmq_term(drv->zmq_context) < 0) {
+ zmqdrv_fprintf("calling zmq_term(context) ...\r\n");
+ int rc = zmq_term(drv->zmq_context);
+ zmqdrv_fprintf("terminated zmq context\r\n");
+
+ if (rc < 0) {
zmqdrv_error_code(drv, zmq_errno());
return;
}
@@ -576,17 +556,12 @@ zmqdrv_socket(zmq_drv_t *drv, ErlIOVec *ev)
return;
}
- // If the socket app_thread's signaler is not registered
- // with driver's poller, perform that registration now.
- void* app_thr;
- int sig_fd;
- zmq_getsockopt(s, ZMQ_APP_THREAD, &app_thr, sizeof(app_thr));
- zmq_getsockopt(s, ZMQ_WAITFD, &sig_fd, sizeof(sig_fd));
+ int sig_fd;
+ size_t sig_size = sizeof(sig_fd);
+ zmq_getsockopt(s, ZMQ_FD, &sig_fd, &sig_size);
if (sig_fd < 0) {
- std::stringstream str;
- str << "Invalid signaler (app_thread=" << app_thr << ')';
- zmqdrv_error(drv, str.str().c_str());
+ zmqdrv_error(drv, "Invalid signaler");
return;
}
@@ -660,7 +635,6 @@ zmqdrv_setsockopt(zmq_drv_t *drv, ErlIOVec *ev)
switch (option) {
case ZMQ_HWM: assert(optvallen == 8); break;
- case ZMQ_LWM: assert(optvallen == 8); break;
case ZMQ_SWAP: assert(optvallen == 8); break;
case ZMQ_AFFINITY: assert(optvallen == 8); break;
case ZMQ_IDENTITY: assert(optvallen < 256); break;
@@ -671,7 +645,6 @@ zmqdrv_setsockopt(zmq_drv_t *drv, ErlIOVec *ev)
case ZMQ_MCAST_LOOP: assert(optvallen == 8); break;
case ZMQ_SNDBUF: assert(optvallen == 8); break;
case ZMQ_RCVBUF: assert(optvallen == 8); break;
- //case ZMQ_RCVMORE: assert(optvallen == 8); break;
case ZMQ_ACTIVE: assert(optvallen == 1); break;
}
@@ -785,7 +758,8 @@ zmqdrv_send(zmq_drv_t *drv, ErlIOVec *ev)
#ifdef ZMQDRV_DEBUG
uint32_t events;
- zmq_getsockopt(si->socket, ZMQ_EVENTS | ZMQ_POLLIN | ZMQ_POLLOUT, &events, sizeof(events));
+ size_t events_size = sizeof(events);
+ zmq_getsockopt(si->socket, ZMQ_EVENTS, &events, &events_size);
zmqdrv_fprintf("sending %p [idx=%d] %lu bytes (events=%d)\r\n", si->socket, idx, size, events);
#endif
@@ -834,8 +808,6 @@ zmqdrv_recv(zmq_drv_t *drv, ErlIOVec *ev)
return;
}
- si->process_commands();
-
if (si->active_mode) {
zmqdrv_error_code(drv, EINVAL);
return;
@@ -849,7 +821,8 @@ zmqdrv_recv(zmq_drv_t *drv, ErlIOVec *ev)
}
uint32_t events;
- zmq_getsockopt(si->socket, ZMQ_EVENTS | ZMQ_POLLIN, &events, sizeof(events));
+ size_t events_size = sizeof(events);
+ zmq_getsockopt(si->socket, ZMQ_EVENTS, &events, &events_size);
if (events == 0)
si->in_caller = driver_caller(drv->port);
View
@@ -81,12 +81,6 @@ struct zmq_sock_info {
if (next) next->prev = prev;
}
- void process_commands() {
- zmq_app_thread_t app_thread;
- zmq_getsockopt(socket, ZMQ_APP_THREAD, &app_thread, sizeof(app_thread));
- zmq_process(app_thread);
- }
-
static void* operator new (size_t sz) { return driver_alloc(sz); }
static void operator delete (void* p) { driver_free(p); }
};
View
@@ -5,7 +5,7 @@
-define('DRIVER_NAME', 'zmq_drv').
%% ZMQ socket types.
--define('ZMQ_P2P', 0).
+-define('ZMQ_PAIR', 0).
-define('ZMQ_PUB', 1).
-define('ZMQ_SUB', 2).
-define('ZMQ_REQ', 3).
@@ -17,7 +17,6 @@
%% ZMQ socket options.
-define('ZMQ_HWM', 1).
--define('ZMQ_LWM', 2).
-define('ZMQ_SWAP', 3).
-define('ZMQ_AFFINITY', 4).
-define('ZMQ_IDENTITY', 5).
@@ -29,11 +28,9 @@
-define('ZMQ_SNDBUF', 11).
-define('ZMQ_RCVBUF', 12).
-define('ZMQ_RCVMORE', 13).
+-define('ZMQ_FD', 14).
-define('ZMQ_ACTIVE', 255). % This is driver's socket option rather than 0MQ's
-%% ZMQ init options.
--define('ZMQ_POLL', 1).
-
%% ZMQ send/recv options.
-define('ZMQ_NOBLOCK', 1).
-define('ZMQ_SNDMORE', 2).
View
@@ -48,7 +48,7 @@
%% gen_server callbacks.
-export([init/1,
handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3]).
-include("zmq.hrl").
@@ -79,7 +79,7 @@ socket(Type) when is_atom(Type) ->
%%--------------------------------------------------------------------
%% @doc Create a 0MQ socket.
%% @spec (Type, Options) -> {ok, Socket::zmq_socket()} | {error, Reason}
-%% Type = p2p | pub | sub | req | rep |
+%% Type = pair | pub | sub | req | rep |
%% xreq | xrep | upstream | downstream
%% Options = [Option]
%% Option = {active, boolean()}
@@ -213,10 +213,10 @@ init([IoThreads]) ->
SearchDir = filename:join(filename:dirname(DirName), "priv"),
?log("init, lib path: ~s", [SearchDir]),
try erl_ddll:load(SearchDir, ?DRIVER_NAME) of
- ok ->
+ ok ->
Port = open_port({spawn_driver, ?DRIVER_NAME}, [binary]),
init_context(Port, IoThreads),
- {ok, #state{port=Port}};
+ {ok, #state{port=Port}};
{error, Reason} ->
throw(erl_ddll:format_error(Reason))
catch _:Error ->
@@ -358,7 +358,6 @@ format_error(enobufs) -> "No buffer space available";
format_error(enetdown) -> "Network is down";
format_error(eaddrinuse) -> "Address in use";
format_error(eaddrnotavail) -> "Address not available";
-format_error(emthread) -> "Number of preallocated application threads exceeded";
format_error(efsm) -> "Operation cannot be accomplished in current state";
format_error(enocompatproto) -> "The protocol is not compatible with the socket type";
format_error(E) when is_atom(E) -> inet:format_error(E);
@@ -370,20 +369,16 @@ format_error(E) when is_tuple(E)-> io_lib:format("~p", [E]).
%%%===================================================================
init_context(Port, IoThreads) ->
- % For now the driver only 1 app_thread and multiplexing using poll()
- AppThreads = 1,
- IntFlag = ?ZMQ_POLL,
- ?log("~p, app threads:~B io threads:~B",
- [init, AppThreads, IoThreads]),
- Message = <<(?ZMQ_INIT):8, AppThreads:32, IoThreads:32, IntFlag:32>>,
+ ?log("~p, io threads:~B", [init, IoThreads]),
+ Message = <<(?ZMQ_INIT):8, IoThreads:32>>,
case driver(Port, Message) of
ok -> ok;
{error, Error} -> throw(format_error(Error))
end.
encode_msg_socket(Type) ->
case Type of
- p2p -> <<(?ZMQ_SOCKET):8, (?ZMQ_P2P):8>>;
+ pair -> <<(?ZMQ_SOCKET):8, (?ZMQ_PAIR):8>>;
pub -> <<(?ZMQ_SOCKET):8, (?ZMQ_PUB):8>>;
sub -> <<(?ZMQ_SOCKET):8, (?ZMQ_SUB):8>>;
req -> <<(?ZMQ_SOCKET):8, (?ZMQ_REQ):8>>;
@@ -403,7 +398,6 @@ encode_sock_opts(Socket, Options) when length(Options) =< 255 ->
V = check_sockopt({O, Value}),
case O of
hwm -> <<?ZMQ_HWM, 8, V:64/native>>;
- lwm -> <<?ZMQ_LWM, 8, V:64/native>>;
swap -> <<?ZMQ_SWAP, 8, V:64/native>>;
affinity -> <<?ZMQ_AFFINITY, 8, V:64/native>>;
identity -> <<?ZMQ_IDENTITY, (byte_size(V)):8, V/binary>>;
@@ -414,7 +408,6 @@ encode_sock_opts(Socket, Options) when length(Options) =< 255 ->
mcast_loop -> <<?ZMQ_MCAST_LOOP, 8, V:64/native>>;
sndbuf -> <<?ZMQ_SNDBUF, 8, V:64/native>>;
rcvbuf -> <<?ZMQ_RCVBUF, 8, V:64/native>>;
- rcvmore -> <<?ZMQ_RCVMORE, 8, V:64/native>>;
% Driver's socket options
active -> <<?ZMQ_ACTIVE, 1, V>>;
_ -> throw({unknown_sock_option, O})
@@ -423,7 +416,6 @@ encode_sock_opts(Socket, Options) when length(Options) =< 255 ->
<<(?ZMQ_SETSOCKOPT):8, Socket:32, (length(Opts)):8, (list_to_binary(Opts))/binary>>.
check_sockopt({hwm, V}) when is_integer(V) -> V;
-check_sockopt({lwm, V}) when is_integer(V) -> V;
check_sockopt({swap, V}) when is_integer(V) -> V;
check_sockopt({affinity, V}) when is_integer(V) -> V;
check_sockopt({identity, V}) when is_list(V), length(V) =< 255 -> list_to_binary(V);
@@ -440,8 +432,6 @@ check_sockopt({mcast_loop, true}) -> 1;
check_sockopt({mcast_loop,false}) -> 0;
check_sockopt({sndbuf, V}) when is_integer(V) -> V;
check_sockopt({rcvbuf, V}) when is_integer(V) -> V;
-check_sockopt({rcvmore, true}) -> 1;
-check_sockopt({rcvmore, false}) -> 0;
check_sockopt({active, true}) -> 1;
check_sockopt({active, false}) -> 0;
check_sockopt(Option) -> throw({unknown_option, Option}).
@@ -463,7 +453,7 @@ driver(Port, Message) ->
?log("port command ~p", [Message]),
port_command(Port, Message),
receive
- Data ->
- Data
+ Data ->
+ Data
end.
Oops, something went wrong.

0 comments on commit bfc69ab

Please sign in to comment.