Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP

Loading…

3.x: merged gar1t's fork #46

Closed
wants to merge 33 commits into from

5 participants

@rakvat

merged gar1t's fork into 3.x and fixed tests

zguide erlang examples use erlzmq_device (e.g. in msgqueue.es) which was only available on gar1t's fork. I merged it in branch 3.x and adapted the tests. I request to pull my commits into 3.x to make life easier for erlang-zguide-readers.

gar1t and others added some commits
@gar1t gar1t Ignoring all generated html by default 1bfac14
@gar1t gar1t Queue device in user space (used in examples) cdf4408
@gar1t gar1t Utility module (used in examples) c0587bb
@gar1t gar1t Coerce rcvmore sockopt to boolean dc29b01
@gar1t gar1t Missing docs reference 1738937
@gar1t gar1t Merge remote-tracking branch 'zeromq/master' 0493c1d
@gar1t gar1t Using zeromq 2.1.10 912ce48
@ericbmerritt ericbmerritt Reformat README.md for reasonable line length 4b17cb0
@ericbmerritt ericbmerritt support pulling zmq3.1 in c_src makefile 08fe70c
@ericbmerritt ericbmerritt migrate from the 2.1 api to 3.1 api for zmq 7dcd68f
@ericbmerritt ericbmerritt cleanup tab/space issues that hide change in the diffs 830f9f2
@ericbmerritt ericbmerritt make the makefile a bit more resilent rexecution of test e9584a3
@ericbmerritt ericbmerritt switch to the more reliable git https protocol
Https seems to be more reliable on github and we dont need to push so
lets just use that.
982e85b
@ericbmerritt ericbmerritt remove reduntant edoc type information d90e94d
@ericbmerritt ericbmerritt add a bit of guidance with upgrading d062622
Yurii Rashkovskii Merge branch '3.1.0' of https://github.com/ericbmerritt/erlzmq2 into …
…ericbmerritt-3.1.0

Conflicts:
	c_src/erlzmq_nif.c
001a735
@gar1t gar1t zmq record c3acc99
@gar1t gar1t Helper to work with multipart messages 74dc962
@gar1t gar1t Merge branch 'master' of git://github.com/zeromq/erlzmq2 into zeromq e92777c
@gar1t gar1t Merge branch 'zeromq' 4b20827
@gar1t gar1t Merged version conflict f63704e
@ericbmerritt ericbmerritt Fix the version in app.src 2282fba
@yrashk yrashk Merge pull request #35 from ericbmerritt/3.1.0
3.0 version bump
767702b
@gar1t gar1t Helper for getting msg parts for active socket 170aa66
@gar1t gar1t Helper for receiving parts 5ae218b
@gar1t gar1t Merge branch 'master' of github.com:gar1t/erlzmq2 6185bc6
@gar1t gar1t Merge branch 'master' of git://github.com/zeromq/erlzmq2 into zeromq 063f200
@gar1t gar1t Merge branch 'zeromq' 34013a3
@jj1bdx jj1bdx Update c_src/erlzmq_nif.c
Added `#include <sys/types.h>` for the definition of int64_t
on FreeBSD 9.1.
3b0d57a
@yrashk yrashk Merge pull request #45 from jj1bdx/3.x-int64_t
Update c_src/erlzmq_nif.c
4f2350b
@rakvat rakvat merged gar1t/erlzmq2 (needed for zguide) 9ddf930
@rakvat rakvat adapted tests to gar1t's coerced getsockopt 3e0d100
@rakvat rakvat added test for erlzmq_device with router and dealer 7822b05
@rakvat rakvat closed this
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Aug 7, 2011
  1. @gar1t
  2. @gar1t
  3. @gar1t
  4. @gar1t
  5. @gar1t

    Missing docs reference

    gar1t authored
Commits on Dec 20, 2011
  1. @gar1t
  2. @gar1t

    Using zeromq 2.1.10

    gar1t authored
Commits on Mar 6, 2012
  1. @ericbmerritt
Commits on Mar 8, 2012
  1. @ericbmerritt
  2. @ericbmerritt
  3. @ericbmerritt
  4. @ericbmerritt
  5. @ericbmerritt

    switch to the more reliable git https protocol

    ericbmerritt authored
    Https seems to be more reliable on github and we dont need to push so
    lets just use that.
  6. @ericbmerritt
  7. @ericbmerritt
  8. Merge branch '3.1.0' of https://github.com/ericbmerritt/erlzmq2 into …

    Yurii Rashkovskii authored
    …ericbmerritt-3.1.0
    
    Conflicts:
    	c_src/erlzmq_nif.c
  9. @gar1t

    zmq record

    gar1t authored
  10. @gar1t
  11. @gar1t
  12. @gar1t

    Merge branch 'zeromq'

    gar1t authored
  13. @gar1t

    Merged version conflict

    gar1t authored
Commits on Mar 10, 2012
  1. @ericbmerritt
  2. @yrashk

    Merge pull request #35 from ericbmerritt/3.1.0

    yrashk authored
    3.0 version bump
Commits on Mar 20, 2012
  1. @gar1t
Commits on Mar 23, 2012
  1. @gar1t

    Helper for receiving parts

    gar1t authored
  2. @gar1t
Commits on Apr 4, 2012
  1. @gar1t
  2. @gar1t

    Merge branch 'zeromq'

    gar1t authored
Commits on Oct 23, 2012
  1. @jj1bdx

    Update c_src/erlzmq_nif.c

    jj1bdx authored
    Added `#include <sys/types.h>` for the definition of int64_t
    on FreeBSD 9.1.
  2. @yrashk

    Merge pull request #45 from jj1bdx/3.x-int64_t

    yrashk authored
    Update c_src/erlzmq_nif.c
Commits on Jan 20, 2013
  1. @rakvat
  2. @rakvat
  3. @rakvat
