Skip to content

Commit

Permalink
A clean-room implementation of zmq_proxy_steerable().
Browse files Browse the repository at this point in the history
It is contriubted under the MPL-2.0.

I had no knowledge of the previous implementation of zmq_proxy_steerable().

This version was developed based on expectations set in the old man page with one exception.  This version uses a REP/REQ for the proxy control protocol sockets.  The old man page example used PUB/SUB which is nonsensical given the STATISTICS command requires two way communication.
  • Loading branch information
brettviren committed Oct 8, 2023
1 parent 8cdc4ed commit 26f5b11
Show file tree
Hide file tree
Showing 8 changed files with 726 additions and 43 deletions.
1 change: 1 addition & 0 deletions AUTHORS
Expand Up @@ -27,6 +27,7 @@ Bernd Prager
Bob Beaty
Brandon Carpenter
Brett Cameron
Brett Viren
Brian Buchanan
Burak Arslan
Carl Clemens
Expand Down
5 changes: 5 additions & 0 deletions Makefile.am
Expand Up @@ -483,6 +483,7 @@ test_apps = \
tests/test_issue_566 \
tests/test_proxy_hwm \
tests/test_proxy_single_socket \
tests/test_proxy_steerable \
tests/test_proxy_terminate \
tests/test_getsockopt_memset \
tests/test_setsockopt \
Expand Down Expand Up @@ -731,6 +732,10 @@ tests_test_proxy_single_socket_SOURCES = tests/test_proxy_single_socket.cpp
tests_test_proxy_single_socket_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_proxy_single_socket_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

tests_test_proxy_steerable_SOURCES = tests/test_proxy_steerable.cpp
tests_test_proxy_steerable_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_proxy_steerable_CPPFLAGS = ${TESTUTIL_CPPFLAGS}

tests_test_proxy_terminate_SOURCES = tests/test_proxy_terminate.cpp
tests_test_proxy_terminate_LDADD = ${TESTUTIL_LIBS} src/libzmq.la
tests_test_proxy_terminate_CPPFLAGS = ${TESTUTIL_CPPFLAGS}
Expand Down
101 changes: 97 additions & 4 deletions doc/zmq_proxy_steerable.txt
Expand Up @@ -3,7 +3,7 @@ zmq_proxy_steerable(3)

NAME
----
zmq_proxy_steerable - DEPRECATED
zmq_proxy_steerable - built-in 0MQ proxy with control flow


SYNOPSIS
Expand All @@ -14,9 +14,102 @@ SYNOPSIS

DESCRIPTION
-----------
The _zmq_proxy_steerable()_ function is an empty stub that only returns an
*EOPNOTSUPP* error, as the author did not provide a relicense agreement for
the Mozilla Public License v2 relicense of libzmq.

The _zmq_proxy_steerable()_ function is a variant of the _zmq_proxy()_ function.
It accepts a fourth _control_ socket. When the _control_ socket is _NULL_ the
two functions operate identically.

When a _control_ socket of type _REP_ is provided to the proxy function the
application may send commands to the proxy. The following commands are
supported.

_PAUSE_::
The proxy will cease transferring messages between its endpoints.

_RESUME_::
The proxy will resume transferring messages between its endpoints.

_TERMINATE_::
The proxy function will exit with a return value of 0.

_STATISTICS_::
The proxy behavior will remain unchanged and reply with a set of simple summary values of the messages that have been sent through the proxy as described next.

There are eight statistics values, each of size _uint64_t_ in the multi-part
message reply to the _STATISTICS_ command. These are:

- number of messages received by the frontend socket

- number of bytes received by the frontend socket

- number of messages sent by the frontend socket

- number of bytes sent by the frontend socket

- number of messages received by the backend socket

- number of bytes received by the backend socket

- number of messages sent by the backend socket

- number of bytes sent by the backend socket


RETURN VALUE
------------
The _zmq_proxy_steerable()_ function returns 0 if TERMINATE is received on its
control socket. Otherwise, it returns -1 and errno set to ETERM or EINTR (the
0MQ context associated with either of the specified sockets was terminated) or
EFAULT (the provided frontend or backend was invalid).


EXAMPLE
-------
.Create a function to run the proxy
----
// Create the frontend and backend sockets to be proxied
void *frontend = zmq_socket (context, ZMQ_ROUTER);
void *backend = zmq_socket (context, ZMQ_DEALER);

// Create the proxy control socket
void *control = zmq_socket (context, ZMQ_REP);

// Bind the sockets.
zmq_bind (frontend, "tcp://*:5555");
zmq_bind (backend, "tcp://*:5556");
zmq_bind (control, "tcp://*:5557");

zmq_proxy_steerable(frontend, backend, NULL, control);
----
.Code in another thread/process to steer the proxy.
----
void *control = zmq_socket (context, ZMQ_REQ);
zmq_connect (control, "tcp://*:5557");

zmq_msg_t msg;

zmq_send (control, "PAUSE", 5, 0);
zmq_msg_recv (&msg, control, 0));

zmq_send (control, "RESUME", 6, 0);
zmq_msg_recv (&msg, control, 0));

zmq_send (control, "STATISTICS", 10, 0);
while (1) {
zmq_msg_recv (&msg, control, 0));
printf(" %lu", *(uint64_t *)zmq_msg_data (&msg));
if (!zmq_msg_get (&msg, ZMQ_MORE))
break;
}
printf("\n");

zmq_send (control, "TERMINATE", 9, 0);
zmq_msg_recv (&msg, control, 0));

zmq_close(frontend);
zmq_close(backend);
zmq_close(control);
----


SEE ALSO
Expand Down

0 comments on commit 26f5b11

Please sign in to comment.