This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

@@ -55,7 +55,7 @@ static char *process_title;


int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
return 0;
return uv__kqueue_init(loop);
}


@@ -81,12 +81,14 @@ void uv__fsevents_cb(uv_async_t* cb, int status) {
handle = cb->data;

UV__FSEVENTS_WALK(handle, {
if (handle->fd != -1)
if (handle->event_watcher.fd != -1)
handle->cb(handle, event->path[0] ? event->path : NULL, event->events, 0);
});

if ((handle->flags & (UV_CLOSING | UV_CLOSED)) == 0 && handle->fd == -1)
if ((handle->flags & (UV_CLOSING | UV_CLOSED)) == 0 &&
handle->event_watcher.fd == -1) {
uv__fsevents_close(handle);
}
}


@@ -81,9 +81,35 @@
} \
while (0)

#define UV__IO_READ EV_READ
#define UV__IO_WRITE EV_WRITE
#define UV__IO_ERROR EV_ERROR
#if defined(__linux__)
# define UV__POLLIN UV__EPOLLIN
# define UV__POLLOUT UV__EPOLLOUT
# define UV__POLLERR UV__EPOLLERR
# define UV__POLLHUP UV__EPOLLHUP
#endif

#if defined(__sun)
# define UV__POLLIN POLLIN
# define UV__POLLOUT POLLOUT
# define UV__POLLERR POLLERR
# define UV__POLLHUP POLLHUP
#endif

#ifndef UV__POLLIN
# define UV__POLLIN 1
#endif

#ifndef UV__POLLOUT
# define UV__POLLOUT 2
#endif

#ifndef UV__POLLERR
# define UV__POLLERR 4
#endif

#ifndef UV__POLLHUP
# define UV__POLLHUP 8
#endif

/* handle flags */
enum {
@@ -117,12 +143,12 @@ int uv__dup(int fd);
int uv_async_stop(uv_async_t* handle);
void uv__make_close_pending(uv_handle_t* handle);

void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events);
void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events);
void uv__io_start(uv_loop_t* loop, uv__io_t* handle);
void uv__io_stop(uv_loop_t* loop, uv__io_t* handle);
void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event);
int uv__io_active(uv__io_t* handle);
void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd);
void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events);
void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events);
void uv__io_feed(uv_loop_t* loop, uv__io_t* w);
int uv__io_active(const uv__io_t* w, unsigned int events);
void uv__io_poll(uv_loop_t* loop, int timeout); /* in milliseconds or -1 */

/* loop */
int uv__loop_init(uv_loop_t* loop, int default_loop);
@@ -140,13 +166,13 @@ void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream,
uv_handle_type type);
int uv__stream_open(uv_stream_t*, int fd, int flags);
void uv__stream_destroy(uv_stream_t* stream);
void uv__server_io(uv_loop_t* loop, uv__io_t* watcher, int events);
void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
int uv__accept(int sockfd);

/* tcp */
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb);
int uv__tcp_nodelay(uv_tcp_t* handle, int enable);
int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay);
int uv__tcp_nodelay(int fd, int on);
int uv__tcp_keepalive(int fd, int on, unsigned int delay);

/* pipe */
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
@@ -168,6 +194,7 @@ void uv__work_submit(uv_loop_t* loop,
void uv__work_done(uv_async_t* handle, int status);

/* platform specific */
int uv__kqueue_init(uv_loop_t* loop);
int uv__platform_loop_init(uv_loop_t* loop, int default_loop);
void uv__platform_loop_delete(uv_loop_t* loop);

@@ -29,57 +29,254 @@
#include <sys/sysctl.h>
#include <sys/types.h>
#include <sys/event.h>
#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>

static void uv__fs_event(EV_P_ ev_io* w, int revents);
static void uv__fs_event(uv_loop_t* loop, uv__io_t* w, unsigned int fflags);


static void uv__fs_event_start(uv_fs_event_t* handle) {
ev_io_init(&handle->event_watcher,
uv__fs_event,
handle->fd,
EV_LIBUV_KQUEUE_HACK);
ev_io_start(handle->loop->ev, &handle->event_watcher);
int uv__kqueue_init(uv_loop_t* loop) {
loop->backend_fd = kqueue();

if (loop->backend_fd == -1)
return -1;

uv__cloexec(loop->backend_fd, 1);

return 0;
}


static void uv__fs_event_stop(uv_fs_event_t* handle) {
ev_io_stop(handle->loop->ev, &handle->event_watcher);
void uv__io_poll(uv_loop_t* loop, int timeout) {
struct kevent events[1024];
struct kevent* ev;
struct timespec spec;
unsigned int nevents;
unsigned int revents;
ngx_queue_t* q;
uint64_t base;
uint64_t diff;
uv__io_t* w;
int filter;
int fflags;
int count;
int nfds;
int fd;
int op;
int i;

if (loop->nfds == 0) {
assert(ngx_queue_empty(&loop->watcher_queue));
return;
}

nevents = 0;

while (!ngx_queue_empty(&loop->watcher_queue)) {
q = ngx_queue_head(&loop->watcher_queue);
ngx_queue_remove(q);
ngx_queue_init(q);

w = ngx_queue_data(q, uv__io_t, watcher_queue);
assert(w->pevents != 0);
assert(w->fd >= 0);
assert(w->fd < (int) loop->nwatchers);

/* Filter out no-op changes. This is for compatibility with the event ports
* backend, see uv__io_start().
*/
if (w->events == w->pevents)
continue;

if ((w->events & UV__POLLIN) == 0 && (w->pevents & UV__POLLIN) != 0) {
filter = EVFILT_READ;
fflags = 0;
op = EV_ADD;

if (w->cb == uv__fs_event) {
filter = EVFILT_VNODE;
fflags = NOTE_ATTRIB | NOTE_WRITE | NOTE_RENAME
| NOTE_DELETE | NOTE_EXTEND | NOTE_REVOKE;
op = EV_ADD | EV_ONESHOT; /* Stop the event from firing repeatedly. */
}

EV_SET(events + nevents, w->fd, filter, op, fflags, 0, 0);

if (++nevents == ARRAY_SIZE(events)) {
if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL))
abort();
nevents = 0;
}
}

if ((w->events & UV__POLLOUT) == 0 && (w->pevents & UV__POLLOUT) != 0) {
EV_SET(events + nevents, w->fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);

if (++nevents == ARRAY_SIZE(events)) {
if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL))
abort();
nevents = 0;
}
}

w->events = w->pevents;
}

