Skip to content

Commit

Permalink
cleanup patch from Jeremy Barnes
Browse files Browse the repository at this point in the history
  • Loading branch information
JustinTulloss committed Mar 27, 2011
1 parent 1f795b4 commit f5d0b31
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 24 deletions.
16 changes: 5 additions & 11 deletions binding.cc
Original file line number Diff line number Diff line change
Expand Up @@ -538,11 +538,6 @@ class Socket::IncomingMessage {
}

inline Local<Value> GetBuffer() {
#if 0
Handle<Object> buf
= Buffer::New(v8::String::New((char*)zmq_msg_data(*msgref_), zmq_msg_size(*msgref_)));
return buf;
#else
if (buf_.IsEmpty()) {
Buffer* buf_obj = Buffer::New(
(char*)zmq_msg_data(*msgref_), zmq_msg_size(*msgref_),
Expand All @@ -552,7 +547,6 @@ class Socket::IncomingMessage {
buf_ = Persistent<Object>::New(buf_obj->handle_);
}
return Local<Value>::New(buf_);
#endif
}

private:
Expand Down Expand Up @@ -655,7 +649,7 @@ class Socket::OutgoingMessage {
// Raise a flag indicating that we're done with the buffer
((BufferReference*)message)->noLongerNeeded_ = true;
}

// Called when V8 would like to GC buf_
static void WeakCheck(v8::Persistent<v8::Value> obj, void* data) {
if (((BufferReference*)data)->noLongerNeeded_) {
Expand Down Expand Up @@ -703,19 +697,19 @@ Handle<Value> Socket::Send(const Arguments &args) {
OutgoingMessage msg(args[0]->ToObject());
if (zmq_send(socket->socket_, msg, flags) < 0)
return ThrowException(ExceptionFromError());

#else // copying version that has no GC issues
zmq_msg_t msg;
Local<Object> buf = args[0]->ToObject();
size_t len = Buffer::Length(buf);
int res = zmq_msg_init_size(&msg, len);
if (res != 0)
return ThrowException(ExceptionFromError());
return ThrowException(ExceptionFromError());

char * cp = (char *)zmq_msg_data(&msg);
const char * dat = Buffer::Data(buf);
std::copy(dat, dat + len, cp);

if (zmq_send(socket->socket_, &msg, flags) < 0)
return ThrowException(ExceptionFromError());
#endif // zero copy / copying version
Expand Down
24 changes: 11 additions & 13 deletions zeromq.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
/*globals require exports process Buffer */

var util = require('util');
var EventEmitter = require('events').EventEmitter;
var IOWatcher = process.binding('io_watcher').IOWatcher;
Expand Down Expand Up @@ -78,6 +79,7 @@ var sockProp = function(name, option) {
return this._zmq.setsockopt(option, value);
});
};

sockProp('_fd', zmq.ZMQ_FD);
sockProp('_ioevents', zmq.ZMQ_EVENTS);
sockProp('_receiveMore', zmq.ZMQ_RCVMORE);
Expand Down Expand Up @@ -105,6 +107,8 @@ Socket.prototype.bind = function(addr, cb) {
cb(err);
});
};

/* Currently buggy.
Socket.prototype.bindSync = function(addr) {
var self = this;
self._watcher.stop();
Expand All @@ -116,6 +120,8 @@ Socket.prototype.bindSync = function(addr) {
}
self._watcher.start();
};
*/

Socket.prototype.connect = function(addr) {
this._zmq.connect(addr);
};
Expand All @@ -125,6 +131,7 @@ Socket.prototype.connect = function(addr) {
Socket.prototype.subscribe = function(filter) {
this._subscribe = filter;
};

Socket.prototype.unsubscribe = function(filter) {
this._unsubscribe = filter;
};
Expand All @@ -150,6 +157,7 @@ Socket.prototype.send = function() {
this._outgoing = this._outgoing.concat(parts);
this._flush();
};

Socket.prototype.currentSendBacklog = function() {
return this._outgoing.length;
};
Expand All @@ -161,17 +169,14 @@ Socket.prototype._flush = function() {

// Don't allow recursive flush invocation as it can lead to stack
// exhaustion and write starvation
if (this._inFlush === true) return;
if (this._inFlush === true) { return; }

this._inFlush = true;

//console.log("flushing with ", this._outgoing.length, " entries");

try {

while (true) {
var flags = this._ioevents;
//console.log("flags = ", flags, "outgoing = ", this._outgoing.length);
if (this._outgoing.length === 0) {
flags &= ~zmq.ZMQ_POLLOUT;
}
Expand All @@ -185,34 +190,27 @@ Socket.prototype._flush = function() {
emitArgs.push(this._zmq.recv());
} while (this._receiveMore);

//console.log("received ", emitArgs.length, " messages");

this.emit.apply(this, emitArgs);
if (this._zmq.state != zmq.STATE_READY) {
console.log("state not ready");
return;
}
}

// We send as much as possible in one burst so that we don't
// starve sends if we receive more than one message for each
// one sent.
//while (flags & zmq.ZMQ_POLLOUT)
while ((flags & zmq.ZMQ_POLLOUT) && (this._outgoing.length !== 0))
{
var sendArgs = this._outgoing.shift();
//console.log("sending ", sendArgs);
this._zmq.send.apply(this._zmq, sendArgs);
flags = this._ioevents;
}

}
}
catch (e) {
console.log("sending got error", e);
console.log("flags are ", flags, " in: ", flags & zmq.ZMQ_POLLIN,
" out: ", flags & zmq.ZMQ_POLLOUT);
console.log("outgoing is ", sys.inspect(this._outgoing));
e.flags = flags;
e.outgoing = sys.inspect(this._outgoing);
try {
this.emit('error', e);
} catch (e2) {
Expand Down

0 comments on commit f5d0b31

Please sign in to comment.