This page is out of date. Refresh to see the latest.
View
11 .gitignore
@@ -1,5 +1,6 @@
ebin/*.app
ebin/*.beam
+test/*.beam
priv/*.so
c_src/*.o
deps
@@ -8,9 +9,9 @@ perf/*.beam
.eunit
doc/edoc-info
doc/erlang.png
-doc/ezmq.html
-doc/index.html
-doc/overview-summary.html
-doc/packages-frame.html
+doc/*.html
doc/stylesheet.css
-doc/modules-frame.html
+*~
+*.swa
+*.swp
+*.swo
View
50 README.md
@@ -1,41 +1,41 @@
erlzmq2
-====
+=======
NIF based Erlang bindings for the ZeroMQ messaging library.
Copyright (c) 2011 Yurii Rashkovskii, Evax Software and Michael Truog
Overview
-========
+--------
-The erlzmq2 application provides high-performance NIF based Erlang bindings
-for the ZeroMQ messaging library.
+The erlzmq2 application provides high-performance NIF based Erlang
+bindings for the ZeroMQ messaging library.
Downloading
-===========
+-----------
-The erlzmq2 source code can be found on [GitHub](https://github.com/yrashk/erlzmq2)
+The erlzmq2 source code can be found on
+[GitHub](https://github.com/zeromq/erlzmq2)
$ git clone http://github.com/zeromq/erlzmq2.git
-It is also available on [Agner](http://erlagner.org/):
-
- $ agner build erlzmq
-
-In order to build erlzmq2 against a specific version of ZeroMQ (not `v2.1.11`), use this:
-
- $ ZEROMQ_VERSION=v<VERSION> agner build erlzmq
-
Building
-========
+--------
+
+Please note that to behave properly on your system ZeroMQ might
+require [some tuning](http://www.zeromq.org/docs:tuning-zeromq).
Build the code
$ make
-If you want to build against a specific version of ZeroMQ (not `v2.1.11`), use this:
+If you want to build against a specific version of ZeroMQ in the 3.1
+series (not `v3.1.0`), use this:
$ ZEROMQ_VERSION=v<VERSION> make
+Be aware that this will almost assuredly not work correctly for any
+versions of zeromq that are not in the 3.1 series.
+
Build the docs
$ make docs
@@ -44,21 +44,25 @@ Run the test suite
$ make test
-Run the benchmarks (requires [python](http://www.python.org) and [matplotlib](http://matplotlib.sourceforge.net/))
+Run the benchmarks (requires [python](http://www.python.org) and
+[matplotlib](http://matplotlib.sourceforge.net/))
$ make bench
-This will run performance tests and output png graphs in the graphs directory.
-
-Please note that to behave properly on your system ZeroMQ might require [some tuning](http://www.zeromq.org/docs:tuning-zeromq).
+This will run performance tests and output png graphs in the graphs
+directory.
Architecture
-============
+------------
-The bindings use Erlang's [NIF (native implemented functions)](http://www.erlang.org/doc/man/erl_nif.html) interface to achieve the best performance. One extra OS thread and one pair of inproc sockets by context are used to simulate blocking recv calls without affecting the Erlang virtual machine's responsiveness.
+The bindings use Erlang's
+[NIF (native implemented functions)](http://www.erlang.org/doc/man/erl_nif.html)
+interface to achieve the best performance. One extra OS thread and one
+pair of inproc sockets by context are used to simulate blocking recv
+calls without affecting the Erlang virtual machine's responsiveness.
License
-=======
+-------
The project is released under the MIT license.
View
21 UPGRADING3.1.md
@@ -0,0 +1,21 @@
+Upgrading From 2.1 to 3.1
+=========================
+
+See the general upgrading guidelines [here](http://www.zeromq.org/docs:3-1-upgrade).
+
+Things to watchout for in erlzmq2
+---------------------------------
+
+The 'timeout' flag has been removed from the send receive flags. In
+3.1 the native zeromq `sndtimeo` and `rcvtimeo` flags where added. You
+should use these instead.
+
+Also, is in zeromq as a whole the hwm flag has been replaced with
+sndhwm and rcvhwm.
+
+Things not to worry about
+-------------------------
+
+The rest of the things in that upgrade guide can be ignored. The
+erlzmq2 nif binding abstracts that away for you.
+
View
22 c_src/Makefile
@@ -8,14 +8,14 @@ ZMQ_FLAGS=
endif
ifndef ZEROMQ_VERSION
-ZEROMQ_VERSION=v2.1.11
+ZEROMQ_VERSION=v3.1.0
endif
-all: $(DEPS)/zeromq2/src/.libs/libzmq.a
+all: $(DEPS)/zeromq3/src/.libs/libzmq.a
clean:
- if test -e $(DEPS)/zeromq2/Makefile; then \
- cd $(DEPS)/zeromq2; make clean; \
+ if test -e $(DEPS)/zeromq3/Makefile; then \
+ cd $(DEPS)/zeromq3; make clean; \
else \
true; \
fi
@@ -23,11 +23,13 @@ clean:
distclean:
@rm -rf $(DEPS)
-$(DEPS)/zeromq2:
- @mkdir $(DEPS)
- @git clone git://github.com/zeromq/zeromq2-x.git $(DEPS)/zeromq2
+$(DEPS):
+ @mkdir -p $(DEPS)
+
+$(DEPS)/zeromq3: $(DEPS)
+ @git clone https://github.com/zeromq/libzmq.git $(DEPS)/zeromq3
@echo $(ZEROMQ_VERSION)
- @cd $(DEPS)/zeromq2 && git checkout $(ZEROMQ_VERSION)
+ @cd $(DEPS)/zeromq3 && git checkout $(ZEROMQ_VERSION)
-$(DEPS)/zeromq2/src/.libs/libzmq.a: $(DEPS)/zeromq2
- @cd $(DEPS)/zeromq2 && ./autogen.sh && ./configure $(ZMQ_FLAGS) && make
+$(DEPS)/zeromq3/src/.libs/libzmq.a: $(DEPS)/zeromq3
+ @cd $(DEPS)/zeromq3 && ./autogen.sh && ./configure $(ZMQ_FLAGS) && make
View
115 c_src/erlzmq_nif.c
@@ -2,17 +2,17 @@
// ex: set softtabstop=2 tabstop=2 shiftwidth=2 expandtab fileencoding=utf-8:
//
// Copyright (c) 2011 Yurii Rashkovskii, Evax Software and Michael Truog
-//
+//
// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
-//
+//
// The above copyright notice and this permission notice shall be included in
// all copies or substantial portions of the Software.
-//
+//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -27,6 +27,7 @@
#include <string.h>
#include <stdio.h>
#include <assert.h>
+#include <sys/types.h>
#define ERLZMQ_MAX_CONCURRENT_REQUESTS 16384
@@ -192,7 +193,7 @@ NIF(erlzmq_nif_socket)
if (! enif_get_int(env, argv[2], &active)) {
return enif_make_badarg(env);
}
-
+
erlzmq_socket_t * socket = enif_alloc_resource(erlzmq_nif_resource_socket,
sizeof(erlzmq_socket_t));
assert(socket);
@@ -309,21 +310,16 @@ NIF(erlzmq_nif_setsockopt)
size_t option_len;
switch (option_name) {
// uint64_t
- case ZMQ_HWM:
case ZMQ_AFFINITY:
- case ZMQ_SNDBUF:
- case ZMQ_RCVBUF:
if (! enif_get_uint64(env, argv[2], &value_uint64)) {
return enif_make_badarg(env);
}
option_value = &value_uint64;
option_len = sizeof(int64_t);
break;
+
// int64_t
- case ZMQ_SWAP:
- case ZMQ_RATE:
- case ZMQ_RECOVERY_IVL:
- case ZMQ_MCAST_LOOP:
+ case ZMQ_MAXMSGSIZE:
if (! enif_get_int64(env, argv[2], &value_int64)) {
return enif_make_badarg(env);
}
@@ -341,9 +337,20 @@ NIF(erlzmq_nif_setsockopt)
option_len = value_binary.size;
break;
// int
+ case ZMQ_SNDHWM:
+ case ZMQ_RCVHWM:
+ case ZMQ_RATE:
+ case ZMQ_RECOVERY_IVL:
+ case ZMQ_RCVBUF:
+ case ZMQ_SNDBUF:
case ZMQ_LINGER:
case ZMQ_RECONNECT_IVL:
+ case ZMQ_RECONNECT_IVL_MAX:
case ZMQ_BACKLOG:
+ case ZMQ_MULTICAST_HOPS:
+ case ZMQ_RCVTIMEO:
+ case ZMQ_SNDTIMEO:
+ case ZMQ_IPV4ONLY:
if (! enif_get_int(env, argv[2], &value_int)) {
return enif_make_badarg(env);
}
@@ -388,12 +395,7 @@ NIF(erlzmq_nif_getsockopt)
size_t option_len;
switch(option_name) {
// int64_t
- case ZMQ_RCVMORE:
- case ZMQ_SWAP:
- case ZMQ_RATE:
- case ZMQ_RECOVERY_IVL:
- case ZMQ_RECOVERY_IVL_MSEC:
- case ZMQ_MCAST_LOOP:
+ case ZMQ_MAXMSGSIZE:
option_len = sizeof(value_int64);
enif_mutex_lock(socket->mutex);
if (zmq_getsockopt(socket->socket_zmq, option_name,
@@ -405,10 +407,7 @@ NIF(erlzmq_nif_getsockopt)
return enif_make_tuple2(env, enif_make_atom(env, "ok"),
enif_make_int64(env, value_int64));
// uint64_t
- case ZMQ_HWM:
case ZMQ_AFFINITY:
- case ZMQ_SNDBUF:
- case ZMQ_RCVBUF:
option_len = sizeof(value_uint64);
enif_mutex_lock(socket->mutex);
if (zmq_getsockopt(socket->socket_zmq, option_name,
@@ -435,10 +434,22 @@ NIF(erlzmq_nif_getsockopt)
enif_make_binary(env, &value_binary));
// int
case ZMQ_TYPE:
+ case ZMQ_RCVMORE:
+ case ZMQ_SNDHWM:
+ case ZMQ_RCVHWM:
+ case ZMQ_RATE:
+ case ZMQ_RECOVERY_IVL:
+ case ZMQ_SNDBUF:
+ case ZMQ_RCVBUF:
case ZMQ_LINGER:
case ZMQ_RECONNECT_IVL:
case ZMQ_RECONNECT_IVL_MAX:
case ZMQ_BACKLOG:
+ case ZMQ_MULTICAST_HOPS:
+ case ZMQ_RCVTIMEO:
+ case ZMQ_SNDTIMEO:
+ case ZMQ_IPV4ONLY:
+ case ZMQ_EVENTS:
case ZMQ_FD: // FIXME: ZMQ_FD returns SOCKET on Windows
option_len = sizeof(value_int);
enif_mutex_lock(socket->mutex);
@@ -483,12 +494,12 @@ NIF(erlzmq_nif_send)
int polling_thread_send = 1;
if (! socket->active) {
enif_mutex_lock(socket->mutex);
- if (zmq_send(socket->socket_zmq, &req.data.send.msg,
- req.data.send.flags | ZMQ_NOBLOCK)) {
+ if (zmq_sendmsg(socket->socket_zmq, &req.data.send.msg,
+ req.data.send.flags | ZMQ_DONTWAIT) == -1) {
enif_mutex_unlock(socket->mutex);
int const error = zmq_errno();
if (error != EAGAIN ||
- (error == EAGAIN && (req.data.send.flags & ZMQ_NOBLOCK))) {
+ (error == EAGAIN && (req.data.send.flags & ZMQ_DONTWAIT))) {
zmq_msg_close(&req.data.send.msg);
return return_zmq_errno(env, error);
}
@@ -498,7 +509,7 @@ NIF(erlzmq_nif_send)
polling_thread_send = 0;
}
}
-
+
if (polling_thread_send) {
req.type = ERLZMQ_THREAD_REQUEST_SEND;
req.data.send.env = enif_alloc_env();
@@ -520,7 +531,7 @@ NIF(erlzmq_nif_send)
enif_mutex_unlock(socket->context->mutex);
return return_zmq_errno(env, ETERM);
}
- if (zmq_send(socket->context->thread_socket, &msg, 0)) {
+ if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
@@ -534,7 +545,7 @@ NIF(erlzmq_nif_send)
zmq_msg_close(&msg);
// each pointer to the socket in a request increments the reference
enif_keep_resource(socket);
-
+
return enif_make_copy(env, req.data.send.ref);
}
}
@@ -568,25 +579,25 @@ NIF(erlzmq_nif_recv)
if (zmq_msg_init(&msg)) {
return return_zmq_errno(env, zmq_errno());
}
-
// try recv with noblock
enif_mutex_lock(socket->mutex);
- if (zmq_recv(socket->socket_zmq, &msg, ZMQ_NOBLOCK)) {
+ if (zmq_recvmsg(socket->socket_zmq, &msg, ZMQ_DONTWAIT) == -1) {
enif_mutex_unlock(socket->mutex);
+ int const error = zmq_errno();
zmq_msg_close(&msg);
- int const error = zmq_errno();
if (error != EAGAIN ||
- (error == EAGAIN && (req.data.recv.flags & ZMQ_NOBLOCK))) {
+ (error == EAGAIN && (req.data.recv.flags & ZMQ_DONTWAIT))) {
return return_zmq_errno(env, error);
}
+
req.type = ERLZMQ_THREAD_REQUEST_RECV;
req.data.recv.env = enif_alloc_env();
req.data.recv.ref = enif_make_ref(req.data.recv.env);
enif_self(env, &req.data.recv.pid);
req.data.recv.socket = socket;
- if (zmq_msg_init_size(&msg, sizeof(erlzmq_thread_request_t))) {
+ if (zmq_msg_init_size(&msg, sizeof(erlzmq_thread_request_t)) == -1) {
enif_free_env(req.data.recv.env);
return return_zmq_errno(env, zmq_errno());
}
@@ -598,7 +609,7 @@ NIF(erlzmq_nif_recv)
enif_mutex_unlock(socket->context->mutex);
return return_zmq_errno(env, ETERM);
}
- if (zmq_send(socket->context->thread_socket, &msg, 0)) {
+ if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
@@ -611,19 +622,19 @@ NIF(erlzmq_nif_recv)
zmq_msg_close(&msg);
// each pointer to the socket in a request increments the reference
enif_keep_resource(socket);
-
+
return enif_make_copy(env, req.data.recv.ref);
}
}
else {
enif_mutex_unlock(socket->mutex);
-
+
ErlNifBinary binary;
enif_alloc_binary(zmq_msg_size(&msg), &binary);
memcpy(binary.data, zmq_msg_data(&msg), zmq_msg_size(&msg));
-
+
zmq_msg_close(&msg);
-
+
return enif_make_tuple2(env, enif_make_atom(env, "ok"),
enif_make_binary(env, &binary));
}
@@ -665,7 +676,7 @@ NIF(erlzmq_nif_close)
enif_mutex_unlock(socket->context->mutex);
return enif_make_atom(env, "ok");
}
- if (zmq_send(socket->context->thread_socket, &msg, 0)) {
+ if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(socket->context->mutex);
zmq_msg_close(&msg);
enif_free_env(req.data.close.env);
@@ -704,7 +715,7 @@ NIF(erlzmq_nif_term)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
enif_mutex_lock(context->mutex);
- if (zmq_send(context->thread_socket, &msg, 0)) {
+ if (zmq_sendmsg(context->thread_socket, &msg, 0) == -1) {
enif_mutex_unlock(context->mutex);
zmq_msg_close(&msg);
enif_free_env(req.data.term.env);
@@ -770,17 +781,17 @@ static void * polling_thread(void * handle)
if (item->revents & ZMQ_POLLIN) {
size_t value_len = sizeof(int64_t);
int64_t flag_value = 0;
-
+
assert(r->type == ERLZMQ_THREAD_REQUEST_RECV);
--count;
zmq_msg_t msg;
zmq_msg_init(&msg);
enif_mutex_lock(r->data.recv.socket->mutex);
- if (zmq_recv(r->data.recv.socket->socket_zmq, &msg,
- r->data.recv.flags) ||
- (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON &&
- zmq_getsockopt(r->data.recv.socket->socket_zmq,
+ if (zmq_recvmsg(r->data.recv.socket->socket_zmq, &msg,
+ r->data.recv.flags) == -1 ||
+ (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON &&
+ zmq_getsockopt(r->data.recv.socket->socket_zmq,
ZMQ_RCVMORE, &flag_value, &value_len)) )
{
enif_mutex_unlock(r->data.recv.socket->mutex);
@@ -812,14 +823,16 @@ static void * polling_thread(void * handle)
if (r->data.recv.socket->active == ERLZMQ_SOCKET_ACTIVE_ON) {
ERL_NIF_TERM flags_list;
-
+
// Should we send the multipart flag
if(flag_value == 1) {
- flags_list = enif_make_list1(r->data.recv.env, enif_make_atom(r->data.recv.env, "rcvmore"));
+ flags_list = enif_make_list1(r->data.recv.env,
+ enif_make_atom(r->data.recv.env,
+ "rcvmore"));
} else {
flags_list = enif_make_list(r->data.recv.env, 0);
}
-
+
enif_send(NULL, &r->data.recv.pid, r->data.recv.env,
enif_make_tuple4(r->data.recv.env,
enif_make_atom(r->data.recv.env, "zmq"),
@@ -854,8 +867,8 @@ static void * polling_thread(void * handle)
--count;
enif_mutex_lock(r->data.send.socket->mutex);
- if (zmq_send(r->data.send.socket->socket_zmq,
- &r->data.send.msg, r->data.send.flags)) {
+ if (zmq_sendmsg(r->data.send.socket->socket_zmq,
+ &r->data.send.msg, r->data.send.flags) == -1) {
enif_mutex_unlock(r->data.send.socket->mutex);
enif_send(NULL, &r->data.send.pid, r->data.send.env,
enif_make_tuple2(r->data.send.env,
@@ -885,9 +898,9 @@ static void * polling_thread(void * handle)
zmq_msg_t msg;
zmq_msg_init(&msg);
enif_mutex_lock(context->mutex);
- status = zmq_recv(thread_socket, &msg, 0);
+ status = zmq_recvmsg(thread_socket, &msg, 0);
enif_mutex_unlock(context->mutex);
- assert(status == 0);
+ assert(status != -1);
assert(zmq_msg_size(&msg) == sizeof(erlzmq_thread_request_t));
@@ -1024,7 +1037,7 @@ static ERL_NIF_TERM add_active_req(ErlNifEnv* env, erlzmq_socket_t * socket)
memcpy(zmq_msg_data(&msg), &req, sizeof(erlzmq_thread_request_t));
- if (zmq_send(socket->context->thread_socket, &msg, 0)) {
+ if (zmq_sendmsg(socket->context->thread_socket, &msg, 0) == -1) {
zmq_msg_close(&msg);
enif_free_env(req.data.recv.env);
return return_zmq_errno(env, zmq_errno());
View
8 c_src/vector.c
@@ -2,13 +2,13 @@
* ex: set softtabstop=4 tabstop=4 shiftwidth=4 expandtab fileencoding=utf-8:
*
* BSD LICENSE
- *
+ *
* Copyright (c) 2011, Michael Truog <mjtruog at gmail dot com>
* All rights reserved.
- *
+ *
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
- *
+ *
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
@@ -21,7 +21,7 @@
* * The name of the author may not be used to endorse or promote
* products derived from this software without specific prior
* written permission
- *
+ *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
View
8 c_src/vector.h
@@ -2,13 +2,13 @@
* ex: set softtabstop=4 tabstop=4 shiftwidth=4 expandtab fileencoding=utf-8:
*
* BSD LICENSE
- *
+ *
* Copyright (c) 2011, Michael Truog <mjtruog at gmail dot com>
* All rights reserved.
- *
+ *
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
- *
+ *
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above copyright
@@ -21,7 +21,7 @@
* * The name of the author may not be used to endorse or promote
* products derived from this software without specific prior
* written permission
- *
+ *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND
* CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES,
* INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
View
64 include/erlzmq.hrl
@@ -13,15 +13,12 @@
-define('ZMQ_XSUB', 10).
% ZMQ socket options.
--define('ZMQ_HWM', 1).
--define('ZMQ_SWAP', 3).
-define('ZMQ_AFFINITY', 4).
-define('ZMQ_IDENTITY', 5).
-define('ZMQ_SUBSCRIBE', 6).
-define('ZMQ_UNSUBSCRIBE', 7).
-define('ZMQ_RATE', 8).
-define('ZMQ_RECOVERY_IVL', 9).
--define('ZMQ_MCAST_LOOP', 10).
-define('ZMQ_SNDBUF', 11).
-define('ZMQ_RCVBUF', 12).
-define('ZMQ_RCVMORE', 13).
@@ -33,22 +30,30 @@
-define('ZMQ_BACKLOG', 19).
-define('ZMQ_RECOVERY_IVL_MSEC', 20).
-define('ZMQ_RECONNECT_IVL_MAX', 21).
+-define('ZMQ_MAXMSGSIZE', 22).
+-define('ZMQ_SNDHWM', 23).
+-define('ZMQ_RCVHWM', 24).
+-define('ZMQ_MULTICAST_HOPS', 25).
+-define('ZMQ_RCVTIMEO', 27).
+-define('ZMQ_SNDTIMEO', 28).
+-define('ZMQ_IPV4ONLY', 31).
+-define('ZMQ_LAST_ENDPOINT', 32).
+
+% Message options
+-define('ZMQ_MORE', 1).
% ZMQ send/recv flags
--define('ZMQ_NOBLOCK', 1).
--define('ZMQ_SNDMORE', 2).
+-define('ZMQ_DONTWAIT', 1).
+-define('ZMQ_SNDMORE', 2).
%% Types
-%% @type erlzmq_socket_type() = pair | pub | sub | req | rep | xreq | xrep |
-%% pull | push | xpub | xsub.
%% Possible types for an erlzmq socket.<br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_socket">zmq_socket</a></i>
-type erlzmq_socket_type() :: pair | pub | sub | req | rep | dealer | router | xreq | xrep |
pull | push | xpub | xsub.
-%% @type erlzmq_endpoint() = string().
%% The endpoint argument is a string consisting of two parts:
%% <b>transport://address</b><br />
%% The following transports are defined:
@@ -59,16 +64,6 @@
%% <a href="http://api.zeromq.org/master:zmq_connect">zmq_connect</a></i>
-type erlzmq_endpoint() :: string().
-%% @type errno() = eperm | enoent | srch | eintr | eio | enxio | ebad |
-%% echild | edeadlk | enomem | eacces | efault | enotblk | ebusy | eexist |
-%% exdev | enodev | enotdir | eisdir | einval | enfile | emfile | enotty |
-%% etxtbsy | efbig | enospc | espipe | erofs | emlink | epipe | eagain |
-%% einprogress | ealready | enotsock | edestaddrreq | emsgsize |
-%% eprototype | enoprotoopt | eprotonosupport | esocktnosupport |
-%% enotsup | epfnosupport | eafnosupport | eaddrinuse | eaddrnotavail |
-%% enetdown | enetunreach | enetreset | econnaborted | econnreset |
-%% enobufs | eisconn | enotconn | eshutdown | etoomanyrefs | etimedout |
-%% econnrefused | eloop | enametoolong.
%% Standard error atoms.
-type errno() :: eperm | enoent | srch | eintr | eio | enxio | ebad |
echild | edeadlk | enomem | eacces | efault | enotblk |
@@ -83,62 +78,51 @@
enotconn | eshutdown | etoomanyrefs |
etimedout | econnrefused | eloop | enametoolong.
-%% @type erlzmq_error_type() = enotsup | eprotonosupport | enobufs |
-%% enetdown | eaddrinuse | eaddnotavail | econnrefused | einprogress |
-%% efsm | enocompatproto | eterm | emthread | errno() |
-%% {unknown, integer()}.
%% Possible error types.
-type erlzmq_error_type() :: enotsup | eprotonosupport | enobufs | enetdown |
- eaddrinuse | eaddnotavail | econnrefused |
+ eaddrinuse | eaddnotavail | econnrefused |
einprogress | efsm | enocompatproto | eterm |
emthread | errno() | {unknown, integer()}.
-%% @type erlzmq_error() = {error, erlzmq_error_type()}.
%% Error tuples returned by most API functions.
-type erlzmq_error() :: {error, erlzmq_error_type()}.
-%% @type erlzmq_data() = iolist().
%% Data to be sent with {@link erlzmq:send/3. send/3} or received with
%% {@link erlzmq:recv/2. recv/2}
-type erlzmq_data() :: iolist().
-%% @type erlzmq_context() = binary().
%% An opaque handle to an erlzmq context.
-opaque erlzmq_context() :: binary().
-%% @type erlzmq_socket() = binary().
%% An opaque handle to an erlzmq socket.
-opaque erlzmq_socket() :: {pos_integer(), binary()}.
-%% @type erlzmq_send_recv_flag() = noblock | sndmore | recvmore | {timeout, timeout()}.
%% The individual flags to use with {@link erlzmq:send/3. send/3}
%% and {@link erlzmq:recv/2. recv/2}.<br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_send">zmq_send</a> or
%% <a href="http://api.zeromq.org/master:zmq_recv">zmq_recv</a></i>
--type erlzmq_send_recv_flag() :: noblock | sndmore | recvmore | {timeout, timeout()}.
+-type erlzmq_send_recv_flag() :: dontwait | sndmore | recvmore | {timeout, timeout()}.
-%% @type erlzmq_send_recv_flags() = list(erlzmq_send_recv_flag()).
%% A list of flags to use with {@link ezqm:send/3. send/3} and
%% {@link erlzmq:recv/2. recv/2}
-type erlzmq_send_recv_flags() :: list(erlzmq_send_recv_flag()).
-%% @type erlzmq_sockopt() = hwm | swap | affinity | identity | subscribe |
-%% unsubscribe | rate | recovery_ivl | mcast_loop | sndbuf | rcvbuf |
-%% rcvmore | fd | events | linger | reconnect_ivl | backlog |
-%% recovery_ivl_msec | reconnect_ivl_max.
%% Available options for {@link erlzmq:setsockopt/3. setsockopt/3}
%% and {@link erlzmq:getsockopt/2. getsockopt/2}.<br />
%% <i>For more information see
%% <a href="http://api.zeromq.org/master:zmq_setsockopt">zmq_setsockopt</a>
%% and <a href="http://api.zeromq.org/master:zmq_getsockopt">zmq_getsockopt</a></i>
--type erlzmq_sockopt() :: hwm | swap | affinity | identity | subscribe |
- unsubscribe | rate | recovery_ivl | mcast_loop |
- sndbuf | rcvbuf | rcvmore | fd | events | linger |
- reconnect_ivl | backlog | recovery_ivl_msec |
- reconnect_ivl_max.
+-type erlzmq_sockopt() :: affinity | identity | subscribe |
+ unsubscribe | rate | recovery_ivl | sndbuf |
+ rcvbuf | rcvmore | fd | events | linger |
+ reconnect_ivl | backlog |reconnect_ivl_max
+ | maxmsgsize | sndhwm | rcvhwm |
+ multicast_hops | rcvtimeo | sndtimeo |
+ ipv4only.
+
-%% @type erlzmq_sockopt_value() = integer() | iolist().
%% Possible option values for {@link erlzmq:setsockopt/3. setsockopt/3}.
-type erlzmq_sockopt_value() :: integer() | iolist().
+-record(zmq, {socket, data, flags}).
View
6 rebar.config
@@ -1,9 +1,9 @@
{erl_opts, [debug_info, warnings_as_errors]}.
{port_envs,
- [{"DRV_LDFLAGS","deps/zeromq2/src/.libs/libzmq.a -shared $ERL_LDFLAGS -lstdc++ -luuid"},
- {"darwin", "DRV_LDFLAGS", "deps/zeromq2/src/.libs/libzmq.a -bundle -flat_namespace -undefined suppress $ERL_LDFLAGS"},
- {"DRV_CFLAGS","-Ic_src -Ideps/zeromq2/include -g -Wall -fPIC $ERL_CFLAGS"}]}.
+ [{"DRV_LDFLAGS","deps/zeromq3/src/.libs/libzmq.a -shared $ERL_LDFLAGS -lstdc++"},
+ {"darwin", "DRV_LDFLAGS", "deps/zeromq3/src/.libs/libzmq.a -bundle -flat_namespace -undefined suppress $ERL_LDFLAGS"},
+ {"DRV_CFLAGS","-Ic_src -Ideps/zeromq3/include -g -Wall -fPIC $ERL_CFLAGS"}]}.
{pre_hooks,[{compile,"make -C c_src"},
{clean, "make -C c_src clean"}]}.
View
2  src/erlzmq.app.src
@@ -1,7 +1,7 @@
{application, erlzmq,
[
{description, "Erlang ZeroMQ Driver"},
- {vsn, "2.0"},
+ {vsn, "3.0"},
{modules, [erlzmq, erlzmq_nif]},
{registered, []},
{applications, [
View
125 src/erlzmq.erl
@@ -2,17 +2,17 @@
%% ex: set softtabstop=4 tabstop=4 shiftwidth=4 expandtab fileencoding=utf-8:
%%
%% Copyright (c) 2011 Yurii Rashkovskii, Evax Software and Michael Truog
-%%
+%%
%% Permission is hereby granted, free of charge, to any person obtaining a copy
%% of this software and associated documentation files (the "Software"), to deal
%% in the Software without restriction, including without limitation the rights
%% to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
%% copies of the Software, and to permit persons to whom the Software is
%% furnished to do so, subject to the following conditions:
-%%
+%%
%% The above copyright notice and this permission notice shall be included in
%% all copies or substantial portions of the Software.
-%%
+%%
%% THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
%% IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
%% FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
@@ -31,8 +31,12 @@
connect/2,
send/2,
send/3,
+ sendmsg/2,
+ sendmsg/3,
recv/1,
recv/2,
+ recvmsg/1,
+ recvmsg/2,
setsockopt/3,
getsockopt/2,
close/1,
@@ -116,7 +120,7 @@ socket(Context, Type, {active, true}) ->
erlzmq_nif:socket(Context, socket_type(Type), 1);
socket(Context, Type, {active, false}) ->
erlzmq_nif:socket(Context, socket_type(Type), 0).
-
+
%% @doc Accept connections on a socket.
%% <br />
@@ -171,11 +175,46 @@ send({I, Socket}, Binary, Flags)
ok;
{Ref, {error, _} = Error} ->
Error
+ after case erlzmq_nif:getsockopt(Socket,?'ZMQ_SNDTIMEO') of
+ {ok, -1} ->
+ infinity;
+ {ok, Else} ->
+ Else
+ end ->
+ {error, eagain}
end;
Result ->
Result
end.
+%% @equiv send(Socket, Msg, [])
+%% @doc This function exists for zeromq api compatibility and doesn't
+%% actually provide any different functionality then what you get with
+%% the {@link erlzmq:send/2} function. In fact this function just
+%% calls that function. So there is a slight bit of additional
+%% overhead as well.
+-spec sendmsg(erlzmq_socket(),
+ Binary :: binary()) ->
+ ok |
+ erlzmq_error().
+sendmsg(Socket, Binary) when is_binary(Binary) ->
+ send(Socket, Binary, []).
+
+%% @equiv send(Socket, Msg, Flags)
+%% @doc This function exists for zeromq api compatibility and doesn't
+%% actually provide any different functionality then what you get with
+%% the {@link erlzmq:send/3} function. In fact this function just
+%% calls that function. So there is a slight bit of additional
+%% overhead as well.
+-spec sendmsg(erlzmq_socket(),
+ Binary :: binary(),
+ Flags :: erlzmq_send_recv_flags()) ->
+ ok |
+ erlzmq_error().
+sendmsg(Socket, Binary, Flags) ->
+ send(Socket, Binary, Flags).
+
+
%% @equiv recv(Socket, 0)
-spec recv(Socket :: erlzmq_socket()) ->
{ok, erlzmq_data()} |
@@ -191,23 +230,51 @@ recv(Socket) ->
-spec recv(Socket :: erlzmq_socket(),
Flags :: erlzmq_send_recv_flags()) ->
{ok, erlzmq_data()} |
- erlzmq_error() |
- {error, {timeout, reference()}}.
+ erlzmq_error().
recv({I, Socket}, Flags)
when is_integer(I), is_list(Flags) ->
case erlzmq_nif:recv(Socket, sendrecv_flags(Flags)) of
Ref when is_reference(Ref) ->
- Timeout = proplists:get_value(timeout, Flags, infinity),
receive
{Ref, Result} ->
{ok, Result}
- after Timeout ->
- {error, {timeout, Ref}}
+ after case erlzmq_nif:getsockopt(Socket,?'ZMQ_RCVTIMEO') of
+ {ok, -1} ->
+ infinity;
+ {ok, Else} ->
+ Else
+ end ->
+ {error, eagain}
end;
Result ->
Result
end.
+%% @equiv recv(Socket, 0)
+%% @doc This function exists for zeromq api compatibility and doesn't
+%% actually provide any different functionality then what you get with
+%% the {@link erlzmq:recv/3} function. In fact this function just
+%% calls that function. So there is a slight bit of additional
+%% overhead as well.
+-spec recvmsg(Socket :: erlzmq_socket()) ->
+ {ok, erlzmq_data()} |
+ erlzmq_error().
+recvmsg(Socket) ->
+ recv(Socket, []).
+
+%% @equiv recv(Socket, Flags)
+%% @doc This function exists for zeromq api compatibility and doesn't
+%% actually provide any different functionality then what you get with
+%% the {@link erlzmq:recv/3} function. In fact this function just
+%% calls that function. So there is a slight bit of additional
+%% overhead as well.
+-spec recvmsg(Socket :: erlzmq_socket(),
+ Flags :: erlzmq_send_recv_flags()) ->
+ {ok, erlzmq_data()} |
+ erlzmq_error().
+recvmsg(Socket, Flags) ->
+ recv(Socket, Flags).
+
%% @doc Set an {@link erlzmq_sockopt(). option} associated with a socket.
%% <br />
%% <i>For more information see
@@ -233,7 +300,11 @@ setsockopt({I, Socket}, Name, Value) when is_integer(I), is_atom(Name) ->
{ok, erlzmq_sockopt_value()} |
erlzmq_error().
getsockopt({I, Socket}, Name) when is_integer(I), is_atom(Name) ->
- erlzmq_nif:getsockopt(Socket, option_name(Name)).
+ coerce_sockopt(Name, erlzmq_nif:getsockopt(Socket, option_name(Name))).
+
+coerce_sockopt(rcvmore, {ok, 0}) -> {ok, false};
+coerce_sockopt(rcvmore, {ok, 1}) -> {ok, true};
+coerce_sockopt(_Other, Val) -> Val.
%% @equiv close(Socket, infinity)
-spec close(Socket :: erlzmq_socket()) ->
@@ -301,6 +372,9 @@ term(Context, Timeout) ->
end.
%% @doc Returns the 0MQ library version.
+%% <br />
+%% <i>For more information see
+%% <a href="http://api.zeromq.org/master:zmq_version">zmq_version</a>.</i>
%% @end
-spec version() -> {integer(), integer(), integer()}.
@@ -343,20 +417,14 @@ socket_type(xsub) ->
sendrecv_flags([]) ->
0;
-sendrecv_flags([{timeout,_}]) ->
- 0;
-sendrecv_flags([noblock|Rest]) ->
- ?'ZMQ_NOBLOCK' bor sendrecv_flags(Rest);
+sendrecv_flags([dontwait|Rest]) ->
+ ?'ZMQ_DONTWAIT' bor sendrecv_flags(Rest);
sendrecv_flags([sndmore|Rest]) ->
?'ZMQ_SNDMORE' bor sendrecv_flags(Rest).
-spec option_name(Name :: erlzmq_sockopt()) ->
integer().
-option_name(hwm) ->
- ?'ZMQ_HWM';
-option_name(swap) ->
- ?'ZMQ_SWAP';
option_name(affinity) ->
?'ZMQ_AFFINITY';
option_name(identity) ->
@@ -369,8 +437,6 @@ option_name(rate) ->
?'ZMQ_RATE';
option_name(recovery_ivl) ->
?'ZMQ_RECOVERY_IVL';
-option_name(mcast_loop) ->
- ?'ZMQ_MCAST_LOOP';
option_name(sndbuf) ->
?'ZMQ_SNDBUF';
option_name(rcvbuf) ->
@@ -387,8 +453,19 @@ option_name(reconnect_ivl) ->
?'ZMQ_RECONNECT_IVL';
option_name(backlog) ->
?'ZMQ_BACKLOG';
-option_name(recovery_ivl_msec) ->
- ?'ZMQ_RECOVERY_IVL_MSEC';
option_name(reconnect_ivl_max) ->
- ?'ZMQ_RECONNECT_IVL_MAX'.
-
+ ?'ZMQ_RECONNECT_IVL_MAX';
+option_name(maxmsgsize) ->
+ ?'ZMQ_MAXMSGSIZE';
+option_name(sndhwm) ->
+ ?'ZMQ_SNDHWM';
+option_name(rcvhwm) ->
+ ?'ZMQ_RCVHWM';
+option_name(multicast_hops) ->
+ ?'ZMQ_MULTICAST_HOPS';
+option_name(rcvtimeo) ->
+ ?'ZMQ_RCVTIMEO';
+option_name(sndtimeo) ->
+ ?'ZMQ_SNDTIMEO';
+option_name(ipv4only) ->
+ ?'ZMQ_IPV4ONLY'.
View
60 src/erlzmq_device.erl
@@ -0,0 +1,60 @@
+-module(erlzmq_device).
+
+-export([queue/2]).
+
+-import(proplists, [get_bool/2]).
+
+%%--------------------------------------------------------------------
+%% @doc A queue device implemented in Erlang.
+%%
+%% Frontend and Backend must be sockets in active mode.
+%%
+%% This function will not return.
+%%
+%% @spec queue(Frontend, Backend) -> any()
+%% Frontend = erlzmq_socket()
+%% Backend = erlzmq_socket()
+%% @end
+%%--------------------------------------------------------------------
+
+queue(Frontend, Backend) ->
+ receive
+ {zmq, Frontend, Msg, Flags} ->
+ Parts = lists:reverse(queue_recv_acc(Frontend, Flags, [Msg])),
+ queue_send(Backend, Parts),
+ queue(Frontend, Backend);
+ {zmq, Backend, Msg, Flags} ->
+ Parts = lists:reverse(queue_recv_acc(Backend, Flags, [Msg])),
+ queue_send(Frontend, Parts),
+ queue(Frontend, Backend);
+ {shutdown} ->
+ ok
+ end.
+
+%%--------------------------------------------------------------------
+%% @doc Accumulates messages from Socket.
+%% @spec queue_recv_acc(Socket, Flags, Acc0) -> Acc
+%% @end
+%%--------------------------------------------------------------------
+
+queue_recv_acc(Socket, Flags0, Acc) ->
+ case get_bool(rcvmore, Flags0) of
+ true ->
+ receive
+ {zmq, Socket, Msg, Flags} ->
+ queue_recv_acc(Socket, Flags, [Msg|Acc])
+ end;
+ false -> Acc
+ end.
+
+%%--------------------------------------------------------------------
+%% @doc Sends a multipart message to Out.
+%% @spec queue_send(erlzmq_socket(), Parts) -> ok
+%% @end
+%%--------------------------------------------------------------------
+
+queue_send(Out, [LastPart]) ->
+ ok = erlzmq:send(Out, LastPart);
+queue_send(Out, [Part|Rest]) ->
+ ok = erlzmq:send(Out, Part, [sndmore]),
+ queue_send(Out, Rest).
View
26 src/erlzmq_parts.erl
@@ -0,0 +1,26 @@
+-module(erlzmq_parts).
+
+-export([new/0, part_in/2]).
+
+new() -> [].
+
+part_in({zmq, Socket, Part, [rcvmore]}, Parts) ->
+ {rcvmore, add_part(Socket, Part, Parts)};
+part_in({zmq, Socket, Part, []}, Parts) ->
+ {SocketParts, NewParts} =
+ find_socket_parts(Socket, add_part(Socket, Part, Parts)),
+ {msg, Socket, SocketParts, NewParts}.
+
+add_part(Socket, Part, Parts) ->
+ [{Socket, Part}|Parts].
+
+find_socket_parts(Socket, Parts) ->
+ find_socket_parts_acc(Socket, Parts, [], []).
+
+find_socket_parts_acc(_Socket, [], SocketParts, NewParts) ->
+ {SocketParts, lists:reverse(NewParts)};
+find_socket_parts_acc(Socket, [{Socket, Part}|Rest], SocketParts, NewParts) ->
+ find_socket_parts_acc(Socket, Rest, [Part|SocketParts], NewParts);
+find_socket_parts_acc(Socket, [OtherPart|Rest], SocketParts, NewParts) ->
+ find_socket_parts_acc(Socket, Rest, SocketParts, [OtherPart|NewParts]).
+
View
99 src/erlzmq_util.erl
@@ -0,0 +1,99 @@
+-module(erlzmq_util).
+
+-export([dump/1, recv_parts/1, recv_parts/2]).
+
+%%--------------------------------------------------------------------
+%% @doc Reads available messages from Socket, printing them to stdout.
+%% @spec dump(Socket) -> any()
+%% Socket = erlzmq_socket()
+%% @end
+%%--------------------------------------------------------------------
+
+dump(Socket) ->
+ {ok, Msg} = erlzmq:recv(Socket),
+ io:format("----------------------------------------~n"),
+ dump_msg(Msg, Socket).
+
+%%--------------------------------------------------------------------
+%% @doc Receives message parts for Socket, returning them as a list.
+%% @equiv recv_parts(Socket, infinity)
+%% @end
+%%--------------------------------------------------------------------
+
+recv_parts(Socket) ->
+ recv_parts(Socket, infinity).
+
+%%--------------------------------------------------------------------
+%% @doc Receives message parts for Socket, returning them as a list.
+%% @spec recv_parts(Socket, Timeout) -> [binary()]
+%% Socket = erlzmq_socket()
+%% Timeout = integer() | infinity
+%% @end
+%%--------------------------------------------------------------------
+
+recv_parts(Socket, Timeout) ->
+ recv_parts(Socket, Timeout, []).
+
+%%--------------------------------------------------------------------
+%% @doc Accumulator for recv_parts/2
+%% @spec recv_parts(Socket, Timeout, Acc0) -> Acc
+%% @end
+%%--------------------------------------------------------------------
+recv_parts(Socket, Timeout, Acc) ->
+ receive
+ {zmq, Socket, Part, []} ->
+ lists:reverse([Part|Acc]);
+ {zmq, Socket, Part, [rcvmore]} ->
+ recv_parts(Socket, Timeout, [Part|Acc])
+ after
+ Timeout -> exit({zmq_recv_timeout, Socket})
+ end.
+
+%%--------------------------------------------------------------------
+%% @doc Print a socket message, including subsequent parts.
+%% @spec dump_msg(Msg, Socket) -> ok
+%% @end
+%%--------------------------------------------------------------------
+
+dump_msg(Msg, Socket) ->
+ io:format("[~3..0B] ", [size(Msg)]),
+ Str = binary_to_list(Msg),
+ case io_lib:printable_list(Str) of
+ true -> io:format(Str);
+ false -> io:format(bin_to_hex(Msg))
+ end,
+ io:format("~n"),
+ case erlzmq:getsockopt(Socket, rcvmore) of
+ {ok, true} ->
+ {ok, Next} = erlzmq:recv(Socket),
+ dump_msg(Next, Socket);
+ {ok, false} ->
+ ok
+ end.
+
+%%--------------------------------------------------------------------
+%% @doc Convert a binary to a hex string.
+%% @spec bin_to_hex(binary()) -> list()
+%% @end
+%%--------------------------------------------------------------------
+
+bin_to_hex(B) when is_binary(B) ->
+ lists:flatten(lists:map(fun int_to_hex/1, binary_to_list(B))).
+
+%%--------------------------------------------------------------------
+%% @doc Convert an int to a two char hex string.
+%% @spec int_to_hex(integer()) -> string()
+%% @end
+%%--------------------------------------------------------------------
+
+int_to_hex(N) when N < 256 ->
+ [hex(N div 16), hex(N rem 16)].
+
+%%--------------------------------------------------------------------
+%% @doc Converts an integer to a hex char.
+%% @spec hex(integer()) -> char()
+%% @end
+%%--------------------------------------------------------------------
+
+hex(N) when N < 10 -> $0 + N;
+hex(N) when N >= 10, N < 16 -> $a + (N - 10).
View
292 test/erlzmq_test.erl
@@ -7,29 +7,71 @@ hwm_test() ->
{ok, S1} = erlzmq:socket(C, [pull, {active, false}]),
{ok, S2} = erlzmq:socket(C, [push, {active, false}]),
- ok = erlzmq:setsockopt(S2, linger, 0),
- ok = erlzmq:setsockopt(S2, hwm, 5),
+ ok = erlzmq:setsockopt(S1, rcvhwm, 2),
+ ok = erlzmq:setsockopt(S2, sndhwm, 2),
- ok = erlzmq:bind(S1, "tcp://127.0.0.1:5858"),
- ok = erlzmq:connect(S2, "tcp://127.0.0.1:5858"),
+
+ ok = erlzmq:bind(S1, "inproc://a"),
+ ok = erlzmq:connect(S2, "inproc://a"),
ok = hwm_loop(10, S2),
?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
+ ?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
+ ?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
+ ?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
+
?assertMatch(ok, erlzmq:send(S2, <<"test">>)),
+
+ ?assertMatch({ok, <<"test">>}, erlzmq:recv(S1)),
+
ok = erlzmq:close(S1),
ok = erlzmq:close(S2),
ok = erlzmq:term(C).
hwm_loop(0, _S) ->
ok;
-hwm_loop(N, S) when N > 5 ->
- ?assertMatch(ok, erlzmq:send(S, <<"test">>, [noblock])),
+hwm_loop(N, S) when N > 6 ->
+ ?assertMatch(ok, erlzmq:send(S, <<"test">>, [dontwait])),
hwm_loop(N-1, S);
hwm_loop(N, S) ->
- ?assertMatch({error, _} ,erlzmq:send(S, <<"test">>, [noblock])),
+ ?assertMatch({error, _} ,erlzmq:send(S, <<"test">>, [dontwait])),
hwm_loop(N-1, S).
+invalid_rep_test() ->
+ {ok, Ctx} = erlzmq:context(),
+
+ {ok, XrepSocket} = erlzmq:socket(Ctx, [xrep, {active, false}]),
+ {ok, ReqSocket} = erlzmq:socket(Ctx, [req, {active, false}]),
+
+ ok = erlzmq:setsockopt(XrepSocket, linger, 0),
+ ok = erlzmq:setsockopt(ReqSocket, linger, 0),
+ ok = erlzmq:bind(XrepSocket, "inproc://hi"),
+ ok = erlzmq:connect(ReqSocket, "inproc://hi"),
+
+ %% Initial request.
+ ok = erlzmq:send(ReqSocket, <<"r">>),
+
+ %% Receive the request.
+ {ok, Addr} = erlzmq:recv(XrepSocket),
+ {ok, Bottom} = erlzmq:recv(XrepSocket),
+ {ok, _Body} = erlzmq:recv(XrepSocket),
+
+ %% Send invalid reply.
+ ok = erlzmq:send(XrepSocket, Addr),
+
+ %% Send valid reply.
+ ok = erlzmq:send(XrepSocket, Addr, [sndmore]),
+ ok = erlzmq:send(XrepSocket, Bottom, [sndmore]),
+ ok = erlzmq:send(XrepSocket, <<"b">>),
+
+ %% Check whether we've got the valid reply.
+ {ok, <<"b">>} = erlzmq:recv(ReqSocket),
+
+ %% Tear down the wiring.
+ ok = erlzmq:close(XrepSocket),
+ ok = erlzmq:close(ReqSocket),
+ ok = erlzmq:term(Ctx).
pair_inproc_test() ->
basic_tests("inproc://tester", pair, pair, active),
@@ -43,6 +85,141 @@ pair_tcp_test() ->
basic_tests("tcp://127.0.0.1:5554", pair, pair, active),
basic_tests("tcp://127.0.0.1:5555", pair, pair, passive).
+reqrep_device_test() ->
+ {ok, Ctx} = erlzmq:context(),
+
+ %% Create a req/rep device.
+ {ok, Xreq} = erlzmq:socket(Ctx, [xreq, {active, false}]),
+ ok = erlzmq:bind(Xreq, "tcp://127.0.0.1:5560"),
+ {ok, Xrep} = erlzmq:socket(Ctx, [xrep, {active, false}]),
+ ok = erlzmq:bind(Xrep, "tcp://127.0.0.1:5561"),
+
+ %% Create a worker.
+ {ok, Rep} = erlzmq:socket(Ctx, [rep, {active, false}]),
+ ok= erlzmq:connect(Rep, "tcp://127.0.0.1:5560"),
+
+ %% Create a client.
+ {ok, Req} = erlzmq:socket(Ctx, [req, {active, false}]),
+ ok = erlzmq:connect(Req, "tcp://127.0.0.1:5561"),
+
+ %% Send a request.
+ ok = erlzmq:send(Req, <<"ABC">>, [sndmore]),
+ ok = erlzmq:send(Req, <<"DEF">>),
+
+
+ %% Pass the request through the device.
+ lists:foreach(fun(_) ->
+ {ok, Msg} = erlzmq:recv(Xrep),
+ {ok, RcvMore}= erlzmq:getsockopt(Xrep, rcvmore),
+ case RcvMore of
+ false ->
+ ok = erlzmq:send(Xreq, Msg);
+ true ->
+ ok = erlzmq:send(Xreq, Msg, [sndmore])
+ end
+ end,
+ lists:seq(1, 4)),
+
+ %% Receive the request.
+ {ok, Buff0} = erlzmq:recv(Rep),
+ ?assertMatch(<<"ABC">>, Buff0),
+ {ok, RcvMore1} = erlzmq:getsockopt(Rep, rcvmore),
+ ?assertMatch(true, RcvMore1),
+ {ok, Buff1} = erlzmq:recv(Rep),
+ ?assertMatch(<<"DEF">>, Buff1),
+ {ok, RcvMore2} = erlzmq:getsockopt(Rep, rcvmore),
+ ?assertMatch(false, RcvMore2),
+
+ %% Send the reply.
+ ok = erlzmq:send(Rep, <<"GHI">>, [sndmore]),
+ ok = erlzmq:send (Rep, <<"JKL">>),
+
+ %% Pass the reply through the device.
+ lists:foreach(fun(_) ->
+ {ok, Msg} = erlzmq:recv(Xreq),
+ {ok,RcvMore3} = erlzmq:getsockopt(Xreq, rcvmore),
+ case RcvMore3 of
+ false ->
+ ok = erlzmq:send(Xrep, Msg);
+ true ->
+ ok = erlzmq:send(Xrep, Msg, [sndmore])
+ end
+ end, lists:seq(1, 4)),
+
+ %% Receive the reply.
+ {ok, Buff2} = erlzmq:recv(Req),
+ ?assertMatch(<<"GHI">>, Buff2),
+ {ok, RcvMore4} = erlzmq:getsockopt(Req, rcvmore),
+ ?assertMatch(true, RcvMore4),
+ {ok, Buff3} = erlzmq:recv(Req),
+ ?assertMatch(<<"JKL">>, Buff3),
+ {ok, RcvMore5} = erlzmq:getsockopt(Req, rcvmore),
+ ?assertMatch(false, RcvMore5),
+
+ %% Clean up.
+ ok = erlzmq:close(Req),
+ ok = erlzmq:close(Rep),
+ ok = erlzmq:close(Xrep),
+ ok = erlzmq:close(Xreq),
+ ok = erlzmq:term(Ctx).
+
+reqrep_erlzmq_device_test() ->
+ {ok, Ctx} = erlzmq:context(),
+
+ %% Create a router/dealer device.
+ DevicePid = spawn(
+ fun() ->
+ {ok, Router} = erlzmq:socket(Ctx, [router, {active, true}]),
+ ok = erlzmq:bind(Router, "tcp://127.0.0.1:5561"),
+ {ok, Dealer} = erlzmq:socket(Ctx, [dealer, {active, true}]),
+ ok = erlzmq:bind(Dealer, "tcp://127.0.0.1:5560"),
+ erlzmq_device:queue(Router, Dealer),
+ ok = erlzmq:close(Router),
+ ok = erlzmq:close(Dealer)
+ end),
+
+ %% Create a worker.
+ {ok, Rep} = erlzmq:socket(Ctx, rep),
+ ok= erlzmq:connect(Rep, "tcp://127.0.0.1:5560"),
+
+ %% Create a client.
+ {ok, Req} = erlzmq:socket(Ctx, req),
+ ok = erlzmq:connect(Req, "tcp://127.0.0.1:5561"),
+
+ %% Send a request.
+ ok = erlzmq:send(Req, <<"ABC">>, [sndmore]),
+ ok = erlzmq:send(Req, <<"DEF">>),
+
+ %% Receive the request.
+ {ok, Buff0} = erlzmq:recv(Rep),
+ ?assertMatch(<<"ABC">>, Buff0),
+ {ok, RcvMore1} = erlzmq:getsockopt(Rep, rcvmore),
+ ?assertMatch(true, RcvMore1),
+ {ok, Buff1} = erlzmq:recv(Rep),
+ ?assertMatch(<<"DEF">>, Buff1),
+ {ok, RcvMore2} = erlzmq:getsockopt(Rep, rcvmore),
+ ?assertMatch(false, RcvMore2),
+
+ %% Send the reply.
+ ok = erlzmq:send(Rep, <<"GHI">>, [sndmore]),
+ ok = erlzmq:send (Rep, <<"JKL">>),
+
+ %% Receive the reply.
+ {ok, Buff2} = erlzmq:recv(Req),
+ ?assertMatch(<<"GHI">>, Buff2),
+ {ok, RcvMore4} = erlzmq:getsockopt(Req, rcvmore),
+ ?assertMatch(true, RcvMore4),
+ {ok, Buff3} = erlzmq:recv(Req),
+ ?assertMatch(<<"JKL">>, Buff3),
+ {ok, RcvMore5} = erlzmq:getsockopt(Req, rcvmore),
+ ?assertMatch(false, RcvMore5),
+
+ %% Clean up.
+ ok = erlzmq:close(Req),
+ ok = erlzmq:close(Rep),
+ DevicePid ! {shutdown},
+ ok = erlzmq:term(Ctx).
+
reqrep_inproc_test() ->
basic_tests("inproc://test", req, rep, active),
basic_tests("inproc://test", req, rep, passive).
@@ -55,6 +232,103 @@ reqrep_tcp_test() ->
basic_tests("tcp://127.0.0.1:5556", req, rep, active),
basic_tests("tcp://127.0.0.1:5557", req, rep, passive).
+
+sub_forward_test() ->
+ {ok, Ctx} = erlzmq:context(),
+
+ %% First, create an intermediate device.
+ {ok, Xpub} = erlzmq:socket(Ctx, [xpub, {active, false}]),
+
+ ok = erlzmq:bind(Xpub, "tcp://127.0.0.1:5560"),
+
+ {ok, Xsub} = erlzmq:socket(Ctx, [xsub, {active, false}]),
+
+ ok = erlzmq:bind(Xsub, "tcp://127.0.0.1:5561"),
+
+ %% Create a publisher.
+ {ok, Pub} = erlzmq:socket(Ctx, [pub, {active, false}]),
+
+ ok = erlzmq:connect(Pub, "tcp://127.0.0.1:5561"),
+
+ %% Create a subscriber.
+ {ok, Sub} = erlzmq:socket(Ctx, [sub, {active, false}]),
+
+ ok = erlzmq:connect(Sub, "tcp://127.0.0.1:5560"),
+
+ %% Subscribe for all messages.
+ ok = erlzmq:setsockopt(Sub, subscribe, <<"">>),
+
+ %% Pass the subscription upstream through the device.
+ {ok, Buff0} = erlzmq:recv(Xpub),
+ ok = erlzmq:send(Xsub, Buff0),
+
+ %% Wait a bit till the subscription gets to the publisher.
+ timer:sleep(1000),
+
+ %% Send an empty message.
+ ok = erlzmq:send(Pub, <<>>),
+
+ %% Pass the message downstream through the device.
+ {ok, Buff} = erlzmq:recv(Xsub),
+
+ ok = erlzmq:send(Xpub, Buff),
+
+ %% Receive the message in the subscriber.
+ {ok, Buff} = erlzmq:recv(Sub),
+
+ %% Clean up.
+ ok = erlzmq:close(Xpub),
+ ok = erlzmq:close(Xsub),
+ ok = erlzmq:close(Pub),
+ ok = erlzmq:close(Sub),
+ ok = erlzmq:term(Ctx).
+
+timeo_test() ->
+ {ok, Ctx} = erlzmq:context(),
+ %% Create a disconnected socket.
+ {ok, Sb} = erlzmq:socket(Ctx, [pull, {active, false}]),
+ ok = erlzmq:bind(Sb, "inproc://timeout_test"),
+ %% Check whether non-blocking recv returns immediately.
+ {error, eagain} = erlzmq:recv(Sb, [dontwait]),
+ %% Check whether recv timeout is honoured.
+ Timeout0 = 500,
+ ok = erlzmq:setsockopt(Sb, rcvtimeo, Timeout0),
+ {Elapsed0, _} =
+ timer:tc(fun() ->
+ ?assertMatch({error, eagain}, erlzmq:recv(Sb))
+ end),
+ ?assert(Elapsed0 > 440000 andalso Elapsed0 < 550000),
+
+ %% Check whether connection during the wait doesn't distort the timeout.
+ Timeout1 = 2000,
+ ok = erlzmq:setsockopt(Sb, rcvtimeo, Timeout1),
+ proc_lib:spawn(fun() ->
+ timer:sleep(1000),
+ {ok, Sc} = erlzmq:socket(Ctx, [push, {active, false}]),
+ ok = erlzmq:connect(Sc, "inproc://timeout_test"),
+ timer:sleep(1000),
+ ok = erlzmq:close(Sc)
+ end),
+ {Elapsed1, _} = timer:tc(fun() ->
+ ?assertMatch({error, eagain}, erlzmq:recv(Sb))
+ end),
+ ?assert(Elapsed1 > 1900000 andalso Elapsed1 < 2100000),
+
+ %% Check that timeouts don't break normal message transfer.
+ {ok, Sc} = erlzmq:socket(Ctx, [push, {active, false}]),
+ ok = erlzmq:setsockopt(Sb, rcvtimeo, Timeout1),
+ ok = erlzmq:setsockopt(Sb, sndtimeo, Timeout1),
+ ok = erlzmq:connect(Sc, "inproc://timeout_test"),
+
+ Buff = <<"12345678ABCDEFGH12345678abcdefgh">>,
+ ok = erlzmq:send(Sc, Buff),
+ {ok, Buff} = erlzmq:recv(Sb),
+ %% Clean-up.
+ ok = erlzmq:close(Sc),
+ ok = erlzmq:close(Sb),
+ ok = erlzmq:term (Ctx).
+
+
bad_init_test() ->
?assertEqual({error, einval}, erlzmq:context(-1)).
@@ -94,7 +368,7 @@ shutdown_blocking_unblocking_test() ->
?assertMatch({error, {timeout, _}}, V),
{error, {timeout, Ref}} = V,
erlzmq:close(S),
- receive
+ receive
{Ref, ok} ->
ok
end.
@@ -177,7 +451,7 @@ ping_pong({S1, S2}, Msg, active) ->
?assertMatch({ok, Msg}, timeout)
end,
ok;
-
+
ping_pong({S1, S2}, Msg, passive) ->
ok = erlzmq:send(S1, Msg),
?assertMatch({ok, Msg}, erlzmq:recv(S2)),
Something went wrong with that request. Please try again.