Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

unix: handle passing kind of working

  • Loading branch information...
ry committed Oct 3, 2011
1 parent 2d826fe commit 7de7ff30b465c1a00bb668301d649c4e0f162b23
Showing with 107 additions and 18 deletions.
  1. +93 −15 src/unix/stream.c
  2. +14 −3 test/test-ipc.c
@@ -29,6 +29,8 @@
#include <string.h>
#include <sys/uio.h>

#include <stdio.h>


static void uv__stream_connect(uv_stream_t*);
static void uv__write(uv_stream_t* stream);
@@ -476,23 +478,47 @@ static void uv__write_callbacks(uv_stream_t* stream) {
static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
ssize_t nread;
struct msghdr msg;
struct cmsghdr* cmsg;
char cmsg_space[64];
int received_fd = -1;
struct ev_loop* ev = stream->loop->ev;

/* XXX: Maybe instead of having UV_READING we just test if
* tcp->read_cb is NULL or not?
*/
while (stream->read_cb && ((uv_handle_t*)stream)->flags & UV_READING) {
while ((stream->read_cb || stream->read2_cb) &&
stream->flags & UV_READING) {
assert(stream->alloc_cb);
buf = stream->alloc_cb((uv_handle_t*)stream, 64 * 1024);

assert(buf.len > 0);
assert(buf.base);
assert(stream->fd >= 0);

do {
nread = read(stream->fd, buf.base, buf.len);
if (stream->read_cb) {
do {
nread = read(stream->fd, buf.base, buf.len);
}
while (nread < 0 && errno == EINTR);
} else {
assert(stream->read2_cb);
/* read2_cb uses recvmsg */
msg.msg_flags = 0;
msg.msg_iov = (struct iovec*) &buf;
msg.msg_iovlen = 1;
msg.msg_name = NULL;
msg.msg_namelen = 0;
/* Set up to receive a descriptor even if one isn't in the message */
msg.msg_controllen = 64;
msg.msg_control = (void *) cmsg_space;

do {
nread = recvmsg(stream->fd, &msg, 0);
}
while (nread < 0 && errno == EINTR);
}
while (nread < 0 && errno == EINTR);


if (nread < 0) {
/* Error */
@@ -502,24 +528,73 @@ static void uv__read(uv_stream_t* stream) {
ev_io_start(ev, &stream->read_watcher);
}
uv__set_sys_error(stream->loop, EAGAIN);
stream->read_cb(stream, 0, buf);

if (stream->read_cb) {
stream->read_cb(stream, 0, buf);
} else {
stream->read2_cb((uv_pipe_t*)stream, 0, buf, UV_UNKNOWN_HANDLE);
}

return;
} else {
/* Error. User should call uv_close(). */
uv__set_sys_error(stream->loop, errno);
stream->read_cb(stream, -1, buf);

if (stream->read_cb) {
stream->read_cb(stream, -1, buf);
} else {
stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE);
}

assert(!ev_is_active(&stream->read_watcher));
return;
}

} else if (nread == 0) {
/* EOF */
uv__set_artificial_error(stream->loop, UV_EOF);
ev_io_stop(ev, &stream->read_watcher);
stream->read_cb(stream, -1, buf);

if (stream->read_cb) {
stream->read_cb(stream, -1, buf);
} else {
stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE);
}
return;
} else {
/* Successful read */
stream->read_cb(stream, nread, buf);

if (stream->read_cb) {
stream->read_cb(stream, nread, buf);
} else {
assert(stream->read2_cb);

/*
* XXX: Some implementations can send multiple file descriptors in a
* single message. We should be using CMSG_NXTHDR() to walk the
* chain to get at them all. This would require changing the API to
* hand these back up the caller, is a pain.
*/

for (cmsg = CMSG_FIRSTHDR(&msg);
msg.msg_controllen > 0 && cmsg != NULL;
cmsg = CMSG_NXTHDR(&msg, cmsg)) {

if (cmsg->cmsg_type == SCM_RIGHTS) {
if (stream->accepted_fd != -1) {
fprintf(stderr, "(libuv) ignoring extra FD received\n");
}

stream->accepted_fd = *(int *) CMSG_DATA(cmsg);

} else {
fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
cmsg->cmsg_type);
}
}

stream->read2_cb((uv_pipe_t*)stream, nread, buf, UV_TCP);
}
}
}
}
@@ -780,8 +855,8 @@ int uv_write(uv_write_t* req, uv_stream_t* stream, uv_buf_t bufs[], int bufcnt,
}


int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb,
uv_read_cb read_cb, uv_read2_cb read2_cb) {
assert(stream->type == UV_TCP || stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);

@@ -803,6 +878,7 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
assert(alloc_cb);

stream->read_cb = read_cb;
stream->read2_cb = read2_cb;
stream->alloc_cb = alloc_cb;

/* These should have been set by uv_tcp_init. */
@@ -813,13 +889,15 @@ int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
}


int uv_read_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
uv_read_cb read_cb) {
return uv__read_start_common(stream, alloc_cb, read_cb, NULL);
}


int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
uv_read2_cb read_cb) {
int r;
r = uv_read_start(stream, alloc_cb, NULL);
assert(stream->read_cb == NULL);
stream->read2_cb = read_cb;
return r;
return uv__read_start_common(stream, alloc_cb, NULL, read_cb);
}


@@ -45,8 +45,7 @@ static void ipc_on_connection(uv_stream_t* server, int status) {
static void exit_cb(uv_process_t* process, int exit_status, int term_signal) {
printf("exit_cb\n");
exit_cb_called++;
ASSERT(exit_status == 1);
ASSERT(term_signal == 0);
ASSERT(exit_status == 0);
uv_close((uv_handle_t*)process, NULL);
}

@@ -60,13 +59,25 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf,
uv_handle_type pending) {
int r;
uv_buf_t outbuf;
uv_err_t err;

if (nread == 0) {
/* Everything OK, but nothing read. */
free(buf.base);
return;
}

if (nread < 0) {
err = uv_last_error(pipe->loop);
if (err.code == UV_EOF) {
free(buf.base);
return;
}

printf("error recving on channel: %s\n", uv_strerror(err));
abort();
}

ASSERT(nread > 0 && buf.base && pending != UV_UNKNOWN_HANDLE);
read2_cb_called++;

@@ -82,7 +93,7 @@ static void on_read(uv_pipe_t* pipe, ssize_t nread, uv_buf_t buf,
ASSERT(r == 0);

/* Make sure that the expected data is correctly multiplexed. */
ASSERT(memcmp("hello\n", buf.base, buf.len) == 0);
ASSERT(memcmp("hello\n", buf.base, nread) == 0);
fprintf(stderr, "got %d bytes\n", (int)nread);

outbuf = uv_buf_init("world\n", 6);

0 comments on commit 7de7ff3

Please sign in to comment.
You can’t perform that action at this time.