Skip to content

Commit

Permalink
Merge 49553eb into 9d31965
Browse files Browse the repository at this point in the history
  • Loading branch information
nnog committed Oct 12, 2023
2 parents 9d31965 + 49553eb commit 5483d46
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 43 deletions.
12 changes: 7 additions & 5 deletions doc/zmq_proxy_steerable.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,21 +19,21 @@ The _zmq_proxy_steerable()_ function is a variant of the _zmq_proxy()_ function.
It accepts a fourth _control_ socket. When the _control_ socket is _NULL_ the
two functions operate identically.

When a _control_ socket of type _REP_ is provided to the proxy function the
When a _control_ socket of type _REP_ or _PAIR_ is provided to the proxy function the
application may send commands to the proxy. The following commands are
supported.

_PAUSE_::
The proxy will cease transferring messages between its endpoints.
The proxy will cease transferring messages between its endpoints. _REP_ control socket will reply with an empty message. No response otherwise.

_RESUME_::
The proxy will resume transferring messages between its endpoints.
The proxy will resume transferring messages between its endpoints. _REP_ control socket will reply with an empty message. No response otherwise.

_TERMINATE_::
The proxy function will exit with a return value of 0.
The proxy function will exit with a return value of 0. _REP_ control socket will reply with an empty message. No response otherwise.

_STATISTICS_::
The proxy behavior will remain unchanged and reply with a set of simple summary values of the messages that have been sent through the proxy as described next.
The proxy state will remain unchanged and reply with a set of simple summary values of the messages that have been sent through the proxy as described next. Control socket must support sending.

There are eight statistics values, each of size _uint64_t_ in the multi-part
message reply to the _STATISTICS_ command. These are:
Expand All @@ -54,6 +54,8 @@ message reply to the _STATISTICS_ command. These are:

- number of bytes sent by the backend socket

Message totals count each part in a multipart message individually.


RETURN VALUE
------------
Expand Down
21 changes: 13 additions & 8 deletions src/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,19 +189,24 @@ static int handle_control (class zmq::socket_base_t *control_,
return 0;
}

if (msiz == 5 && memcmp (command, "\x05PAUSE", 6)) {
state = active;
} else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) {
if (msiz == 5 && 0 == memcmp (command, "PAUSE", 5)) {
state = paused;
} else if (msiz == 6 && 0 == memcmp (command, "RESUME", 6)) {
state = active;
} else if (msiz == 9 && 0 == memcmp (command, "TERMINATE", 9)) {
state = terminated;
}

// satisfy REP duty and reply no matter what.
cmsg.init_size (0);
rc = control_->send (&cmsg, 0);
if (unlikely (rc < 0)) {
return -1;
int type;
size_t sz = sizeof (type);
zmq_getsockopt (control_, ZMQ_TYPE, &type, &sz);
if (type == ZMQ_REP) {
// satisfy REP duty and reply no matter what.
cmsg.init_size (0);
rc = control_->send (&cmsg, 0);
if (unlikely (rc < 0)) {
return -1;
}
}
return 0;
}
Expand Down
127 changes: 97 additions & 30 deletions tests/test_proxy_steerable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include <stdlib.h>
#include <string.h>
#include <inttypes.h>

#define CONTENT_SIZE 13
#define CONTENT_SIZE_MAX 32
Expand All @@ -25,6 +26,9 @@ void *g_clients_pkts_out = NULL;
void *g_workers_pkts_out = NULL;
void *control_context = NULL; // worker control, not proxy control

int g_proxy_control_socktype =
ZMQ_PAIR; //or ZMQ_PAIR, ZMQ_SUB (without statistics)

void setUp ()
{
setup_test_context ();
Expand Down Expand Up @@ -90,7 +94,7 @@ static void client_task (void *db_)

int request_nbr = 0;
bool run = true;
bool keep_sending = true;
bool enable_send = false;
while (run) {
// Tick once per 200 ms, pulling in arriving messages
int centitick;
Expand Down Expand Up @@ -126,14 +130,17 @@ static void client_task (void *db_)
break;
}
if (memcmp (content, "STOP", 4) == 0) {
keep_sending = false;
enable_send = false;
break;
}
if (memcmp (content, "START", 5) == 0) {
enable_send = true;
}
}
}
}

if (keep_sending) {
if (enable_send) {
snprintf (content, CONTENT_SIZE_MAX * sizeof (char),
"request #%03d", ++request_nbr); // CONTENT_SIZE
if (is_verbose)
Expand Down Expand Up @@ -203,9 +210,14 @@ void server_task (void * /*unused_*/)
}

// Proxy control socket
void *proxy_control = zmq_socket (get_test_context (), ZMQ_REP);
void *proxy_control =
zmq_socket (get_test_context (), g_proxy_control_socktype);
TEST_ASSERT_NOT_NULL (proxy_control);
TEST_ASSERT_SUCCESS_ERRNO (zmq_bind (proxy_control, proxy_control_address));
if (g_proxy_control_socktype == ZMQ_SUB) {
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (proxy_control, ZMQ_SUBSCRIBE, "", 0));
}

// Connect backend to frontend via a steerable proxy
int rc = zmq_proxy_steerable (frontend, backend, NULL, proxy_control);
Expand Down Expand Up @@ -319,6 +331,56 @@ static void server_worker (void * /*unused_*/)
//
// - 7/bsb: number of bytes sent out the backend socket

