diff --git a/src/unix/stream.c b/src/unix/stream.c index a258088838..83e2ddbe2e 100644 --- a/src/unix/stream.c +++ b/src/unix/stream.c @@ -29,6 +29,8 @@ #include #include +#include + static void uv__stream_connect(uv_stream_t*); static void uv__write(uv_stream_t* stream); @@ -476,12 +478,17 @@ static void uv__write_callbacks(uv_stream_t* stream) { static void uv__read(uv_stream_t* stream) { uv_buf_t buf; ssize_t nread; + struct msghdr msg; + struct cmsghdr* cmsg; + char cmsg_space[64]; + int received_fd = -1; struct ev_loop* ev = stream->loop->ev; /* XXX: Maybe instead of having UV_READING we just test if * tcp->read_cb is NULL or not? */ - while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) { + while ((stream->read_cb || stream->read2_cb) && + stream->flags & UV_READING) { assert(stream->alloc_cb); buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024); @@ -489,10 +496,29 @@ static void uv__read(uv_stream_t* stream) { assert(buf.base); assert(stream->fd >= 0); - do { - nread = read(stream->fd, buf.base, buf.len); + if (stream->read_cb) { + do { + nread = read(stream->fd, buf.base, buf.len); + } + while (nread < 0 && errno == EINTR); + } else { + assert(stream->read2_cb); + /* read2_cb uses recvmsg */ + msg.msg_flags = 0; + msg.msg_iov = (struct iovec*) &buf; + msg.msg_iovlen = 1; + msg.msg_name = NULL; + msg.msg_namelen = 0; + /* Set up to receive a descriptor even if one isn't in the message */ + msg.msg_controllen = 64; + msg.msg_control = (void *) cmsg_space; + + do { + nread = recvmsg(stream->fd, &msg, 0); + } + while (nread < 0 && errno == EINTR); } - while (nread < 0 && errno == EINTR); + if (nread < 0) { /* Error */ @@ -502,24 +528,73 @@ static void uv__read(uv_stream_t* stream) { ev_io_start(ev, &stream->read_watcher); } uv__set_sys_error(stream->loop, EAGAIN); - stream->read_cb(stream, 0, buf); + + if (stream->read_cb) { + stream->read_cb(stream, 0, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, 0, buf, UV_UNKNOWN_HANDLE); + } + return; } else { /* Error. User should call uv_close(). */ uv__set_sys_error(stream->loop, errno); - stream->read_cb(stream, -1, buf); + + if (stream->read_cb) { + stream->read_cb(stream, -1, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE); + } + assert(!ev_is_active(&stream->read_watcher)); return; } + } else if (nread == 0) { /* EOF */ uv__set_artificial_error(stream->loop, UV_EOF); ev_io_stop(ev, &stream->read_watcher); - stream->read_cb(stream, -1, buf); + + if (stream->read_cb) { + stream->read_cb(stream, -1, buf); + } else { + stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE); + } return; } else { /* Successful read */ - stream->read_cb(stream, nread, buf); + + if (stream->read_cb) { + stream->read_cb(stream, nread, buf); + } else { + assert(stream->read2_cb); + + /* + * XXX: Some implementations can send multiple file descriptors in a + * single message. We should be using CMSG_NXTHDR() to walk the + * chain to get at them all. This would require changing the API to + * hand these back up the caller, is a pain. + */ + + for (cmsg = CMSG_FIRSTHDR(&msg); + msg.msg_controllen > 0 && cmsg != NULL; + cmsg = CMSG_NXTHDR(&msg, cmsg)) { + + if (cmsg->cmsg_type == SCM_RIGHTS) { + if (stream->accepted_fd != -1) { + fprintf(stderr, "(libuv) ignoring extra FD received\n"); + } + + stream->accepted_fd = *(int *) CMSG_DATA(cmsg); + + } else { + fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n", + cmsg->cmsg_type); + } + } + + stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_TCP); + } } } } @@ -780,8 +855,8 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt, } -int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, - uv_read_cb read_cb) { +int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read_cb read_cb, uv_read2_cb read2_cb) { assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE || stream->type == UV_TTY); @@ -803,6 +878,7 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, assert(alloc_cb); stream->read_cb = read_cb; + stream->read2_cb = read2_cb; stream->alloc_cb = alloc_cb; /* These should have been set by uv_tcp_init. */ @@ -813,13 +889,15 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, } +int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, + uv_read_cb read_cb) { + return uv__read_start_common(stream, alloc_cb, read_cb, NULL); +} + + int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb, uv_read2_cb read_cb) { - int r; - r = uv_read_start(stream, alloc_cb, NULL); - assert(stream->read_cb == NULL); - stream->read2_cb = read_cb; - return r; + return uv__read_start_common(stream, alloc_cb, NULL, read_cb); } diff --git a/test/test-ipc.c b/test/test-ipc.c index c16f9ce732..ed263c10b0 100644 --- a/test/test-ipc.c +++ b/test/test-ipc.c @@ -45,8 +45,7 @@ static void ipc_on_connection(uv_stream_t* server, int status) { static void exit_cb(uv_process_t* process, int exit_status, int term_signal) { printf("exit_cb\n"); exit_cb_called++; - ASSERT(exit_status == 1); - ASSERT(term_signal == 0); + ASSERT(exit_status == 0); uv_close((uv_handle_t*)process, NULL); } @@ -60,6 +59,7 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, uv_handle_type pending) { int r; uv_buf_t outbuf; + uv_err_t err; if (nread == 0) { /* Everything OK, but nothing read. */ @@ -67,6 +67,17 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, return; } + if (nread < 0) { + err = uv_last_error(pipe->loop); + if (err.code == UV_EOF) { + free(buf.base); + return; + } + + printf("error recving on channel: %s\n", uv_strerror(err)); + abort(); + } + ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE); read2_cb_called++; @@ -82,7 +93,7 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf, ASSERT(r == 0); /* Make sure that the expected data is correctly multiplexed. */ - ASSERT(memcmp("hello\n", buf.base, buf.len) == 0); + ASSERT(memcmp("hello\n", buf.base, nread) == 0); fprintf(stderr, "got %d bytes\n", (int)nread); outbuf = uv_buf_init("world\n", 6);