assert(timeout >= -1);
base = loop->time;
count = 48; /* Benchmarks suggest this gives the best throughput. */

for (;; nevents = 0) {
if (timeout != -1) {
spec.tv_sec = timeout / 1000;
spec.tv_nsec = (timeout % 1000) * 1000000;
}

nfds = kevent(loop->backend_fd,
events,
nevents,
events,
ARRAY_SIZE(events),
timeout == -1 ? NULL : &spec);

if (nfds == 0) {
assert(timeout != -1);
return;
}

if (nfds == -1) {
if (errno != EINTR)
abort();

if (timeout == 0)
return;

if (timeout == -1)
continue;

/* Interrupted by a signal. Update timeout and poll again. */
goto update_timeout;
}

nevents = 0;

for (i = 0; i < nfds; i++) {
ev = events + i;
fd = ev->ident;
w = loop->watchers[fd];

if (w == NULL) {
/* File descriptor that we've stopped watching, disarm it. */
/* TODO batch up */
struct kevent events[1];

EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0);
if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL))
if (errno != EBADF && errno != ENOENT)
abort();

continue;
}

if (ev->filter == EVFILT_VNODE) {
assert(w->events == UV__POLLIN);
assert(w->pevents == UV__POLLIN);
w->cb(loop, w, ev->fflags); /* XXX always uv__fs_event() */
nevents++;
continue;
}

revents = 0;

if (ev->filter == EVFILT_READ) {
if (w->events & UV__POLLIN)
revents |= UV__POLLIN;
else {
/* TODO batch up */
struct kevent events[1];
EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0);
if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL)) abort();
}
}

if (ev->filter == EVFILT_WRITE) {
if (w->events & UV__POLLOUT)
revents |= UV__POLLOUT;
else {
/* TODO batch up */
struct kevent events[1];
EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0);
if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL)) abort();
}
}

if (ev->flags & EV_ERROR)
revents |= UV__POLLERR;

if (revents == 0)
continue;

w->cb(loop, w, revents);
nevents++;
}

if (nevents != 0) {
if (nfds == ARRAY_SIZE(events) && --count != 0) {
/* Poll for more events but don't block this time. */
timeout = 0;
continue;
}
return;
}

if (timeout == 0)
return;

if (timeout == -1)
continue;

update_timeout:
assert(timeout > 0);

diff = uv_hrtime() / 1000000;
assert(diff >= base);
diff -= base;

if (diff >= (uint64_t) timeout)
return;

timeout -= diff;
}
}


