Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Add `pending` property to indicates need ZMQ_POLLOUT

When use 'PUSH/PULL' type sockets, if SNDHWM exceeds, then `_flush()`
will not write the messages to underly zeromq socket but buffer them.
When underly socket queue became drain, we need `_flush` buffered
message. But current version doesn't do this...
  • Loading branch information...
commit 9c5de3d99dd9ffbfa41371188c57ed369deb36c3 1 parent 4436503
Wang Wenlin soplwang authored
Showing with 28 additions and 2 deletions.
  1. +25 −0 binding.cc
  2. +3 −2 lib/index.js
25 binding.cc
View
@@ -80,6 +80,9 @@ namespace zmq {
static Handle<Value> GetState(Local<String> p, const AccessorInfo& info);
+ static Handle<Value> GetPending(Local<String> p, const AccessorInfo& info);
+ static void SetPending(Local<String> p, Local<Value> v, const AccessorInfo& info);
+
template<typename T>
Handle<Value> GetSockOpt(int option);
template<typename T>
@@ -108,6 +111,7 @@ namespace zmq {
Persistent<Object> context_;
void *socket_;
+ int32_t pending_;
uint8_t state_;
bool IsReady();
@@ -218,6 +222,8 @@ namespace zmq {
t->InstanceTemplate()->SetInternalFieldCount(1);
t->InstanceTemplate()->SetAccessor(
String::NewSymbol("state"), GetState, NULL);
+ t->InstanceTemplate()->SetAccessor(
+ String::NewSymbol("pending"), GetPending, SetPending);
NODE_SET_PROTOTYPE_METHOD(t, "bind", Bind);
NODE_SET_PROTOTYPE_METHOD(t, "bindSync", BindSync);
@@ -265,6 +271,8 @@ namespace zmq {
zmq_pollitem_t items[1];
items[0].socket = socket_;
items[0].events = ZMQ_POLLIN;
+ if (pending_ > 0)
+ items[0].events |= ZMQ_POLLOUT;
return zmq_poll(items, 1, 0);
}
@@ -299,6 +307,7 @@ namespace zmq {
Socket::Socket(Context *context, int type) : ObjectWrap() {
context_ = Persistent<Object>::New(context->handle_);
socket_ = zmq_socket(context->context_, type);
+ pending_ = 0;
state_ = STATE_READY;
poll_handle_ = new uv_poll_t;
@@ -337,6 +346,22 @@ namespace zmq {
return Integer::New(socket->state_);
}
+ Handle<Value>
+ Socket::GetPending(Local<String> p, const AccessorInfo& info) {
+ Socket* socket = ObjectWrap::Unwrap<Socket>(info.Holder());
+ return Integer::New(socket->pending_);
+ }
+
+ void
+ Socket::SetPending(Local<String> p, Local<Value> v, const AccessorInfo& info) {
+ if (!v->IsNumber())
+ ThrowException(Exception::TypeError(
+ String::New("Pending must be an integer")));
+
+ Socket* socket = ObjectWrap::Unwrap<Socket>(info.Holder());
+ socket->pending_ = v->Int32Value();
+ }
+
template<typename T>
Handle<Value> Socket::GetSockOpt(int option) {
T value = 0;
5 lib/index.js
View
@@ -290,8 +290,7 @@ Socket.prototype._flush = function() {
})(emitArgs);
if (zmq.STATE_READY != this._zmq.state) {
- this._flushing = false;
- return;
+ break;
}
}
@@ -308,6 +307,8 @@ Socket.prototype._flush = function() {
this.emit('error', err);
}
+ this._zmq.pending = this._outgoing.length;
+
this._flushing = false;
};
Please sign in to comment.
Something went wrong with that request. Please try again.