Skip to content

Commit

Permalink
Fix code style
Browse files Browse the repository at this point in the history
  • Loading branch information
emtr committed Mar 7, 2019
1 parent 52615c8 commit 6cda0a8
Showing 1 changed file with 44 additions and 53 deletions.
97 changes: 44 additions & 53 deletions perf/proxy_thr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,21 @@
#endif


//
// Asynchronous proxy benchmark using ZMQ_XPUB_NODROP.
//
// Topology:
//
// XPUB SUB
// | |
// +-----> XSUB -> XPUB -----/
// | ^^^^^^^^^^^^
// XPUB ZMQ proxy
//
// All connections use "inproc" transport. The two XPUB sockets start
// flooding the proxy. The throughput is computed using the bytes received
// in the SUB socket.
//
/*
Asynchronous proxy benchmark using ZMQ_XPUB_NODROP.
Topology:
XPUB SUB
| |
+-----> XSUB -> XPUB -----/
| ^^^^^^^^^^^^
XPUB ZMQ proxy
All connections use "inproc" transport. The two XPUB sockets start
flooding the proxy. The throughput is computed using the bytes received
in the SUB socket.
*/


#define HWM 10000
Expand Down Expand Up @@ -96,7 +96,7 @@ int test_assert_success_message_errno_helper (int rc_,
if (rc_ == -1) {
char buffer[512];
buffer[sizeof (buffer) - 1] =
0; // to ensure defined behavior with VC++ <= 2013
0; // to ensure defined behavior with VC++ <= 2013
printf ("%s failed%s%s%s, errno = %i (%s)", expr_,
msg_ ? " (additional info: " : "", msg_ ? msg_ : "",
msg_ ? ")" : "", zmq_errno (), zmq_strerror (zmq_errno ()));
Expand Down Expand Up @@ -168,8 +168,8 @@ static void publisher_thread_main (void *pvoid)
zmq_connect (pubsocket, cfg->frontend_endpoint[idx]));


// Wait before starting TX operations till 1 subscriber has subscribed
// (in this test there's 1 subscriber only)
// Wait before starting TX operations till 1 subscriber has subscribed
// (in this test there's 1 subscriber only)
const char subscription_to_all_topics[] = {1, 0};
recv_string_expect_success (pubsocket, subscription_to_all_topics, 0);

Expand All @@ -187,7 +187,7 @@ static void publisher_thread_main (void *pvoid)
rc = zmq_msg_copy (&msg, &msg_orig);
assert (rc == 0);

// Send the message to the socket
// Send the message to the socket
rc = zmq_msg_send (&msg, pubsocket, 0);
if (rc != -1) {
send_count++;
Expand Down Expand Up @@ -215,7 +215,7 @@ static void subscriber_thread_main (void *pvoid)
TEST_ASSERT_SUCCESS_ERRNO (
zmq_connect (subsocket, cfg->backend_endpoint[idx]));

// receive message_count messages
// Receive message_count messages
uint64_t rxsuccess = 0;
bool success = true;
while (success) {
Expand All @@ -227,15 +227,13 @@ static void subscriber_thread_main (void *pvoid)
if (rc != -1) {
TEST_ASSERT_SUCCESS_ERRNO (zmq_msg_close (&msg));
rxsuccess++;
} else {
//
}

if (rxsuccess == message_count)
break;
}

// CLEANUP
// Cleanup

zmq_close (subsocket);
printf ("subscriber thread ended\n");
Expand All @@ -246,7 +244,7 @@ static void proxy_thread_main (void *pvoid)
const proxy_hwm_cfg_t *cfg = (proxy_hwm_cfg_t *) pvoid;
int rc;

// FRONTEND SUB
// FRONTEND SUB

void *frontend_xsub = zmq_socket (
cfg->context,
Expand All @@ -255,7 +253,7 @@ static void proxy_thread_main (void *pvoid)

set_hwm (frontend_xsub);

// bind FRONTEND
// Bind FRONTEND
for (unsigned int i = 0; i < ARRAY_SIZE (cfg->frontend_endpoint); i++) {
const char *ep = cfg->frontend_endpoint[i];
if (ep != NULL) {
Expand All @@ -265,11 +263,11 @@ static void proxy_thread_main (void *pvoid)
}
}

// BACKEND PUB
// BACKEND PUB

void *backend_xpub = zmq_socket (
cfg->context,
ZMQ_XPUB); // the backend is the one exposed to the external world (TCP)
ZMQ_XPUB); // the backend is the one exposed to the external world (TCP)
assert (backend_xpub);

int optval = 1;
Expand All @@ -279,7 +277,7 @@ static void proxy_thread_main (void *pvoid)

set_hwm (backend_xpub);

// bind BACKEND
// Bind BACKEND
for (unsigned int i = 0; i < ARRAY_SIZE (cfg->backend_endpoint); i++) {
const char *ep = cfg->backend_endpoint[i];
if (ep != NULL) {
Expand All @@ -289,18 +287,18 @@ static void proxy_thread_main (void *pvoid)
}
}

// CONTROL REP
// CONTROL REP

void *control_rep = zmq_socket (
cfg->context,
ZMQ_REP); // this one is used by the proxy to receive&reply to commands
ZMQ_REP); // This one is used by the proxy to receive&reply to commands
assert (control_rep);

// bind CONTROL
// Bind CONTROL
rc = zmq_bind (control_rep, cfg->control_endpoint);
assert (rc == 0);

// start proxying!
// Start proxying!

zmq_proxy_steerable (frontend_xsub, backend_xpub, NULL, control_rep);

Expand All @@ -312,27 +310,27 @@ static void proxy_thread_main (void *pvoid)

void terminate_proxy (const proxy_hwm_cfg_t *cfg)
{
// CONTROL REQ
// CONTROL REQ

void *control_req =
zmq_socket (cfg->context,
ZMQ_REQ); // this one can be used to send command to the proxy
ZMQ_REQ); // This one can be used to send command to the proxy
assert (control_req);

// connect CONTROL-REQ: a socket to which send commands
// Connect CONTROL-REQ: a socket to which send commands
int rc = zmq_connect (control_req, cfg->control_endpoint);
assert (rc == 0);

// Ask the proxy to exit: the subscriber has received all messages
// Ask the proxy to exit: the subscriber has received all messages

rc = zmq_send (control_req, "TERMINATE", 9, 0);
assert (rc == 9);

zmq_close (control_req);
}

