Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
pipe: allow queueing pending handles
Browse files Browse the repository at this point in the history
If multiple handles arrive to the IPC pipe at the same time (happens on
some platforms), libuv will queue them internally, and call `read2_cb`
multiple times with a null-buffer and proper `handle_type`.
  • Loading branch information
indutny committed Dec 23, 2013
1 parent f166d6d commit 08aeaf6
Show file tree
Hide file tree
Showing 3 changed files with 91 additions and 15 deletions.
1 change: 1 addition & 0 deletions include/uv-unix.h
Expand Up @@ -229,6 +229,7 @@ typedef struct {
uv_connection_cb connection_cb; \
int delayed_error; \
int accepted_fd; \
int* queued_fds; \
UV_STREAM_PRIVATE_PLATFORM_FIELDS \

#define UV_TCP_PRIVATE_FIELDS /* empty */
Expand Down
3 changes: 3 additions & 0 deletions include/uv.h
Expand Up @@ -376,6 +376,9 @@ typedef void (*uv_read_cb)(uv_stream_t* stream,
* Just like the uv_read_cb except that if the pending parameter is true
* then you can use uv_accept() to pull the new handle into the process.
* If no handle is pending then pending will be UV_UNKNOWN_HANDLE.
*
* NOTE: The buffer may be a null buffer if multiple fds were accepted and
* read2_cb is called for pending ones.
*/
typedef void (*uv_read2_cb)(uv_pipe_t* pipe,
ssize_t nread,
Expand Down
102 changes: 87 additions & 15 deletions src/unix/stream.c
Expand Up @@ -119,6 +119,7 @@ void uv__stream_init(uv_loop_t* loop,
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
stream->queued_fds = NULL;
stream->delayed_error = 0;
QUEUE_INIT(&stream->write_queue);
QUEUE_INIT(&stream->write_completed_queue);
Expand Down Expand Up @@ -559,6 +560,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
if (server->accepted_fd == -1)
return -EAGAIN;

err = 0;
switch (client->type) {
case UV_NAMED_PIPE:
case UV_TCP:
Expand All @@ -568,27 +570,47 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
if (err) {
/* TODO handle error */
uv__close(server->accepted_fd);
server->accepted_fd = -1;
return err;
goto done;
}
break;

case UV_UDP:
err = uv_udp_open((uv_udp_t*) client, server->accepted_fd);
if (err) {
uv__close(server->accepted_fd);
server->accepted_fd = -1;
return err;
goto done;
}
break;

default:
assert(0);
}

uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
server->accepted_fd = -1;
return 0;
done:
/* Process queued fds */
if (server->queued_fds != NULL) {
/* Read first */
server->accepted_fd = server->queued_fds[2];

/* All read, free */
if (--server->queued_fds[0] == 0) {
free(server->queued_fds);
server->queued_fds = NULL;
} else {
/* Shift rest */
memmove(server->queued_fds + 2,
server->queued_fds + 3,
server->queued_fds[0]);
}

/* Invoke read_cb one more time */
uv__io_feed(server->loop, &server->io_watcher);
} else {
server->accepted_fd = -1;
if (err == 0)
uv__io_start(server->loop, &server->io_watcher, UV__POLLIN);
}
return err;
}


Expand Down Expand Up @@ -951,13 +973,58 @@ static void uv__stream_eof(uv_stream_t* stream, const uv_buf_t* buf) {
}


static int uv__stream_queue_fd(uv_stream_t* stream, int fd) {
int queue_offset;
int queue_len;

if (stream->queued_fds == NULL) {
queue_offset = 0;
queue_len = 8;
stream->queued_fds = malloc((queue_len + 2) * sizeof(*stream->queued_fds));
if (stream->queued_fds == NULL)
return UV_ENOMEM;
stream->queued_fds[1] = queue_len;
} else {
queue_offset = stream->queued_fds[0];
queue_len = stream->queued_fds[1];

/* Grow */
if (queue_offset == queue_len) {
queue_len += 8;
stream->queued_fds = realloc(stream->queued_fds,
(queue_len + 2) *
sizeof(*stream->queued_fds));
if (stream->queued_fds == NULL)
return UV_ENOMEM;
stream->queued_fds[1] = queue_len;
}
}

/* Put fd in a queue */
stream->queued_fds[0] = queue_offset;
stream->queued_fds[2 + queue_offset++] = fd;

return 0;
}


static void uv__read(uv_stream_t* stream) {
uv_buf_t buf;
ssize_t nread;
struct msghdr msg;
struct cmsghdr* cmsg;
char cmsg_space[64];
int count;
int err;

/* Has queued fds */
if (stream->accepted_fd != -1) {
static uv_buf_t buf = { NULL, 0 };
stream->read2_cb((uv_pipe_t*) stream,
0,
&buf,
uv__handle_type(stream->accepted_fd));
}

stream->flags &= ~UV_STREAM_READ_PARTIAL;

Expand Down Expand Up @@ -1046,17 +1113,20 @@ static void uv__read(uv_stream_t* stream) {
cmsg = CMSG_NXTHDR(&msg, cmsg)) {

if (cmsg->cmsg_type == SCM_RIGHTS) {
if (stream->accepted_fd != -1) {
fprintf(stderr, "(libuv) ignoring extra FD received\n");
}

/* silence aliasing warning */
{
void* pv = CMSG_DATA(cmsg);
int* pi = pv;
void* pv = CMSG_DATA(cmsg);
int* pi = pv;

/* Already has accepted fd, queue now */
if (stream->accepted_fd != -1) {
err = uv__stream_queue_fd(stream, *pi);
if (err != 0) {
uv__stream_read_cb(stream, err, NULL, UV_UNKNOWN_HANDLE);
return;
}
} else {
stream->accepted_fd = *pi;
}

} else {
fprintf(stderr, "ignoring non-SCM_RIGHTS ancillary data: %d\n",
cmsg->cmsg_type);
Expand Down Expand Up @@ -1493,6 +1563,8 @@ void uv__stream_close(uv_stream_t* handle) {
handle->accepted_fd = -1;
}

free(handle->queued_fds);

assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
}

Expand Down

0 comments on commit 08aeaf6

Please sign in to comment.