diff --git a/src/services/query_service.cpp b/src/services/query_service.cpp index 845aad71..b3a01db9 100644 --- a/src/services/query_service.cpp +++ b/src/services/query_service.cpp @@ -57,12 +57,37 @@ void query_service::work() if (!started(bind(router, query_dealer, notify_dealer))) return; + zmq::poller poller; + poller.add(router); + poller.add(query_dealer); + poller.add(notify_dealer); + // TODO: tap in to failure conditions, such as high water. - // Relay messages between router and dealer (blocks on context). - //************************************************************************* - // TODO: integrate notify_dealer into relay. - relay(router, query_dealer); - //************************************************************************* + while (!poller.terminated() && !stopped()) + { + const auto signaled = poller.wait(); + + if (signaled.contains(router.id()) && + !forward(router, query_dealer)) + { + log::warning(LOG_SERVER) + << "Failed to forward from router to query_dealer."; + } + + if (signaled.contains(query_dealer.id()) && + !forward(query_dealer, router)) + { + log::warning(LOG_SERVER) + << "Failed to forward from query_dealer to router."; + } + + if (signaled.contains(notify_dealer.id()) && + !forward(notify_dealer, router)) + { + log::warning(LOG_SERVER) + << "Failed to forward from notify_dealer to router."; + } + } // Unbind the sockets and exit this thread. finished(unbind(router, query_dealer, notify_dealer)); diff --git a/src/workers/notification_worker.cpp b/src/workers/notification_worker.cpp index c729a10e..2c88a5a1 100644 --- a/src/workers/notification_worker.cpp +++ b/src/workers/notification_worker.cpp @@ -138,6 +138,7 @@ void notification_worker::work() // Other threads connect and disconnect dynamically to send updates. while (!poller.terminated() && !stopped()) { + // BUGBUG: this can fail on some platforms if interval is > 1000. poller.wait(interval); purge(); }