Skip to content
Browse files

GENERIC socket type and COMMAND flag added

GENERIC allows to use 0MQ as a dumb networking framework.
It provides user with connect/disconnect notifications.
Also, each inbound message is labeled by ID of the connection
it originated from. Outbound messages should be labeled by
the ID of the connection to send them to.

To distinguish connect/disconnect notifications from common
messages, COMMAND flag was introduced.

Signed-off-by: Martin Sustrik <sustrik@250bpm.com>
  • Loading branch information...
1 parent c8e8f2a commit bf78e230ad4736da9fce6e0b4d1655affb8f466b @sustrik sustrik committed
Showing with 46 additions and 20 deletions.
  1. +3 −0 include/zmq.h
  2. +2 −0 src/Makefile.am
  3. +1 −0 src/msg.hpp
  4. +33 −20 src/socket_base.cpp
  5. +7 −0 src/socket_base.hpp
View
3 include/zmq.h
@@ -158,6 +158,7 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_PUSH 8
#define ZMQ_XPUB 9
#define ZMQ_XSUB 10
+#define ZMQ_GENERIC 13
/* Socket options. */
#define ZMQ_AFFINITY 4
@@ -182,11 +183,13 @@ ZMQ_EXPORT int zmq_term (void *context);
#define ZMQ_RCVTIMEO 27
#define ZMQ_SNDTIMEO 28
#define ZMQ_RCVLABEL 29
+#define ZMQ_RCVCMD 30
/* Send/recv options. */
#define ZMQ_DONTWAIT 1
#define ZMQ_SNDMORE 2
#define ZMQ_SNDLABEL 4
+#define ZMQ_SNDCMD 8
ZMQ_EXPORT void *zmq_socket (void *context, int type);
ZMQ_EXPORT int zmq_close (void *s);
View
2 src/Makefile.am
@@ -23,6 +23,7 @@ libzmq_la_SOURCES = \
err.hpp \
fd.hpp \
fq.hpp \
+ generic.hpp \
io_object.hpp \
io_thread.hpp \
ip.hpp \
@@ -88,6 +89,7 @@ libzmq_la_SOURCES = \
epoll.cpp \
err.cpp \
fq.cpp \
+ generic.cpp \
io_object.cpp \
io_thread.cpp \
ip.cpp \
View
1 src/msg.hpp
@@ -48,6 +48,7 @@ namespace zmq
enum
{
label = 1,
+ command = 2,
shared = 64,
more = 128
};
View
53 src/socket_base.cpp
@@ -58,6 +58,7 @@
#include "xrep.hpp"
#include "xpub.hpp"
#include "xsub.hpp"
+#include "generic.hpp"
bool zmq::socket_base_t::check_tag ()
{
@@ -103,6 +104,9 @@ zmq::socket_base_t *zmq::socket_base_t::create (int type_, class ctx_t *parent_,
case ZMQ_XSUB:
s = new (std::nothrow) xsub_t (parent_, tid_);
break;
+ case ZMQ_GENERIC:
+ s = new (std::nothrow) generic_t (parent_, tid_);
+ break;
default:
errno = EINVAL;
return NULL;
@@ -119,6 +123,7 @@ zmq::socket_base_t::socket_base_t (ctx_t *parent_, uint32_t tid_) :
last_tsc (0),
ticks (0),
rcvlabel (false),
+ rcvcmd (false),
rcvmore (false)
{
}
@@ -259,6 +264,16 @@ int zmq::socket_base_t::getsockopt (int option_, void *optval_,
return 0;
}
+ if (option_ == ZMQ_RCVCMD) {
+ if (*optvallen_ < sizeof (int)) {
+ errno = EINVAL;
+ return -1;
+ }
+ *((int*) optval_) = rcvcmd ? 1 : 0;
+ *optvallen_ = sizeof (int);
+ return 0;
+ }
+
if (option_ == ZMQ_RCVMORE) {
if (*optvallen_ < sizeof (int)) {
errno = EINVAL;
@@ -469,11 +484,13 @@ int zmq::socket_base_t::send (msg_t *msg_, int flags_)
if (unlikely (rc != 0))
return -1;
- // At this point we impose the LABEL & MORE flags on the message.
+ // At this point we impose the flags on the message.
if (flags_ & ZMQ_SNDLABEL)
msg_->set_flags (msg_t::label);
if (flags_ & ZMQ_SNDMORE)
msg_->set_flags (msg_t::more);
+ if (flags_ & ZMQ_SNDCMD)
+ msg_->set_flags (msg_t::command);
// Try to send the message.
rc = xsend (msg_, flags_);
@@ -550,12 +567,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
// If we have the message, return immediately.
if (rc == 0) {
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -571,12 +583,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
rc = xrecv (msg_, flags_);
if (rc < 0)
return rc;
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -609,13 +616,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_)
}
}
- // Extract LABEL & MORE flags from the message.
- rcvlabel = msg_->flags () & msg_t::label;
- if (rcvlabel)
- msg_->reset_flags (msg_t::label);
- rcvmore = msg_->flags () & msg_t::more ? true : false;
- if (rcvmore)
- msg_->reset_flags (msg_t::more);
+ extract_flags (msg_);
return 0;
}
@@ -856,3 +857,15 @@ void zmq::socket_base_t::terminated (pipe_t *pipe_)
unregister_term_ack ();
}
+void zmq::socket_base_t::extract_flags (msg_t *msg_)
+{
+ rcvlabel = msg_->flags () & msg_t::label;
+ if (rcvlabel)
+ msg_->reset_flags (msg_t::label);
+ rcvmore = msg_->flags () & msg_t::more ? true : false;
+ if (rcvmore)
+ msg_->reset_flags (msg_t::more);
+ rcvcmd = msg_->flags () & msg_t::command ? true : false;
+ if (rcvcmd)
+ msg_->reset_flags (msg_t::command);
+}
View
7 src/socket_base.hpp
@@ -128,6 +128,10 @@ namespace zmq
// handlers explicitly. If required, it will deallocate the socket.
void check_destroy ();
+ // Moves the flags from the message to local variables,
+ // to be later retrieved by getsockopt.
+ void extract_flags (msg_t *msg_);
+
// Used to check whether the object is a socket.
uint32_t tag;
@@ -182,6 +186,9 @@ namespace zmq
// True if the last message received had LABEL flag set.
bool rcvlabel;
+ // True if the last message received had COMMAND flag set.
+ bool rcvcmd;
+
// True if the last message received had MORE flag set.
bool rcvmore;

0 comments on commit bf78e23

Please sign in to comment.
Something went wrong with that request. Please try again.