Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #462 from hurtonm/raw_fixes

Fix raw mode on reconnect
  • Loading branch information...
commit ec0824142ec6bc4932919b53f5d511b6f983693e 2 parents 4e028ec + c1e960b
Pieter Hintjens hintjens authored
Showing with 109 additions and 9 deletions.
  1. +6 −9 src/session_base.cpp
  2. +103 −0 tests/test_raw_sock.cpp
15 src/session_base.cpp
View
@@ -120,11 +120,6 @@ zmq::session_base_t::session_base_t (class io_thread_t *io_thread_,
identity_received (false),
addr (addr_)
{
- // Identities are not exchanged for raw sockets
- if (options.raw_sock) {
- identity_sent = true;
- identity_received = true;
- }
}
zmq::session_base_t::~session_base_t ()
@@ -156,8 +151,9 @@ void zmq::session_base_t::attach_pipe (pipe_t *pipe_)
int zmq::session_base_t::pull_msg (msg_t *msg_)
{
- // First message to send is identity
- if (unlikely (!identity_sent)) {
+ // Unless the socket is in raw mode, the first
+ // message we send is its identity.
+ if (unlikely (!identity_sent && !options.raw_sock)) {
int rc = msg_->init_size (options.identity_size);
errno_assert (rc == 0);
memcpy (msg_->data (), options.identity, options.identity_size);
@@ -177,8 +173,9 @@ int zmq::session_base_t::pull_msg (msg_t *msg_)
int zmq::session_base_t::push_msg (msg_t *msg_)
{
- // First message to receive is identity
- if (unlikely (!identity_received)) {
+ // Unless the socket is in raw mode, the first
+ // message we receive is its identity.
+ if (unlikely (!identity_received && !options.raw_sock)) {
msg_->set_flags (msg_t::identity);
identity_received = true;
if (!options.recv_identity) {
103 tests/test_raw_sock.cpp
View
@@ -32,6 +32,7 @@
#include <fcntl.h>
#include <zmq.h>
#include <unistd.h>
+#include <poll.h>
//ToDo: Windows?
const char *test_str = "TEST-STRING";
@@ -63,6 +64,42 @@ int tcp_client ()
return sockfd;
}
+int tcp_server ()
+{
+ int listenfd = socket (AF_INET, SOCK_STREAM, 0);
+ assert (listenfd != -1);
+
+ int flag = 1;
+ int rc = setsockopt (listenfd, SOL_SOCKET, SO_REUSEADDR, &flag, sizeof flag);
+ assert (rc == 0);
+
+ struct sockaddr_in serv_addr;
+ bzero (&serv_addr, sizeof serv_addr);
+ serv_addr.sin_family = AF_INET;
+ serv_addr.sin_addr.s_addr = htonl (INADDR_ANY);
+ serv_addr.sin_port = htons (5555);
+
+ rc = bind (listenfd, (struct sockaddr *) &serv_addr, sizeof serv_addr);
+ assert (rc == 0);
+
+ rc = listen (listenfd, 8);
+ assert (rc == 0);
+
+ int sockfd = accept (listenfd, NULL, NULL);
+ assert (sockfd != -1);
+
+ rc = close (listenfd);
+ assert (rc == 0);
+
+ int flags = fcntl (sockfd, F_GETFL, 0);
+ if (flags == -1)
+ flags = 0;
+ rc = fcntl (sockfd, F_SETFL, flags | O_NONBLOCK);
+ assert (rc != -1);
+
+ return sockfd;
+}
+
void tcp_client_write (int sockfd, const void *buf, int buf_len)
{
assert (buf);
@@ -90,11 +127,75 @@ void tcp_client_read (int sockfd)
assert (memcmp (buffer, test_str, strlen (test_str)) == 0);
}
+size_t tcp_read (int s, char *buf, size_t bufsize)
+{
+ size_t bytes_read = 0;
+
+ struct pollfd pfd = {s, POLLIN};
+ int rc = poll (&pfd, 1, 100);
+
+ while (rc > 0 && bytes_read < bufsize) {
+ int n = read (s, buf + bytes_read, bufsize - bytes_read);
+ if (n <= 0)
+ return bytes_read;
+ bytes_read += n;
+ rc = poll (&pfd, 1, 100);
+ }
+
+ return bytes_read;
+}
+
void tcp_client_close (int sockfd)
{
close (sockfd);
}
+void test_zmq_connect ()
+{
+ void *ctx = zmq_init (1);
+ assert (ctx);
+
+ void *zs = zmq_socket (ctx, ZMQ_ROUTER);
+ assert (zs);
+
+ int rc = zmq_setsockopt (zs, ZMQ_IDENTITY, "X", 1);
+ assert (rc == 0);
+
+ int raw_sock = 1;
+ rc = zmq_setsockopt (zs, ZMQ_ROUTER_RAW, &raw_sock, sizeof raw_sock);
+ assert (rc == 0);
+
+ rc = zmq_connect (zs, "tcp://127.0.0.1:5555");
+ assert (rc == 0);
+
+ int i;
+ for (i = 0; i < 8; i++) {
+ int server_fd = tcp_server ();
+ assert (server_fd != -1);
+
+ zmq_msg_t msg;
+ rc = zmq_msg_init_size (&msg, strlen (test_str));
+ assert (rc == 0);
+ memcpy (zmq_msg_data (&msg), test_str, strlen (test_str));
+ rc = zmq_msg_send (&msg, zs, 0);
+
+ char buffer [128];
+ size_t bytes_read = tcp_read (server_fd, buffer, sizeof buffer);
+
+ assert (bytes_read == strlen (test_str)
+ || memcmp (buffer, test_str, bytes_read) == 0);
+
+ rc = close (server_fd);
+ assert (rc == 0);
+ }
+
+ rc = zmq_close (zs);
+ assert (rc == 0);
+
+ rc = zmq_term (ctx);
+ assert (rc == 0);
+}
+
int main ()
{
fprintf (stderr, "test_raw_sock running...\n");
@@ -148,6 +249,8 @@ int main ()
zmq_close (sb);
zmq_term (ctx);
+ test_zmq_connect ();
+
fprintf (stderr, "test_raw_sock PASSED.\n");
return 0;
Please sign in to comment.
Something went wrong with that request. Please try again.