Skip to content

Commit

Permalink
Merge pull request #1902 from bluca/zmq_unbind_api_breakage
Browse files Browse the repository at this point in the history
  • Loading branch information
c-rack committed Apr 18, 2016
2 parents 8d49650 + c8211bf commit 136870f
Show file tree
Hide file tree
Showing 4 changed files with 316 additions and 1 deletion.
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,11 @@ matrix:
os: osx
- env: BUILD_TYPE=valgrind CURVE=tweetnacl
os: osx
include:
- env: BUILD_TYPE=default CURVE=tweetnacl IPv6=ON
os: linux
dist: precise
sudo: false

sudo: required

Expand Down
32 changes: 31 additions & 1 deletion src/socket_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1029,8 +1029,38 @@ int zmq::socket_base_t::term_endpoint (const char *addr_)
return 0;
}

std::string resolved_addr = std::string (addr_);
std::pair <endpoints_t::iterator, endpoints_t::iterator> range;

// The resolved last_endpoint is used as a key in the endpoints map.
// The address passed by the user might not match in the TCP case due to
// IPv4-in-IPv6 mapping (EG: tcp://[::ffff:127.0.0.1]:9999), so try to
// resolve before giving up. Given at this stage we don't know whether a
// socket is connected or bound, try with both.
if (protocol == "tcp") {
range = endpoints.equal_range (resolved_addr);
if (range.first == range.second) {
tcp_address_t *tcp_addr = new (std::nothrow) tcp_address_t ();
alloc_assert (tcp_addr);
rc = tcp_addr->resolve (address.c_str (), false, options.ipv6);

if (rc == 0) {
tcp_addr->to_string (resolved_addr);
range = endpoints.equal_range (resolved_addr);

if (range.first == range.second) {
rc = tcp_addr->resolve (address.c_str (), true, options.ipv6);
if (rc == 0) {
tcp_addr->to_string (resolved_addr);
}
}
}
LIBZMQ_DELETE(tcp_addr);
}
}

// Find the endpoints range (if any) corresponding to the addr_ string.
std::pair <endpoints_t::iterator, endpoints_t::iterator> range = endpoints.equal_range (std::string (addr_));
range = endpoints.equal_range (resolved_addr);
if (range.first == range.second) {
errno = ENOENT;
EXIT_MUTEX ();
Expand Down
228 changes: 228 additions & 0 deletions tests/test_unbind_wildcard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,16 @@
int main (void)
{
setup_test_environment();
int ipv6 = is_ipv6_available ();
void *ctx = zmq_ctx_new ();
assert (ctx);

/* Address wildcard, IPv6 disabled */
void *sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
void *sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);

int rc = zmq_bind (sb, "tcp://*:5555");
assert (rc == 0);

Expand All @@ -35,12 +40,235 @@ int main (void)
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len);
assert (rc == 0);

rc = zmq_connect (sc, endpoint);
assert (rc == 0);

bounce (sb, sc);

rc = zmq_disconnect (sc, endpoint);
assert (rc == 0);
rc = zmq_unbind (sb, endpoint);
assert (rc == 0);

rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);

