Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Merge pull request #468 from hurtonm/issue_465

Resolve LIBZMQ-465
  • Loading branch information...
commit c179ad11730cfe742ccd74577372aef43fed2fa1 2 parents 99f7144 + c543b2c
@hintjens hintjens authored
View
5 src/decoder.cpp
@@ -57,11 +57,6 @@ void zmq::decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
msg_sink = msg_sink_;
}
-bool zmq::decoder_t::stalled () const
-{
- return next == &decoder_t::message_ready;
-}
-
bool zmq::decoder_t::one_byte_size_ready ()
{
// First byte of size is read. If it is 0xff read 8-byte size.
View
24 src/decoder.hpp
@@ -143,6 +143,22 @@ namespace zmq
}
}
+ // Returns true if the decoder has been fed all required data
+ // but cannot proceed with the next decoding step.
+ // False is returned if the decoder has encountered an error.
+ bool stalled ()
+ {
+ while (!to_read) {
+ if (!(static_cast <T*> (this)->*next) ()) {
+ if (unlikely (!(static_cast <T*> (this)->next)))
+ return false;
+ return true;
+ }
+ }
+
+ return false;
+ }
+
inline bool message_ready_size (size_t msg_sz)
{
zmq_assert (false);
@@ -172,13 +188,13 @@ namespace zmq
next = NULL;
}
+ private:
+
// Next step. If set to NULL, it means that associated data stream
// is dead. Note that there can be still data in the process in such
// case.
step_t next;
- private:
-
// Where to store the read data.
unsigned char *read_pos;
@@ -205,10 +221,6 @@ namespace zmq
// Set the receiver of decoded messages.
void set_msg_sink (i_msg_sink *msg_sink_);
- // Returns true if there is a decoded message
- // waiting to be delivered to the session.
- bool stalled () const;
-
private:
bool one_byte_size_ready ();
View
2  src/i_decoder.hpp
@@ -40,7 +40,7 @@ namespace zmq
virtual size_t process_buffer (unsigned char *data_, size_t size_) = 0;
- virtual bool stalled () const = 0;
+ virtual bool stalled () = 0;
virtual bool message_ready_size (size_t msg_sz) = 0;
};
View
2  src/raw_decoder.cpp
@@ -53,7 +53,7 @@ void zmq::raw_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
msg_sink = msg_sink_;
}
-bool zmq::raw_decoder_t::stalled () const
+bool zmq::raw_decoder_t::stalled ()
{
return false;
}
View
2  src/raw_decoder.hpp
@@ -45,7 +45,7 @@ namespace zmq
// i_decoder interface.
virtual void set_msg_sink (i_msg_sink *msg_sink_);
- virtual bool stalled () const;
+ virtual bool stalled ();
virtual bool message_ready_size (size_t msg_sz);
View
17 src/stream_engine.cpp
@@ -52,10 +52,10 @@
zmq::stream_engine_t::stream_engine_t (fd_t fd_, const options_t &options_, const std::string &endpoint_) :
s (fd_),
+ io_enabled (false),
inpos (NULL),
insize (0),
decoder (NULL),
- input_error (false),
outpos (NULL),
outsize (0),
encoder (NULL),
@@ -134,6 +134,7 @@ void zmq::stream_engine_t::plug (io_thread_t *io_thread_,
// Connect to I/O threads poller object.
io_object_t::plug (io_thread_);
handle = add_fd (s);
+ io_enabled = true;
if (options.raw_sock) {
// no handshaking for raw sock, instantiate raw encoder and decoders
@@ -169,7 +170,10 @@ void zmq::stream_engine_t::unplug ()
plugged = false;
// Cancel all fd subscriptions.
- rm_fd (handle);
+ if (io_enabled) {
+ rm_fd (handle);
+ io_enabled = false;
+ }
// Disconnect from I/O threads poller object.
io_object_t::unplug ();
@@ -250,9 +254,10 @@ void zmq::stream_engine_t::in_event ()
// waiting for input events and postpone the termination
// until after the session has accepted the message.
if (disconnection) {
- input_error = true;
- if (decoder->stalled ())
- reset_pollin (handle);
+ if (decoder->stalled ()) {
+ rm_fd (handle);
+ io_enabled = false;
+ }
else
error ();
}
@@ -319,7 +324,7 @@ void zmq::stream_engine_t::activate_out ()
void zmq::stream_engine_t::activate_in ()
{
- if (input_error) {
+ if (unlikely (!io_enabled)) {
// There was an input error but the engine could not
// be terminated (due to the stalled decoder).
// Flush the pending message and terminate the engine now.
View
4 src/stream_engine.hpp
@@ -96,12 +96,14 @@ namespace zmq
// Preamble (10 bytes) + version (1 byte) + socket type (1 byte).
const static size_t greeting_size = 12;
+ // True iff we are registered with an I/O poller.
+ bool io_enabled;
+
handle_t handle;
unsigned char *inpos;
size_t insize;
i_decoder *decoder;
- bool input_error;
unsigned char *outpos;
size_t outsize;
View
5 src/v1_decoder.cpp
@@ -58,11 +58,6 @@ void zmq::v1_decoder_t::set_msg_sink (i_msg_sink *msg_sink_)
msg_sink = msg_sink_;
}
-bool zmq::v1_decoder_t::stalled () const
-{
- return next == &v1_decoder_t::message_ready;
-}
-
bool zmq::v1_decoder_t::flags_ready ()
{
msg_flags = 0;
View
2  src/v1_decoder.hpp
@@ -44,8 +44,6 @@ namespace zmq
// i_decoder interface.
virtual void set_msg_sink (i_msg_sink *msg_sink_);
- virtual bool stalled () const;
-
private:
bool flags_ready ();
Please sign in to comment.
Something went wrong with that request. Please try again.