// The main thread simply starts some publishers, a proxy,
// and a subscriber. Finish when all packets are received.
// The main thread simply starts some publishers, a proxy,
// and a subscriber. Finish when all packets are received.

int main (int argc, char *argv[])
{
Expand All @@ -352,17 +350,11 @@ int main (int argc, char *argv[])
int rv = zmq_ctx_set (context, ZMQ_IO_THREADS, 4);
assert (rv == 0);

// START ALL SECONDARY THREADS
// START ALL SECONDARY THREADS

#if 0
const char *pub1 = "tcp://127.0.0.1:7981";
const char *pub2 = "tcp://127.0.0.1:7982";
const char *sub1 = "tcp://127.0.0.1:7984";
#else
const char *pub1 = "inproc://perf_pub1";
const char *pub2 = "inproc://perf_pub2";
const char *sub1 = "inproc://perf_backend";
#endif

proxy_hwm_cfg_t cfg_global = {};
cfg_global.context = context;
Expand All @@ -371,12 +363,12 @@ int main (int argc, char *argv[])
cfg_global.backend_endpoint[0] = sub1;
cfg_global.control_endpoint = "inproc://ctrl";

// Proxy
// Proxy
proxy_hwm_cfg_t cfg_proxy = cfg_global;
void *proxy = zmq_threadstart (&proxy_thread_main, (void *) &cfg_proxy);
assert (proxy != 0);

// Subscriber 1
// Subscriber 1
proxy_hwm_cfg_t cfg_sub1 = cfg_global;
cfg_sub1.thread_idx = 0;
void *subscriber =
Expand All @@ -387,22 +379,21 @@ int main (int argc, char *argv[])
void *watch = zmq_stopwatch_start ();


// Publisher 1
// Publisher 1
proxy_hwm_cfg_t cfg_pub1 = cfg_global;
cfg_pub1.thread_idx = 0;
void *publisher1 =
zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub1);
assert (publisher1 != 0);

// Publisher 2
// Publisher 2
proxy_hwm_cfg_t cfg_pub2 = cfg_global;
cfg_pub2.thread_idx = 1;
void *publisher2 =
zmq_threadstart (&publisher_thread_main, (void *) &cfg_pub2);
assert (publisher2 != 0);


// Wait for all packets to be received
// Wait for all packets to be received
zmq_threadclose (subscriber);

unsigned long elapsed = zmq_stopwatch_stop (watch);
Expand All @@ -419,11 +410,11 @@ int main (int argc, char *argv[])
printf ("mean throughput: %d [msg/s]\n", (int) throughput);
printf ("mean throughput: %.3f [Mb/s]\n", (double) megabits);

// Wait for the end of publishers...
// Wait for the end of publishers...
zmq_threadclose (publisher1);
zmq_threadclose (publisher2);

// ... then close the proxy
// ... then close the proxy
terminate_proxy (&cfg_proxy);
zmq_threadclose (proxy);

Expand Down

0 comments on commit 6cda0a8

Please sign in to comment.