/* Address wildcard, IPv6 enabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);

rc = zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);

rc = zmq_bind (sb, "tcp://*:5556");
assert (rc == 0);

endpoint_len = sizeof (endpoint);
memset(endpoint, 0, endpoint_len);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len);
assert (rc == 0);

rc = zmq_connect (sc, endpoint);
assert (rc == 0);

bounce (sb, sc);

rc = zmq_disconnect (sc, endpoint);
assert (rc == 0);
rc = zmq_unbind (sb, endpoint);
assert (rc == 0);

rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);

/* Port wildcard, IPv4 address, IPv6 disabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);

rc = zmq_bind (sb, "tcp://127.0.0.1:*");
assert (rc == 0);

endpoint_len = sizeof (endpoint);
memset(endpoint, 0, endpoint_len);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len);
assert (rc == 0);

rc = zmq_connect (sc, endpoint);
assert (rc == 0);

bounce (sb, sc);

rc = zmq_disconnect (sc, endpoint);
assert (rc == 0);
rc = zmq_unbind (sb, endpoint);
assert (rc == 0);

rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);

/* Port wildcard, IPv4 address, IPv6 enabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);

rc = zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);

rc = zmq_bind (sb, "tcp://127.0.0.1:*");
assert (rc == 0);

endpoint_len = sizeof (endpoint);
memset(endpoint, 0, endpoint_len);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len);
assert (rc == 0);

rc = zmq_connect (sc, endpoint);
assert (rc == 0);

bounce (sb, sc);

rc = zmq_disconnect (sc, endpoint);
assert (rc == 0);
rc = zmq_unbind (sb, endpoint);
assert (rc == 0);

rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);

if (ipv6) {
/* Port wildcard, IPv6 address, IPv6 enabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);

rc = zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);

rc = zmq_bind (sb, "tcp://[::1]:*");
assert (rc == 0);

endpoint_len = sizeof (endpoint);
memset(endpoint, 0, endpoint_len);
rc = zmq_getsockopt (sb, ZMQ_LAST_ENDPOINT, endpoint, &endpoint_len);
assert (rc == 0);

rc = zmq_connect (sc, endpoint);
assert (rc == 0);

bounce (sb, sc);

rc = zmq_disconnect (sc, endpoint);
assert (rc == 0);
rc = zmq_unbind (sb, endpoint);
assert (rc == 0);

rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
}

/* No wildcard, IPv4 address, IPv6 disabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);

rc = zmq_bind (sb, "tcp://127.0.0.1:5557");
assert (rc == 0);
rc = zmq_connect (sc, "tcp://127.0.0.1:5557");
assert (rc == 0);

bounce (sb, sc);

rc = zmq_disconnect (sc, "tcp://127.0.0.1:5557");
assert (rc == 0);
rc = zmq_unbind (sb, "tcp://127.0.0.1:5557");
assert (rc == 0);

rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);

/* No wildcard, IPv4 address, IPv6 enabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);

rc = zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);

rc = zmq_bind (sb, "tcp://127.0.0.1:5558");
assert (rc == 0);
rc = zmq_connect (sc, "tcp://127.0.0.1:5558");
assert (rc == 0);

bounce (sb, sc);

rc = zmq_disconnect (sc, "tcp://127.0.0.1:5558");
assert (rc == 0);
rc = zmq_unbind (sb, "tcp://127.0.0.1:5558");
assert (rc == 0);

rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);

if (ipv6) {
/* No wildcard, IPv6 address, IPv6 enabled */
sb = zmq_socket (ctx, ZMQ_REP);
assert (sb);
sc = zmq_socket (ctx, ZMQ_REQ);
assert (sc);

rc = zmq_setsockopt (sb, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);
rc = zmq_setsockopt (sc, ZMQ_IPV6, &ipv6, sizeof (int));
assert (rc == 0);

rc = zmq_bind (sb, "tcp://[::1]:5559");
assert (rc == 0);
rc = zmq_connect (sc, "tcp://[::1]:5559");
assert (rc == 0);

bounce (sb, sc);

rc = zmq_disconnect (sc, "tcp://[::1]:5559");
assert (rc == 0);
rc = zmq_unbind (sb, "tcp://[::1]:5559");
assert (rc == 0);

rc = zmq_close (sc);
assert (rc == 0);
rc = zmq_close (sb);
assert (rc == 0);
}

rc = zmq_ctx_term (ctx);
assert (rc == 0);

Expand Down
52 changes: 52 additions & 0 deletions tests/testutil.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
#include <string.h>

#if defined _WIN32
# include "windows.hpp"
# if defined _MSC_VER
# include <crtdbg.h>
# pragma warning(disable:4996)
Expand All @@ -61,6 +62,9 @@
# include <signal.h>
# include <stdlib.h>
# include <sys/wait.h>
# include <sys/socket.h>
# include <netinet/in.h>
# include <arpa/inet.h>
#endif

// Bounce a message from client to server and back
Expand Down Expand Up @@ -318,5 +322,53 @@ msleep (int milliseconds)
#endif
}

// check if IPv6 is available (0/false if not, 1/true if it is)
// only way to reliably check is to actually open a socket and try to bind it
int
is_ipv6_available(void)
{
int rc, ipv6 = 1;
struct sockaddr_in6 test_addr;

memset (&test_addr, 0, sizeof (test_addr));
test_addr.sin6_family = AF_INET6;
inet_pton (AF_INET6, "::1", &(test_addr.sin6_addr));

#ifdef ZMQ_HAVE_WINDOWS
SOCKET fd = socket (AF_INET6, SOCK_STREAM, IPPROTO_IP);
if (fd == INVALID_SOCKET)
ipv6 = 0;
else {
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (const char *)&ipv6, sizeof(int));
rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, (const char *)&ipv6, sizeof(int));
if (rc == SOCKET_ERROR)
ipv6 = 0;
else {
rc = bind (fd, (struct sockaddr *)&test_addr, sizeof (test_addr));
if (rc == SOCKET_ERROR)
ipv6 = 0;
}
closesocket (fd);
}
#else
int fd = socket (AF_INET6, SOCK_STREAM, IPPROTO_IP);
if (fd == -1)
ipv6 = 0;
else {
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &ipv6, sizeof(int));
rc = setsockopt(fd, IPPROTO_IPV6, IPV6_V6ONLY, &ipv6, sizeof(int));
if (rc != 0)
ipv6 = 0;
else {
rc = bind (fd, (struct sockaddr *)&test_addr, sizeof (test_addr));
if (rc != 0)
ipv6 = 0;
}
close (fd);
}
#endif

return ipv6;
}

#endif

0 comments on commit 136870f

Please sign in to comment.