Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Fix issue 349

https://zeromq.jira.com/browse/LIBZMQ-349
Add the send/recv socket options from 3.1.0.
  • Loading branch information...
commit 1e5c6fa0077dcd5767bc557fbccaf383dee98339 1 parent 6ab8ef0
@jhawk28 jhawk28 authored
View
1  .gitignore
@@ -29,6 +29,7 @@ tests/test_reqrep_ipc
tests/test_reqrep_tcp
tests/test_shutdown_stress
tests/test_hwm
+tests/test_timeo
src/platform.hpp*
src/stamp-h1
devices/zmq_forwarder/zmq_forwarder
View
2  NEWS
@@ -4,6 +4,8 @@
Bug fixes
---------
+* Fixed issue 349, add send/recv socket options
+
* Fixed issue 301, fix builds on HP-UX 11iv3 when using either gcc or aCC.
* Fixed issue 305, memory leakage when using dynamic subscriptions.
View
32 doc/zmq_getsockopt.txt
@@ -394,6 +394,38 @@ assert (rc == 0);
----
+ZMQ_RCVTIMEO: Maximum time before a socket operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Retrieve the timeout for recv operation on the socket. If the value is `0`,
+_zmq_recv(3)_ will return immediately, with a EAGAIN error if there is no
+message to receive. If the value is `-1`, it will block until a message is
+available. For all other values, it will wait for a message for that amount
+of time before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
+ZMQ_SNDTIMEO: Maximum time before a socket operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Retrieve the timeout for send operation on the socket. If the value is `0`,
+_zmq_send(3)_ will return immediately, with a EAGAIN error if the message
+cannot be sent. If the value is `-1`, it will block until the message is sent.
+For all other values, it will try to send the message for that amount of time
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
SEE ALSO
--------
linkzmq:zmq_setsockopt[3]
View
32 doc/zmq_setsockopt.txt
@@ -369,6 +369,38 @@ assert (rc);
----
+ZMQ_RCVTIMEO: Maximum time before a recv operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sets the timeout for receive operation on the socket. If the value is `0`,
+_zmq_recv(3)_ will return immediately, with a EAGAIN error if there is no
+message to receive. If the value is `-1`, it will block until a message is
+available. For all other values, it will wait for a message for that amount
+of time before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
+ZMQ_SNDTIMEO: Maximum time before a send operation returns with EAGAIN
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Sets the timeout for send operation on the socket. If the value is `0`,
+_zmq_send(3)_ will return immediately, with a EAGAIN error if the message
+cannot be sent. If the value is `-1`, it will block until the message is sent.
+For all other values, it will try to send the message for that amount of time
+before returning with an EAGAIN error.
+
+[horizontal]
+Option value type:: int
+Option value unit:: milliseconds
+Default value:: -1 (infinite)
+Applicable socket types:: all
+
+
SEE ALSO
--------
linkzmq:zmq_getsockopt[3]
View
4 include/zmq.h
@@ -211,7 +211,9 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_BACKLOG 19
#define ZMQ_RECOVERY_IVL_MSEC 20 /* opt. recovery time, reconcile in 3.x */
#define ZMQ_RECONNECT_IVL_MAX 21
-
+#define ZMQ_RCVTIMEO 27
+#define ZMQ_SNDTIMEO 28
+
/* Send/recv options. */
#define ZMQ_NOBLOCK 1
#define ZMQ_SNDMORE 2
View
39 src/options.cpp
@@ -18,6 +18,7 @@
along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include <stdio.h>
#include <string.h>
#ifndef ZMQ_HAVE_WINDOWS
#include <sys/stat.h>
@@ -44,7 +45,9 @@ zmq::options_t::options_t () :
backlog (100),
requires_in (false),
requires_out (false),
- immediate_connect (true)
+ immediate_connect (true),
+ rcvtimeo (-1),
+ sndtimeo (-1)
{
}
@@ -195,6 +198,22 @@ int zmq::options_t::setsockopt (int option_, const void *optval_,
backlog = *((int*) optval_);
return 0;
+ case ZMQ_RCVTIMEO:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ rcvtimeo = *((int*) optval_);
+ return 0;
+
+ case ZMQ_SNDTIMEO:
+ if (optvallen_ != sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ sndtimeo = *((int*) optval_);
+ return 0;
+
}
errno = EINVAL;
@@ -341,6 +360,24 @@ int zmq::options_t::getsockopt (int option_, void *optval_, size_t *optvallen_)
*optvallen_ = sizeof (int);
return 0;
+ case ZMQ_RCVTIMEO:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = rcvtimeo;
+ *optvallen_ = sizeof (int);
+ return 0;
+
+ case ZMQ_SNDTIMEO:
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = sndtimeo;
+ *optvallen_ = sizeof (int);
+ return 0;
+
}
errno = EINVAL;
View
4 src/options.hpp
@@ -80,6 +80,10 @@ namespace zmq
// is not aware of the peer's identity, however, it is able to send
// messages straight away.
bool immediate_connect;
+
+ // The timeout for send/recv operations for this socket.
+ int rcvtimeo;
+ int sndtimeo;
};
}
View
8 src/socket_base.cpp
@@ -510,13 +510,13 @@ int zmq::socket_base_t::send (::zmq_msg_t *msg_, int flags_)
// In case of non-blocking send we'll simply propagate
// the error - including EAGAIN - upwards.
- if (flags_ & ZMQ_NOBLOCK)
+ if (flags_ & ZMQ_NOBLOCK || options.sndtimeo == 0)
return -1;
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
clock_t clock ;
- int timeout = -1;
+ int timeout = options.sndtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
// Oops, we couldn't send the message. Wait for the next
@@ -589,7 +589,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// For non-blocking recv, commands are processed in case there's an
// activate_reader command already waiting int a command pipe.
// If it's not, return EAGAIN.
- if (flags_ & ZMQ_NOBLOCK) {
+ if (flags_ & ZMQ_NOBLOCK || options.rcvtimeo == 0) {
if (errno != EAGAIN)
return -1;
if (unlikely (process_commands (0, false) != 0))
@@ -608,7 +608,7 @@ int zmq::socket_base_t::recv (::zmq_msg_t *msg_, int flags_)
// Compute the time when the timeout should occur.
// If the timeout is infite, don't care.
clock_t clock ;
- int timeout = -1;
+ int timeout = options.rcvtimeo;
uint64_t end = timeout < 0 ? 0 : (clock.now_ms () + timeout);
// In blocking scenario, commands are processed over and over again until
View
4 tests/Makefile.am
@@ -10,7 +10,8 @@ noinst_PROGRAMS = test_pair_inproc \
if !ON_MINGW
noinst_PROGRAMS += test_shutdown_stress \
test_pair_ipc \
- test_reqrep_ipc
+ test_reqrep_ipc \
+ test_timeo
endif
test_pair_inproc_SOURCES = test_pair_inproc.cpp testutil.hpp
@@ -25,6 +26,7 @@ if !ON_MINGW
test_shutdown_stress_SOURCES = test_shutdown_stress.cpp
test_pair_ipc_SOURCES = test_pair_ipc.cpp testutil.hpp
test_reqrep_ipc_SOURCES = test_reqrep_ipc.cpp testutil.hpp
+test_timeo_SOURCES = test_timeo.cpp
endif
TESTS = $(noinst_PROGRAMS)
View
125 tests/test_timeo.cpp
@@ -0,0 +1,125 @@
+/*
+ Copyright (c) 2010-2011 250bpm s.r.o.
+ Copyright (c) 2010-2011 Other contributors as noted in the AUTHORS file
+
+ This file is part of 0MQ.
+
+ 0MQ is free software; you can redistribute it and/or modify it under
+ the terms of the GNU Lesser General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ 0MQ is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include <assert.h>
+#include <string.h>
+#include <pthread.h>
+#include <stdio.h>
+
+#include "../include/zmq.h"
+#include "../include/zmq_utils.h"
+
+extern "C"
+{
+ void *worker(void *ctx)
+ {
+ // Worker thread connects after delay of 1 second. Then it waits
+ // for 1 more second, so that async connect has time to succeed.
+ zmq_sleep (1);
+ void *sc = zmq_socket (ctx, ZMQ_PUSH);
+ assert (sc);
+ int rc = zmq_connect (sc, "inproc://timeout_test");
+ assert (rc == 0);
+ zmq_sleep (1);
+ rc = zmq_close (sc);
+ assert (rc == 0);
+ return NULL;
+ }
+}
+
+int main (int argc, char *argv [])
+{
+ fprintf (stderr, "test_timeo running...\n");
+
+ void *ctx = zmq_init (1);
+ assert (ctx);
+
+ // Create a disconnected socket.
+ void *sb = zmq_socket (ctx, ZMQ_PULL);
+ assert (sb);
+ int rc = zmq_bind (sb, "inproc://timeout_test");
+ assert (rc == 0);
+
+ // Check whether non-blocking recv returns immediately.
+ zmq_msg_t msg;
+ zmq_msg_init (&msg);
+ rc = zmq_recv (sb, &msg, ZMQ_NOBLOCK);
+ assert (rc == -1);
+ assert (zmq_errno() == EAGAIN);
+
+ // Check whether recv timeout is honoured.
+ int timeout = 500;
+ size_t timeout_size = sizeof timeout;
+ rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
+ assert (rc == 0);
+ void *watch = zmq_stopwatch_start ();
+ rc = zmq_recv (sb, &msg, 0);
+ assert (rc == -1);
+ assert (zmq_errno () == EAGAIN);
+ unsigned long elapsed = zmq_stopwatch_stop (watch);
+ assert (elapsed > 440000 && elapsed < 550000);
+
+ // Check whether connection during the wait doesn't distort the timeout.
+ timeout = 2000;
+ rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
+ assert (rc == 0);
+ pthread_t thread;
+ rc = pthread_create (&thread, NULL, worker, ctx);
+ assert (rc == 0);
+ watch = zmq_stopwatch_start ();
+ rc = zmq_recv (sb, &msg, 0);
+ assert (rc == -1);
+ assert (zmq_errno () == EAGAIN);
+ elapsed = zmq_stopwatch_stop (watch);
+ assert (elapsed > 1900000 && elapsed < 2100000);
+ rc = pthread_join (thread, NULL);
+ assert (rc == 0);
+
+ // Check that timeouts don't break normal message transfer.
+ void *sc = zmq_socket (ctx, ZMQ_PUSH);
+ assert (sc);
+ rc = zmq_setsockopt(sb, ZMQ_RCVTIMEO, &timeout, timeout_size);
+ assert (rc == 0);
+ rc = zmq_setsockopt(sb, ZMQ_SNDTIMEO, &timeout, timeout_size);
+ assert (rc == 0);
+ rc = zmq_connect (sc, "inproc://timeout_test");
+ assert (rc == 0);
+
+ zmq_msg_init (&msg);
+ zmq_msg_init_size (&msg, 32);
+ memcpy (zmq_msg_data (&msg), "12345678ABCDEFGH12345678abcdefgh", 32);
+ rc = zmq_send (sc, &msg, 0);
+ assert (rc == 0);
+
+ rc = zmq_recv (sb, &msg, 0);
+ assert (rc == 0);
+ assert (32 == zmq_msg_size(&msg));
+
+ // Clean-up.
+ rc = zmq_close (sc);
+ assert (rc == 0);
+ rc = zmq_close (sb);
+ assert (rc == 0);
+ rc = zmq_term (ctx);
+ assert (rc == 0);
+
+ return 0 ;
+}
+
Please sign in to comment.
Something went wrong with that request. Please try again.