Skip to content
Permalink
Browse files

ZMQII-18: Implement I/O multiplexing (first approximation)

  • Loading branch information...
Martin Sustrik
Martin Sustrik committed Oct 1, 2009
1 parent f2ff2c6 commit cc631c4c6649b0d67114db13386a949426e35dbf
Showing with 269 additions and 3 deletions.
  1. +33 −0 bindings/c/zmq.h
  2. +5 −0 bindings/cpp/zmq.hpp
  3. +0 −2 src/fd_signaler.hpp
  4. +6 −0 src/i_signaler.hpp
  5. +11 −0 src/p2p.cpp
  6. +2 −0 src/p2p.hpp
  7. +11 −0 src/pub.cpp
  8. +2 −0 src/pub.hpp
  9. +17 −0 src/rep.cpp
  10. +2 −0 src/rep.hpp
  11. +13 −0 src/req.cpp
  12. +2 −0 src/req.hpp
  13. +15 −0 src/socket_base.cpp
  14. +12 −0 src/socket_base.hpp
  15. +13 −0 src/sub.cpp
  16. +2 −0 src/sub.hpp
  17. +6 −0 src/ypollset.cpp
  18. +1 −0 src/ypollset.hpp
  19. +116 −1 src/zmq.cpp
@@ -353,6 +353,39 @@ ZMQ_EXPORT int zmq_flush (void *s);
// EFSM - function cannot be called at the moment.
ZMQ_EXPORT int zmq_recv (void *s, zmq_msg_t *msg, int flags);

////////////////////////////////////////////////////////////////////////////////
// I/O multiplexing.
////////////////////////////////////////////////////////////////////////////////

#define ZMQ_POLLIN 1
#define ZMQ_POLLOUT 2

// 'socket' is a 0MQ socket we want to poll on. If set to NULL, native file
// descriptor (socket) 'fd' will be used instead. 'events' defines event we
// are going to poll on - combination of ZMQ_POLLIN and ZMQ_POLLOUT. Error
// event does not exist for portability reasons. Errors from native sockets
// are reported as ZMQ_POLLIN. It's client's responsibilty to identify the
// error afterwards. 'revents' field is filled in after function returns. It's
// a combination of ZMQ_POLLIN and/or ZMQ_POLLOUT depending on the state of the
// socket.
typedef struct
{
void *socket;
int fd;
short events;
short revents;
} zmq_pollitem_t;

// Polls for the items specified by 'items'. Number of items in the array is
// determined by 'nitems' argument. Returns number of items signaled, -1
// in the case of error.
//
// Errors: EFAULT - there's a 0MQ socket in the pollset belonging to
// a different thread.
// ENOTSUP - 0MQ context was initialised without ZMQ_POLL flag.
// I/O multiplexing is disabled.
ZMQ_EXPORT int zmq_poll (zmq_pollitem_t *items, int nitems);

////////////////////////////////////////////////////////////////////////////////
// Helper functions.
////////////////////////////////////////////////////////////////////////////////
@@ -200,6 +200,11 @@ namespace zmq
throw error_t ();
}

inline operator void* ()
{
return ptr;
}

inline void setsockopt (int option_, const void *optval_,
size_t optvallen_)
{
@@ -44,8 +44,6 @@ namespace zmq
void signal (int signal_);
uint64_t poll ();
uint64_t check ();

// Get the file descriptor associated with the object.
fd_t get_fd ();

private:
@@ -21,6 +21,7 @@
#define __ZMQ_I_SIGNALER_HPP_INCLUDED__

#include "stdint.hpp"
#include "fd.hpp"

namespace zmq
{
@@ -42,6 +43,11 @@ namespace zmq
// Same as poll, however, if there is no signal available,
// function returns zero immediately instead of waiting for a signal.
virtual uint64_t check () = 0;

// Returns file descriptor that allows waiting for signals. Specific
// signalers may not support this functionality. If so, the function
// returns retired_fd.
virtual fd_t get_fd () = 0;
};

}
@@ -84,4 +84,15 @@ int zmq::p2p_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0;
}

bool zmq::p2p_t::xhas_in ()
{
zmq_assert (false);
return false;
}

