Skip to content
Browse files

Merge remote branch 'ry/master' into multipart-update

  • Loading branch information...
2 parents f7e3e42 + 77145ba commit fc59669037ca1fae1250e4a69286ab7297dd1ced @isaacs committed Feb 7, 2010
Showing with 196 additions and 83 deletions.
  1. +144 −62 deps/coupling/coupling.c
  2. +10 −8 doc/api.txt
  3. +8 −0 src/node_net.cc
  4. +9 −2 test/mjsunit/fixtures/echo.js
  5. +25 −11 test/mjsunit/test-stdio.js
View
206 deps/coupling/coupling.c
@@ -143,98 +143,176 @@ ring_buffer_push (ring_buffer *ring, int fd)
return r;
}
+/* PULL PUMP
+ *
+ * This is used to read data from a blocking file descriptor and pump it into
+ * a non-blocking pipe (or other non-blocking fd). The algorithm is this:
+ *
+ * while (true) {
+ * read(STDIN_FILENO) // blocking
+ *
+ * while (!ring.empty) {
+ * write(pipe) // non-blocking
+ * select(pipe, writable)
+ * }
+ * }
+ *
+ */
static void
-pump (int is_pull, int pullfd, int pushfd)
+pull_pump (int pullfd, int pushfd)
{
int r;
ring_buffer ring;
- fd_set readfds, writefds, exceptfds;
+
+ fd_set writefds, exceptfds;
+ FD_ZERO(&exceptfds);
+ FD_ZERO(&writefds);
+ FD_SET(pushfd, &exceptfds);
+ FD_SET(pushfd, &writefds);
ring_buffer_init(&ring);
- int maxfd;
+ while (pullfd >= 0) {
+ /* Blocking read from STDIN_FILENO */
+ r = ring_buffer_pull(&ring, pullfd);
- while (pushfd >= 0 && (pullfd >= 0 || !ring_buffer_empty_p(&ring))) {
- FD_ZERO(&exceptfds);
- FD_ZERO(&readfds);
- FD_ZERO(&writefds);
-
- maxfd = -1;
-
- if (is_pull) {
- if (!ring_buffer_empty_p(&ring)) {
- maxfd = pushfd;
- FD_SET(pushfd, &exceptfds);
- FD_SET(pushfd, &writefds);
- }
- } else {
- if (pullfd >= 0) {
- if (!ring_buffer_filled_p(&ring)) {
- maxfd = pullfd;
- FD_SET(pullfd, &exceptfds);
- FD_SET(pullfd, &readfds);
+ if (r == 0) {
+ /* eof */
+ close(pullfd);
+ pullfd = -1;
+ } else if (r < 0 && errno != EINTR && errno != EAGAIN) {
+ /* error */
+ perror("pull_pump read()");
+ close(pullfd);
+ pullfd = -1;
+ }
+
+ /* Push all of the data in the ring buffer out. */
+ while (!ring_buffer_empty_p(&ring)) {
+ /* non-blocking write() to the pipe */
+ r = ring_buffer_push(&ring, pushfd);
+
+ if (r < 0 && errno != EAGAIN && errno != EINTR) {
+ if (errno == EPIPE) {
+ /* This happens if someone closes the other end of the pipe. This
+ * is a normal forced close of STDIN. Hopefully there wasn't data
+ * in the ring buffer. Just close both ends and exit.
+ */
+ close(pushfd);
+ close(pullfd);
+ pushfd = pullfd = -1;
+ } else {
+ perror("pull_pump write()");
+ close(pushfd);
+ close(pullfd);
}
+ return;
}
- }
- if (maxfd >= 0) {
- r = select(maxfd+1, &readfds, &writefds, &exceptfds, NULL);
+ /* Select for writablity on the pipe end.
+ * Very rarely will this stick.
+ */
+ r = select(pushfd+1, NULL, &writefds, &exceptfds, NULL);
- if (r < 0 || (pullfd >= 0 && FD_ISSET(pushfd, &exceptfds))) {
+ if (r < 0 || FD_ISSET(pushfd, &exceptfds)) {
close(pushfd);
close(pullfd);
pushfd = pullfd = -1;
return;
}
}
+ }
+ assert(pullfd < 0);
+ assert(ring_buffer_empty_p(&ring));
+ close(pushfd);
+}
+
+/* PUSH PUMP
+ *
+ * This is used to push data out to a blocking file descriptor. It pulls
+ * data from a non-blocking pipe (pullfd) and pushes to STDOUT_FILENO
+ * (pushfd).
+ * When the pipe is closed, then the rest of the data is pushed out and then
+ * STDOUT_FILENO is closed.
+ *
+ * The algorithm looks roughly like this:
+ *
+ * while (true) {
+ * r = read(pipe) // nonblocking
+ *
+ * while (!ring.empty) {
+ * write(STDOUT_FILENO) // blocking
+ * }
+ *
+ * select(pipe, readable);
+ * }
+ */
+static void
+push_pump (int pullfd, int pushfd)
+{
+ int r;
+ ring_buffer ring;
+
+ fd_set readfds, exceptfds;
+ FD_ZERO(&exceptfds);
+ FD_ZERO(&readfds);
+ FD_SET(pullfd, &exceptfds);
+ FD_SET(pullfd, &readfds);
+
+ ring_buffer_init(&ring);
+
+ /* The pipe is open or there is data left to be pushed out
+ * NOTE: if pushfd (STDOUT_FILENO) ever errors out, then we just exit the
+ * loop.
+ */
+ while (pullfd >= 0 || !ring_buffer_empty_p(&ring)) {
- if (pullfd >= 0 && FD_ISSET(pullfd, &exceptfds)) {
+ /* Pull from the non-blocking pipe */
+ r = ring_buffer_pull(&ring, pullfd);
+
+ if (r == 0) {
+ /* eof */
+ close(pullfd);
+ pullfd = -1;
+ } else if (r < 0 && errno != EINTR && errno != EAGAIN) {
+ perror("push_pump read()");
close(pullfd);
pullfd = -1;
+ return;
}
- if (pullfd >= 0 && (is_pull || FD_ISSET(pullfd, &readfds))) {
- r = ring_buffer_pull(&ring, pullfd);
- if (r == 0) {
- /* eof */
- close(pullfd);
- pullfd = -1;
+ /* Push everything out to STDOUT */
+ while (!ring_buffer_empty_p(&ring)) {
+ /* Blocking write() to pushfd (STDOUT_FILENO) */
+ r = ring_buffer_push(&ring, pushfd);
+
+ /* If there was a problem, just exit the entire function */
- } else if (r < 0) {
- if (errno != EINTR && errno != EAGAIN) goto error;
+ if (r < 0 && errno != EINTR) {
+ close(pushfd);
+ close(pullfd);
+ pushfd = pullfd = -1;
+ return;
}
}
+
+ if (pullfd >= 0) {
+ /* select for readability on the pullfd */
+ r = select(pullfd+1, &readfds, NULL, &exceptfds, NULL);
- if (!is_pull || FD_ISSET(pushfd, &writefds)) {
- r = ring_buffer_push(&ring, pushfd);
- if (r < 0) {
- switch (errno) {
- case EINTR:
- case EAGAIN:
- continue;
-
- case EPIPE:
- /* TODO catch SIGPIPE? */
- close(pushfd);
- close(pullfd);
- pushfd = pullfd = -1;
- return;
-
- default:
- goto error;
- }
+ if (r < 0 || FD_ISSET(pullfd, &exceptfds)) {
+ close(pushfd);
+ close(pullfd);
+ pushfd = pullfd = -1;
+ return;
}
}
}
-
+ /* If we got here then we got eof on pullfd and pushed all the data out.
+ * so now just close pushfd */
+ assert(pullfd < 0);
+ assert(ring_buffer_empty_p(&ring));
close(pushfd);
- close(pullfd);
- return;
-
-error:
- close(pushfd);
- close(pullfd);
- perror("(coupling) pump");
}
static inline int
@@ -262,7 +340,11 @@ pump_thread (void *data)
{
struct coupling *c = (struct coupling*)data;
- pump(c->is_pull, c->pullfd, c->pushfd);
+ if (c->is_pull) {
+ pull_pump(c->pullfd, c->pushfd);
+ } else {
+ push_pump(c->pullfd, c->pushfd);
+ }
return NULL;
}
View
18 doc/api.txt
@@ -1543,14 +1543,14 @@ resolution.addCallback(function (addresses, ttl, cname) {
reversing.addCallback( function (domains, ttl, cname) {
sys.puts("reverse for " + a + ": " + JSON.stringify(domains));
});
- reversing.addErrback( function (code, msg) {
- sys.puts("reverse for " + a + " failed: " + msg);
+ reversing.addErrback( function (e) {
+ puts("reverse for " + a + " failed: " + e.message);
});
}
});
-resolution.addErrback(function (code, msg) {
- sys.puts("error: " + msg);
+resolution.addErrback(function (e) {
+ puts("error: " + e.message);
});
-------------------------------------------------------------------------
@@ -1566,8 +1566,9 @@ This function returns a promise.
canonical name for the query.
The type of each item in +addresses+ is determined by the record type, and
described in the documentation for the corresponding lookup methods below.
-- on error: returns +code, msg+. +code+ is one of the error codes listed
- below and +msg+ is a string describing the error in English.
+- on error: Returns an instanceof Error object, where the "errno" field is one
+ of the error codes listed below and the "message" field is a string
+ describing the error in English.
+dns.resolve4(domain)+::
@@ -1605,8 +1606,9 @@ Reverse resolves an ip address to an array of domain names.
- on success: returns +domains, ttl, cname+. +ttl+ (time-to-live) is an integer
specifying the number of seconds this result is valid for. +cname+ is the
canonical name for the query. +domains+ is an array of domains.
-- on error: returns +code, msg+. +code+ is one of the error codes listed
- below and +msg+ is a string describing the error in English.
+- on error: Returns an instanceof Error object, where the "errno" field is one
+ of the error codes listed below and the "message" field is a string
+ describing the error in English.
Each DNS query can return an error code.
View
8 src/node_net.cc
@@ -821,6 +821,14 @@ Handle<Value> Server::Listen(const Arguments& args) {
if (address_list) freeaddrinfo(address_list);
+ if (server->server_.errorno) {
+ Local<Value> e = Exception::Error(
+ String::NewSymbol(strerror(server->server_.errorno)));
+ Local<Object> obj = e->ToObject();
+ obj->Set(String::NewSymbol("errno"), Integer::New(server->server_.errorno));
+ return ThrowException(e);
+ }
+
return Undefined();
}
View
11 test/mjsunit/fixtures/echo.js
@@ -1,5 +1,12 @@
process.mixin(require("../common"));
process.stdio.open();
+
+print("hello world\r\n");
+
process.stdio.addListener("data", function (data) {
- puts(data);
-});
+ print(data);
+});
+
+process.stdio.addListener("close", function () {
+ process.stdio.close();
+});
View
36 test/mjsunit/test-stdio.js
@@ -2,20 +2,34 @@ process.mixin(require("./common"));
var sub = path.join(fixturesDir, 'echo.js');
-var result = false;
-
-var child = process.createChildProcess(path.join(libDir, "../bin/node"), [sub]);
+var gotHelloWorld = false;
+var gotEcho = false;
+
+var child = process.createChildProcess(process.argv[0], [sub]);
+
child.addListener("error", function (data){
puts("parent stderr: " + data);
});
+
child.addListener("output", function (data){
- if (data && data[0] == 't') {
- result = true;
+ if (data) {
+ puts('child said: ' + JSON.stringify(data));
+ if (!gotHelloWorld) {
+ assert.equal("hello world\r\n", data);
+ gotHelloWorld = true;
+ child.write('echo me\r\n');
+ } else {
+ assert.equal("echo me\r\n", data);
+ gotEcho = true;
+ child.close();
+ }
+ } else {
+ puts('child eof');
}
});
-setTimeout(function () {
- child.write('t\r\n');
-}, 100);
-setTimeout(function (){
- assert.ok(result);
-}, 500)
+
+
+process.addListener('exit', function () {
+ assert.ok(gotHelloWorld);
+ assert.ok(gotEcho);
+});

0 comments on commit fc59669

Please sign in to comment.
Something went wrong with that request. Please try again.