Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Non-blocking driver API. Added documentation

  • Loading branch information...
commit fa086a6ad24a7bf6fc24846d59f545ba7a5738cd 1 parent 706d572
Serge Aleynikov saleyn authored
3  .gitignore
View
@@ -0,0 +1,3 @@
+zmq.tgz
+doc
+priv
29 LICENSE
View
@@ -0,0 +1,29 @@
+Copyright (c) 2010 Dhammika Pathirana <dhammika@gmail.com>
+Copywight (c) 2010 Serge Aleynikov <saleyn@gmail.com>
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+ * Redistributions of source code must retain the above copyright notice,
+ this list of conditions and the following disclaimer.
+
+ * Redistributions in binary form must reproduce the above copyright notice,
+ this list of conditions and the following disclaimer in the documentation
+ and/or other materials provided with the distribution.
+
+ * Neither the name of Dhammika Pathirana or Serge Aleynikov nor the names
+ of its contributors may be used to endorse or promote products derived
+ from this software without specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 Makefile
View
@@ -1,7 +1,24 @@
-all:
- cd priv; ${MAKE}
- cd src; ${MAKE}
+TARBALL=zmq.tgz
+TARBALL_EXCLUDE=/tmp/exclude.xxx
+
+all clean:
+ @for d in c_src src; do \
+ ${MAKE} --directory=$$d $@; \
+ done
+
+docs:
+ @${MAKE} --directory=src $@
+
+tar:
+ @echo Creating $(TARBALL)
+ @DIR=$${PWD##*/} && pushd .. > /dev/null && \
+ echo -e "*.o\n*.d\n.git\n*.tgz\n.*.*\n~\n.~*#\ntags\n" > $(TARBALL_EXCLUDE) && \
+ for f in priv; do \
+ find $$DIR/$$f -type f -print >> $(TARBALL_EXCLUDE) ; \
+ done && \
+ tar zcf $(TARBALL) $$DIR --exclude-from $(TARBALL_EXCLUDE) && \
+ mv $(TARBALL) $$DIR && \
+ popd > /dev/null && \
+ rm -f $(TARBALL_EXCLUDE) && \
+ ls -l $(TARBALL)
-clean:
- cd priv; ${MAKE} clean
- cd src; ${MAKE} clean
22 README
View
@@ -1,12 +1,26 @@
Erlang bindings for ZeroMQ (http://www.zeromq.org).
+Generate documentation by running "make docs".
+
+See doc/index.html for full documentation.
+
Examples
PUB/SUB zmq_pubserver/zmq_subclient
REQ/REP zmq_repserver/zmq_reqclient
-License/Copyright
-The author disclaims copyright.
-If you find this code useful you can buy me a beer in return.
+ 1> zmq_pubserver:run().
+ 2> zmq_subclient:run().
+
+ You can run a server and any number of clients
+ in the same Erlang shell or on different nodes.
+
+License
+ BSD License
+
+Copyright
+ Copyright (c) 2010 Dhammika Pathirana <dhammika@gmail.com>
+ Copywight (c) 2010 Serge Aleynikov <saleyn@gmail.com>
Contacts
-Report bugs to <dhammika@gmail.com>
+ Report bugs to <dhammika@gmail.com> and <saleyn@gmail.com>
+
29 c_src/Makefile
View
@@ -0,0 +1,29 @@
+ZMQ_DIR=/opt/zeromq-2.0.6
+
+ERL ?= erl
+ERL_INTERFACE_DIR=$(shell \
+ $(ERL) -eval 'io:format("~s\n", [code:lib_dir(erl_interface)]).' \
+ -noshell -s erlang halt)
+ERL_INSTALL_DIR=$(dir $(dir $(dir $(dir $(ERL_INTERFACE_DIR)))))
+
+ifndef $(ERL_INSTALL_DIR)
+ $(error Erlang installation directory not found)
+endif
+
+LDFLAGS=-shared -fPIC ${ERL_INTERFACE_DIR}/lib/libei.a -L${ZMQ_DIR}/lib -lzmq -lpthread -luuid
+CFLAGS=-Wall -ggdb -O0 -I${ZMQ_DIR}/include \
+ -I$(ERL_INTERFACE_DIR)/include \
+ -I$(ERL_INSTALL_DIR)/usr/include
+CC=g++
+
+ifdef debug
+CFLAGS += -DZMQDRV_DEBUG
+endif
+
+all: ../priv/zmq_drv.so
+
+../priv/zmq_drv.so: zmq_drv.cpp
+ $(CC) -o $@ $< ${LDFLAGS} ${CFLAGS}
+
+clean:
+ rm -rf zmq_drv.o ../priv/zmq_drv.so
865 c_src/zmq_drv.cpp
View
@@ -0,0 +1,865 @@
+/*
+ * ------------------------------------------------------------------
+ * Erlang bindings for ZeroMQ.
+ * ------------------------------------------------------------------
+ * <dhammika@gmail.com> wrote this code, copyright disclaimed.
+ * <saleyn@gmail.com> bug fixes and many enhancements to the driver
+ * to support non-blocking I/O.
+ * ------------------------------------------------------------------
+ */
+
+#include <stdio.h>
+#include <string.h>
+#include <zmq.h>
+#include <ctype.h>
+#include <sstream>
+#include <assert.h>
+#include "zmq_drv.h"
+
+#ifdef ZMQDRV_DEBUG
+#define zmqdrv_fprintf(...) fprintf(stderr, __VA_ARGS__)
+#else
+#define zmqdrv_fprintf(...)
+#endif
+
+#define INIT_ATOM(NAME) am_ ## NAME = (ErlDrvTermData)driver_mk_atom((char*)#NAME)
+
+/* Callbacks */
+static ErlDrvEntry zmq_driver_entry = {
+ zmqdrv_driver_init, /* init */
+ zmqdrv_start, /* startup (defined below) */
+ zmqdrv_stop, /* shutdown (defined below) */
+ NULL, /* output */
+ zmqdrv_ready_input, /* ready_input */
+ NULL, /* ready_output */
+ (char*)"zmq_drv", /* driver name */
+ NULL, /* finish */
+ NULL, /* handle */
+ NULL, /* control */
+ NULL, /* timeout */
+ zmqdrv_outputv, /* outputv, binary output */
+ NULL, /* ready_async */
+ NULL, /* flush */
+ NULL, /* call */
+ NULL, /* event */
+ ERL_DRV_EXTENDED_MARKER, /* ERL_DRV_EXTENDED_MARKER */
+ ERL_DRV_EXTENDED_MAJOR_VERSION, /* ERL_DRV_EXTENDED_MAJOR_VERSION */
+ ERL_DRV_EXTENDED_MAJOR_VERSION, /* ERL_DRV_EXTENDED_MINOR_VERSION */
+ ERL_DRV_FLAG_USE_PORT_LOCKING, /* ERL_DRV_FLAGs */
+ NULL, /* handle2 (reserved */
+ zmqdrv_process_exit, /* process_exit */
+ NULL /* stop_select */
+};
+
+/* Driver internal, C hook to driver API. */
+extern "C" DRIVER_INIT(zmq_drv)
+{
+ return &zmq_driver_entry;
+}
+
+zmq_drv_t::~zmq_drv_t()
+{
+ for (zmq_pid_sockets_map_t::iterator it = zmq_pid_sockets.begin();
+ it != zmq_pid_sockets.end(); ++it)
+ driver_demonitor_process(port, &it->second.monitor);
+
+ for (zmq_fd_sockets_map_t::iterator it = zmq_fd_sockets.begin();
+ it != zmq_fd_sockets.end(); ++it)
+ driver_select(port, (ErlDrvEvent)it->first, ERL_DRV_READ, 0);
+
+ for (zmq_sock_info *it=zmq_sock_infos, *next=(it ? it->next : NULL); it; it = next) {
+ next = it->next;
+ delete (&*it);
+ }
+ zmq_sockets.clear();
+ zmq_idxs.clear();
+ zmq_pid_sockets.clear();
+ zmq_fd_sockets.clear();
+
+ if (zmq_context)
+ zmq_term(zmq_context);
+}
+
+void zmq_drv_t::add_socket(zmq_sock_info* s)
+{
+ // Insert the new socket info to the head of the list
+ if (zmq_sock_infos) zmq_sock_infos->prev = s;
+ s->next = zmq_sock_infos;
+ zmq_sock_infos = s;
+
+ // Update map: idx -> socket
+ zmq_sockets[s->idx] = s;
+ // Update map: socket -> idx
+ zmq_idxs[s->socket] = s;
+ {
+ // Update map: pid -> sockets
+ zmq_pid_sockets_map_t::iterator it = zmq_pid_sockets.find(s->owner);
+ if (it != zmq_pid_sockets.end())
+ it->second.sockets.insert(s);
+ else {
+ monitor_sockets_t ms;
+ driver_monitor_process(port, s->owner, &ms.monitor);
+ ms.sockets.insert(s);
+ zmq_pid_sockets[s->owner] = ms;
+ }
+ }
+ {
+ // Update map: fd -> sockets
+ zmq_fd_sockets_map_t::iterator it = zmq_fd_sockets.find(s->fd);
+ if (it != zmq_fd_sockets.end())
+ it->second.insert(s);
+ else {
+ zmq_sock_set_t set;
+ set.insert(s);
+ zmq_fd_sockets[s->fd] = set;
+ driver_select(port, (ErlDrvEvent)s->fd, ERL_DRV_READ, 1);
+ zmqdrv_fprintf("registered sig_fd(%d) with VM\r\n", s->fd);
+ }
+ }
+}
+
+int zmq_drv_t::del_socket(uint32_t idx)
+{
+ zmq_sock_info* s;
+ int ret = -1;
+
+ zmq_idx_socket_map_t::iterator it = zmq_sockets.find(idx);
+ if (it == zmq_sockets.end()) {
+ zmqdrv_fprintf("warning: socket info not found for idx %d\r\n", idx);
+ return ret;
+ }
+
+ s = it->second;
+ s->unlink();
+ if (s == zmq_sock_infos)
+ zmq_sock_infos = s->next;
+
+ zmq_sockets.erase(idx);
+ zmq_idxs.erase(s->socket);
+
+ {
+ // Remove the socket from a list of sockets owned by pid.
+ // If this was the last socket, demonitor pid.
+ zmq_pid_sockets_map_t::iterator it = zmq_pid_sockets.find(s->owner);
+ if (it != zmq_pid_sockets.end()) {
+ it->second.sockets.erase(s);
+ if (it->second.sockets.empty()) {
+ driver_demonitor_process(port, &it->second.monitor);
+ zmq_pid_sockets.erase(it);
+ }
+ }
+ }
+ {
+ zmq_fd_sockets_map_t::iterator it = zmq_fd_sockets.find(s->fd);
+ if (it != zmq_fd_sockets.end()) {
+ it->second.erase(s);
+ if (it->second.empty()) {
+ zmq_fd_sockets.erase(it->first);
+ driver_select(port, (ErlDrvEvent)it->first, ERL_DRV_READ, 0);
+ zmqdrv_fprintf("unregistered sig_fd(%d) with VM\r\n", it->first);
+ }
+ }
+ }
+
+ delete s;
+ return 0;
+}
+
+uint32_t zmq_drv_t::get_socket_idx(zmq_socket_t sock) const
+{
+ zmq_socket_idx_map_t::const_iterator it = zmq_idxs.find(sock);
+ return it == zmq_idxs.end() ? 0 : it->second->idx;
+}
+
+zmq_sock_info* zmq_drv_t::get_socket_info(uint32_t idx)
+{
+ zmq_idx_socket_map_t::const_iterator it = zmq_sockets.find(idx);
+ return it == zmq_sockets.end() ? NULL : it->second;
+}
+
+zmq_socket_t zmq_drv_t::get_zmq_socket(uint32_t idx) const
+{
+ zmq_idx_socket_map_t::const_iterator it = zmq_sockets.find(idx);
+ return it == zmq_sockets.end() ? NULL : it->second->socket;
+}
+
+static ErlDrvTermData error_atom(int err)
+{
+ char errstr[128];
+ char* s;
+ char* t;
+
+ switch (err) {
+ case ENOTSUP: strcpy(errstr, "enotsup"); break;
+ case EPROTONOSUPPORT: strcpy(errstr, "eprotonosupport"); break;
+ case ENOBUFS: strcpy(errstr, "enobufs"); break;
+ case ENETDOWN: strcpy(errstr, "enetdown"); break;
+ case EADDRINUSE: strcpy(errstr, "eaddrinuse"); break;
+ case EADDRNOTAVAIL: strcpy(errstr, "eaddrnotavail"); break;
+ case ECONNREFUSED: strcpy(errstr, "econnrefused"); break;
+ case EINPROGRESS: strcpy(errstr, "einprogress"); break;
+ case EMTHREAD: strcpy(errstr, "emthread"); break;
+ case EFSM: strcpy(errstr, "efsm"); break;
+ case ENOCOMPATPROTO: strcpy(errstr, "enocompatproto"); break;
+ default:
+ for (s = erl_errno_id(err), t = errstr; *s; s++, t++)
+ *t = tolower(*s);
+ *t = '\0';
+ }
+ return driver_mk_atom(errstr);
+}
+
+static void
+zmq_free_binary(void* /*data*/, void* hint)
+{
+ ErlDrvBinary* bin = (ErlDrvBinary*)hint;
+ driver_free_binary(bin);
+}
+
+static void
+zmqdrv_socket_error(zmq_drv_t *drv, ErlDrvTermData pid, uint32_t idx, int err) {
+ // Return {zmq, Socket::integer(), {error, Reason::atom()}}
+ ErlDrvTermData spec[] =
+ {ERL_DRV_ATOM, am_zmq,
+ ERL_DRV_UINT, idx,
+ ERL_DRV_ATOM, am_error,
+ ERL_DRV_ATOM, error_atom(err),
+ ERL_DRV_TUPLE, 2,
+ ERL_DRV_TUPLE, 3};
+ driver_send_term(drv->port, pid, spec, sizeof(spec)/sizeof(spec[0]));
+}
+
+static void
+zmqdrv_error(zmq_drv_t *drv, const char *errstr)
+{
+ ErlDrvTermData spec[] =
+ {ERL_DRV_ATOM, am_error,
+ ERL_DRV_STRING, (ErlDrvTermData)errstr, strlen(errstr),
+ ERL_DRV_TUPLE, 2};
+ driver_send_term(drv->port, driver_caller(drv->port), spec, sizeof(spec)/sizeof(spec[0]));
+}
+
+static void
+zmqdrv_error_code(zmq_drv_t *drv, int err)
+{
+ ErlDrvTermData spec[] =
+ {ERL_DRV_ATOM, am_error,
+ ERL_DRV_ATOM, error_atom(err),
+ ERL_DRV_TUPLE, 2};
+ driver_send_term(drv->port, driver_caller(drv->port), spec, sizeof(spec)/sizeof(spec[0]));
+}
+
+static void
+zmqdrv_ok(zmq_drv_t *drv)
+{
+ ErlDrvTermData spec[] = {ERL_DRV_ATOM, am_ok};
+ driver_send_term(drv->port, driver_caller(drv->port), spec, sizeof(spec)/sizeof(spec[0]));
+}
+
+static void
+zmqdrv_ok(zmq_drv_t *drv, ErlDrvTermData pid)
+{
+ ErlDrvTermData spec[] = {ERL_DRV_ATOM, am_ok};
+ driver_send_term(drv->port, pid, spec, sizeof(spec)/sizeof(spec[0]));
+}
+
+static void
+zmqdrv_binary_ok(zmq_drv_t *drv, ErlDrvTermData pid, void *data, size_t size)
+{
+ /* Copy payload. */
+ ErlDrvTermData spec[] =
+ {ERL_DRV_ATOM, am_ok,
+ ERL_DRV_BUF2BINARY, (ErlDrvTermData)data, size,
+ ERL_DRV_TUPLE, 2};
+
+ driver_send_term(drv->port, pid, spec, sizeof(spec)/sizeof(spec[0]));
+}
+
+static void
+zmqdrv_binary_ok(zmq_drv_t *drv, void *data, size_t size) {
+ zmqdrv_binary_ok(drv, driver_caller(drv->port), data, size);
+}
+
+//-------------------------------------------------------------------
+// Driver callbacks
+//-------------------------------------------------------------------
+
+int zmqdrv_driver_init(void)
+{
+ INIT_ATOM(ok);
+ INIT_ATOM(error);
+ INIT_ATOM(eagain);
+ INIT_ATOM(zmq);
+ INIT_ATOM(msg);
+ return 0;
+}
+
+/* Driver Start, called on port open. */
+static ErlDrvData
+zmqdrv_start(ErlDrvPort port, char* cmd)
+{
+ zmqdrv_fprintf("driver started by pid %ld\r\n", driver_connected(port));
+ return reinterpret_cast<ErlDrvData>(new zmq_drv_t(port));
+}
+
+/* Driver Stop, called on port close. */
+static void
+zmqdrv_stop(ErlDrvData handle)
+{
+ delete reinterpret_cast<zmq_drv_t*>(handle);
+}
+
+static void
+zmqdrv_ready_input(ErlDrvData handle, ErlDrvEvent event)
+{
+ zmq_drv_t *drv = (zmq_drv_t *)handle;
+
+ // Get 0MQ sockets managed by application thread's signaler
+ // identified by "event" fd.
+ zmq_fd_sockets_map_t::iterator it = drv->zmq_fd_sockets.find((long)event);
+
+ zmqdrv_fprintf("input ready on [idx=%ld]\r\n", (long)event);
+
+ assert(it != drv->zmq_fd_sockets.end());
+
+ zmq_sock_set_t::iterator si = it->second.begin();
+
+ assert(si != it->second.end());
+
+ // FIXME: Currently 0MQ design assumes one thread owning
+ // many 0MQ sockets - so we get the app_thread from the
+ // first socket in the list of sockets associated with app_thread's
+ // signaling fd. If 0MQ's implementation merges the app_thread
+ // concept with 0MQ socket object, this implementation would
+ // have to call zmq_process in the loop for every 0MQ socket.
+ zmq_app_thread_t app_thread;
+ zmq_getsockopt((*si)->socket, ZMQ_APP_THREAD, &app_thread, sizeof(app_thread));
+
+ if (app_thread == NULL) {
+ zmqdrv_fprintf("warning: no app_thread for socket %p [sig_fd=%ld]\r\n",
+ (*si)->socket, (long)event);
+ // If this happens, there's something severely wrong with the socket
+ // structure - bail out by crashing the driver.
+ driver_failure_atom(drv->port, (char*)"no_application_thread_found");
+ return;
+ }
+
+ zmq_process(app_thread);
+
+ for (; si != it->second.end(); ++si) {
+ zmq_socket_t s = (*si)->socket;
+ uint32_t idx = (*si)->idx;
+ ErlDrvTermData owner = (*si)->owner;
+ int rc = 0;
+ uint32_t events;
+
+ zmq_getsockopt(s, ZMQ_EVENTS | ZMQ_POLLIN, &events, sizeof(events));
+
+ while (((*si)->active_mode || (*si)->in_caller) && (events & ZMQ_POLLIN)) {
+ msg_t msg;
+
+ do {
+ rc = zmq_recv(s, &msg, ZMQ_NOBLOCK);
+ } while (rc == -1 && zmq_errno() == EINTR);
+
+ ErlDrvTermData pid = (*si)->active_mode ? owner : (*si)->in_caller;
+
+ if (rc == -1) {
+ if (zmq_errno() != EAGAIN) {
+ ErlDrvTermData spec[] =
+ {ERL_DRV_ATOM, am_zmq,
+ ERL_DRV_UINT, idx,
+ ERL_DRV_ATOM, error_atom(zmq_errno()),
+ ERL_DRV_TUPLE, 2,
+ ERL_DRV_TUPLE, 3};
+ driver_send_term(drv->port, owner, spec, sizeof(spec)/sizeof(spec[0]));
+ (*si)->in_caller = 0;
+ }
+ break;
+ }
+
+ if ((*si)->active_mode) {
+ // Send message {zmq, Socket, binary()} to the owner pid
+ ErlDrvTermData spec[] =
+ {ERL_DRV_ATOM, am_zmq,
+ ERL_DRV_UINT, idx,
+ ERL_DRV_BUF2BINARY, (ErlDrvTermData)zmq_msg_data(&msg), zmq_msg_size(&msg),
+ ERL_DRV_TUPLE, 3};
+ driver_send_term(drv->port, owner, spec, sizeof(spec)/sizeof(spec[0]));
+ } else {
+ // Return result {ok, binary()} to the waiting caller's pid
+ ErlDrvTermData spec[] =
+ {ERL_DRV_ATOM, am_ok,
+ ERL_DRV_BUF2BINARY, (ErlDrvTermData)zmq_msg_data(&msg), zmq_msg_size(&msg),
+ ERL_DRV_TUPLE, 2};
+ driver_send_term(drv->port, pid, spec, sizeof(spec)/sizeof(spec[0]));
+ (*si)->in_caller = 0;
+ }
+
+ // FIXME: add error handling
+ zmqdrv_fprintf("received %ld byte message relayed to pid %ld\r\n", zmq_msg_size(&msg), pid);
+ zmq_getsockopt(s, ZMQ_EVENTS | ZMQ_POLLIN, &events, sizeof(events));
+ }
+
+ zmq_getsockopt(s, ZMQ_EVENTS | ZMQ_POLLIN, &events, sizeof(events));
+
+ if ((*si)->out_caller != 0 && (events & ZMQ_POLLOUT)) {
+ // There was a pending unwritten message on this socket.
+ // Try to write it. If the write succeeds/fails clear the ZMQ_POLLOUT
+ // flag and notify the waiting caller of completion of operation.
+ do {
+ rc = zmq_send(s, &(*si)->out_msg, ZMQ_NOBLOCK);
+ } while (rc == -1 && zmq_errno() == EINTR);
+
+ zmqdrv_fprintf("resending message %p (size=%ld) on socket %p (ret=%d)\r\n",
+ zmq_msg_data(&(*si)->out_msg), zmq_msg_size(&(*si)->out_msg), s, rc);
+
+ if (rc == 0) {
+ zmq_msg_close(&(*si)->out_msg);
+ // Unblock the waiting caller's pid by returning result
+ zmqdrv_ok(drv, (*si)->out_caller);
+ (*si)->out_caller = 0;
+ } else if (zmq_errno() != EAGAIN) {
+ // Unblock the waiting caller's pid by returning result
+ zmq_msg_close(&(*si)->out_msg);
+ zmqdrv_socket_error(drv, (*si)->out_caller, idx, zmq_errno());
+ (*si)->out_caller = 0;
+ }
+ }
+
+ zmqdrv_fprintf("--> socket %p events=%d\r\n", s, events);
+ }
+}
+
+// Called when an Erlang process owning sockets died.
+// Perform cleanup of orphan sockets owned by pid.
+static void
+zmqdrv_process_exit(ErlDrvData handle, ErlDrvMonitor* monitor)
+{
+ zmq_drv_t* drv = (zmq_drv_t *)handle;
+ ErlDrvTermData pid = driver_get_monitored_process(drv->port, monitor);
+
+ zmqdrv_fprintf("detected death of %lu process\r\n", pid);
+
+ driver_demonitor_process(drv->port, monitor);
+
+ // Walk through the list of sockets and close the ones
+ // owned by pid.
+ zmq_pid_sockets_map_t::iterator it=drv->zmq_pid_sockets.find(pid);
+
+ if (it != drv->zmq_pid_sockets.end()) {
+ zmqdrv_fprintf("pid %lu has %lu sockets to be closed\r\n", pid, it->second.sockets.size());
+ for(zmq_sock_set_t::iterator sit = it->second.sockets.begin();
+ sit != it->second.sockets.end(); ++sit)
+ drv->del_socket((*sit)->idx);
+ }
+}
+
+/* Erlang command, called on binary input from VM. */
+static void
+zmqdrv_outputv(ErlDrvData handle, ErlIOVec *ev)
+{
+ zmq_drv_t* drv = (zmq_drv_t *)handle;
+ ErlDrvBinary* data = ev->binv[1];
+ unsigned char cmd = data->orig_bytes[0]; // First byte is the command
+
+ zmqdrv_fprintf("driver got command %d on thread %p\r\n", (int)cmd, erl_drv_thread_self());
+
+ switch (cmd) {
+ case ZMQ_INIT :
+ zmqdrv_init(drv, ev);
+ break;
+ case ZMQ_TERM :
+ zmqdrv_term(drv, ev);
+ break;
+ case ZMQ_SOCKET :
+ zmqdrv_socket(drv, ev);
+ break;
+ case ZMQ_CLOSE :
+ zmqdrv_close(drv, ev);
+ break;
+ case ZMQ_SETSOCKOPT :
+ zmqdrv_setsockopt(drv, ev);
+ break;
+ case ZMQ_GETSOCKOPT :
+ zmqdrv_getsockopt(drv, ev);
+ break;
+ case ZMQ_BIND :
+ zmqdrv_bind(drv, ev);
+ break;
+ case ZMQ_CONNECT :
+ zmqdrv_connect(drv, ev);
+ break;
+ case ZMQ_SEND :
+ zmqdrv_send(drv, ev);
+ break;
+ case ZMQ_RECV :
+ zmqdrv_recv(drv, ev);
+ break;
+ default :
+ zmqdrv_error(drv, "Invalid driver command");
+ }
+}
+
+static void
+zmqdrv_init(zmq_drv_t *drv, ErlIOVec *ev)
+{
+ /*
+ * FIXME
+ * Use ei_decode_* to decode input from erlang VM.
+ * This stuff is not documented anywhere, for now
+ * binary ErlIOVec is decoded by poking in iov struct.
+ *
+ * Serge: Dhammika, ei_decode can only be used to decode
+ * external binary format in the "output" callback function.
+ * It's not suitable for using inside "outputv" body that
+ * operates on I/O vectors unless you use term_to_binary/1
+ * call to explicitely convert a term to external binary format.
+ */
+
+ uint32_t app_threads;
+ uint32_t io_threads;
+ uint32_t flags;
+
+ ErlDrvBinary* input = ev->binv[1];
+ char* bytes = input->orig_bytes;
+ app_threads = ntohl(*(uint32_t *)(bytes + 1));
+ io_threads = ntohl(*(uint32_t *)(bytes + 5));
+ flags = ntohl(*(uint32_t *)(bytes + 9));
+
+ zmqdrv_fprintf("appthreads = %u, iothreads = %u\r\n", app_threads, io_threads);
+
+ if (drv->zmq_context) {
+ zmqdrv_error_code(drv, EBUSY);
+ return;
+ }
+
+ drv->zmq_context = (void *)zmq_init(app_threads, io_threads, flags);
+
+ if (!drv->zmq_context) {
+ zmqdrv_error_code(drv, zmq_errno());
+ return;
+ }
+
+ zmqdrv_ok(drv);
+}
+
+static void
+zmqdrv_term(zmq_drv_t *drv, ErlIOVec *ev)
+{
+ if (!drv->zmq_context) {
+ zmqdrv_error_code(drv, ENODEV);
+ return;
+ }
+
+ if (zmq_term(drv->zmq_context) < 0) {
+ zmqdrv_error_code(drv, zmq_errno());
+ return;
+ }
+
+ zmqdrv_ok(drv);
+ drv->zmq_context = NULL;
+}
+
+static void
+zmqdrv_socket(zmq_drv_t *drv, ErlIOVec *ev)
+{
+ ErlDrvBinary* bin = ev->binv[1];
+ char* bytes = bin->orig_bytes;
+ int type = *(bytes + 1);
+
+ void* s = zmq_socket(drv->zmq_context, type);
+ if (!s) {
+ zmqdrv_error_code(drv, zmq_errno());
+ return;
+ }
+
+ // If the socket app_thread's signaler is not registered
+ // with driver's poller, perform that registration now.
+ void* app_thr;
+ int sig_fd;
+ zmq_getsockopt(s, ZMQ_APP_THREAD, &app_thr, sizeof(app_thr));
+ zmq_getsockopt(s, ZMQ_WAITFD, &sig_fd, sizeof(sig_fd));
+
+ if (sig_fd < 0) {
+ std::stringstream str;
+ str << "Invalid signaler (app_thread=" << app_thr << ')';
+ zmqdrv_error(drv, str.str().c_str());
+ return;
+ }
+
+ // Register a new socket handle in order to avoid
+ // passing actual address of socket to Erlang. This
+ // way it's more safe and also portable between 32 and
+ // 64 bit OS's.
+ uint32_t n = ++drv->zmq_socket_count;
+
+ zmq_sock_info* zsi = new zmq_sock_info(s, n, driver_caller(drv->port), sig_fd);
+ if (!zsi) {
+ driver_failure_posix(drv->port, ENOMEM);
+ return;
+ }
+
+ drv->add_socket(zsi);
+
+ zmqdrv_fprintf("socket %p [idx=%d] owner=%ld\r\n", s, n, zsi->owner);
+
+ ErlDrvTermData spec[] = {ERL_DRV_ATOM, am_ok,
+ ERL_DRV_UINT, n,
+ ERL_DRV_TUPLE, 2};
+ driver_send_term(drv->port, zsi->owner, spec, sizeof(spec)/sizeof(spec[0]));
+}
+
+static void
+zmqdrv_close(zmq_drv_t *drv, ErlIOVec *ev)
+{
+ ErlDrvBinary* bin = ev->binv[1];
+ char* bytes = bin->orig_bytes;
+ uint32_t idx = ntohl(*(uint32_t*)(bytes+1));
+
+ if (idx > drv->zmq_socket_count) {
+ zmqdrv_error_code(drv, ENODEV);
+ return;
+ }
+
+ int ret = drv->del_socket(idx);
+
+ zmqdrv_fprintf("close [idx=%d] -> %d\r\n", idx, ret);
+
+ if (ret < 0) {
+ zmqdrv_error_code(drv, zmq_errno());
+ return;
+ }
+
+ zmqdrv_ok(drv);
+}
+
+static void
+zmqdrv_setsockopt(zmq_drv_t *drv, ErlIOVec *ev)
+{
+ ErlDrvBinary* bin = ev->binv[1];
+ char* bytes = bin->orig_bytes;
+ uint32_t idx = ntohl(*(uint32_t*)(bytes+1));
+ zmq_sock_info* si = drv->get_socket_info(idx);
+ uint8_t n = *(uint8_t*)(bytes+sizeof(idx)+1);
+ char* p = bytes + 1 + sizeof(idx) + 1;
+
+ if (idx > drv->zmq_socket_count || !si) {
+ zmqdrv_error_code(drv, ENODEV);
+ return;
+ }
+
+ zmqdrv_fprintf("setsockopt %p (setting %d options)\r\n", si->socket, (int)n);
+
+ for (uint8_t j=0; j < n; ++j) {
+ unsigned char option = *p++;
+ uint64_t optvallen = *p++;
+ void* optval = p;
+
+ switch (option) {
+ case ZMQ_HWM: assert(optvallen == 8); break;
+ case ZMQ_LWM: assert(optvallen == 8); break;
+ case ZMQ_SWAP: assert(optvallen == 8); break;
+ case ZMQ_AFFINITY: assert(optvallen == 8); break;
+ case ZMQ_IDENTITY: assert(optvallen < 256); break;
+ case ZMQ_SUBSCRIBE: assert(optvallen < 256); break;
+ case ZMQ_UNSUBSCRIBE: assert(optvallen < 256); break;
+ case ZMQ_RATE: assert(optvallen == 8); break;
+ case ZMQ_RECOVERY_IVL: assert(optvallen == 8); break;
+ case ZMQ_MCAST_LOOP: assert(optvallen == 8); break;
+ case ZMQ_SNDBUF: assert(optvallen == 8); break;
+ case ZMQ_RCVBUF: assert(optvallen == 8); break;
+ //case ZMQ_RCVMORE: assert(optvallen == 8); break;
+ case ZMQ_ACTIVE: assert(optvallen == 1); break;
+ }
+
+ zmqdrv_fprintf("setsockopt %p (%d)\r\n", si->socket, option);
+
+ if (option == ZMQ_ACTIVE)
+ si->active_mode = *(char*)optval;
+ else if (zmq_setsockopt(si->socket, option, optval, optvallen) < 0) {
+ zmqdrv_error_code(drv, zmq_errno());
+ return;
+ }
+
+ p += optvallen;
+ }
+
+ zmqdrv_ok(drv);
+}
+
+static void
+zmqdrv_getsockopt(zmq_drv_t *drv, ErlIOVec *ev)
+{
+ zmqdrv_error(drv, "Not implemented");
+}
+
+static void
+zmqdrv_bind(zmq_drv_t *drv, ErlIOVec *ev)
+{
+ ErlDrvBinary* bin = ev->binv[1];
+ char* bytes = bin->orig_bytes;
+ uint16_t size = bin->orig_size - 5;
+ uint32_t idx = ntohl(*(uint32_t*)(bytes+1));
+ void* s = drv->get_zmq_socket(idx);
+ char addr[512];
+
+ if (size > sizeof(addr) - 1) {
+ zmqdrv_error_code(drv, E2BIG);
+ return;
+ }
+
+ memcpy(addr, bytes + 5, size);
+ addr[size] = '\0';
+
+ if (idx > drv->zmq_socket_count || !s) {
+ zmqdrv_error_code(drv, ENODEV);
+ return;
+ } else if (addr[0] == '\0') {
+ zmqdrv_error_code(drv, EINVAL);
+ return;
+ }
+
+ if (zmq_bind(s, addr) < 0) {
+ zmqdrv_error_code(drv, zmq_errno());
+ return;
+ }
+
+ zmqdrv_ok(drv);
+}
+
+static void
+zmqdrv_connect(zmq_drv_t *drv, ErlIOVec *ev)
+{
+ ErlDrvBinary* bin = ev->binv[1];
+ char* bytes = bin->orig_bytes;
+ uint32_t idx = ntohl(*(uint32_t*)(bytes+1));
+ void* s = drv->get_zmq_socket(idx);
+ uint16_t size = bin->orig_size - 5;
+ char addr[512];
+
+ if (idx > drv->zmq_socket_count || !s) {
+ zmqdrv_error_code(drv, ENODEV);
+ return;
+ }
+
+ if (size > sizeof(addr) - 1) {
+ zmqdrv_error_code(drv, E2BIG);
+ return;
+ }
+
+ memcpy(addr, bytes + 5, size);
+ addr[size] = '\0';
+
+ zmqdrv_fprintf("connect %s\r\n", addr);
+
+ if (!addr[0]) {
+ zmqdrv_error_code(drv, EINVAL);
+ return;
+ }
+
+ if (zmq_connect(s, addr) < 0) {
+ zmqdrv_error_code(drv, zmq_errno());
+ return;
+ }
+
+ zmqdrv_ok(drv);
+}
+
+static void
+zmqdrv_send(zmq_drv_t *drv, ErlIOVec *ev)
+{
+ ErlDrvBinary* bin = ev->binv[1];
+ char* bytes = bin->orig_bytes;
+ uint32_t idx = ntohl(*(uint32_t*)(bytes+1));
+ zmq_sock_info* si = drv->get_socket_info(idx);
+ void* data = (void *)(bytes + 5);
+ size_t size = bin->orig_size - 5;
+
+ if (idx > drv->zmq_socket_count || !si) {
+ zmqdrv_error_code(drv, ENODEV);
+ return;
+ }
+
+#ifdef ZMQDRV_DEBUG
+ uint32_t events;
+ zmq_getsockopt(si->socket, ZMQ_EVENTS | ZMQ_POLLIN | ZMQ_POLLOUT, &events, sizeof(events));
+ zmqdrv_fprintf("sending %p [idx=%d] %lu bytes (events=%d)\r\n", si->socket, idx, size, events);
+#endif
+
+ if (si->out_caller != 0) {
+ // There's still an unwritten message pending
+ zmqdrv_error_code(drv, EBUSY);
+ return;
+ }
+
+ // Increment the reference count on binary so that zmq can
+ // take ownership of it.
+ driver_binary_inc_refc(bin);
+
+ if (zmq_msg_init_data(&si->out_msg, data, size, &zmq_free_binary, bin)) {
+ zmqdrv_error_code(drv, zmq_errno());
+ driver_binary_dec_refc(bin);
+ return;
+ }
+
+ if (zmq_send(si->socket, &si->out_msg, ZMQ_NOBLOCK) == 0) {
+ zmqdrv_ok(drv);
+ zmqdrv_ready_input((ErlDrvData)drv, (ErlDrvEvent)si->fd);
+ } else {
+ int e = zmq_errno();
+ if (e == EAGAIN) {
+ // No msg returned to caller - make him wait until async
+ // send succeeds
+ si->out_caller = driver_caller(drv->port);
+ return;
+ }
+ zmqdrv_error_code(drv, e);
+ }
+ zmq_msg_close(&si->out_msg);
+}
+
+static void
+zmqdrv_recv(zmq_drv_t *drv, ErlIOVec *ev)
+{
+ ErlDrvBinary* bin = ev->binv[1];
+ char* bytes = bin->orig_bytes;
+ uint32_t idx = ntohl(*(uint32_t*)(bytes+1));
+ zmq_sock_info* si = drv->get_socket_info(idx);
+
+ if (idx > drv->zmq_socket_count || !si) {
+ zmqdrv_error_code(drv, ENODEV);
+ return;
+ }
+
+ si->process_commands();
+
+ if (si->active_mode) {
+ zmqdrv_error_code(drv, EINVAL);
+ return;
+ }
+
+ if (si->in_caller != 0) {
+ // Previous recv() call in passive mode didn't complete.
+ // The owner must be blocked waiting for result.
+ zmqdrv_error_code(drv, EBUSY);
+ return;
+ }
+
+ uint32_t events;
+ zmq_getsockopt(si->socket, ZMQ_EVENTS | ZMQ_POLLIN, &events, sizeof(events));
+
+ if (events == 0)
+ si->in_caller = driver_caller(drv->port);
+ else {
+ msg_t msg;
+
+ if (zmq_recv(si->socket, &msg, ZMQ_NOBLOCK) == 0)
+ zmqdrv_binary_ok(drv, zmq_msg_data(&msg), zmq_msg_size(&msg));
+ else if (zmq_errno() == EAGAIN) {
+ // No input available. Make the caller wait by not returning result
+ si->in_caller = driver_caller(drv->port);
+ } else
+ zmqdrv_error_code(drv, zmq_errno());
+ }
+}
168 c_src/zmq_drv.h
View
@@ -0,0 +1,168 @@
+/*
+ * ------------------------------------------------------------------
+ * Erlang bindings for ZeroMQ.
+ * ------------------------------------------------------------------
+ * <dhammika@gmail.com> wrote this code, copyright disclaimed.
+ * ------------------------------------------------------------------
+ */
+
+#include <erl_driver.h>
+#include <ei.h>
+#include <map>
+#include <set>
+
+/* Erlang driver commands. */
+enum driver_commands {
+ ZMQ_INIT = 1
+ , ZMQ_TERM
+ , ZMQ_SOCKET
+ , ZMQ_CLOSE
+ , ZMQ_SETSOCKOPT
+ , ZMQ_GETSOCKOPT
+ , ZMQ_BIND
+ , ZMQ_CONNECT
+ , ZMQ_SEND
+ , ZMQ_RECV
+ , ZMQ_FLUSH = 255
+};
+
+// Erlang driver socket options
+enum driver_sockopts {
+ ZMQ_ACTIVE = 255
+};
+
+// Provides auto-cleanup
+struct msg_t: public zmq_msg_t {
+ msg_t() { zmq_msg_init(this); }
+ ~msg_t() { zmq_msg_close(this); }
+};
+
+typedef void* zmq_socket_t;
+typedef void* zmq_app_thread_t;
+
+// Structure encapsulating information about a single 0MQ socket
+// managed by the driver. The driver maintains a doubly linked
+// list of these structures where each 0MQ socket is onwed by
+// one Erlang process ("owner" member).
+struct zmq_sock_info {
+ zmq_socket_t socket; // 0MQ socket handle
+ uint32_t idx; // index of socket passed to Erlang process
+ ErlDrvTermData owner; // Erlang owner pid of this socket
+ int fd; // Signaling fd for this socket
+ ErlDrvTermData in_caller; // Caller's pid of the last recv() command in passive mode
+ zmq_msg_t out_msg; // Pending message to be written to 0MQ socket
+ ErlDrvTermData out_caller; // Caller's pid of the last send() command
+ // if it resulted in EAGAIN error.
+ bool active_mode; // true - messages are delivered to owner
+ // false - owner must explicitely call recv()
+ zmq_sock_info* prev; // Pointer to prev socket info structure
+ zmq_sock_info* next; // Pointer to next socket info structure
+
+ zmq_sock_info(zmq_socket_t _s, uint32_t _idx, ErlDrvTermData _owner, int _sig_fd)
+ : socket(_s), idx(_idx), owner(_owner), fd(_sig_fd), in_caller(0)
+ , out_caller(0), active_mode(true), prev(NULL), next(NULL)
+ {
+ zmq_msg_init(&out_msg);
+ }
+
+ ~zmq_sock_info() {
+ if (out_caller != 0) zmq_msg_close(&out_msg);
+ if (socket) zmq_close(socket);
+ }
+
+ // Delete current element from the linked list
+ void unlink() {
+ if (prev) prev->next = next;
+ if (next) next->prev = prev;
+ }
+
+ void process_commands() {
+ zmq_app_thread_t app_thread;
+ zmq_getsockopt(socket, ZMQ_APP_THREAD, &app_thread, sizeof(app_thread));
+ zmq_process(app_thread);
+ }
+
+ static void* operator new (size_t sz) { return driver_alloc(sz); }
+ static void operator delete (void* p) { driver_free(p); }
+};
+
+typedef std::set<zmq_sock_info*> zmq_sock_set_t;
+
+// Maintains a set of sockets managed by a monitored Erlang pid
+struct monitor_sockets_t {
+ ErlDrvMonitor monitor;
+ zmq_sock_set_t sockets;
+};
+
+typedef std::map<uint32_t, zmq_sock_info*> zmq_idx_socket_map_t;
+typedef std::map<zmq_socket_t, zmq_sock_info*> zmq_socket_idx_map_t;
+typedef std::map<ErlDrvTermData, monitor_sockets_t> zmq_pid_sockets_map_t;
+typedef std::map<int, zmq_sock_set_t> zmq_fd_sockets_map_t;
+
+// Driver state structure
+struct zmq_drv_t {
+ ErlDrvPort port;
+ //ErlDrvTermData owner;
+ void* zmq_context;
+ // Linked list of all 0MQ socket structures managed by driver
+ zmq_sock_info* zmq_sock_infos;
+ // Maps <socket index> -> <0MQ socket structure>
+ zmq_idx_socket_map_t zmq_sockets;
+ // Maps <0MQ socket handle> -> <0MQ socket structure>
+ zmq_socket_idx_map_t zmq_idxs;
+ // Maps <Erlang pid> -> list of 0MQ socket structs owned by Erlang pid
+ zmq_pid_sockets_map_t zmq_pid_sockets;
+ // Maps <thread's signaling fd> -> list of 0MQ socket structs managed by signaler
+ zmq_fd_sockets_map_t zmq_fd_sockets;
+ // Current socket struct index
+ uint32_t zmq_socket_count;
+
+ zmq_drv_t(ErlDrvPort _port)
+ : port(_port), zmq_context(NULL)
+ , zmq_sock_infos(NULL), zmq_socket_count(0)
+ {
+ //owner = driver_connected(_port);
+ }
+
+ ~zmq_drv_t();
+
+ void add_socket(zmq_sock_info* sock);
+ int del_socket(uint32_t idx);
+ uint32_t get_socket_idx(zmq_socket_t sock) const;
+ zmq_sock_info* get_socket_info(uint32_t idx);
+ zmq_socket_t get_zmq_socket(uint32_t idx) const;
+
+ static void* operator new (size_t sz) { return driver_alloc(sz); }
+ static void operator delete (void* p) { driver_free(p); }
+
+};
+
+/* Forward declarations */
+static int zmqdrv_driver_init(void);
+static ErlDrvData zmqdrv_start(ErlDrvPort port, char* cmd);
+static void zmqdrv_stop(ErlDrvData handle);
+static void zmqdrv_ready_input(ErlDrvData handle, ErlDrvEvent event);
+static void zmqdrv_outputv(ErlDrvData handle, ErlIOVec *ev);
+static void zmqdrv_process_exit(ErlDrvData handle, ErlDrvMonitor* monitor);
+static void zmqdrv_socket_error(zmq_drv_t *drv, ErlDrvTermData pid, uint32_t idx, int err);
+static void zmqdrv_error(zmq_drv_t *zmq_drv, const char *errstr);
+static void zmqdrv_error_code(zmq_drv_t *zmq_drv, int err);
+static void zmqdrv_ok(zmq_drv_t *zmq_drv);
+static void zmqdrv_binary_ok(zmq_drv_t *zmq_drv, void *data, size_t size);
+static void zmqdrv_init(zmq_drv_t *zmq_drv, ErlIOVec *ev);
+static void zmqdrv_term(zmq_drv_t *zmq_drv, ErlIOVec *ev);
+static void zmqdrv_socket(zmq_drv_t *zmq_drv, ErlIOVec *ev);
+static void zmqdrv_close(zmq_drv_t *zmq_drv, ErlIOVec *ev);
+static void zmqdrv_bind(zmq_drv_t *zmq_drv, ErlIOVec *ev);
+static void zmqdrv_connect(zmq_drv_t *zmq_drv, ErlIOVec *ev);
+static void zmqdrv_send(zmq_drv_t *zmq_drv, ErlIOVec *ev);
+static void zmqdrv_recv(zmq_drv_t *zmq_drv, ErlIOVec *ev);
+static void zmqdrv_setsockopt(zmq_drv_t *zmq_drv, ErlIOVec *ev);
+static void zmqdrv_getsockopt(zmq_drv_t *zmq_drv, ErlIOVec *ev);
+
+static ErlDrvTermData am_ok;
+static ErlDrvTermData am_error;
+static ErlDrvTermData am_eagain;
+static ErlDrvTermData am_zmq;
+static ErlDrvTermData am_msg;
+
60 include/zmq.hrl
View
@@ -0,0 +1,60 @@
+%%%-------------------------------------------------------------------
+%%% File: $Id$
+%%%-------------------------------------------------------------------
+
+-define('DRIVER_NAME', 'zmq_drv').
+
+%% ZMQ socket types.
+-define('ZMQ_P2P', 0).
+-define('ZMQ_PUB', 1).
+-define('ZMQ_SUB', 2).
+-define('ZMQ_REQ', 3).
+-define('ZMQ_REP', 4).
+-define('ZMQ_XREQ', 5).
+-define('ZMQ_XREP', 6).
+-define('ZMQ_UPSTREAM', 7).
+-define('ZMQ_DOWNSTREAM', 8).
+
+%% ZMQ socket options.
+-define('ZMQ_HWM', 1).
+-define('ZMQ_LWM', 2).
+-define('ZMQ_SWAP', 3).
+-define('ZMQ_AFFINITY', 4).
+-define('ZMQ_IDENTITY', 5).
+-define('ZMQ_SUBSCRIBE', 6).
+-define('ZMQ_UNSUBSCRIBE', 7).
+-define('ZMQ_RATE', 8).
+-define('ZMQ_RECOVERY_IVL', 9).
+-define('ZMQ_MCAST_LOOP', 10).
+-define('ZMQ_SNDBUF', 11).
+-define('ZMQ_RCVBUF', 12).
+-define('ZMQ_RCVMORE', 13).
+-define('ZMQ_ACTIVE', 255). % This is driver's socket option rather than 0MQ's
+
+%% ZMQ init options.
+-define('ZMQ_POLL', 1).
+
+%% ZMQ send/recv options.
+-define('ZMQ_NOBLOCK', 1).
+-define('ZMQ_SNDMORE', 2).
+
+%% ZMQ port options.
+-define('ZMQ_INIT', 1).
+-define('ZMQ_TERM', 2).
+-define('ZMQ_SOCKET', 3).
+-define('ZMQ_CLOSE', 4).
+-define('ZMQ_SETSOCKOPT', 5).
+-define('ZMQ_GETSOCKOPT', 6).
+-define('ZMQ_BIND', 7).
+-define('ZMQ_CONNECT', 8).
+-define('ZMQ_SEND', 9).
+-define('ZMQ_RECV', 10).
+
+%% Debug log.
+-ifdef(debug).
+-define(log(Msg, MsgArgs),
+ io:format("[~p:~p] ~s\n", [?MODULE, ?LINE, io_lib:format(Msg, MsgArgs)])).
+-else.
+-define(log(Msg, MsgArgs), true).
+-endif.
+
110 patches/zeromq-2.0.6.poll.patch
View
@@ -0,0 +1,110 @@
+--- include/zmq.h.orig 2010-03-16 13:24:30.000000000 -0400
++++ include/zmq.h 2010-05-25 07:38:23.218330648 -0400
+@@ -180,11 +180,18 @@
+ #define ZMQ_SNDBUF 11
+ #define ZMQ_RCVBUF 12
+
++/* For zmq_getsockopt(3) */
++#define ZMQ_APP_THREAD 126
++#define ZMQ_WAITFD 127
++#define ZMQ_EVENTS 128 /* To be used as bitmask ZMQ_EVENTS | ZMQ_POLLIN | ZMQ_POLLOUT */
++
+ #define ZMQ_NOBLOCK 1
+ #define ZMQ_NOFLUSH 2
+
+ ZMQ_EXPORT void *zmq_socket (void *context, int type);
+ ZMQ_EXPORT int zmq_close (void *s);
++ZMQ_EXPORT int zmq_getsockopt(void *s, int option, void *optval,
++ size_t optvallen);
+ ZMQ_EXPORT int zmq_setsockopt (void *s, int option, const void *optval,
+ size_t optvallen);
+ ZMQ_EXPORT int zmq_bind (void *s, const char *addr);
+@@ -215,6 +222,19 @@
+
+ ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems, long timeout);
+
++// Process whatever outstanding work within application thread.
++// To be used in combination with zmq_getsockopt(3):
++// void* app_thread;
++// zmq_getsockopt(s, ZMQ_APP_THREAD, &app_thread, sizeof(void*));
++// int fd, events;
++// zmq_getsockopt(s, ZMQ_WAITFD, &fd, sizeof(int));
++// ... do poll(3) call on fd ...
++// zmq_process(app_thread);
++// zmq_getsockopt(s, ZMQ_EVENTS | ZMQ_POLLIN, &events, sizeof(int));
++// if (events & ZMQ_POLLIN)
++// ... 0MQ socket has data ...
++ZMQ_EXPORT int zmq_process (void *app_thread_);
++
+ ////////////////////////////////////////////////////////////////////////////////
+ // Experimental.
+ ////////////////////////////////////////////////////////////////////////////////
+--- src/zmq.cpp.orig 2010-03-16 13:24:30.000000000 -0400
++++ src/zmq.cpp 2010-05-25 10:36:24.784885719 -0400
+@@ -33,6 +33,7 @@
+ #include "config.hpp"
+ #include "err.hpp"
+ #include "fd.hpp"
++#include "app_thread.hpp"
+
+ #if defined ZMQ_HAVE_LINUX || defined ZMQ_HAVE_FREEBSD ||\
+ defined ZMQ_HAVE_OPENBSD || defined ZMQ_HAVE_SOLARIS ||\
+@@ -629,6 +630,58 @@
+ #endif
+ }
+
++int zmq_getsockopt(void *socket_, int option, void* optval, size_t optvallen)
++{
++ switch (option) {
++ case ZMQ_APP_THREAD: {
++ if (optvallen != sizeof(void*)) {
++ errno = EINVAL;
++ break;
++ }
++ *(void**)optval = ((zmq::socket_base_t*)socket_)->get_thread();
++ return 0;
++ }
++ case ZMQ_WAITFD: {
++ if (optvallen != sizeof(int)) {
++ errno = EINVAL;
++ break;
++ }
++ void* app_thread = ((zmq::socket_base_t*)socket_)->get_thread();
++ *(int*)optval = app_thread ?
++ ((zmq::app_thread_t*)app_thread)->get_signaler()->get_fd() : -1;
++ return 0;
++ }
++ case ZMQ_EVENTS | ZMQ_POLLIN:
++ case ZMQ_EVENTS | ZMQ_POLLOUT:
++ case ZMQ_EVENTS | ZMQ_POLLIN | ZMQ_POLLOUT: {
++ if (optvallen != sizeof(unsigned int)) {
++ errno = EINVAL;
++ break;
++ }
++ unsigned int events = (unsigned int) option ^ ZMQ_EVENTS;
++ *(unsigned int*)optval = 0;
++ if ((ZMQ_POLLOUT & events) && ((zmq::socket_base_t*)socket_)->has_out ())
++ *(unsigned int*)optval |= ZMQ_POLLOUT;
++ if ((ZMQ_POLLIN & events) && ((zmq::socket_base_t*)socket_)->has_in ())
++ *(unsigned int*)optval |= ZMQ_POLLIN;
++ return 0;
++ }
++ default:
++ errno = ENOPROTOOPT;
++ }
++ return -1;
++}
++
++int zmq_process(void *app_thread_)
++{
++ // Process 0MQ commands if needed.
++ if (app_thread_) {
++ ((zmq::app_thread_t*)app_thread_)->process_commands (false, false);
++ return 0;
++ }
++ return -1;
++}
++
+ int zmq_errno ()
+ {
+ return errno;
11 priv/Makefile
View
@@ -1,11 +0,0 @@
-ERL_INTERFACE_DIR=/usr/lib/erlang/lib/erl_interface-3.6.2
-ZMQ_DIR=/usr/local/zeromq
-
-LDFLAGS=-shared -fPIC ${ERL_INTERFACE_DIR}/lib/libei.a -lzmq -lpthread -luuid
-CFLAGS=-Wall -ggdb -O0 -I${ZMQ_DIR}/include -L${ZMQ_DIR}/lib
-
-all:
- gcc zmq_drv.c -o zmq_drv.so ${CFLAGS} ${LDFLAGS}
-
-clean:
- rm -rf zmq_drv.o zmq_drv.so
430 priv/zmq_drv.c
View
@@ -1,430 +0,0 @@
-/*
- * ------------------------------------------------------------------
- * Erlang bindings for ZeroMQ.
- * ------------------------------------------------------------------
- * <dhammika@gmail.com> wrote this code, copyright disclaimed.
- * ------------------------------------------------------------------
- */
-
-#include <stdio.h>
-#include <string.h>
-#include <zmq.h>
-#include "zmq_drv.h"
-
-#ifdef ZMQDRV_DEBUG
-#define zmqdrv_fprintf(...) fprintf(stderr, __VA_ARGS__)
-#else
-#define zmqdrv_fprintf(...)
-#endif
-
-/* Forward declarations */
-static ErlDrvData zmqdrv_start(ErlDrvPort port, char* cmd);
-static void zmqdrv_stop(ErlDrvData handle);
-static void zmqdrv_outputv(ErlDrvData handle, ErlIOVec *ev);
-static void zmqdrv_error(zmq_drv_t *zmq_drv, const char *errstr);
-static void zmqdrv_ok(zmq_drv_t *zmq_drv);
-static void zmqdrv_binary_ok(zmq_drv_t *zmq_drv, void *data, size_t size);
-static void zmqdrv_init(zmq_drv_t *zmq_drv, ErlIOVec *ev);
-static void zmqdrv_term(zmq_drv_t *zmq_drv, ErlIOVec *ev);
-static void zmqdrv_socket(zmq_drv_t *zmq_drv, ErlIOVec *ev);
-static void zmqdrv_close(zmq_drv_t *zmq_drv, ErlIOVec *ev);
-static void zmqdrv_bind(zmq_drv_t *zmq_drv, ErlIOVec *ev);
-static void zmqdrv_connect(zmq_drv_t *zmq_drv, ErlIOVec *ev);
-static void zmqdrv_send(zmq_drv_t *zmq_drv, ErlIOVec *ev);
-static void zmqdrv_recv(zmq_drv_t *zmq_drv, ErlIOVec *ev);
-static void zmqdrv_setsockopt(zmq_drv_t *zmq_drv, ErlIOVec *ev);
-static void zmqdrv_getsockopt(zmq_drv_t *zmq_drv, ErlIOVec *ev);
-
-/* Callbacks */
-static ErlDrvEntry basic_driver_entry = {
- NULL, /* init */
- zmqdrv_start, /* startup (defined below) */
- zmqdrv_stop, /* shutdown (defined below) */
- NULL, /* output */
- NULL, /* ready_input */
- NULL, /* ready_output */
- "zmq_drv", /* driver name */
- NULL, /* finish */
- NULL, /* handle */
- NULL, /* control */
- NULL, /* timeout */
- zmqdrv_outputv, /* outputv, binary output */
- NULL, /* ready_async */
- NULL, /* flush */
- NULL, /* call */
- NULL, /* event */
- ERL_DRV_EXTENDED_MARKER, /* ERL_DRV_EXTENDED_MARKER */
- ERL_DRV_EXTENDED_MAJOR_VERSION, /* ERL_DRV_EXTENDED_MAJOR_VERSION */
- ERL_DRV_EXTENDED_MAJOR_VERSION, /* ERL_DRV_EXTENDED_MINOR_VERSION */
- ERL_DRV_FLAG_USE_PORT_LOCKING /* ERL_DRV_FLAGs */
-};
-
-/* Driver internal, hook to driver API. */
-DRIVER_INIT(basic_driver)
-{
- return &basic_driver_entry;
-}
-
-/* Driver Start, called on port open. */
-static ErlDrvData
-zmqdrv_start(ErlDrvPort port, char* cmd)
-{
- zmq_drv_t* drv = (zmq_drv_t*)driver_alloc(sizeof(zmq_drv_t));
- drv->port = port;
- drv->zmq_context = NULL;
- return (ErlDrvData)drv;
-}
-
-/* Driver Stop, called on port close. */
-static void
-zmqdrv_stop(ErlDrvData handle)
-{
- zmq_drv_t *drv = (zmq_drv_t *)handle;
-
- if (drv->zmq_context)
- zmq_term(drv->zmq_context);
- driver_free(drv);
-}
-
-/* Erlang command, called on binary input from VM. */
-static void
-zmqdrv_outputv(ErlDrvData handle, ErlIOVec *ev)
-{
- uint32_t cmd;
-
- zmq_drv_t *drv = (zmq_drv_t *)handle;
-
- cmd = ntohl(*(uint32_t *)ev->iov[1].iov_base);
-
- switch (cmd) {
- case ZMQ_INIT :
- zmqdrv_init(drv, ev);
- break;
- case ZMQ_TERM :
- zmqdrv_term(drv, ev);
- break;
- case ZMQ_SOCKET :
- zmqdrv_socket(drv, ev);
- break;
- case ZMQ_CLOSE :
- zmqdrv_close(drv, ev);
- break;
- case ZMQ_SETSOCKOPT :
- zmqdrv_setsockopt(drv, ev);
- break;
- case ZMQ_GETSOCKOPT :
- zmqdrv_getsockopt(drv, ev);
- break;
- case ZMQ_BIND :
- zmqdrv_bind(drv, ev);
- break;
- case ZMQ_CONNECT :
- zmqdrv_connect(drv, ev);
- break;
- case ZMQ_SEND :
- zmqdrv_send(drv, ev);
- break;
- case ZMQ_RECV :
- zmqdrv_recv(drv, ev);
- break;
- default :
- zmqdrv_error(drv, "Invalid driver command");
- }
-}
-
-static void
-zmqdrv_error(zmq_drv_t *zmq_drv, const char *errstr)
-{
- ErlDrvTermData spec[] =
- {ERL_DRV_ATOM, driver_mk_atom((char *)"error"),
- ERL_DRV_ATOM, driver_mk_atom((char *)errstr), ERL_DRV_TUPLE, 2};
- driver_output_term(zmq_drv->port, spec, sizeof(spec)/sizeof(spec[0]));
-}
-
-static void
-zmqdrv_ok(zmq_drv_t *zmq_drv)
-{
- ErlDrvTermData spec[] =
- {ERL_DRV_ATOM, driver_mk_atom((char *)"ok")};
- driver_output_term(zmq_drv->port, spec, sizeof(spec)/sizeof(spec[0]));
-}
-
-static void
-zmqdrv_binary_ok(zmq_drv_t *zmq_drv, void *data, size_t size)
-{
- ErlDrvBinary *ev_data = driver_alloc_binary(size);
- /* Copy payload. */
- ev_data->orig_size = size;
- memcpy(ev_data->orig_bytes, data, size);
-
- ErlDrvTermData spec[] =
- {ERL_DRV_ATOM, driver_mk_atom((char *)"ok"),
- ERL_DRV_BINARY, (ErlDrvTermData)ev_data, size, 0,
- ERL_DRV_TUPLE, 2};
-
- driver_output_term(zmq_drv->port, spec, sizeof(spec)/sizeof(spec[0]));
- driver_free_binary(ev_data);
-}
-
-static void
-zmqdrv_init(zmq_drv_t *zmq_drv, ErlIOVec *ev)
-{
-#if 0
- /*
- * FIXME
- * Use ei_decode_* to decode input from erlang VM.
- * This stuff is not documented anywhere, for now
- * binary ErlIOVec is decoded by poking in iov struct.
- */
-
- ErlDrvBinary *ev_app_threads = ev->binv[2];
- ErlDrvBinary *ev_io_threads = ev->binv[3];
- ErlDrvBinary *ev_flags = ev->binv[4];
- int i = 0;
-
- if (ei_decode_long(ev_app_threads->orig_bytes, &i, &app_threads) < 0) {
- zmqdrv_error(zmq_drv, "Invalid command");
- return;
- }
-
- i = 0;
- if (ei_decode_long(ev_io_threads->orig_bytes, &i, &io_threads) < 0) {
- zmqdrv_error(zmq_drv, "Invalid command");
- return;
- }
-
- i = 0;
- if (ei_decode_long(ev_flags->orig_bytes, &i, &flags) < 0) {
- zmqdrv_error(zmq_drv, "Invalid command");
- return;
- }
-#endif
-
- uint32_t app_threads;
- uint32_t io_threads;
- uint32_t flags;
-
- app_threads = ntohl(*(uint32_t *)(ev->iov[1].iov_base + 4));
- io_threads = ntohl(*(uint32_t *)(ev->iov[1].iov_base + 8));
- flags = ntohl(*(uint32_t *)(ev->iov[1].iov_base + 12));
-
- zmqdrv_fprintf("appthreads = %u, iothreads = %u\n", app_threads, io_threads);
-
- zmq_drv->zmq_context = (void *)zmq_init(app_threads, io_threads, flags);
- if (!zmq_drv->zmq_context) {
- zmqdrv_error(zmq_drv, zmq_strerror(zmq_errno()));
- return;
- }
-
- zmqdrv_ok(zmq_drv);
-}
-
-static void
-zmqdrv_term(zmq_drv_t *zmq_drv, ErlIOVec *ev)
-{
- if (!zmq_drv->zmq_context) {
- zmqdrv_error(zmq_drv, "Uninitialized context");
- return;
- }
-
- if (zmq_term(zmq_drv->zmq_context) < 0) {
- zmqdrv_error(zmq_drv, zmq_strerror(zmq_errno()));
- return;
- }
-
- zmqdrv_ok(zmq_drv);
- zmq_drv->zmq_context = NULL;
-}
-
-static void
-zmqdrv_socket(zmq_drv_t *zmq_drv, ErlIOVec *ev)
-{
- uint32_t type;
- void *s;
-
- type = ntohl(*(uint32_t *)(ev->iov[1].iov_base + 4));
-
- s = zmq_socket(zmq_drv->zmq_context, type);
- if (!s) {
- zmqdrv_error(zmq_drv->zmq_context, zmq_strerror(zmq_errno()));
- return;
- }
-
- zmqdrv_fprintf("socket %p\n", s);
- zmqdrv_binary_ok(zmq_drv, (void *)&s, sizeof(s));
-}
-
-static void
-zmqdrv_close(zmq_drv_t *zmq_drv, ErlIOVec *ev)
-{
- void *s;
-
- memcpy(&s, (void *)(ev->iov[1].iov_base + 4), sizeof(s));
-
- zmqdrv_fprintf("close %p\n", s);
- if (zmq_close(s) < 0) {
- zmqdrv_error(zmq_drv->zmq_context, zmq_strerror(zmq_errno()));
- return;
- }
-
- zmqdrv_ok(zmq_drv);
-}
-
-static void
-zmqdrv_setsockopt(zmq_drv_t *zmq_drv, ErlIOVec *ev)
-{
- void *s;
- uint32_t option;
- void *optval;
- size_t optvallen;
-
- memcpy(&s, (void *)(ev->iov[1].iov_base + 4), sizeof(s));
- option = ntohl(*(uint32_t *)(ev->iov[1].iov_base + 4 + sizeof(s)));
- optval = (void *)(ev->iov[1].iov_base + 4 + sizeof(s) + 4);
- optvallen = ev->iov[1].iov_len - (4 + sizeof(s) + 4);
-
- if (!s) {
- zmqdrv_error(zmq_drv->zmq_context, "Invalid socket");
- return;
- }
-
- zmqdrv_fprintf("setsockopt %p\n", s);
- if (zmq_setsockopt(s, option, optval, optvallen) < 0) {
- zmqdrv_error(zmq_drv->zmq_context, zmq_strerror(zmq_errno()));
- return;
- }
-
- zmqdrv_ok(zmq_drv);
-}
-
-static void
-zmqdrv_getsockopt(zmq_drv_t *zmq_drv, ErlIOVec *ev)
-{
- zmqdrv_error(zmq_drv->zmq_context, "Not implemented yet, ha ha..");
-}
-
-static void
-zmqdrv_bind(zmq_drv_t *zmq_drv, ErlIOVec *ev)
-{
- void *s;
- char addr[512];
- uint16_t size;
-
- memcpy(&s, (void *)(ev->iov[1].iov_base + 4), sizeof(s));
- /*
- * FIXME
- * Address is prefixed with 131,107,x,x 4 byte header!
- * 131 - version, 107 - binary format, x,x string length.
- */
- size = ntohs(*(uint16_t *)(ev->iov[1].iov_base + 14));
- if (size > sizeof(addr) - 1) {
- zmqdrv_error(zmq_drv->zmq_context, "Invalid address");
- return;
- }
-
- memcpy(addr, ev->iov[1].iov_base + 16, size);
- addr[size] = '\0';
-
- if (!s || !addr[0]) {
- zmqdrv_error(zmq_drv->zmq_context, "Invalid argument");
- return;
- }
-
- if (zmq_bind(s, addr) < 0) {
- zmqdrv_error(zmq_drv->zmq_context, zmq_strerror(zmq_errno()));
- return;
- }
-
- zmqdrv_ok(zmq_drv);
-}
-
-static void
-zmqdrv_connect(zmq_drv_t *zmq_drv, ErlIOVec *ev)
-{
- void *s;
- char addr[512];
- uint16_t size;
-
- memcpy(&s, (void *)(ev->iov[1].iov_base + 4), sizeof(s));
- /*
- * FIXME
- * Address is prefixed with 131,107,x,x 4 byte header!
- * 131 - version, 107 - binary format, x,x string length.
- */
- size = ntohs(*(uint16_t *)(ev->iov[1].iov_base + 14));
- if (size > sizeof(addr) - 1) {
- zmqdrv_error(zmq_drv->zmq_context, "Invalid address");
- return;
- }
-
- memcpy(addr, ev->iov[1].iov_base + 16, size);
- addr[size] = '\0';
-
- if (!s || !addr[0]) {
- zmqdrv_error(zmq_drv->zmq_context, "Invalid argument");
- return;
- }
-
- if (zmq_connect(s, addr) < 0) {
- zmqdrv_error(zmq_drv->zmq_context, zmq_strerror(zmq_errno()));
- return;
- }
-
- zmqdrv_ok(zmq_drv);
-}
-
-static void
-zmqdrv_send(zmq_drv_t *zmq_drv, ErlIOVec *ev)
-{
- void *s;
- void *data;
- size_t size;
- void *buf;
- zmq_msg_t msg;
-
- memcpy(&s, (void *)(ev->iov[1].iov_base + 4), sizeof(s));
- data = (void *)(ev->iov[1].iov_base + 12);
- size = ev->iov[1].iov_len - 12;
-
- /* FIXME Is there a way to avoid this? */
- buf = malloc(size);
- if (!buf) {
- zmqdrv_error(zmq_drv->zmq_context, strerror(ENOMEM));
- return;
- }
- memcpy(buf, data, size);
-
- if (zmq_msg_init_data(&msg, buf, size, NULL, NULL)) {
- zmqdrv_error(zmq_drv->zmq_context, zmq_strerror(zmq_errno()));
- free(buf);
- return;
- }
-
- if (zmq_send(s, &msg, 0) < 0) {
- zmqdrv_error(zmq_drv->zmq_context, zmq_strerror(zmq_errno()));
- zmq_msg_close(&msg);
- return;
- }
-
- zmqdrv_ok(zmq_drv);
-}
-
-static void
-zmqdrv_recv(zmq_drv_t *zmq_drv, ErlIOVec *ev)
-{
- void *s;
- zmq_msg_t msg;
-
- memcpy(&s, (void *)(ev->iov[1].iov_base + 4), sizeof(s));
-
- zmq_msg_init(&msg);
-
- if (zmq_recv(s, &msg, 0) < 0) {
- zmqdrv_error(zmq_drv->zmq_context, zmq_strerror(zmq_errno()));
- zmq_msg_close(&msg);
- return;
- }
-
- zmqdrv_binary_ok(zmq_drv, zmq_msg_data(&msg), zmq_msg_size(&msg));
- zmq_msg_close(&msg);
-}
27 priv/zmq_drv.h
View
@@ -1,27 +0,0 @@
-/*
- * ------------------------------------------------------------------
- * Erlang bindings for ZeroMQ.
- * ------------------------------------------------------------------
- * <dhammika@gmail.com> wrote this code, copyright disclaimed.
- * ------------------------------------------------------------------
- */
-
-#include <erl_driver.h>
-#include <ei.h>
-
-/* Erlang driver commands. */
-#define ZMQ_INIT 1
-#define ZMQ_TERM 2
-#define ZMQ_SOCKET 3
-#define ZMQ_CLOSE 4
-#define ZMQ_SETSOCKOPT 5
-#define ZMQ_GETSOCKOPT 6
-#define ZMQ_BIND 7
-#define ZMQ_CONNECT 8
-#define ZMQ_SEND 9
-#define ZMQ_RECV 10
-
-typedef struct {
- ErlDrvPort port;
- void *zmq_context;
-} zmq_drv_t;
41 src/Makefile
View
@@ -1,9 +1,36 @@
-all:
- erlc -o ../ebin zmq.erl
- erlc -o ../ebin zmq_subclient.erl
- erlc -o ../ebin zmq_pubserver.erl
- erlc -o ../ebin zmq_reqclient.erl
- erlc -o ../ebin zmq_repserver.erl
+ERL_SOURCES=$(wildcard *.erl)
+EBIN_DIR=../ebin
+DOC_DIR=../doc
+ERL_FLAGS= +debug_info -I../include
+ERL_OBJECTS=$(ERL_SOURCES:%.erl=$(EBIN_DIR)/%.beam)
+#DOC_FILES=$(ERL_SOURCES:%.erl=$(DOC_DIR)/%.html)
+DOC_FILES=$(DOC_DIR)/zmq.html
+
+ifdef debug
+ERL_FLAGS += -Ddebug
+endif
+
+include ../vsn.mk
+
+all: $(ERL_OBJECTS) $(EBIN_DIR)/zmq.app
+
+$(EBIN_DIR)/%.beam: %.erl
+ erlc $(ERL_FLAGS) -o ../ebin $<
+
+$(EBIN_DIR)/zmq.app: zmq.app.src ../vsn.mk
+ sed 's/%VSN%/$(VSN)/g' $< > $@
clean:
- rm -r ../ebin/*.beam
+ rm -f $(ERL_OBJECTS)
+ rm -fr $(DOC_DIR)
+
+docs: $(DOC_DIR) $(DOC_FILES)
+
+$(DOC_DIR):
+ mkdir $@
+
+#$(DOC_DIR)/overview.html: overview.edoc
+# erl -noshell -run edoc_run file '"$<"' '[{dir, "$(DOC_DIR)"}]' -run init stop
+$(DOC_DIR)/zmq.html: zmq.erl overview.edoc
+ erl -noshell -run edoc_run application "'zmq'" '"."' '[{dir, "$(DOC_DIR)"}]' -run init stop
+
177 src/overview.edoc
View
@@ -0,0 +1,177 @@
+@title Erlang bindings for ZeroMQ messaging framework.
+
+@author Serge Aleynikov <saleyn at gmail dot com>
+@author Dhammika Pathirana <dhammika at gmail dot com>
+
+@copyright 2010 Serge Aleynikov and Dhammika Pathirana
+
+@version {@version}
+
+@doc
+
+== Building ==
+
+This implementation requires a feature of ZeroMQ that is
+not yet included in the 2.0.6 release, and therefore ZeroMQ needs
+to be patched by applying the `zeromq-2.0.6.poll.patch' patch.
+
+```
+ $ cp patches/zeromq-2.0.6.poll.patch /path/to/zeromq/source/
+ $ cd /path/to/zeromq/source/
+ $ patch -p0 < zeromq-2.0.6.poll.patch
+ $ make
+ $ make install
+'''
+
+After applying the patch modify the path in `c_src/Makefile'
+of the following variable to reflect location of ZeroMQ installation:
+
+```
+ZMQ_DIR=/opt/zeromq-2.0.6
+'''
+
+Make sure that Erlang is installed and `erl' is in the `PATH'.
+
+Run:
+
+```
+ $ make
+ $ make docs
+'''
+
+== Architecture ==
+
+A ZeroMQ context is created by calling zmq:start_link/0 or
+zmq:start_link/1 function optionally passing the number of I/O
+threads to be used by ZeroMQ framework. The number of
+application threads defaults to `1' and currently is not
+configurable.
+
+The ZeroMQ context is started and owned by the `zmq' process.
+On the contrary, ZeroMQ sockets are owned by individual Erlang
+processes that communicate directly with the driver bypassing `zmq'
+process. This way individual pids can issue blocking recv/1 or
+send/2 calls without synchronizing them through the driver owner's
+pid.
+
+The driver is aware of socket ownership by monitoring Erlang pids
+that created sockets. This means that a socket is automatically
+garbage collected when the socket owner Erlang pid dies.
+
+The semantics of socket creation function is slightly different
+from the corresponding `zmq_socket(3)' function in a way that
+it also accepts socket options:
+
+```
+ {ok, Socket} = zmq:socket(req, [{active, true}, {hwm, 1000}]).
+'''
+
+All socket types and option names map 1-to-1 to corresponding
+names of ZeroMQ socket types and options except for `ZMQ_' prefix
+being stripped.
+
+A new socket option is added called `active'. The option accepts
+a boolean argument and has the following meaning. If the option
+is `true', all incoming messages are automatically delivered by
+the driver to the owner's mailbox without explicit need to
+call zmq:recv/1.
+
+If the option is `false' the owner must explicitely call zmq:recv/1
+to receive a message from a ZeroMQ socket. Use `{active, true}'
+mode with causion since if the messages are arriving at a rate faster
+than the Erlang process can handle them the process's queue will
+grow and eventually run out of memory.
+
+zmq:send/2 and zmq:recv/1 calls may block the owner Erlang process
+but will never block the Erlang VM. The driver uses non-blocking
+calls to `zmq_send(2)' and `zmq_recv(1)' functions and will not
+deliver any result to the caller's pid until the message is
+successfully sent or received. Currently timeout option is not
+implemented on these two calls.
+
+Many Erlang pids owning many ZeroMQ sockets can be easily started
+in one Erlang VM. However, it's more typical to have a small number
+of ZeroMQ sockets in the system, since each socket can handle many
+transport connections.
+
+== Known Limitations ==
+
+<ul>
+<li>The driver attaches ZeroMQ file descriptors to Erlang VM's
+ event loop and this functionality has only been tested with
+ using was tested with kernel poll enabled (use "+K true"
+ startup option)</li>
+<li>The current architecture of ZeroMQ assumes that ZeroMQ sockets
+ are owned by the thread that created them and all operations
+ on these sockets should be done in the context of that thread.
+ This is not how Erlang driver currently works. All socket
+ calls are synchronized so that no two concurrent threads
+ would access the same ZeroMQ socket simultaneously, however
+ Erlang pid owning the ZeroMQ socket can be scheduled on
+ different OS kernel threads when Erlang is running with
+ SMP support enabled. We haven't tested this sufficiently to
+ say if this is or isn't an issue. There's an on-going
+ discussion (as of ZeroMQ version 2.0.7) to allow ZeroMQ
+ sockets to be migrated between threads.</li>
+</ul>
+
+== Usage ==
+
+=== Sample ZMQ_REQ client ===
+```
+ $ erl +K true -smp disable -pa /path/to/zmq/ebin
+
+ % Create ZeroMQ context
+ 1> zmq:start_link().
+ {ok,<0.34.0>}
+
+ % Create a ZeroMQ REQ socket and specify that messages
+ % are delivered to the current shell's mailbox.
+ 2> {ok, S} = zmq:socket(req, [{active, true}]).
+ {ok,{#Port<0.623>,1}}
+
+ % Connect to server
+ 3> zmq:connect(S, "tcp://127.0.0.1:5555").
+ ok
+
+ % Send a message to server
+ 4> zmq:send(S, <<"Test">>).
+ ok
+
+ % Receive a reply
+ 5> zmq:recv(S).
+ {error,einval}
+
+ % Note the error - in the active socket mode we cannot
+ % receive messages by calling zmq:recv/1. Instead
+ % use receive keyword to accomplish the task.
+
+ 6> f(M), receive M -> M end.
+ % The process blocks because there's no reply from server
+ % Once you start the server as shown in the following steps
+ % the receive call returns with the following message:
+ {zmq,1,<<"Reply">>}
+'''
+
+=== Sample ZMQ_REP server ===
+
+Start another shell either within the same Erlang VM
+by using ^G, or in a separate OS shell:
+
+```
+ $ erl +K true -smp disable -pa /path/to/zmq/ebin
+
+ 1> zmq:start_link().
+ {ok,<0.34.0>}
+ 2> {ok, S} = zmq:socket(rep, [{active, false}]).
+ {ok,{#Port<0.483>,1}}
+ 3> zmq:bind(S, "tcp://127.0.0.1:5555").
+ ok
+ 4> zmq:recv(S).
+ {ok,<<"Test">>}
+ 5> zmq:send(S, <<"Reply">>).
+ ok
+
+'''
+
+
11 src/zmq.app.src
View
@@ -0,0 +1,11 @@
+% This is an -*- erlang -*- file.
+
+{application, zmq,
+ [{description, "ZeroMQ"},
+ {vsn, "%VSN%"},
+ {modules, [zmq]},
+ {registered,[]},
+ {applications, [kernel,stdlib]},
+ {env, []}
+ ]
+}.
556 src/zmq.erl
View
@@ -1,112 +1,193 @@
%%%-------------------------------------------------------------------
-%%% @doc
-%%% Erlang bindings for ZeroMQ.
-%%% ------------------------------------------------------------------
-%%% <dhammika@gmail.com> wrote this code, copyright disclaimed.
-%%% ------------------------------------------------------------------
+%%% File: $Id$
+%%%-------------------------------------------------------------------
+%%% @doc Erlang bindings for ZeroMQ.
+%%%
+%%% @author Dhammika Pathirana <dhammika at gmail dot com>
+%%% @author Serge Aleynikov <saleyn at gmail dot com>.
+%%% @copyright 2010 Dhammika Pathirana and Serge Aleynikov
+%%% @end
+%%%-------------------------------------------------------------------
+%%% @type zmq_socket(). Opaque 0MQ socket type.
+%%% @type zmq_sockopt() = {hwm, integer()}
+%%% | {lwm, integer()}
+%%% | {swap, integer()}
+%%% | {affinity, integer()}
+%%% | {identity, string()}
+%%% | {subscribe, string()}
+%%% | {unsubscibe, string()}
+%%% | {rate, integer()}
+%%% | {recovery_ivl, integer()}
+%%% | {mcast_loop, boolean()}
+%%% | {sndbuf, integer()}
+%%% | {rcvbuf, integer()}
+%%% | {rcvmore, boolean()}
+%%% | {active, boolean()}.
+%%% 0MQ socket options. See 0MQ man pages for details.
+%%% One additional options `active' indicates to the driver
+%%% that incoming messages must be automatically delivered
+%%% to the process owner's mailbox instead of explicitely
+%%% requiring recv/1 call.
%%% @end
%%%-------------------------------------------------------------------
-module(zmq).
-author("dhammika@gmail.com").
+-author("saleyn@gmail.com").
+-id("$Id$").
-behaviour(gen_server).
%% ZMQ API
--export([start_link/0, init/3, term/0,
- socket/1, close/1, sockopt/2, bind/2, connect/2,
- send/2, recv/1]).
+-export([start_link/0, start_link/1,
+ socket/1, socket/2, close/1, setsockopt/2,
+ bind/2, connect/2, send/2, recv/1, format_error/1]).
+
+-export([port/0]).
%% gen_server callbacks.
-export([init/1,
handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--define('DRIVER_NAME', 'zmq_drv').
--record(state, {port}).
+-include("zmq.hrl").
-%% ZMQ socket types.
--define('ZMQ_P2P', 0).
--define('ZMQ_PUB', 1).
--define('ZMQ_SUB', 2).
--define('ZMQ_REQ', 3).
--define('ZMQ_REP', 4).
--define('ZMQ_XREQ', 5).
--define('ZMQ_XREP', 6).
--define('ZMQ_UPSTREAM', 7).
--define('ZMQ_DOWNSTREAM', 8).
-
-%% ZMQ socket options.
--define('ZMQ_HWM', 1).
--define('ZMQ_LWM', 2).
--define('ZMQ_SWAP', 3).
--define('ZMQ_AFFINITY', 4).
--define('ZMQ_IDENTITY', 5).
--define('ZMQ_SUBSCRIBE', 6).
--define('ZMQ_UNSUBSCRIBE', 7).
--define('ZMQ_RATE', 8).
--define('ZMQ_RECOVERY_IVL', 9).
--define('ZMQ_MCAST_LOOP', 10).
--define('ZMQ_SNDBUF', 11).
--define('ZMQ_RCVBUF', 12).
--define('ZMQ_RCVMORE', 13).
-
-%% ZMQ send/recv options.
--define('ZMQ_NOBLOCK', 1).
--define('ZMQ_SNDMORE', 2).
-
-%% ZMQ port options.
--define('ZMQ_INIT', 1).
--define('ZMQ_TERM', 2).
--define('ZMQ_SOCKET', 3).
--define('ZMQ_CLOSE', 4).
--define('ZMQ_SETSOCKOPT', 5).
--define('ZMQ_GETSOCKOPT', 6).
--define('ZMQ_BIND', 7).
--define('ZMQ_CONNECT', 8).
--define('ZMQ_SEND', 9).
--define('ZMQ_RECV', 10).
-
-%% Debug log.
-log(Msg, MsgArgs) ->
- io:format(string:concat(string:concat("[~p:~p] ", Msg), "~n"),
- [?MODULE, ?LINE | MsgArgs]).
+-record(state, {port}).
%%%===================================================================
%%% ZMQ API
%%%===================================================================
%%--------------------------------------------------------------------
-%% @doc
-%% Start the server.
-%%
-%% @spec start_link() ->
-%% {ok, Pid} |
-%% {error, Error} |
-%% ignore
+%% @equiv start_link(1)
%% @end
%%--------------------------------------------------------------------
start_link() ->
- gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
-init(AppThreads, IoThreads, Flags) ->
- gen_server:call(?MODULE, {init, AppThreads, IoThreads, Flags}).
-term() ->
- gen_server:call(?MODULE, {term}).
-socket(Type) ->
- gen_server:call(?MODULE, {socket, Type}).
-close(Socket) ->
+ start_link(1).
+
+%%--------------------------------------------------------------------
+%% @doc Start the server.
+%% @spec (IoThreads) -> {ok, Pid} | {error, Error} | ignore
+%% @end
+%%--------------------------------------------------------------------
+start_link(IoThreads) when is_integer(IoThreads) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [IoThreads], []).
+
+socket(Type) when is_atom(Type) ->
+ socket(Type, []).
+
+%%--------------------------------------------------------------------
+%% @doc Create a 0MQ socket.
+%% @spec (Type, Options) -> {ok, Socket::zmq_socket()} | {error, Reason}
+%% Type = p2p | pub | sub | req | rep |
+%% xreq | xrep | upstream | downstream
+%% Options = [Option]
+%% Option = {active, boolean()}
+%% | {zmq_sockopt(), Value}
+%% @end
+%%--------------------------------------------------------------------
+socket(Type, Options) when is_atom(Type), is_list(Options) ->
+% gen_server:call(?MODULE, {socket, Type, Options}).
+ % We are using direct call to the driver to create the socket,
+ % because we need the driver to know the socket owner's pid, so
+ % that it can deliver messages to its mailbox in the passive mode
+ try gen_server:call(?MODULE, port) of
+ Port when is_port(Port) ->
+ [check_sockopt({O, V}) || {O,V} <- Options],
+ Msg = encode_msg_socket(Type),
+ {ok, S} = driver(Port, Msg),
+ case driver(Port, encode_sock_opts(S, Options)) of
+ ok ->
+ {ok, {Port, S}};
+ {error, Why} ->
+ driver(Port, encode_close(S)),
+ throw(Why)
+ end
+ catch _:Error ->
+ {error, Error}
+ end.
+
+%%--------------------------------------------------------------------
+%% @doc Close a 0MQ socket.
+%% @spec (Socket::zmq_socket()) -> ok | {error, Reason}
+%% @end
+%%--------------------------------------------------------------------
+close({Port, Socket}) when is_integer(Socket) ->
+ Msg = encode_close(Socket),
+ driver(Port, Msg);
+close(Socket) when is_integer(Socket) ->
gen_server:call(?MODULE, {close, Socket}).
-sockopt(Command, {Socket, Option, Value}) ->
- gen_server:call(?MODULE, {sockopt, Command, {Socket, Option, Value}}).
-%sockopt(Command, {Socket, Option}) ->
-% gen_server:call(?MODULE, {sockopt, Command, {Socket, Option}}).
-bind(Socket, Address) ->
- gen_server:call(?MODULE, {bind, Socket, Address}).
-connect(Socket, Address) ->
- gen_server:call(?MODULE, {connect, Socket, Address}).
-send(Socket, Data) ->
- gen_server:call(?MODULE, {send, Socket, Data}).
-recv(Socket) ->
- gen_server:call(?MODULE, {recv, Socket}).
+
+%%--------------------------------------------------------------------
+%% @doc Set socket options.
+%% @spec (Socket::zmq_socket(), Options) -> ok | {error, Reason}
+%% Options = [{zmq_sockopt(), Value}]
+%% @end
+%%--------------------------------------------------------------------
+setsockopt(Socket, Opts) when is_integer(Socket), is_list(Opts) ->
+ gen_server:call(?MODULE, {setsockopt, Socket, Opts}).
+
+%%--------------------------------------------------------------------
+%% @doc Bind a 0MQ socket to address.
+%% @spec (Socket::zmq_socket(), Address) -> ok | {error, Reason}
+%% Address = string() | binary()
+%% @end
+%%--------------------------------------------------------------------
+bind(Socket, Address) when is_integer(Socket), is_list(Address) ->
+ bind(Socket, list_to_binary(Address));
+bind(Socket, Address) when is_integer(Socket), is_binary(Address) ->
+ gen_server:call(?MODULE, {bind, Socket, Address});
+bind({Port, S}, Address) when is_integer(S), is_list(Address) ->
+ bind({Port, S}, list_to_binary(Address));
+bind({Port, S}, Address) when is_integer(S), is_binary(Address) ->
+ Msg = encode_bind(S, Address),
+ driver(Port, Msg).
+
+%%--------------------------------------------------------------------
+%% @doc Connect a 0MQ socket to address.
+%% @spec (Socket::zmq_socket(), Address) -> ok | {error, Reason}
+%% Address = string() | binary()
+%% @end
+%%--------------------------------------------------------------------
+connect(Socket, Address) when is_integer(Socket), is_list(Address) ->
+ connect(Socket, list_to_binary(Address));
+connect(Socket, Address) when is_integer(Socket), is_binary(Address) ->
+ gen_server:call(?MODULE, {connect, Socket, Address});
+% Experimantal support of direct port communication
+connect({Port, S}, Address) when is_list(Address)->
+ connect({Port, S}, list_to_binary(Address));
+connect({Port, S}, Address) when is_binary(Address) ->
+ Msg = encode_connect(S, Address),
+ driver(Port, Msg).
+
+%%--------------------------------------------------------------------
+%% @doc Send a message to a given 0MQ socket.
+%% @spec (Socket::zmq_socket(), Msg::binary()) -> ok | {error, Reason}
+%% @end
+%%--------------------------------------------------------------------
+send(Socket, Data) when is_integer(Socket), is_binary(Data) ->
+ gen_server:call(?MODULE, {send, Socket, Data});
+% Experimantal support of direct port communication
+send({Port, S}, Data) ->
+ Msg = encode_msg_send(S, Data),
+ driver(Port, Msg).
+
+%%--------------------------------------------------------------------
+%% @doc Receive a message from a given 0MQ socket.
+%% @spec (Socket::zmq_socket()) -> {ok, binary()} | {error, Reason}
+%% @end
+%%--------------------------------------------------------------------
+recv(Socket) when is_integer(Socket) ->
+ gen_server:call(?MODULE, {recv, Socket});
+% Experimantal support of direct port communication
+recv({Port, S}) ->
+ Msg = encode_msg_recv(S),
+ driver(Port, Msg).
+
+%% Experimental functions for direct communications with port
+%% bypassing serialization through ?MODULE server.
+
+port() ->
+ gen_server:call(?MODULE, port).
%%%===================================================================
%%% gen_server callbacks
@@ -114,8 +195,7 @@ recv(Socket) ->
%%--------------------------------------------------------------------
%% @private
-%% @doc
-%% Handle start.
+%% @doc Handle start.
%%
%% @spec init(Args) ->
%% {ok, State} |
@@ -125,22 +205,26 @@ recv(Socket) ->
%% ignore
%% @end
%%--------------------------------------------------------------------
-init([]) ->
+init([IoThreads]) ->
process_flag(trap_exit, true),
- SearchDir = filename:join(
- [filename:dirname(code:which(?MODULE)), "..", "priv"]),
- log("init, lib path:~p", [SearchDir]),
- case erl_ddll:load(SearchDir, atom_to_list(?DRIVER_NAME)) of
- ok ->
- {ok, #state{port=open_port({spawn, ?DRIVER_NAME}, [binary])}};
- {error, Reason} ->
- {stop, {error, Reason}}
+ DirName = re:replace(filename:dirname(code:which(?MODULE)),
+ "/?[^/]+/\\.\\.", "", [{return,list}]),
+ SearchDir = filename:join(filename:dirname(DirName), "priv"),
+ ?log("init, lib path: ~s", [SearchDir]),
+ try erl_ddll:load(SearchDir, ?DRIVER_NAME) of
+ ok ->
+ Port = open_port({spawn_driver, ?DRIVER_NAME}, [binary]),
+ init_context(Port, IoThreads),
+ {ok, #state{port=Port}};
+ {error, Reason} ->
+ throw(erl_ddll:format_error(Reason))
+ catch _:Error ->
+ {stop, Error}
end.
%%--------------------------------------------------------------------
%% @private
-%% @doc
-%% Handle synchronous call.
+%% @doc Handle synchronous call.
%%
%% @spec handle_call(Request, From, State) ->
%% {reply, Reply, NewState} |
@@ -153,105 +237,67 @@ init([]) ->
%% {stop, Reason, NewState}
%% @end
%%-------------------------------------------------------------------
-handle_call({init, AppThreads, IoThreads, Flags}, _From, State) ->
- log("~p, app threads:~B io threads:~B",
- [init, AppThreads, IoThreads]),
- Message = <<(?ZMQ_INIT):32, AppThreads:32, IoThreads:32, Flags:32>>,
- Reply = driver(State#state.port, Message),
- {reply, Reply, State};
-handle_call({term}, _From, State) ->
- log("~p", [term]),
- Message = <<(?ZMQ_TERM):32>>,
- Reply = driver(State#state.port, Message),
- {reply, Reply, State};
-
-handle_call({socket, Type}, _From, State)
- when is_atom(Type) ->
- log("~p, type:~s", [socket, Type]),
- Message =
- case Type of
- zmq_p2p -> <<(?ZMQ_SOCKET):32, (?ZMQ_P2P):32>>;
- zmq_pub -> <<(?ZMQ_SOCKET):32, (?ZMQ_PUB):32>>;
- zmq_sub -> <<(?ZMQ_SOCKET):32, (?ZMQ_SUB):32>>;
- zmq_req -> <<(?ZMQ_SOCKET):32, (?ZMQ_REQ):32>>;
- zmq_rep -> <<(?ZMQ_SOCKET):32, (?ZMQ_REP):32>>;
- zmq_xreq -> <<(?ZMQ_SOCKET):32, (?ZMQ_XREQ):32>>;
- zmq_xrep -> <<(?ZMQ_SOCKET):32, (?ZMQ_XREP):32>>;
- zmq_upstream -> <<(?ZMQ_SOCKET):32, (?ZMQ_UPSTREAM):32>>;
- zmq_downstream -> <<(?ZMQ_SOCKET):32, (?ZMQ_DOWNSTREAM):32>>;
- other -> {error, "Unknown socket type"}
- end,
- Reply = driver(State#state.port, Message),
- {reply, Reply, State};
-
-handle_call({close, Socket}, _From, State)
- when is_binary(Socket) ->
- log("~p", [close]),
- Message = <<(?ZMQ_CLOSE):32, Socket/binary>>,
- Reply = driver(State#state.port, Message),
- {reply, Reply, State};
-
-% FIXME Doesn't support getsockopt yet.
-handle_call({sockopt, _Command, {Socket, Option, Value}}, _From, State)
- when is_binary(Socket) ->
- log("~p", [socketopt]),
- Message =
- case Option of
- zmq_hwm -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_HWM):32, Value/binary>>;
- zmq_lwn -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_LWM):32, Value/binary>>;
- zmq_swap -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_SWAP):32, Value/binary>>;
- zmq_affinity -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_AFFINITY):32, Value/binary>>;
- zmq_identity -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_IDENTITY):32, Value/binary>>;
- zmq_subscribe -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_SUBSCRIBE):32, Value/binary>>;
- zmq_unsubscibe -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_UNSUBSCRIBE):32, Value/binary>>;
- zmq_rate -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_RATE):32, Value/binary>>;
- zmq_racovery_ivl -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_RECOVERY_IVL):32, Value/binary>>;
- zmq_mcast_loop -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_MCAST_LOOP):32, Value/binary>>;
- zmq_rcvbuf -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_RCVBUF):32, Value/binary>>;
- zmq_rcvmore -> <<(?ZMQ_SETSOCKOPT):32, Socket/binary, (?ZMQ_RCVMORE):32, Value/binary>>;
- other -> {error, "Unknown socket type"}
+% No need to support context termination - context allocation
+% is handled on port creation, and the resource will be
+% automatically reclaimed upon death of the port driver.
+%handle_call({term}, _From, State) ->
+% ?log("~p", [term]),
+% Message = <<(?ZMQ_TERM):8>>,
+% Reply = driver(State#state.port, Message),
+% {reply, Reply, State};
+
+handle_call({socket, Type, Options}, _From, #state{port=Port} = State) ->
+ ?log("~p, type:~s options:~p", [socket, Type, Options]),
+ try
+ [check_sockopt({O, V}) || {O,V} <- Options],
+ Msg = encode_msg_socket(Type),
+ {ok, S} = driver(Port, Msg),
+ case driver(Port, encode_sock_opts(S, Options)) of
+ ok ->
+ ok;
+ {error, Why} ->
+ driver(Port, encode_close(S)),
+ throw(Why)
end,
- Reply = driver(State#state.port, Message),
- {reply, Reply, State};
-
-handle_call({bind, Socket, Address}, _From, State)
- when is_binary(Socket) ->
- log("~p addr:~s", [bind, binary_to_term(Address)]),
- Message = <<(?ZMQ_BIND):32, Socket/binary, Address/binary>>,
- Reply = driver(State#state.port, Message),
- {reply, Reply, State};
-
-handle_call({connect, Socket, Address}, _From, State)
- when is_binary(Socket) and is_binary(Address) ->
- log("~p addr:~s", [connect, binary_to_term(Address)]),
- Message = <<(?ZMQ_CONNECT):32, Socket/binary, Address/binary>>,
- Reply = driver(State#state.port, Message),
- {reply, Reply, State};
-
-handle_call({send, Socket, Data}, _From, State)
- when is_binary(Socket) and is_binary(Data) ->
- log("~p", [send]),
- Message = <<(?ZMQ_SEND):32, Socket/binary, Data/binary>>,
- Reply = driver(State#state.port, Message),
- {reply, Reply, State};
-
-handle_call({recv, Socket}, _From, State)
- when is_binary(Socket) ->
- log("~p", [recv]),
- Message = <<(?ZMQ_RECV):32, Socket/binary>>,
- Reply = driver(State#state.port, Message),
- {reply, Reply, State};
-
-handle_call(_Request, _From, State) ->
- log("~p", ['unknown request']),
- Reply = {error, unkown_call},
- {reply, Reply, State}.
+ {reply, {ok, S}, State}
+ catch _:Error ->
+ {reply, {error, Error}, State}
+ end;
+
+handle_call({close, Socket}, _From, State) ->
+ ?log("~p", [close]),
+ do_call(State, encode_close(Socket));
+
+handle_call({setsockopt, Socket, Options}, _From, State) ->
+ ?log("~p", [socketopt]),
+ do_call(State, encode_sock_opts(Socket, Options));
+
+handle_call({bind, Socket, Address}, _From, State) ->
+ ?log("~p addr:~s", [bind, binary_to_list(Address)]),
+ do_call(State, encode_bind(Socket, Address));
+
+handle_call({connect, Socket, Address}, _From, State) ->
+ ?log("~p addr:~s", [connect, binary_to_list(Address)]),
+ do_call(State, encode_connect(Socket, Address));
+
+handle_call({send, Socket, Data}, _From, State) ->
+ ?log("~p", [send]),
+ do_call(State, encode_msg_send(Socket, Data));
+
+handle_call({recv, Socket}, _From, State) ->
+ ?log("~p", [recv]),
+ do_call(State, encode_msg_recv(Socket));
+
+handle_call(port, _From, #state{port = Port} = State) ->
+ {reply, Port, State};
+
+handle_call(Request, _From, State) ->
+ {stop, {unknown_call, Request}, State}.
%%--------------------------------------------------------------------
%% @private
-%% @doc
-%% Handle asynchronous call.
+%% @doc Handle asynchronous call.
%%
%% @spec handle_cast(Msg, State) ->
%% {noreply, NewState} |
@@ -260,13 +306,12 @@ handle_call(_Request, _From, State) ->
%% {stop, Reason, NewState}
%% @end
%%--------------------------------------------------------------------
-handle_cast(_Msg, State) ->
- {noreply, State}.
+handle_cast(Msg, State) ->
+ {stop, {unknown_cast, Msg}, State}.
%%--------------------------------------------------------------------
%% @private
-%% @doc
-%% Handle timeout.
+%% @doc Handle message.
%%
%% @spec handle_info(Info, State) ->
%% {noreply, NewState} |
@@ -276,12 +321,12 @@ handle_cast(_Msg, State) ->
%% @end
%%--------------------------------------------------------------------
handle_info(_Info, State) ->
+ ?log("unhandled message: ~p\n", [_Info]),
{noreply, State}.
%%--------------------------------------------------------------------
%% @private
-%% @doc
-%% Handle termination/shutdown.
+%% @doc Handle termination/shutdown.
%%
%% @spec terminate(Reason, State) -> void()
%% @end
@@ -292,8 +337,7 @@ terminate(_Reason, State) ->
%%--------------------------------------------------------------------
%% @private
-%% @doc
-%% Handle code change.
+%% @doc Handle code change.
%%
%% @spec code_change(OldVsn, State, Extra) ->
%% {ok, NewState}
@@ -302,15 +346,123 @@ terminate(_Reason, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
+%%--------------------------------------------------------------------
+%% @doc Format error atom returned by the driver.
+%% @spec (Code::atom()) -> string()
+%% @end
+%%--------------------------------------------------------------------
+format_error(enotsup) -> "Not supported";
+format_error(eprotonosupport) -> "Protocol not supported";
+format_error(enobufs) -> "No buffer space available";
+format_error(enetdown) -> "Network is down";
+format_error(eaddrinuse) -> "Address in use";
+format_error(eaddrnotavail) -> "Address not available";
+format_error(emthread) -> "Number of preallocated application threads exceeded";
+format_error(efsm) -> "Operation cannot be accomplished in current state";
+format_error(enocompatproto) -> "The protocol is not compatible with the socket type";
+format_error(E) when is_atom(E) -> inet:format_error(E);
+format_error(E) when is_list(E) -> E;
+format_error(E) when is_tuple(E)-> io_lib:format("~p", [E]).
+
%%%===================================================================
-%%% zmq_drv port wrapper.
+%%% Internal functions