bool zmq::p2p_t::xhas_out ()
{
zmq_assert (false);
return false;
}

@@ -42,6 +42,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();

private:

@@ -156,3 +156,14 @@ int zmq::pub_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}

bool zmq::pub_t::xhas_in ()
{
return false;
}

bool zmq::pub_t::xhas_out ()
{
// TODO: Reimplement when queue limits are added.
return true;
}

@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();

private:

@@ -195,4 +195,21 @@ int zmq::rep_t::xrecv (zmq_msg_t *msg_, int flags_)
return -1;
}

bool zmq::rep_t::xhas_in ()
{
for (int count = active; count != 0; count--) {
if (in_pipes [current]->check_read ())
return !waiting_for_reply;
current++;
if (current >= active)
current = 0;
}

return false;
}

bool zmq::rep_t::xhas_out ()
{
return waiting_for_reply;
}

@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();

private:

@@ -195,4 +195,17 @@ int zmq::req_t::xrecv (zmq_msg_t *msg_, int flags_)
return 0;
}

bool zmq::req_t::xhas_in ()
{
if (reply_pipe->check_read ())
return waiting_for_reply;

return false;
}

bool zmq::req_t::xhas_out ()
{
return !waiting_for_reply;
}


@@ -43,6 +43,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();

private:

@@ -364,6 +364,21 @@ int zmq::socket_base_t::close ()
return 0;
}

zmq::app_thread_t *zmq::socket_base_t::get_thread ()
{
return app_thread;
}

bool zmq::socket_base_t::has_in ()
{
return xhas_in ();
}

bool zmq::socket_base_t::has_out ()
{
return xhas_out ();
}

bool zmq::socket_base_t::register_session (const char *name_,
session_t *session_)
{
@@ -54,6 +54,16 @@ namespace zmq
int recv (zmq_msg_t *msg_, int flags_);
int close ();

// This function is used by the polling mechanism to determine
// whether the socket belongs to the application thread the poll
// is called from.
class app_thread_t *get_thread ();

// These functions are used by the polling mechanism to determine
// which events are to be reported from this socket.
bool has_in ();
bool has_out ();

// The list of sessions cannot be accessed via inter-thread
// commands as it is unacceptable to wait for the completion of the
// action till user application yields control of the application
@@ -88,6 +98,8 @@ namespace zmq
virtual int xsend (zmq_msg_t *msg_, int options_) = 0;
virtual int xflush () = 0;
virtual int xrecv (zmq_msg_t *msg_, int options_) = 0;
virtual bool xhas_in () = 0;
virtual bool xhas_out () = 0;

// Socket options.
options_t options;
@@ -197,3 +197,16 @@ int zmq::sub_t::fq (zmq_msg_t *msg_, int flags_)
errno = EAGAIN;
return -1;
}

bool zmq::sub_t::xhas_in ()
{
// TODO: This is more complex as we have to ignore all the messages that
// don't fit the filter.
zmq_assert (false);
return false;
}

bool zmq::sub_t::xhas_out ()
{
return false;
}
@@ -48,6 +48,8 @@ namespace zmq
int xsend (zmq_msg_t *msg_, int flags_);
int xflush ();
int xrecv (zmq_msg_t *msg_, int flags_);
bool xhas_in ();
bool xhas_out ();

private:

@@ -29,6 +29,7 @@ zmq::ypollset_t::~ypollset_t ()

void zmq::ypollset_t::signal (int signal_)
{
printf ("++signal\n");
zmq_assert (signal_ >= 0 && signal_ < wait_signal);
if (bits.btsr (signal_, wait_signal))
sem.post ();
@@ -58,3 +59,8 @@ uint64_t zmq::ypollset_t::check ()
{
return (uint64_t) bits.xchg (0);
}

zmq::fd_t zmq::ypollset_t::get_fd ()
{
return retired_fd;
}
@@ -42,6 +42,7 @@ namespace zmq
void signal (int signal_);
uint64_t poll ();
uint64_t check ();
fd_t get_fd ();

private:

0 comments on commit cc631c4

Please sign in to comment.
You can’t perform that action at this time.