uint64_t read_stat_value (void *proxy_control)
{
zmq_msg_t stats_msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0));
TEST_ASSERT_EQUAL_INT (sizeof (uint64_t),
zmq_msg_size (&stats_msg));
uint64_t val = *(uint64_t *) zmq_msg_data (&stats_msg);
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
return val;
}

//return total bytes proxied, so we can test PAUSE/RESUME
uint64_t statistics (void *proxy_control, const char *runctx)
{
if (is_verbose) {
printf ("steer: sending STATISTICS - %s\n", runctx);
}

TEST_ASSERT_SUCCESS_ERRNO (zmq_send (proxy_control, "STATISTICS", 10, 0));

uint64_t total_bytes_proxied = 0;
for (int count = 0; count < 8; ++count) {
uint64_t val = read_stat_value (proxy_control);
if (is_verbose) {
if (count == 0) {
printf ("stats: client pkts out: %d worker pkts out: %d { ",
zmq_atomic_counter_value (g_clients_pkts_out),
zmq_atomic_counter_value (g_workers_pkts_out));
}
printf ("%" PRIu64, val);
if (count == 7) {
printf ("}\n");
}
}
switch (count) {
case 3: //bytes sent on frontend
case 7: //bytes sent on backend
total_bytes_proxied += val;
}
}

int rcvmore;
size_t sz = sizeof (rcvmore);
zmq_getsockopt (proxy_control, ZMQ_RCVMORE, &rcvmore, &sz);
TEST_ASSERT_EQUAL_INT (rcvmore, 0);
return total_bytes_proxied;
}


// The main thread simply starts several clients and a server, and then
// waits for the server to finish.

Expand All @@ -328,32 +390,18 @@ void steer (void *proxy_control, const char *command, const char *runctx)
printf ("steer: sending %s - %s\n", command, runctx);
}

// Start with proxy paused
TEST_ASSERT_SUCCESS_ERRNO (
zmq_send (proxy_control, command, strlen (command), 0));

zmq_msg_t stats_msg;
int count = -1;
while (1) {
count = count + 1;
if (g_proxy_control_socktype == ZMQ_REP) {
//expect an empty reply from REP for commands that need no response
zmq_msg_t stats_msg;
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_init (&stats_msg));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_recv (&stats_msg, proxy_control, 0));


if (is_verbose && zmq_msg_size (&stats_msg)) {
if (count == 0) {
printf ("steer:");
}
printf (" %lu", *(unsigned long int *) zmq_msg_data (&stats_msg));
if (count == 7) {
printf ("\n");
}
}
if (!zmq_msg_get (&stats_msg, ZMQ_MORE))
break;
TEST_ASSERT_EQUAL_INT (zmq_msg_size (&stats_msg), 0);
TEST_ASSERT (!zmq_msg_get (&stats_msg, ZMQ_MORE));
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
}
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&stats_msg));
}

void test_proxy_steerable ()
Expand Down Expand Up @@ -382,31 +430,50 @@ void test_proxy_steerable ()
msleep (500); // Run for 500 ms then quit

// Proxy control socket
void *proxy_control = zmq_socket (get_test_context (), ZMQ_REQ);
int control_socktype = ZMQ_PAIR;
switch (g_proxy_control_socktype) {
case ZMQ_REP:
control_socktype = ZMQ_REQ;
break;
case ZMQ_SUB:
control_socktype = ZMQ_PUB;
break;
default:
break;
}
void *proxy_control = zmq_socket (get_test_context (), control_socktype);
TEST_ASSERT_NOT_NULL (proxy_control);
linger = 0;
TEST_ASSERT_SUCCESS_ERRNO (
zmq_setsockopt (proxy_control, ZMQ_LINGER, &linger, sizeof (linger)));
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (proxy_control, proxy_control_address));

TEST_ASSERT (statistics (proxy_control, "should be all 0s before clients start") == 0);

send_string_expect_success (control, "START", 0);

msleep (500); // Run for 500 ms then quit

steer (proxy_control, "STATISTICS", "started clients");
steer (proxy_control, "PAUSE", "started server");
TEST_ASSERT (statistics (proxy_control, "started clients") > 0);
steer (proxy_control, "PAUSE", "pausing proxying after 500ms");
uint64_t bytes = statistics (proxy_control, "post-pause");

msleep (500); // Run for 500 ms then quit

steer (proxy_control, "RESUME", "started clients");
TEST_ASSERT (statistics (proxy_control, "post-pause") == bytes);

steer (proxy_control, "RESUME", "resuming proxying after another 500ms");

msleep (500); // Run for 500 ms then quit

steer (proxy_control, "STATISTICS", "ran for a while");
TEST_ASSERT (statistics (proxy_control, "ran for a while") > bytes);

if (is_verbose)
printf ("stopping all clients and server workers\n");
send_string_expect_success (control, "STOP", 0);

steer (proxy_control, "STATISTICS", "stopped clients and workers");
statistics (proxy_control, "stopped clients and workers");

msleep (500); // Wait for all clients and workers to STOP

Expand All @@ -415,7 +482,7 @@ void test_proxy_steerable ()
send_string_expect_success (control, "TERMINATE", 0);

msleep (500);
steer (proxy_control, "STATISTICS", "terminate clients and server workers");
statistics (proxy_control, "terminate clients and server workers");

msleep (500); // Wait for all clients and workers to terminate
steer (proxy_control, "TERMINATE", "terminate proxy");
Expand Down

0 comments on commit 5483d46

Please sign in to comment.