Skip to content

Commit

Permalink
zmq_queue implementation added
Browse files Browse the repository at this point in the history
  • Loading branch information
sustrik committed Mar 13, 2010
1 parent 22db38b commit dcb9836
Showing 1 changed file with 109 additions and 5 deletions.
114 changes: 109 additions & 5 deletions devices/zmq_queue/zmq_queue.cpp
Expand Up @@ -20,6 +20,113 @@
#include "../../include/zmq.hpp"
#include "../../foreign/xmlParser/xmlParser.cpp"

class queue
{
public:

queue (zmq::socket_t& reply, zmq::socket_t& request) :
xrep (reply),
xreq (request)
{
items [0].socket = reply;
items [0].fd = 0;
items [0].events = ZMQ_POLLIN;
items [0].revents = 0;

items [1].socket = request;
items [1].fd = 0;
items [1].events = ZMQ_POLLIN;
items [1].revents = 0;

m_next_request_method = &queue::get_request;
m_next_response_method = &queue::get_response;
}

void run()
{
while (true) {
int rc = zmq::poll (&items [0], 2, -1);
if (rc < 0)
break;
next_request();
next_response();
}
}

private:

void next_request()
{
(this->*m_next_request_method) ();
}

void next_response()
{
(this->*m_next_response_method) ();
}

void get_request()
{
if (items [0].revents & ZMQ_POLLIN ) {
int rc = xrep.recv (&request_msg, ZMQ_NOBLOCK);
if (!rc)
return;
items [0].events &= ~ZMQ_POLLIN;
items [1].events |= ZMQ_POLLOUT;
m_next_request_method = &queue::send_request;
}
}

void send_request()
{
if (items [1].revents & ZMQ_POLLOUT) {
int rc = xreq.send (request_msg, ZMQ_NOBLOCK);
if (!rc) return;
items [1].events &= ~ZMQ_POLLOUT;
items [0].events |= ZMQ_POLLIN;
m_next_request_method = &queue::get_request;
}
}

void get_response()
{
if ( items [1].revents & ZMQ_POLLIN ) {
int rc = xreq.recv (&response_msg, ZMQ_NOBLOCK);
if (!rc)
return;
items [1].events &= ~ZMQ_POLLIN;
items [0].events |= ZMQ_POLLOUT;
m_next_response_method = &queue::send_response;
}
}

void send_response()
{
if (items [0].revents & ZMQ_POLLOUT) {
int rc = xrep.send (response_msg, ZMQ_NOBLOCK);
if (!rc)
return;
items [0].events &= ~ZMQ_POLLOUT;
items [1].events |= ZMQ_POLLIN;
m_next_response_method = &queue::get_response;
}
}

zmq::socket_t & xrep;
zmq::socket_t & xreq;
zmq_pollitem_t items [2];
zmq::message_t request_msg;
zmq::message_t response_msg;

typedef void (queue::*next_method) ();

next_method m_next_request_method;
next_method m_next_response_method;

queue (queue const &);
void operator = (queue const &);
};

int main (int argc, char *argv [])
{
if (argc != 2) {
Expand Down Expand Up @@ -112,11 +219,8 @@ int main (int argc, char *argv [])
n++;
}

zmq::message_t msg;
while (true) {
in_socket.recv (&msg);
out_socket.send (msg);
}
queue q(in_socket, out_socket);
q.run();

return 0;
}

0 comments on commit dcb9836

Please sign in to comment.