Skip to content

Commit

Permalink
working
Browse files Browse the repository at this point in the history
  • Loading branch information
ry committed Nov 30, 2010
1 parent 36773b2 commit f120638
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 86 deletions.
59 changes: 4 additions & 55 deletions src/node_io_watcher.cc
Expand Up @@ -75,8 +75,6 @@ void IOWatcher::Initialize(Handle<Object> target) {

dump_queue = Persistent<Object>::New(Object::New());
io_watcher->Set(String::NewSymbol("dumpQueue"), dump_queue);
dump_queue->Set(next_sym, dump_queue);
dump_queue->Set(prev_sym, dump_queue);
}


Expand Down Expand Up @@ -227,43 +225,6 @@ Handle<Value> IOWatcher::Set(const Arguments& args) {
#endif


namespace linklist {

inline void Remove(Handle<Object> b) {
Local<Object> b_next = b->Get(next_sym)->ToObject();
Local<Object> b_prev = b->Get(prev_sym)->ToObject();

// b.next.prev = b.prev
b_next->Set(prev_sym, b_prev);

// b.prev.next = b.next
b_prev->Set(next_sym, b_next);
}


inline void Append(Handle<Object> list, Handle<Object> b) {
// b.next = list
b->Set(next_sym, list);

// b.prev = list.prev
b->Set(prev_sym, list);

// list.prev.next = b
Local<Object> list_prev = list->Get(prev_sym)->ToObject();
list_prev->Set(next_sym, b);

// list.prev = b
list->Set(prev_sym, b);
}


inline bool Empty(Handle<Object> list) {
// list.next === list
return list->Get(next_sym)->StrictEquals(list);
}
}


void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) {
assert(revents == EV_PREPARE);
assert(w == &dumper);
Expand Down Expand Up @@ -329,33 +290,19 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) {
}
size_t first_offset = offset;

// Note that watcher.buckets points to a linked-list
// of buckets.
Local<Value> first_bucket_v = watcher->Get(first_bucket_sym);
if (!first_bucket_v->IsObject()) {
// It is possible that someone did
//
// socket.write("data"); socket.destroy();
//
// In which case we'll have a watcher in the dump_queue which is
// no longer associated with a socket. In this case we just
// skip it.
assert(!watcher->Get(last_bucket_sym)->IsObject());
continue;
}

// Loop over all the buckets for this particular watcher/socket in order
// to fill iov.
Local<Value> bucket_v;
Local<Object> bucket;
unsigned int bucket_index = 0;

for (bucket_v = first_bucket_v;
for (bucket_v = watcher->Get(first_bucket_sym);
// Break if we have an FD to send.
// sendmsg can only handle one FD at a time.
fd_to_send < 0 &&
// break if we've hit the end
!bucket_v->IsObject() &&
bucket_v->IsObject() &&
// break if iov contains a lot of data
to_write < max_to_write &&
// break if iov is running out of space
Expand Down Expand Up @@ -403,6 +350,8 @@ void IOWatcher::Dump(EV_P_ ev_prepare *w, int revents) {
}
}

if (to_write == 0) continue;

ssize_t written;

if (unix_socket) {
Expand Down
39 changes: 21 additions & 18 deletions test/simple/test-dumper-unix.js
Expand Up @@ -18,8 +18,15 @@ function test (N, b, cb) {
// Use writev/dumper to send data down the one of the sockets, fds[1].
// This requires a IOWatcher.
var w = new IOWatcher();
w.set(fds[1], false, true);
w.isUnixSocket = true;
w.set(fds[1], false, false);

w.callback = function (readable, writable) {
assert.ok(!readable && writable); // not really important.
// Insert watcher into dumpQueue
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;
}

var ndrain = 0;
w.ondrain = function () {
Expand Down Expand Up @@ -61,11 +68,9 @@ function test (N, b, cb) {
stream.on('close', function () {
assert.equal(fdsSent, fdsRecv);
// check to make sure the watcher isn't in the dump queue.
var x = IOWatcher.dumpQueue;
do {
for (var x = IOWatcher.dumpQueue; x; x = x.next) {
assert.ok(x !== w);
x = x.next;
} while (x !== IOWatcher.dumpQueue);
}

ncomplete++;
if (cb) cb();
Expand All @@ -76,19 +81,17 @@ function test (N, b, cb) {
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;

if (N > 0) {
w.firstBucket = { data: b };
w.lastBucket = w.firstBucket;

for (var i = 0; i < N-1; i++) {
var bucket = { data: b };
w.lastBucket.next = bucket;
w.lastBucket = bucket;
// Kind of randomly fill these buckets with fds.
if (fdsSent < 5 && i % 2 == 0) {
bucket.fd = 1; // send stdout
fdsSent++;
}
w.firstBucket = { data: b };
w.lastBucket = w.firstBucket;

for (var i = 0; i < N-1; i++) {
var bucket = { data: b };
w.lastBucket.next = bucket;
w.lastBucket = bucket;
// Kind of randomly fill these buckets with fds.
if (fdsSent < 5 && i % 2 == 0) {
bucket.fd = 1; // send stdout
fdsSent++;
}
}
}
Expand Down
31 changes: 18 additions & 13 deletions test/simple/test-dumper.js
Expand Up @@ -16,11 +16,19 @@ function test (N, b, cb) {

// Create a pipe
var fds = process.binding('net').pipe();
console.log("fds == %j", fds);

// Use writev/dumper to send data down the write end of the pipe, fds[1].
// This requires a IOWatcher.
var w = new IOWatcher();
w.set(fds[1], false, false);
w.set(fds[1], false, true);

w.callback = function (readable, writable) {
assert.ok(!readable && writable); // not really important.
// Insert watcher into dumpQueue
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;
}

var ndrain = 0;
w.ondrain = function () {
Expand Down Expand Up @@ -57,11 +65,9 @@ function test (N, b, cb) {

stream.on('close', function () {
// check to make sure the watcher isn't in the dump queue.
var x = IOWatcher.dumpQueue;
do {
for (var x = IOWatcher.dumpQueue; x; x = x.next) {
assert.ok(x !== w);
x = x.next;
} while (x !== IOWatcher.dumpQueue);
}

ncomplete++;
if (cb) cb();
Expand All @@ -72,15 +78,14 @@ function test (N, b, cb) {
w.next = IOWatcher.dumpQueue.next;
IOWatcher.dumpQueue.next = w;

if (N > 0) {
w.firstBucket = { data: b };
w.lastBucket = w.firstBucket;
w.firstBucket = { data: b };
w.lastBucket = w.firstBucket;

for (var i = 0; i < N-1; i++) {
var bucket = { data: b };
w.lastBucket.next = bucket;
w.lastBucket = bucket;
}
for (var i = 0; i < N-1; i++) {
var bucket = { data: b };
assert.ok(!w.lastBucket.next);
w.lastBucket.next = bucket;
w.lastBucket = bucket;
}
}

Expand Down

0 comments on commit f120638

Please sign in to comment.