Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 30 additions & 5 deletions src/services/query_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,37 @@ void query_service::work()
if (!started(bind(router, query_dealer, notify_dealer)))
return;

zmq::poller poller;
poller.add(router);
poller.add(query_dealer);
poller.add(notify_dealer);

// TODO: tap in to failure conditions, such as high water.
// Relay messages between router and dealer (blocks on context).
//*************************************************************************
// TODO: integrate notify_dealer into relay.
relay(router, query_dealer);
//*************************************************************************
while (!poller.terminated() && !stopped())
{
const auto signaled = poller.wait();

if (signaled.contains(router.id()) &&
!forward(router, query_dealer))
{
log::warning(LOG_SERVER)
<< "Failed to forward from router to query_dealer.";
}

if (signaled.contains(query_dealer.id()) &&
!forward(query_dealer, router))
{
log::warning(LOG_SERVER)
<< "Failed to forward from query_dealer to router.";
}

if (signaled.contains(notify_dealer.id()) &&
!forward(notify_dealer, router))
{
log::warning(LOG_SERVER)
<< "Failed to forward from notify_dealer to router.";
}
}

// Unbind the sockets and exit this thread.
finished(unbind(router, query_dealer, notify_dealer));
Expand Down
1 change: 1 addition & 0 deletions src/workers/notification_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ void notification_worker::work()
// Other threads connect and disconnect dynamically to send updates.
while (!poller.terminated() && !stopped())
{
// BUGBUG: this can fail on some platforms if interval is > 1000.
poller.wait(interval);
purge();
}
Expand Down