Skip to content

Commit

Permalink
checking for available messages added to ypipe/pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
Martin Sustrik committed Sep 30, 2009
1 parent 84d854a commit f2ff2c6
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 8 deletions.
11 changes: 11 additions & 0 deletions src/pipe.cpp
Expand Up @@ -36,6 +36,17 @@ zmq::reader_t::~reader_t ()
{
}

bool zmq::reader_t::check_read ()
{
// Check if there's an item in the pipe.
if (pipe->check_read ())
return true;

// If not, deactivate the pipe.
endpoint->kill (this);
return false;
}

bool zmq::reader_t::read (zmq_msg_t *msg_)
{
if (!pipe->read (msg_)) {
Expand Down
3 changes: 3 additions & 0 deletions src/pipe.hpp
Expand Up @@ -42,6 +42,9 @@ namespace zmq

void set_endpoint (i_endpoint *endpoint_);

// Returns true if there is at least one message to read in the pipe.
bool check_read ();

// Reads a message to the underlying pipe.
bool read (zmq_msg_t *msg_);

Expand Down
24 changes: 16 additions & 8 deletions src/ypipe.hpp
Expand Up @@ -106,16 +106,12 @@ namespace zmq
return true;
}

// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read (T *value_)
// Check whether item is available for reading.
inline bool check_read ()
{
// Was the value prefetched already? If so, return it.
if (&queue.front () != r) {
*value_ = queue.front ();
queue.pop ();
// Was the value prefetched already? If so, return.
if (&queue.front () != r)
return true;
}

// There's no prefetched value, so let us prefetch more values.
// (Note that D is a template parameter. Becaue of that one of
Expand Down Expand Up @@ -165,6 +161,18 @@ namespace zmq
return false;
}

// There was at least one value prefetched.
return true;
}

// Reads an item from the pipe. Returns false if there is no value.
// available.
inline bool read (T *value_)
{
// Try to prefetch a value.
if (!check_read ())
return false;

// There was at least one value prefetched.
// Return it to the caller.
*value_ = queue.front ();
Expand Down

0 comments on commit f2ff2c6

Please sign in to comment.