static void uv__fs_event(EV_P_ ev_io* w, int revents) {
static void uv__fs_event(uv_loop_t* loop, uv__io_t* w, unsigned int fflags) {
uv_fs_event_t* handle;
struct kevent ev;
int events;

assert(revents == EV_LIBUV_KQUEUE_HACK);

handle = container_of(w, uv_fs_event_t, event_watcher);

if (handle->fflags & (NOTE_ATTRIB | NOTE_EXTEND))
if (fflags & (NOTE_ATTRIB | NOTE_EXTEND))
events = UV_CHANGE;
else
events = UV_RENAME;

handle->cb(handle, NULL, events, 0);

if (handle->fd == -1)
if (handle->event_watcher.fd == -1)
return;

/* File watcher operates in one-shot mode, re-arm it. */
uv__fs_event_stop(handle);
uv__fs_event_start(handle);
}

/* Watcher operates in one-shot mode, re-arm it. */
fflags = NOTE_ATTRIB | NOTE_WRITE | NOTE_RENAME
| NOTE_DELETE | NOTE_EXTEND | NOTE_REVOKE;

/* Called by libev, don't touch. */
void uv__kqueue_hack(EV_P_ int fflags, ev_io *w) {
uv_fs_event_t* handle;
EV_SET(&ev, w->fd, EVFILT_VNODE, EV_ADD | EV_ONESHOT, fflags, 0, 0);

handle = container_of(w, uv_fs_event_t, event_watcher);
handle->fflags = fflags;
if (kevent(loop->backend_fd, &ev, 1, NULL, 0, NULL))
abort();
}


@@ -88,10 +285,10 @@ int uv_fs_event_init(uv_loop_t* loop,
const char* filename,
uv_fs_event_cb cb,
int flags) {
int fd;
#if defined(__APPLE__)
struct stat statbuf;
#endif /* defined(__APPLE__) */
int fd;

/* TODO open asynchronously - but how do we report back errors? */
if ((fd = open(filename, O_RDONLY)) == -1) {
@@ -101,10 +298,9 @@ int uv_fs_event_init(uv_loop_t* loop,

uv__handle_init(loop, (uv_handle_t*)handle, UV_FS_EVENT);
uv__handle_start(handle); /* FIXME shouldn't start automatically */
uv__io_init(&handle->event_watcher, uv__fs_event, fd);
handle->filename = strdup(filename);
handle->fflags = 0;
handle->cb = cb;
handle->fd = fd;

#if defined(__APPLE__)
/* Nullify field to perform checks later */
@@ -124,7 +320,7 @@ int uv_fs_event_init(uv_loop_t* loop,
fallback:
#endif /* defined(__APPLE__) */

uv__fs_event_start(handle);
uv__io_start(loop, &handle->event_watcher, UV__POLLIN);

return 0;
}
@@ -133,13 +329,16 @@ int uv_fs_event_init(uv_loop_t* loop,
void uv__fs_event_close(uv_fs_event_t* handle) {
#if defined(__APPLE__)
if (uv__fsevents_close(handle))
uv__fs_event_stop(handle);
uv__io_stop(handle->loop, &handle->event_watcher, UV__POLLIN);
#else
uv__fs_event_stop(handle);
uv__io_stop(handle->loop, &handle->event_watcher, UV__POLLIN);
#endif /* defined(__APPLE__) */

uv__handle_stop(handle);

free(handle->filename);
close(handle->fd);
handle->fd = -1;
handle->filename = NULL;

close(handle->event_watcher.fd);
handle->event_watcher.fd = -1;
}
@@ -64,7 +64,9 @@ static int compare_watchers(const struct watcher_list* a,
RB_GENERATE_STATIC(watcher_root, watcher_list, entry, compare_watchers)


static void uv__inotify_read(uv_loop_t* loop, uv__io_t* w, int revents);
static void uv__inotify_read(uv_loop_t* loop,
uv__io_t* w,
unsigned int revents);


static int new_inotify_fd(void) {
@@ -98,11 +100,8 @@ static int init_inotify(uv_loop_t* loop) {
return -1;
}

uv__io_init(&loop->inotify_read_watcher,
uv__inotify_read,
loop->inotify_fd,
UV__IO_READ);
uv__io_start(loop, &loop->inotify_read_watcher);
uv__io_init(&loop->inotify_read_watcher, uv__inotify_read, loop->inotify_fd);
uv__io_start(loop, &loop->inotify_read_watcher, UV__POLLIN);

return 0;
}
@@ -115,7 +114,9 @@ static struct watcher_list* find_watcher(uv_loop_t* loop, int wd) {
}


static void uv__inotify_read(uv_loop_t* loop, uv__io_t* dummy, int events) {
static void uv__inotify_read(uv_loop_t* loop,
uv__io_t* dummy,
unsigned int events) {
const struct uv__inotify_event* e;
struct watcher_list* w;
uv_fs_event_t* h;
@@ -78,20 +78,183 @@ static void free_args_mem(void) {


int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
loop->inotify_watchers = NULL;
int fd;

fd = uv__epoll_create1(UV__EPOLL_CLOEXEC);

/* epoll_create1() can fail either because it's not implemented (old kernel)
* or because it doesn't understand the EPOLL_CLOEXEC flag.
*/
if (fd == -1 && (errno == ENOSYS || errno == EINVAL)) {
fd = uv__epoll_create(256);

if (fd != -1)
uv__cloexec(fd, 1);
}

loop->backend_fd = fd;
loop->inotify_fd = -1;
loop->inotify_watchers = NULL;

if (fd == -1)
return -1;

return 0;
}


void uv__platform_loop_delete(uv_loop_t* loop) {
if (loop->inotify_fd == -1) return;
uv__io_stop(loop, &loop->inotify_read_watcher);
uv__io_stop(loop, &loop->inotify_read_watcher, UV__POLLIN);
close(loop->inotify_fd);
loop->inotify_fd = -1;
}


void uv__io_poll(uv_loop_t* loop, int timeout) {
struct uv__epoll_event events[1024];
struct uv__epoll_event* pe;
struct uv__epoll_event e;
ngx_queue_t* q;
uv__io_t* w;
uint64_t base;
uint64_t diff;
int nevents;
int count;
int nfds;
int fd;
int op;
int i;

if (loop->nfds == 0) {
assert(ngx_queue_empty(&loop->watcher_queue));
return;
}

while (!ngx_queue_empty(&loop->watcher_queue)) {
q = ngx_queue_head(&loop->watcher_queue);
ngx_queue_remove(q);
ngx_queue_init(q);

w = ngx_queue_data(q, uv__io_t, watcher_queue);
assert(w->pevents != 0);
assert(w->fd >= 0);
assert(w->fd < (int) loop->nwatchers);

/* Filter out no-op changes. This is for compatibility with the event ports
* backend, see the comment in uv__io_start().
*/
if (w->events == w->pevents)
continue;

e.events = w->pevents;
e.data = w->fd;

if (w->events == 0)
op = UV__EPOLL_CTL_ADD;
else
op = UV__EPOLL_CTL_MOD;

/* XXX Future optimization: do EPOLL_CTL_MOD lazily if we stop watching
* events, skip the syscall and squelch the events after epoll_wait().
*/
if (uv__epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
if (errno != EEXIST)
abort();

assert(op == UV__EPOLL_CTL_ADD);

/* We've reactivated a file descriptor that's been watched before. */
if (uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_MOD, w->fd, &e))
abort();
}

w->events = w->pevents;
}

assert(timeout >= -1);
base = loop->time;
count = 48; /* Benchmarks suggest this gives the best throughput. */

for (;;) {
nfds = uv__epoll_wait(loop->backend_fd,
events,
ARRAY_SIZE(events),
timeout);

if (nfds == 0) {
assert(timeout != -1);
return;
}

if (nfds == -1) {
if (errno != EINTR)
abort();

if (timeout == -1)
continue;

if (timeout == 0)
return;

/* Interrupted by a signal. Update timeout and poll again. */
goto update_timeout;
}

nevents = 0;

for (i = 0; i < nfds; i++) {
pe = events + i;
fd = pe->data;

assert(fd >= 0);
assert((unsigned) fd < loop->nwatchers);

w = loop->watchers[fd];

if (w == NULL) {
/* File descriptor that we've stopped watching, disarm it. */
if (uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_DEL, fd, pe))
if (errno != EBADF && errno != ENOENT)
abort();

continue;
}

w->cb(loop, w, pe->events);
nevents++;
}

if (nevents != 0) {
if (nfds == ARRAY_SIZE(events) && --count != 0) {
/* Poll for more events but don't block this time. */
timeout = 0;
continue;
}
return;
}

if (timeout == 0)
return;

if (timeout == -1)
continue;

update_timeout:
assert(timeout > 0);

diff = uv_hrtime() / 1000000;
assert(diff >= base);
diff -= base;

if (diff >= (uint64_t) timeout)
return;

timeout -= diff;
}
}


uint64_t uv_hrtime() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
@@ -29,16 +29,9 @@

int uv__loop_init(uv_loop_t* loop, int default_loop) {
unsigned int i;
int flags;

uv__signal_global_once_init();

#if HAVE_KQUEUE
flags = EVBACKEND_KQUEUE;
#else
flags = EVFLAG_AUTO;
#endif

memset(loop, 0, sizeof(*loop));
RB_INIT(&loop->timer_handles);
ngx_queue_init(&loop->wq);
@@ -48,15 +41,24 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
ngx_queue_init(&loop->check_handles);
ngx_queue_init(&loop->prepare_handles);
ngx_queue_init(&loop->handle_queue);

loop->nfds = 0;
loop->watchers = NULL;
loop->nwatchers = 0;
ngx_queue_init(&loop->pending_queue);
ngx_queue_init(&loop->watcher_queue);

loop->closing_handles = NULL;
loop->time = uv_hrtime() / 1000000;
loop->async_pipefd[0] = -1;
loop->async_pipefd[1] = -1;
loop->signal_pipefd[0] = -1;
loop->signal_pipefd[1] = -1;
loop->backend_fd = -1;
loop->emfile_fd = -1;
loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags);
ev_set_userdata(loop->ev, loop);

if (uv__platform_loop_init(loop, default_loop))
return -1;

uv_signal_init(loop, &loop->child_watcher);
uv__handle_unref(&loop->child_watcher);
@@ -74,17 +76,13 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
uv__handle_unref(&loop->wq_async);
loop->wq_async.flags |= UV__HANDLE_INTERNAL;

if (uv__platform_loop_init(loop, default_loop))
return -1;

return 0;
}


void uv__loop_delete(uv_loop_t* loop) {
uv__signal_loop_cleanup(loop);
uv__platform_loop_delete(loop);
ev_loop_destroy(loop->ev);

if (loop->async_pipefd[0] != -1) {
close(loop->async_pipefd[0]);
@@ -101,8 +99,23 @@ void uv__loop_delete(uv_loop_t* loop) {
loop->emfile_fd = -1;
}

if (loop->backend_fd != -1) {
close(loop->backend_fd);
loop->backend_fd = -1;
}

uv_mutex_lock(&loop->wq_mutex);
assert(ngx_queue_empty(&loop->wq) && "thread pool work queue not empty!");
uv_mutex_unlock(&loop->wq_mutex);
uv_mutex_destroy(&loop->wq_mutex);

#if 0
assert(ngx_queue_empty(&loop->pending_queue));
assert(ngx_queue_empty(&loop->watcher_queue));
assert(loop->nfds == 0);
#endif

free(loop->watchers);
loop->watchers = NULL;
loop->nwatchers = 0;
}
@@ -48,7 +48,7 @@ static char *process_title;


int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
return 0;
return uv__kqueue_init(loop);
}


@@ -44,7 +44,7 @@ static char *process_title;


int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
return 0;
return uv__kqueue_init(loop);
}


@@ -29,7 +29,7 @@
#include <unistd.h>
#include <stdlib.h>

static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events);
static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, unsigned int events);


int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
@@ -57,7 +57,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
bound = 0;

/* Already bound? */
if (handle->fd >= 0) {
if (handle->io_watcher.fd >= 0) {
uv__set_artificial_error(handle->loop, UV_EINVAL);
goto out;
}
@@ -89,7 +89,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) {

/* Success. */
handle->pipe_fname = pipe_fname; /* Is a strdup'ed copy. */
handle->fd = sockfd;
handle->io_watcher.fd = sockfd;
status = 0;

out:
@@ -117,21 +117,18 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
saved_errno = errno;
status = -1;

if (handle->fd == -1) {
if (handle->io_watcher.fd == -1) {
uv__set_artificial_error(handle->loop, UV_EINVAL);
goto out;
}
assert(handle->fd >= 0);
assert(handle->io_watcher.fd >= 0);

if ((status = listen(handle->fd, backlog)) == -1) {
if ((status = listen(handle->io_watcher.fd, backlog)) == -1) {
uv__set_sys_error(handle->loop, errno);
} else {
handle->connection_cb = cb;
uv__io_init(&handle->read_watcher,
uv__pipe_accept,
handle->fd,
UV__IO_READ);
uv__io_start(handle->loop, &handle->read_watcher);
handle->io_watcher.cb = uv__pipe_accept;
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLIN);
}

out:
@@ -175,11 +172,11 @@ void uv_pipe_connect(uv_connect_t* req,
int r;

saved_errno = errno;
new_sock = (handle->fd == -1);
new_sock = (handle->io_watcher.fd == -1);
err = -1;

if (new_sock)
if ((handle->fd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
if ((handle->io_watcher.fd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
goto out;

memset(&saddr, 0, sizeof saddr);
@@ -190,7 +187,7 @@ void uv_pipe_connect(uv_connect_t* req,
* is either there or not.
*/
do {
r = connect(handle->fd, (struct sockaddr*)&saddr, sizeof saddr);
r = connect(handle->io_watcher.fd, (struct sockaddr*)&saddr, sizeof saddr);
}
while (r == -1 && errno == EINTR);

@@ -199,12 +196,11 @@ void uv_pipe_connect(uv_connect_t* req,

if (new_sock)
if (uv__stream_open((uv_stream_t*)handle,
handle->fd,
handle->io_watcher.fd,
UV_STREAM_READABLE | UV_STREAM_WRITABLE))
goto out;

uv__io_start(handle->loop, &handle->read_watcher);
uv__io_start(handle->loop, &handle->write_watcher);
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLIN | UV__POLLOUT);
err = 0;

out:
@@ -217,7 +213,7 @@ void uv_pipe_connect(uv_connect_t* req,
ngx_queue_init(&req->queue);

/* Run callback on next tick. */
uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE);
uv__io_feed(handle->loop, &handle->io_watcher);

/* Mimic the Windows pipe implementation, always
* return 0 and let the callback handle errors.
@@ -227,17 +223,17 @@ void uv_pipe_connect(uv_connect_t* req,


/* TODO merge with uv__server_io()? */
static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events) {
static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_pipe_t* pipe;
int saved_errno;
int sockfd;

saved_errno = errno;
pipe = container_of(w, uv_pipe_t, read_watcher);
pipe = container_of(w, uv_pipe_t, io_watcher);

assert(pipe->type == UV_NAMED_PIPE);

sockfd = uv__accept(pipe->fd);
sockfd = uv__accept(pipe->io_watcher.fd);
if (sockfd == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
uv__set_sys_error(pipe->loop, errno);
@@ -248,7 +244,7 @@ static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events) {
pipe->connection_cb((uv_stream_t*)pipe, 0);
if (pipe->accepted_fd == sockfd) {
/* The user hasn't called uv_accept() yet */
uv__io_stop(pipe->loop, &pipe->read_watcher);
uv__io_stop(pipe->loop, &pipe->io_watcher, UV__POLLIN);
}
}

@@ -27,25 +27,24 @@
#include <errno.h>


static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, int events) {
static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_poll_t* handle;
int pevents;

handle = container_of(w, uv_poll_t, io_watcher);

if (events & UV__IO_ERROR) {
/* An error happened. Libev has implicitly stopped the watcher, but we */
/* need to fix the refcount. */
if (events & UV__POLLERR) {
uv__io_stop(loop, w, UV__POLLIN | UV__POLLOUT);
uv__handle_stop(handle);
uv__set_sys_error(handle->loop, EBADF);
handle->poll_cb(handle, -1, 0);
return;
}

pevents = 0;
if (events & UV__IO_READ)
if (events & UV__POLLIN)
pevents |= UV_READABLE;
if (events & UV__IO_WRITE)
if (events & UV__POLLOUT)
pevents |= UV_WRITABLE;

handle->poll_cb(handle, 0, pevents);
@@ -54,10 +53,8 @@ static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, int events) {

int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) {
uv__handle_init(loop, (uv_handle_t*) handle, UV_POLL);
handle->fd = fd;
uv__io_init(&handle->io_watcher, uv__poll_io, fd);
handle->poll_cb = NULL;
uv__io_init(&handle->io_watcher, uv__poll_io, fd, 0);

return 0;
}

@@ -69,7 +66,7 @@ int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle,


static void uv__poll_stop(uv_poll_t* handle) {
uv__io_stop(handle->loop, &handle->io_watcher);
uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLIN | UV__POLLOUT);
uv__handle_stop(handle);
}

@@ -87,23 +84,20 @@ int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) {
assert((pevents & ~(UV_READABLE | UV_WRITABLE)) == 0);
assert(!(handle->flags & (UV_CLOSING | UV_CLOSED)));

if (pevents == 0) {
uv__poll_stop(handle);
uv__poll_stop(handle);

if (pevents == 0)
return 0;
}

events = 0;
if (pevents & UV_READABLE)
events |= UV__IO_READ;
events |= UV__POLLIN;
if (pevents & UV_WRITABLE)
events |= UV__IO_WRITE;
events |= UV__POLLOUT;

uv__io_stop(handle->loop, &handle->io_watcher);
uv__io_set(&handle->io_watcher, uv__poll_io, handle->fd, events);
uv__io_start(handle->loop, &handle->io_watcher);

handle->poll_cb = poll_cb;
uv__io_start(handle->loop, &handle->io_watcher, events);
uv__handle_start(handle);
handle->poll_cb = poll_cb;

return 0;
}
@@ -204,7 +204,7 @@ static int uv__process_init_stdio(uv_stdio_container_t* container, int fds[2]) {
if (container->flags & UV_INHERIT_FD) {
fd = container->data.fd;
} else {
fd = container->data.stream->fd;
fd = container->data.stream->io_watcher.fd;
}

if (fd == -1) {
@@ -38,7 +38,7 @@ RB_HEAD(uv__signal_tree_s, uv_signal_s);


static int uv__signal_unlock();
static void uv__signal_event(uv_loop_t* loop, uv__io_t* watcher, int events);
static void uv__signal_event(uv_loop_t* loop, uv__io_t* w, unsigned int events);
static int uv__signal_compare(uv_signal_t* w1, uv_signal_t* w2);
static void uv__signal_stop(uv_signal_t* handle);

@@ -189,17 +189,16 @@ static uv_err_t uv__signal_register_handler(int signum) {
static void uv__signal_unregister_handler(int signum) {
/* When this function is called, the signal lock must be held. */
struct sigaction sa;
int r;

memset(&sa, 0, sizeof(sa));
sa.sa_handler = SIG_DFL;

r = sigaction(signum, &sa, NULL);
/* sigaction can only fail with EINVAL or EFAULT; an attempt to deregister a
* signal implies that it was successfully registered earlier, so EINVAL
* should never happen.
*/
assert(r == 0);
if (sigaction(signum, &sa, NULL))
abort();
}


@@ -213,9 +212,8 @@ static int uv__signal_loop_once_init(uv_loop_t* loop) {

uv__io_init(&loop->signal_io_watcher,
uv__signal_event,
loop->signal_pipefd[0],
UV__IO_READ);
uv__io_start(loop, &loop->signal_io_watcher);
loop->signal_pipefd[0]);
uv__io_start(loop, &loop->signal_io_watcher, UV__POLLIN);

return 0;
}
@@ -330,7 +328,7 @@ int uv_signal_start(uv_signal_t* handle, uv_signal_cb signal_cb, int signum) {
}


static void uv__signal_event(uv_loop_t* loop, uv__io_t* watcher, int events) {
static void uv__signal_event(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv__signal_msg_t* msg;
uv_signal_t* handle;
char buf[sizeof(uv__signal_msg_t) * 32];
@@ -439,6 +437,7 @@ static void uv__signal_stop(uv_signal_t* handle) {

removed_handle = RB_REMOVE(uv__signal_tree_s, &uv__signal_tree, handle);
assert(removed_handle == handle);
(void) removed_handle;

/* Check if there are other active signal watchers observing this signal. If
* not, unregister the signal handler.

Large diffs are not rendered by default.

@@ -65,14 +65,163 @@

int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
loop->fs_fd = -1;
loop->backend_fd = port_create();

if (loop->backend_fd == -1)
return -1;

uv__cloexec(loop->backend_fd, 1);

return 0;
}


void uv__platform_loop_delete(uv_loop_t* loop) {
if (loop->fs_fd == -1) return;
close(loop->fs_fd);
loop->fs_fd = -1;
if (loop->fs_fd != -1) {
close(loop->fs_fd);
loop->fs_fd = -1;
}

if (loop->backend_fd != -1) {
close(loop->backend_fd);
loop->backend_fd = -1;
}
}


void uv__io_poll(uv_loop_t* loop, int timeout) {
struct port_event events[1024];
struct port_event* pe;
struct timespec spec;
ngx_queue_t* q;
uv__io_t* w;
uint64_t base;
uint64_t diff;
unsigned int nfds;
unsigned int i;
int saved_errno;
int nevents;
int count;
int fd;

if (loop->nfds == 0) {
assert(ngx_queue_empty(&loop->watcher_queue));
return;
}

while (!ngx_queue_empty(&loop->watcher_queue)) {
q = ngx_queue_head(&loop->watcher_queue);
ngx_queue_remove(q);
ngx_queue_init(q);

w = ngx_queue_data(q, uv__io_t, watcher_queue);
assert(w->pevents != 0);

if (port_associate(loop->backend_fd, PORT_SOURCE_FD, w->fd, w->pevents, 0))
abort();

w->events = w->pevents;
}

assert(timeout >= -1);
base = loop->time;
count = 48; /* Benchmarks suggest this gives the best throughput. */

for (;;) {
if (timeout != -1) {
spec.tv_sec = timeout / 1000;
spec.tv_nsec = (timeout % 1000) * 1000000;
}

/* Work around a kernel bug where nfds is not updated. */
events[0].portev_source = 0;

nfds = 1;
saved_errno = 0;
if (port_getn(loop->backend_fd,
events,
ARRAY_SIZE(events),
&nfds,
timeout == -1 ? NULL : &spec)) {
/* Work around another kernel bug: port_getn() may return events even
* on error.
*/
if (errno == EINTR || errno == ETIME)
saved_errno = errno;
else
abort();
}

if (events[0].portev_source == 0) {
if (timeout == 0)
return;

if (timeout == -1)
continue;

goto update_timeout;
}

if (nfds == 0) {
assert(timeout != -1);
return;
}

nevents = 0;

for (i = 0; i < nfds; i++) {
pe = events + i;
fd = pe->portev_object;

assert(fd >= 0);
assert((unsigned) fd < loop->nwatchers);

w = loop->watchers[fd];

/* File descriptor that we've stopped watching, ignore. */
if (w == NULL)
continue;

w->cb(loop, w, pe->portev_events);
nevents++;

/* Events Ports operates in oneshot mode, rearm timer on next run. */
if (w->pevents != 0 && ngx_queue_empty(&w->watcher_queue))
ngx_queue_insert_tail(&loop->watcher_queue, &w->watcher_queue);
}

if (nevents != 0) {
if (nfds == ARRAY_SIZE(events) && --count != 0) {
/* Poll for more events but don't block this time. */
timeout = 0;
continue;
}
return;
}

if (saved_errno == ETIME) {
assert(timeout != -1);
return;
}

if (timeout == 0)
return;

if (timeout == -1)
continue;

update_timeout:
assert(timeout > 0);

diff = uv_hrtime() / 1000000;
assert(diff >= base);
diff -= base;

if (diff >= (uint64_t) timeout)
return;

timeout -= diff;
}
}


@@ -139,7 +288,9 @@ static void uv__fs_event_rearm(uv_fs_event_t *handle) {
}


static void uv__fs_event_read(uv_loop_t* loop, uv__io_t* w, int revents) {
static void uv__fs_event_read(uv_loop_t* loop,
uv__io_t* w,
unsigned int revents) {
uv_fs_event_t *handle = NULL;
timespec_t timeout;
port_event_t pe;
@@ -216,8 +367,8 @@ int uv_fs_event_init(uv_loop_t* loop,
uv__fs_event_rearm(handle);

if (first_run) {
uv__io_init(&loop->fs_event_watcher, uv__fs_event_read, portfd, UV__IO_READ);
uv__io_start(loop, &loop->fs_event_watcher);
uv__io_init(&loop->fs_event_watcher, uv__fs_event_read, portfd);
uv__io_start(loop, &loop->fs_event_watcher, UV__POLLIN);
}

return 0;
@@ -37,7 +37,7 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
static int maybe_new_socket(uv_tcp_t* handle, int domain, int flags) {
int sockfd;

if (handle->fd != -1)
if (handle->io_watcher.fd != -1)
return 0;

sockfd = uv__socket(domain, SOCK_STREAM, 0);
@@ -68,7 +68,7 @@ static int uv__bind(uv_tcp_t* tcp,
return -1;

tcp->delayed_error = 0;
if (bind(tcp->fd, addr, addrsize) == -1) {
if (bind(tcp->io_watcher.fd, addr, addrsize) == -1) {
if (errno == EADDRINUSE) {
tcp->delayed_error = errno;
} else {
@@ -105,7 +105,7 @@ static int uv__connect(uv_connect_t* req,
handle->delayed_error = 0;

do
r = connect(handle->fd, addr, addrlen);
r = connect(handle->io_watcher.fd, addr, addrlen);
while (r == -1 && errno == EINTR);

if (r == -1) {
@@ -127,10 +127,10 @@ static int uv__connect(uv_connect_t* req,
ngx_queue_init(&req->queue);
handle->connect_req = req;

uv__io_start(handle->loop, &handle->write_watcher);
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);

if (handle->delayed_error)
uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE);
uv__io_feed(handle->loop, &handle->io_watcher);

return 0;
}
@@ -174,7 +174,7 @@ int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name,
goto out;
}

if (handle->fd < 0) {
if (handle->io_watcher.fd < 0) {
uv__set_sys_error(handle->loop, EINVAL);
rv = -1;
goto out;
@@ -183,7 +183,7 @@ int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name,
/* sizeof(socklen_t) != sizeof(int) on some systems. */
socklen = (socklen_t)*namelen;

if (getsockname(handle->fd, name, &socklen) == -1) {
if (getsockname(handle->io_watcher.fd, name, &socklen) == -1) {
uv__set_sys_error(handle->loop, errno);
rv = -1;
} else {
@@ -211,7 +211,7 @@ int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name,
goto out;
}

if (handle->fd < 0) {
if (handle->io_watcher.fd < 0) {
uv__set_sys_error(handle->loop, EINVAL);
rv = -1;
goto out;
@@ -220,7 +220,7 @@ int uv_tcp_getpeername(uv_tcp_t* handle, struct sockaddr* name,
/* sizeof(socklen_t) != sizeof(int) on some systems. */
socklen = (socklen_t)*namelen;

if (getpeername(handle->fd, name, &socklen) == -1) {
if (getpeername(handle->io_watcher.fd, name, &socklen) == -1) {
uv__set_sys_error(handle->loop, errno);
rv = -1;
} else {
@@ -250,14 +250,14 @@ int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb) {
if (maybe_new_socket(tcp, AF_INET, UV_STREAM_READABLE))
return -1;

if (listen(tcp->fd, backlog))
if (listen(tcp->io_watcher.fd, backlog))
return uv__set_sys_error(tcp->loop, errno);

tcp->connection_cb = cb;

/* Start listening for connections. */
uv__io_set(&tcp->read_watcher, uv__server_io, tcp->fd, UV__IO_READ);
uv__io_start(tcp->loop, &tcp->read_watcher);
tcp->io_watcher.cb = uv__server_io;
uv__io_start(tcp->loop, &tcp->io_watcher, UV__POLLIN);

return 0;
}
@@ -293,63 +293,38 @@ int uv__tcp_connect6(uv_connect_t* req,
}


int uv__tcp_nodelay(uv_tcp_t* handle, int enable) {
if (setsockopt(handle->fd,
IPPROTO_TCP,
TCP_NODELAY,
&enable,
sizeof enable) == -1) {
uv__set_sys_error(handle->loop, errno);
return -1;
}
return 0;
int uv__tcp_nodelay(int fd, int on) {
return setsockopt(fd, IPPROTO_TCP, TCP_NODELAY, &on, sizeof(on));
}


int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
if (setsockopt(handle->fd,
SOL_SOCKET,
SO_KEEPALIVE,
&enable,
sizeof enable) == -1) {
uv__set_sys_error(handle->loop, errno);
int uv__tcp_keepalive(int fd, int on, unsigned int delay) {
if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &on, sizeof(on)))
return -1;
}

#ifdef TCP_KEEPIDLE
if (enable && setsockopt(handle->fd,
IPPROTO_TCP,
TCP_KEEPIDLE,
&delay,
sizeof delay) == -1) {
uv__set_sys_error(handle->loop, errno);
if (on && setsockopt(fd, IPPROTO_TCP, TCP_KEEPIDLE, &delay, sizeof(delay)))
return -1;
}
#endif

/* Solaris/SmartOS, if you don't support keep-alive,
* then don't advertise it in your system headers...
*/
#if defined(TCP_KEEPALIVE) && !defined(__sun)
if (enable && setsockopt(handle->fd,
IPPROTO_TCP,
TCP_KEEPALIVE,
&delay,
sizeof delay) == -1) {
uv__set_sys_error(handle->loop, errno);
if (on && setsockopt(fd, IPPROTO_TCP, TCP_KEEPALIVE, &delay, sizeof(delay)))
return -1;
}
#endif

return 0;
}


int uv_tcp_nodelay(uv_tcp_t* handle, int enable) {
if (handle->fd != -1 && uv__tcp_nodelay(handle, enable))
return -1;
int uv_tcp_nodelay(uv_tcp_t* handle, int on) {
if (handle->io_watcher.fd != -1)
if (uv__tcp_nodelay(handle->io_watcher.fd, on))
return -1;

if (enable)
if (on)
handle->flags |= UV_TCP_NODELAY;
else
handle->flags &= ~UV_TCP_NODELAY;
@@ -358,25 +333,26 @@ int uv_tcp_nodelay(uv_tcp_t* handle, int enable) {
}


int uv_tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay) {
if (handle->fd != -1 && uv__tcp_keepalive(handle, enable, delay))
return -1;
int uv_tcp_keepalive(uv_tcp_t* handle, int on, unsigned int delay) {
if (handle->io_watcher.fd != -1)
if (uv__tcp_keepalive(handle->io_watcher.fd, on, delay))
return -1;

if (enable)
if (on)
handle->flags |= UV_TCP_KEEPALIVE;
else
handle->flags &= ~UV_TCP_KEEPALIVE;

/* TODO Store delay if handle->fd == -1 but don't want to enlarge
* uv_tcp_t with an int that's almost never used...
/* TODO Store delay if handle->io_watcher.fd == -1 but don't want to enlarge
* uv_tcp_t with an int that's almost never used...
*/

return 0;
}


int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int enable) {
if (enable)
int uv_tcp_simultaneous_accepts(uv_tcp_t* handle, int on) {
if (on)
handle->flags |= UV_TCP_SINGLE_ACCEPT;
else
handle->flags &= ~UV_TCP_SINGLE_ACCEPT;
@@ -51,8 +51,10 @@ int uv_tty_init(uv_loop_t* loop, uv_tty_t* tty, int fd, int readable) {


int uv_tty_set_mode(uv_tty_t* tty, int mode) {
int fd = tty->fd;
struct termios raw;
int fd;

fd = tty->io_watcher.fd;

if (mode && tty->mode == 0) {
/* on */
@@ -103,7 +105,7 @@ int uv_tty_set_mode(uv_tty_t* tty, int mode) {
int uv_tty_get_winsize(uv_tty_t* tty, int* width, int* height) {
struct winsize ws;

if (ioctl(tty->fd, TIOCGWINSZ, &ws) < 0) {
if (ioctl(tty->io_watcher.fd, TIOCGWINSZ, &ws) < 0) {
uv__set_sys_error(tty->loop, errno);
return -1;
}