Skip to content
Browse files

unix: remove dependency on libev

  • Loading branch information...
1 parent 59a2c63 commit 1282d64868b9c560c074b9c9630391f3b18ef633 @bnoordhuis bnoordhuis committed Aug 23, 2012
View
4 README.md
@@ -1,8 +1,8 @@
# libuv [![Build Status](https://secure.travis-ci.org/joyent/libuv.png)](http://travis-ci.org/joyent/libuv)
libuv is a new platform layer for Node. Its purpose is to abstract IOCP on
-Windows and libev on Unix systems. We intend to eventually contain all
-platform differences in this library.
+Windows and epoll/kqueue/event ports/etc. on Unix systems. We intend to
+eventually contain all platform differences in this library.
http://nodejs.org/
View
4 include/uv-private/uv-bsd.h
@@ -23,8 +23,6 @@
#define UV_BSD_H
#define UV_PLATFORM_FS_EVENT_FIELDS \
- ev_io event_watcher; \
- int fflags; \
- int fd; \
+ uv__io_t event_watcher; \
#endif /* UV_BSD_H */
View
4 include/uv-private/uv-darwin.h
@@ -39,9 +39,7 @@
ngx_queue_t cf_signals; \
#define UV_PLATFORM_FS_EVENT_FIELDS \
- ev_io event_watcher; \
- int fflags; \
- int fd; \
+ uv__io_t event_watcher; \
char* realpath; \
int realpath_len; \
int cf_flags; \
View
29 include/uv-private/uv-unix.h
@@ -24,8 +24,6 @@
#include "ngx-queue.h"
-#include "ev.h"
-
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
@@ -46,11 +44,18 @@
struct uv__io_s;
struct uv_loop_s;
+typedef void (*uv__io_cb)(struct uv_loop_s* loop,
+ struct uv__io_s* w,
+ unsigned int events);
typedef struct uv__io_s uv__io_t;
-typedef void (*uv__io_cb)(struct uv_loop_s* loop, uv__io_t* handle, int events);
struct uv__io_s {
- ev_io io_watcher;
+ uv__io_cb cb;
+ ngx_queue_t pending_queue;
+ ngx_queue_t watcher_queue;
+ unsigned int pevents; /* Pending event mask i.e. mask at next tick. */
+ unsigned int events; /* Current event mask. */
+ int fd;
};
struct uv__work {
@@ -135,7 +140,12 @@ typedef struct {
#define UV_LOOP_PRIVATE_FIELDS \
unsigned long flags; \
- struct ev_loop* ev; \
+ int backend_fd; \
+ ngx_queue_t pending_queue; \
+ ngx_queue_t watcher_queue; \
+ uv__io_t** watchers; \
+ unsigned int nwatchers; \
+ unsigned int nfds; \
ngx_queue_t wq; \
uv_mutex_t wq_mutex; \
uv_async_t wq_async; \
@@ -193,31 +203,26 @@ typedef struct {
#define UV_STREAM_PRIVATE_FIELDS \
uv_connect_t *connect_req; \
uv_shutdown_t *shutdown_req; \
- uv__io_t read_watcher; \
- uv__io_t write_watcher; \
+ uv__io_t io_watcher; \
ngx_queue_t write_queue; \
ngx_queue_t write_completed_queue; \
uv_connection_cb connection_cb; \
int delayed_error; \
int accepted_fd; \
- int fd; \
#define UV_TCP_PRIVATE_FIELDS /* empty */
#define UV_UDP_PRIVATE_FIELDS \
- int fd; \
uv_alloc_cb alloc_cb; \
uv_udp_recv_cb recv_cb; \
- uv__io_t read_watcher; \
- uv__io_t write_watcher; \
+ uv__io_t io_watcher; \
ngx_queue_t write_queue; \
ngx_queue_t write_completed_queue; \
#define UV_PIPE_PRIVATE_FIELDS \
const char* pipe_fname; /* strdup'ed */
#define UV_POLL_PRIVATE_FIELDS \
- int fd; \
uv__io_t io_watcher;
#define UV_PREPARE_PRIVATE_FIELDS \
View
28 include/uv.h
@@ -1044,9 +1044,8 @@ UV_EXTERN int uv_poll_stop(uv_poll_t* handle);
/*
* uv_prepare_t is a subclass of uv_handle_t.
*
- * libev wrapper. Every active prepare handle gets its callback called
- * exactly once per loop iteration, just before the system blocks to wait
- * for completed i/o.
+ * Every active prepare handle gets its callback called exactly once per loop
+ * iteration, just before the system blocks to wait for completed i/o.
*/
struct uv_prepare_s {
UV_HANDLE_FIELDS
@@ -1063,8 +1062,8 @@ UV_EXTERN int uv_prepare_stop(uv_prepare_t* prepare);
/*
* uv_check_t is a subclass of uv_handle_t.
*
- * libev wrapper. Every active check handle gets its callback called exactly
- * once per loop iteration, just after the system returns from blocking.
+ * Every active check handle gets its callback called exactly once per loop
+ * iteration, just after the system returns from blocking.
*/
struct uv_check_s {
UV_HANDLE_FIELDS
@@ -1081,10 +1080,10 @@ UV_EXTERN int uv_check_stop(uv_check_t* check);
/*
* uv_idle_t is a subclass of uv_handle_t.
*
- * libev wrapper. Every active idle handle gets its callback called
- * repeatedly until it is stopped. This happens after all other types of
- * callbacks are processed. When there are multiple "idle" handles active,
- * their callbacks are called in turn.
+ * Every active idle handle gets its callback called repeatedly until it is
+ * stopped. This happens after all other types of callbacks are processed.
+ * When there are multiple "idle" handles active, their callbacks are called
+ * in turn.
*/
struct uv_idle_s {
UV_HANDLE_FIELDS
@@ -1101,12 +1100,11 @@ UV_EXTERN int uv_idle_stop(uv_idle_t* idle);
/*
* uv_async_t is a subclass of uv_handle_t.
*
- * libev wrapper. uv_async_send wakes up the event
- * loop and calls the async handle's callback There is no guarantee that
- * every uv_async_send call leads to exactly one invocation of the callback;
- * The only guarantee is that the callback function is called at least once
- * after the call to async_send. Unlike all other libuv functions,
- * uv_async_send can be called from another thread.
+ * uv_async_send wakes up the event loop and calls the async handle's callback.
+ * There is no guarantee that every uv_async_send call leads to exactly one
+ * invocation of the callback; the only guarantee is that the callback function
+ * is called at least once after the call to async_send. Unlike all other
+ * libuv functions, uv_async_send can be called from another thread.
*/
struct uv_async_s {
UV_HANDLE_FIELDS
View
11 src/unix/async.c
@@ -27,7 +27,7 @@
#include <unistd.h>
static int uv__async_init(uv_loop_t* loop);
-static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events);
+static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
static int uv__async_make_pending(volatile sig_atomic_t* ptr) {
@@ -104,17 +104,14 @@ static int uv__async_init(uv_loop_t* loop) {
if (uv__make_pipe(loop->async_pipefd, UV__F_NONBLOCK))
return -1;
- uv__io_init(&loop->async_watcher,
- uv__async_io,
- loop->async_pipefd[0],
- UV__IO_READ);
- uv__io_start(loop, &loop->async_watcher);
+ uv__io_init(&loop->async_watcher, uv__async_io, loop->async_pipefd[0]);
+ uv__io_start(loop, &loop->async_watcher, UV__IO_READ);
return 0;
}
-static void uv__async_io(uv_loop_t* loop, uv__io_t* handle, int events) {
+static void uv__async_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
char buf[1024];
ngx_queue_t* q;
uv_async_t* h;
View
153 src/unix/core.c
@@ -35,7 +35,7 @@
#include <sys/un.h>
#include <netinet/in.h>
#include <arpa/inet.h>
-#include <limits.h> /* PATH_MAX */
+#include <limits.h> /* INT_MAX, PATH_MAX */
#include <sys/uio.h> /* writev */
#ifdef __linux__
@@ -60,6 +60,8 @@
# include <sys/wait.h>
#endif
+static void uv__run_pending(uv_loop_t* loop);
+
static uv_loop_t default_loop_struct;
static uv_loop_t* default_loop_ptr;
@@ -167,9 +169,6 @@ static void uv__finish_close(uv_handle_t* handle) {
case UV_NAMED_PIPE:
case UV_TCP:
case UV_TTY:
- assert(!uv__io_active(&((uv_stream_t*)handle)->read_watcher));
- assert(!uv__io_active(&((uv_stream_t*)handle)->write_watcher));
- assert(((uv_stream_t*)handle)->fd == -1);
uv__stream_destroy((uv_stream_t*)handle);
break;
@@ -263,20 +262,13 @@ static unsigned int uv__poll_timeout(uv_loop_t* loop) {
}
-static void uv__poll(uv_loop_t* loop) {
- void ev__run(EV_P_ ev_tstamp waittime);
- ev_invoke_pending(loop->ev);
- ev__run(loop->ev, uv__poll_timeout(loop) / 1000.);
- ev_invoke_pending(loop->ev);
-}
-
-
static int uv__run(uv_loop_t* loop) {
uv_update_time(loop);
uv__run_timers(loop);
uv__run_idle(loop);
uv__run_prepare(loop);
- uv__poll(loop);
+ uv__run_pending(loop);
+ uv__io_poll(loop, uv__poll_timeout(loop));
uv__run_check(loop);
uv__run_closing_handles(loop);
return uv__has_active_handles(loop) || uv__has_active_reqs(loop);
@@ -534,49 +526,136 @@ void uv_disable_stdio_inheritance(void) {
}
-static void uv__io_set_cb(uv__io_t* handle, uv__io_cb cb) {
- union { void* data; uv__io_cb cb; } u;
- u.cb = cb;
- handle->io_watcher.data = u.data;
+static void uv__run_pending(uv_loop_t* loop) {
+ ngx_queue_t* q;
+ uv__io_t* w;
+
+ while (!ngx_queue_empty(&loop->pending_queue)) {
+ q = ngx_queue_head(&loop->pending_queue);
+ ngx_queue_remove(q);
+ ngx_queue_init(q);
+
+ w = ngx_queue_data(q, uv__io_t, pending_queue);
+ w->cb(loop, w, UV__IO_WRITE);
+ }
}
-static void uv__io_rw(struct ev_loop* ev, ev_io* w, int events) {
- union { void* data; uv__io_cb cb; } u;
- uv_loop_t* loop = ev_userdata(ev);
- uv__io_t* handle = container_of(w, uv__io_t, io_watcher);
- u.data = handle->io_watcher.data;
- u.cb(loop, handle, events & (EV_READ|EV_WRITE|EV_ERROR));
+static unsigned int next_power_of_two(unsigned int val) {
+ val -= 1;
+ val |= val >> 1;
+ val |= val >> 2;
+ val |= val >> 4;
+ val |= val >> 8;
+ val |= val >> 16;
+ val += 1;
+ return val;
}
+static void maybe_resize(uv_loop_t* loop, unsigned int len) {
+ uv__io_t** watchers;
+ unsigned int nwatchers;
+ unsigned int i;
-void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events) {
- ev_io_init(&handle->io_watcher, uv__io_rw, fd, events & (EV_READ|EV_WRITE));
- uv__io_set_cb(handle, cb);
+ if (len <= loop->nwatchers)
+ return;
+
+ nwatchers = next_power_of_two(len);
+ watchers = realloc(loop->watchers, nwatchers * sizeof(loop->watchers[0]));
+
+ if (watchers == NULL)
+ abort();
+
+ for (i = loop->nwatchers; i < nwatchers; i++)
+ watchers[i] = NULL;
+
+ loop->watchers = watchers;
+ loop->nwatchers = nwatchers;
}
-void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events) {
- ev_io_set(&handle->io_watcher, fd, events);
- uv__io_set_cb(handle, cb);
+void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd) {
+ assert(cb != NULL);
+ assert(fd >= -1);
+ ngx_queue_init(&w->pending_queue);
+ ngx_queue_init(&w->watcher_queue);
+ w->cb = cb;
+ w->fd = fd;
+ w->events = 0;
+ w->pevents = 0;
}
-void uv__io_start(uv_loop_t* loop, uv__io_t* handle) {
- ev_io_start(loop->ev, &handle->io_watcher);
+/* Note that uv__io_start() and uv__io_stop() can't simply remove the watcher
+ * from the queue when the new event mask equals the old one. The event ports
+ * backend operates exclusively in single-shot mode and needs to rearm all fds
+ * before each call to port_getn(). It's up to the individual backends to
+ * filter out superfluous event mask modifications.
+ */
+
+
+void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
+ assert(0 == (events & ~(UV__IO_READ | UV__IO_WRITE)));
+ assert(0 != events);
+ assert(w->fd >= 0);
+ assert(w->fd < INT_MAX);
+
+ w->pevents |= events;
+ maybe_resize(loop, w->fd + 1);
+
+ if (ngx_queue_empty(&w->watcher_queue))
+ ngx_queue_insert_tail(&loop->watcher_queue, &w->watcher_queue);
+
+ if (loop->watchers[w->fd] == NULL) {
+ loop->watchers[w->fd] = w;
+ loop->nfds++;
+ }
}
-void uv__io_stop(uv_loop_t* loop, uv__io_t* handle) {
- ev_io_stop(loop->ev, &handle->io_watcher);
+void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
+ assert(0 == (events & ~(UV__IO_READ | UV__IO_WRITE)));
+ assert(0 != events);
+
+ if (w->fd == -1)
+ return;
+
+ assert(w->fd >= 0);
+
+ /* Happens when uv__io_stop() is called on a handle that was never started. */
+ if ((unsigned) w->fd >= loop->nwatchers)
+ return;
+
+ w->pevents &= ~events;
+
+ if (w->pevents == 0) {
+ ngx_queue_remove(&w->pending_queue);
+ ngx_queue_init(&w->pending_queue);
+
+ ngx_queue_remove(&w->watcher_queue);
+ ngx_queue_init(&w->watcher_queue);
+
+ if (loop->watchers[w->fd] != NULL) {
+ assert(loop->watchers[w->fd] == w);
+ assert(loop->nfds > 0);
+ loop->watchers[w->fd] = NULL;
+ loop->nfds--;
+ w->events = 0;
+ }
+ }
+ else if (ngx_queue_empty(&w->watcher_queue))
+ ngx_queue_insert_tail(&loop->watcher_queue, &w->watcher_queue);
}
-void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event) {
- ev_feed_event(loop->ev, &handle->io_watcher, event);
+void uv__io_feed(uv_loop_t* loop, uv__io_t* w) {
+ if (ngx_queue_empty(&w->pending_queue))
+ ngx_queue_insert_tail(&loop->pending_queue, &w->pending_queue);
}
-int uv__io_active(uv__io_t* handle) {
- return ev_is_active(&handle->io_watcher);
+int uv__io_active(const uv__io_t* w, unsigned int events) {
+ assert(0 == (events & ~(UV__IO_READ | UV__IO_WRITE)));
+ assert(0 != events);
+ return 0 != (w->pevents & events);
}
View
3 src/unix/darwin.c
@@ -55,6 +55,9 @@ int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
CFRunLoopSourceContext ctx;
int r;
+ if (uv__kqueue_init(loop))
+ return -1;
+
loop->cf_loop = NULL;
if ((r = uv_mutex_init(&loop->cf_mutex)))
return r;
View
2 src/unix/freebsd.c
@@ -55,7 +55,7 @@ static char *process_title;
int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
- return 0;
+ return uv__kqueue_init(loop);
}
View
6 src/unix/fsevents.c
@@ -81,12 +81,14 @@ void uv__fsevents_cb(uv_async_t* cb, int status) {
handle = cb->data;
UV__FSEVENTS_WALK(handle, {
- if (handle->fd != -1)
+ if (handle->event_watcher.fd != -1)
handle->cb(handle, event->path[0] ? event->path : NULL, event->events, 0);
});
- if ((handle->flags & (UV_CLOSING | UV_CLOSED)) == 0 && handle->fd == -1)
+ if ((handle->flags & (UV_CLOSING | UV_CLOSED)) == 0 &&
+ handle->event_watcher.fd == -1) {
uv__fsevents_close(handle);
+ }
}
View
51 src/unix/internal.h
@@ -82,9 +82,35 @@
} \
while (0)
-#define UV__IO_READ EV_READ
-#define UV__IO_WRITE EV_WRITE
-#define UV__IO_ERROR EV_ERROR
+#if defined(__linux__)
+# define UV__IO_READ UV__EPOLLIN
+# define UV__IO_WRITE UV__EPOLLOUT
+# define UV__IO_ERROR UV__EPOLLERR
+# define UV__IO_HUP UV__EPOLLHUP
+#endif
+
+#if defined(__sun)
+# define UV__IO_READ POLLIN
+# define UV__IO_WRITE POLLOUT
+# define UV__IO_ERROR POLLERR
+# define UV__IO_HUP POLLHUP
+#endif
+
+#ifndef UV__IO_READ
+# define UV__IO_READ 1
+#endif
+
+#ifndef UV__IO_WRITE
+# define UV__IO_WRITE 2
+#endif
+
+#ifndef UV__IO_ERROR
+# define UV__IO_ERROR 4
+#endif
+
+#ifndef UV__IO_HUP
+# define UV__IO_HUP 8
+#endif
/* handle flags */
enum {
@@ -118,12 +144,12 @@ int uv__dup(int fd);
int uv_async_stop(uv_async_t* handle);
void uv__make_close_pending(uv_handle_t* handle);
-void uv__io_init(uv__io_t* handle, uv__io_cb cb, int fd, int events);
-void uv__io_set(uv__io_t* handle, uv__io_cb cb, int fd, int events);
-void uv__io_start(uv_loop_t* loop, uv__io_t* handle);
-void uv__io_stop(uv_loop_t* loop, uv__io_t* handle);
-void uv__io_feed(uv_loop_t* loop, uv__io_t* handle, int event);
-int uv__io_active(uv__io_t* handle);
+void uv__io_init(uv__io_t* w, uv__io_cb cb, int fd);
+void uv__io_start(uv_loop_t* loop, uv__io_t* w, unsigned int events);
+void uv__io_stop(uv_loop_t* loop, uv__io_t* w, unsigned int events);
+void uv__io_feed(uv_loop_t* loop, uv__io_t* w);
+int uv__io_active(const uv__io_t* w, unsigned int events);
+void uv__io_poll(uv_loop_t* loop, int timeout); /* in milliseconds or -1 */
/* loop */
int uv__loop_init(uv_loop_t* loop, int default_loop);
@@ -141,13 +167,13 @@ void uv__stream_init(uv_loop_t* loop, uv_stream_t* stream,
uv_handle_type type);
int uv__stream_open(uv_stream_t*, int fd, int flags);
void uv__stream_destroy(uv_stream_t* stream);
-void uv__server_io(uv_loop_t* loop, uv__io_t* watcher, int events);
+void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
int uv__accept(int sockfd);
/* tcp */
int uv_tcp_listen(uv_tcp_t* tcp, int backlog, uv_connection_cb cb);
-int uv__tcp_nodelay(uv_tcp_t* handle, int enable);
-int uv__tcp_keepalive(uv_tcp_t* handle, int enable, unsigned int delay);
+int uv__tcp_nodelay(int fd, int on);
+int uv__tcp_keepalive(int fd, int on, unsigned int delay);
/* pipe */
int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb);
@@ -169,6 +195,7 @@ void uv__work_submit(uv_loop_t* loop,
void uv__work_done(uv_async_t* handle, int status);
/* platform specific */
+int uv__kqueue_init(uv_loop_t* loop);
int uv__platform_loop_init(uv_loop_t* loop, int default_loop);
void uv__platform_loop_delete(uv_loop_t* loop);
View
263 src/unix/kqueue.c
@@ -29,57 +29,254 @@
#include <sys/sysctl.h>
#include <sys/types.h>
#include <sys/event.h>
+#include <sys/time.h>
#include <unistd.h>
#include <fcntl.h>
#include <time.h>
-static void uv__fs_event(EV_P_ ev_io* w, int revents);
+static void uv__fs_event(uv_loop_t* loop, uv__io_t* w, unsigned int fflags);
-static void uv__fs_event_start(uv_fs_event_t* handle) {
- ev_io_init(&handle->event_watcher,
- uv__fs_event,
- handle->fd,
- EV_LIBUV_KQUEUE_HACK);
- ev_io_start(handle->loop->ev, &handle->event_watcher);
+int uv__kqueue_init(uv_loop_t* loop) {
+ loop->backend_fd = kqueue();
+
+ if (loop->backend_fd == -1)
+ return -1;
+
+ uv__cloexec(loop->backend_fd, 1);
+
+ return 0;
}
-static void uv__fs_event_stop(uv_fs_event_t* handle) {
- ev_io_stop(handle->loop->ev, &handle->event_watcher);
+void uv__io_poll(uv_loop_t* loop, int timeout) {
+ struct kevent events[1024];
+ struct kevent* ev;
+ struct timespec spec;
+ unsigned int nevents;
+ unsigned int revents;
+ ngx_queue_t* q;
+ uint64_t base;
+ uint64_t diff;
+ uv__io_t* w;
+ int filter;
+ int fflags;
+ int count;
+ int nfds;
+ int fd;
+ int op;
+ int i;
+
+ if (loop->nfds == 0) {
+ assert(ngx_queue_empty(&loop->watcher_queue));
+ return;
+ }
+
+ nevents = 0;
+
+ while (!ngx_queue_empty(&loop->watcher_queue)) {
+ q = ngx_queue_head(&loop->watcher_queue);
+ ngx_queue_remove(q);
+ ngx_queue_init(q);
+
+ w = ngx_queue_data(q, uv__io_t, watcher_queue);
+ assert(w->pevents != 0);
+ assert(w->fd >= 0);
+ assert(w->fd < (int) loop->nwatchers);
+
+ /* Filter out no-op changes. This is for compatibility with the event ports
+ * backend, see uv__io_start().
+ */
+ if (w->events == w->pevents)
+ continue;
+
+ if ((w->events & UV__IO_READ) == 0 && (w->pevents & UV__IO_READ) != 0) {
+ filter = EVFILT_READ;
+ fflags = 0;
+ op = EV_ADD;
+
+ if (w->cb == uv__fs_event) {
+ filter = EVFILT_VNODE;
+ fflags = NOTE_ATTRIB | NOTE_WRITE | NOTE_RENAME
+ | NOTE_DELETE | NOTE_EXTEND | NOTE_REVOKE;
+ op = EV_ADD | EV_ONESHOT; /* Stop the event from firing repeatedly. */
+ }
+
+ EV_SET(events + nevents, w->fd, filter, op, fflags, 0, 0);
+
+ if (++nevents == ARRAY_SIZE(events)) {
+ if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL))
+ abort();
+ nevents = 0;
+ }
+ }
+
+ if ((w->events & UV__IO_WRITE) == 0 && (w->pevents & UV__IO_WRITE) != 0) {
+ EV_SET(events + nevents, w->fd, EVFILT_WRITE, EV_ADD, 0, 0, 0);
+
+ if (++nevents == ARRAY_SIZE(events)) {
+ if (kevent(loop->backend_fd, events, nevents, NULL, 0, NULL))
+ abort();
+ nevents = 0;
+ }
+ }
+
+ w->events = w->pevents;
+ }
+
+ assert(timeout >= -1);
+ base = loop->time;
+ count = 48; /* Benchmarks suggest this gives the best throughput. */
+
+ for (;; nevents = 0) {
+ if (timeout != -1) {
+ spec.tv_sec = timeout / 1000;
+ spec.tv_nsec = (timeout % 1000) * 1000000;
+ }
+
+ nfds = kevent(loop->backend_fd,
+ events,
+ nevents,
+ events,
+ ARRAY_SIZE(events),
+ timeout == -1 ? NULL : &spec);
+
+ if (nfds == 0) {
+ assert(timeout != -1);
+ return;
+ }
+
+ if (nfds == -1) {
+ if (errno != EINTR)
+ abort();
+
+ if (timeout == 0)
+ return;
+
+ if (timeout == -1)
+ continue;
+
+ /* Interrupted by a signal. Update timeout and poll again. */
+ goto update_timeout;
+ }
+
+ nevents = 0;
+
+ for (i = 0; i < nfds; i++) {
+ ev = events + i;
+ fd = ev->ident;
+ w = loop->watchers[fd];
+
+ if (w == NULL) {
+ /* File descriptor that we've stopped watching, disarm it. */
+ /* TODO batch up */
+ struct kevent events[1];
+
+ EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0);
+ if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL))
+ if (errno != EBADF && errno != ENOENT)
+ abort();
+
+ continue;
+ }
+
+ if (ev->filter == EVFILT_VNODE) {
+ assert(w->events == UV__IO_READ);
+ assert(w->pevents == UV__IO_READ);
+ w->cb(loop, w, ev->fflags); /* XXX always uv__fs_event() */
+ nevents++;
+ continue;
+ }
+
+ revents = 0;
+
+ if (ev->filter == EVFILT_READ) {
+ if (w->events & UV__IO_READ)
+ revents |= UV__IO_READ;
+ else {
+ /* TODO batch up */
+ struct kevent events[1];
+ EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0);
+ if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL)) abort();
+ }
+ }
+
+ if (ev->filter == EVFILT_WRITE) {
+ if (w->events & UV__IO_WRITE)
+ revents |= UV__IO_WRITE;
+ else {
+ /* TODO batch up */
+ struct kevent events[1];
+ EV_SET(events + 0, fd, ev->filter, EV_DELETE, 0, 0, 0);
+ if (kevent(loop->backend_fd, events, 1, NULL, 0, NULL)) abort();
+ }
+ }
+
+ if (ev->flags & EV_ERROR)
+ revents |= UV__IO_ERROR;
+
+ if (revents == 0)
+ continue;
+
+ w->cb(loop, w, revents);
+ nevents++;
+ }
+
+ if (nevents != 0) {
+ if (nfds == ARRAY_SIZE(events) && --count != 0) {
+ /* Poll for more events but don't block this time. */
+ timeout = 0;
+ continue;
+ }
+ return;
+ }
+
+ if (timeout == 0)
+ return;
+
+ if (timeout == -1)
+ continue;
+
+update_timeout:
+ assert(timeout > 0);
+
+ diff = uv_hrtime() / 1000000;
+ assert(diff >= base);
+ diff -= base;
+
+ if (diff >= (uint64_t) timeout)
+ return;
+
+ timeout -= diff;
+ }
}
-static void uv__fs_event(EV_P_ ev_io* w, int revents) {
+static void uv__fs_event(uv_loop_t* loop, uv__io_t* w, unsigned int fflags) {
uv_fs_event_t* handle;
+ struct kevent ev;
int events;
- assert(revents == EV_LIBUV_KQUEUE_HACK);
-
handle = container_of(w, uv_fs_event_t, event_watcher);
- if (handle->fflags & (NOTE_ATTRIB | NOTE_EXTEND))
+ if (fflags & (NOTE_ATTRIB | NOTE_EXTEND))
events = UV_CHANGE;
else
events = UV_RENAME;
handle->cb(handle, NULL, events, 0);
- if (handle->fd == -1)
+ if (handle->event_watcher.fd == -1)
return;
- /* File watcher operates in one-shot mode, re-arm it. */
- uv__fs_event_stop(handle);
- uv__fs_event_start(handle);
-}
-
+ /* Watcher operates in one-shot mode, re-arm it. */
+ fflags = NOTE_ATTRIB | NOTE_WRITE | NOTE_RENAME
+ | NOTE_DELETE | NOTE_EXTEND | NOTE_REVOKE;
-/* Called by libev, don't touch. */
-void uv__kqueue_hack(EV_P_ int fflags, ev_io *w) {
- uv_fs_event_t* handle;
+ EV_SET(&ev, w->fd, EVFILT_VNODE, EV_ADD | EV_ONESHOT, fflags, 0, 0);
- handle = container_of(w, uv_fs_event_t, event_watcher);
- handle->fflags = fflags;
+ if (kevent(loop->backend_fd, &ev, 1, NULL, 0, NULL))
+ abort();
}
@@ -88,10 +285,10 @@ int uv_fs_event_init(uv_loop_t* loop,
const char* filename,
uv_fs_event_cb cb,
int flags) {
- int fd;
#if defined(__APPLE__)
struct stat statbuf;
#endif /* defined(__APPLE__) */
+ int fd;
/* TODO open asynchronously - but how do we report back errors? */
if ((fd = open(filename, O_RDONLY)) == -1) {
@@ -101,10 +298,9 @@ int uv_fs_event_init(uv_loop_t* loop,
uv__handle_init(loop, (uv_handle_t*)handle, UV_FS_EVENT);
uv__handle_start(handle); /* FIXME shouldn't start automatically */
+ uv__io_init(&handle->event_watcher, uv__fs_event, fd);
handle->filename = strdup(filename);
- handle->fflags = 0;
handle->cb = cb;
- handle->fd = fd;
#if defined(__APPLE__)
/* Nullify field to perform checks later */
@@ -124,7 +320,7 @@ int uv_fs_event_init(uv_loop_t* loop,
fallback:
#endif /* defined(__APPLE__) */
- uv__fs_event_start(handle);
+ uv__io_start(loop, &handle->event_watcher, UV__IO_READ);
return 0;
}
@@ -133,13 +329,16 @@ int uv_fs_event_init(uv_loop_t* loop,
void uv__fs_event_close(uv_fs_event_t* handle) {
#if defined(__APPLE__)
if (uv__fsevents_close(handle))
- uv__fs_event_stop(handle);
+ uv__io_stop(handle->loop, &handle->event_watcher, UV__IO_READ);
#else
- uv__fs_event_stop(handle);
+ uv__io_stop(handle->loop, &handle->event_watcher, UV__IO_READ);
#endif /* defined(__APPLE__) */
uv__handle_stop(handle);
+
free(handle->filename);
- close(handle->fd);
- handle->fd = -1;
+ handle->filename = NULL;
+
+ close(handle->event_watcher.fd);
+ handle->event_watcher.fd = -1;
}
View
15 src/unix/linux/inotify.c
@@ -64,7 +64,9 @@ static int compare_watchers(const struct watcher_list* a,
RB_GENERATE_STATIC(watcher_root, watcher_list, entry, compare_watchers)
-static void uv__inotify_read(uv_loop_t* loop, uv__io_t* w, int revents);
+static void uv__inotify_read(uv_loop_t* loop,
+ uv__io_t* w,
+ unsigned int revents);
static int new_inotify_fd(void) {
@@ -98,11 +100,8 @@ static int init_inotify(uv_loop_t* loop) {
return -1;
}
- uv__io_init(&loop->inotify_read_watcher,
- uv__inotify_read,
- loop->inotify_fd,
- UV__IO_READ);
- uv__io_start(loop, &loop->inotify_read_watcher);
+ uv__io_init(&loop->inotify_read_watcher, uv__inotify_read, loop->inotify_fd);
+ uv__io_start(loop, &loop->inotify_read_watcher, UV__IO_READ);
return 0;
}
@@ -115,7 +114,9 @@ static struct watcher_list* find_watcher(uv_loop_t* loop, int wd) {
}
-static void uv__inotify_read(uv_loop_t* loop, uv__io_t* dummy, int events) {
+static void uv__inotify_read(uv_loop_t* loop,
+ uv__io_t* dummy,
+ unsigned int events) {
const struct uv__inotify_event* e;
struct watcher_list* w;
uv_fs_event_t* h;
View
167 src/unix/linux/linux-core.c
@@ -78,20 +78,183 @@ static void free_args_mem(void) {
int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
- loop->inotify_watchers = NULL;
+ int fd;
+
+ fd = uv__epoll_create1(UV__EPOLL_CLOEXEC);
+
+ /* epoll_create1() can fail either because it's not implemented (old kernel)
+ * or because it doesn't understand the EPOLL_CLOEXEC flag.
+ */
+ if (fd == -1 && (errno == ENOSYS || errno == EINVAL)) {
+ fd = uv__epoll_create(256);
+
+ if (fd != -1)
+ uv__cloexec(fd, 1);
+ }
+
+ loop->backend_fd = fd;
loop->inotify_fd = -1;
+ loop->inotify_watchers = NULL;
+
+ if (fd == -1)
+ return -1;
+
return 0;
}
void uv__platform_loop_delete(uv_loop_t* loop) {
if (loop->inotify_fd == -1) return;
- uv__io_stop(loop, &loop->inotify_read_watcher);
+ uv__io_stop(loop, &loop->inotify_read_watcher, UV__IO_READ);
close(loop->inotify_fd);
loop->inotify_fd = -1;
}
+void uv__io_poll(uv_loop_t* loop, int timeout) {
+ struct uv__epoll_event events[1024];
+ struct uv__epoll_event* pe;
+ struct uv__epoll_event e;
+ ngx_queue_t* q;
+ uv__io_t* w;
+ uint64_t base;
+ uint64_t diff;
+ int nevents;
+ int count;
+ int nfds;
+ int fd;
+ int op;
+ int i;
+
+ if (loop->nfds == 0) {
+ assert(ngx_queue_empty(&loop->watcher_queue));
+ return;
+ }
+
+ while (!ngx_queue_empty(&loop->watcher_queue)) {
+ q = ngx_queue_head(&loop->watcher_queue);
+ ngx_queue_remove(q);
+ ngx_queue_init(q);
+
+ w = ngx_queue_data(q, uv__io_t, watcher_queue);
+ assert(w->pevents != 0);
+ assert(w->fd >= 0);
+ assert(w->fd < (int) loop->nwatchers);
+
+ /* Filter out no-op changes. This is for compatibility with the event ports
+ * backend, see the comment in uv__io_start().
+ */
+ if (w->events == w->pevents)
+ continue;
+
+ e.events = w->pevents;
+ e.data = w->fd;
+
+ if (w->events == 0)
+ op = UV__EPOLL_CTL_ADD;
+ else
+ op = UV__EPOLL_CTL_MOD;
+
+ /* XXX Future optimization: do EPOLL_CTL_MOD lazily if we stop watching
+ * events, skip the syscall and squelch the events after epoll_wait().
+ */
+ if (uv__epoll_ctl(loop->backend_fd, op, w->fd, &e)) {
+ if (errno != EEXIST)
+ abort();
+
+ assert(op == UV__EPOLL_CTL_ADD);
+
+ /* We've reactivated a file descriptor that's been watched before. */
+ if (uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_MOD, w->fd, &e))
+ abort();
+ }
+
+ w->events = w->pevents;
+ }
+
+ assert(timeout >= -1);
+ base = loop->time;
+ count = 48; /* Benchmarks suggest this gives the best throughput. */
+
+ for (;;) {
+ nfds = uv__epoll_wait(loop->backend_fd,
+ events,
+ ARRAY_SIZE(events),
+ timeout);
+
+ if (nfds == 0) {
+ assert(timeout != -1);
+ return;
+ }
+
+ if (nfds == -1) {
+ if (errno != EINTR)
+ abort();
+
+ if (timeout == -1)
+ continue;
+
+ if (timeout == 0)
+ return;
+
+ /* Interrupted by a signal. Update timeout and poll again. */
+ goto update_timeout;
+ }
+
+ nevents = 0;
+
+ for (i = 0; i < nfds; i++) {
+ pe = events + i;
+ fd = pe->data;
+
+ assert(fd >= 0);
+ assert((unsigned) fd < loop->nwatchers);
+
+ w = loop->watchers[fd];
+
+ if (w == NULL) {
+ /* File descriptor that we've stopped watching, disarm it. */
+ if (uv__epoll_ctl(loop->backend_fd, UV__EPOLL_CTL_DEL, fd, pe))
+ if (errno != EBADF && errno != ENOENT)
+ abort();
+
+ continue;
+ }
+
+ w->cb(loop, w, pe->events);
+ nevents++;
+ }
+
+ if (nevents != 0) {
+ if (nfds == ARRAY_SIZE(events) && --count != 0) {
+ /* Poll for more events but don't block this time. */
+ timeout = 0;
+ continue;
+ }
+ return;
+ }
+
+ if (timeout == 0)
+ return;
+
+ if (timeout == -1)
+ continue;
+
+update_timeout:
+ assert(timeout > 0);
+
+ diff = uv_hrtime() / 1000000;
+ assert(diff >= base);
+ diff -= base;
+
+ if (diff >= (uint64_t) timeout)
+ return;
+
+ timeout -= diff;
+ }
+}
+
+
uint64_t uv_hrtime() {
struct timespec ts;
clock_gettime(CLOCK_MONOTONIC, &ts);
View
39 src/unix/loop.c
@@ -29,16 +29,9 @@
int uv__loop_init(uv_loop_t* loop, int default_loop) {
unsigned int i;
- int flags;
uv__signal_global_once_init();
-#if HAVE_KQUEUE
- flags = EVBACKEND_KQUEUE;
-#else
- flags = EVFLAG_AUTO;
-#endif
-
memset(loop, 0, sizeof(*loop));
RB_INIT(&loop->timer_handles);
ngx_queue_init(&loop->wq);
@@ -48,15 +41,24 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
ngx_queue_init(&loop->check_handles);
ngx_queue_init(&loop->prepare_handles);
ngx_queue_init(&loop->handle_queue);
+
+ loop->nfds = 0;
+ loop->watchers = NULL;
+ loop->nwatchers = 0;
+ ngx_queue_init(&loop->pending_queue);
+ ngx_queue_init(&loop->watcher_queue);
+
loop->closing_handles = NULL;
loop->time = uv_hrtime() / 1000000;
loop->async_pipefd[0] = -1;
loop->async_pipefd[1] = -1;
loop->signal_pipefd[0] = -1;
loop->signal_pipefd[1] = -1;
+ loop->backend_fd = -1;
loop->emfile_fd = -1;
- loop->ev = (default_loop ? ev_default_loop : ev_loop_new)(flags);
- ev_set_userdata(loop->ev, loop);
+
+ if (uv__platform_loop_init(loop, default_loop))
+ return -1;
uv_signal_init(loop, &loop->child_watcher);
uv__handle_unref(&loop->child_watcher);
@@ -74,17 +76,13 @@ int uv__loop_init(uv_loop_t* loop, int default_loop) {
uv__handle_unref(&loop->wq_async);
loop->wq_async.flags |= UV__HANDLE_INTERNAL;
- if (uv__platform_loop_init(loop, default_loop))
- return -1;
-
return 0;
}
void uv__loop_delete(uv_loop_t* loop) {
uv__signal_loop_cleanup(loop);
uv__platform_loop_delete(loop);
- ev_loop_destroy(loop->ev);
if (loop->async_pipefd[0] != -1) {
close(loop->async_pipefd[0]);
@@ -101,8 +99,23 @@ void uv__loop_delete(uv_loop_t* loop) {
loop->emfile_fd = -1;
}
+ if (loop->backend_fd != -1) {
+ close(loop->backend_fd);
+ loop->backend_fd = -1;
+ }
+
uv_mutex_lock(&loop->wq_mutex);
assert(ngx_queue_empty(&loop->wq) && "thread pool work queue not empty!");
uv_mutex_unlock(&loop->wq_mutex);
uv_mutex_destroy(&loop->wq_mutex);
+
+#if 0
+ assert(ngx_queue_empty(&loop->pending_queue));
+ assert(ngx_queue_empty(&loop->watcher_queue));
+ assert(loop->nfds == 0);
+#endif
+
+ free(loop->watchers);
+ loop->watchers = NULL;
+ loop->nwatchers = 0;
}
View
2 src/unix/netbsd.c
@@ -48,7 +48,7 @@ static char *process_title;
int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
- return 0;
+ return uv__kqueue_init(loop);
}
View
2 src/unix/openbsd.c
@@ -44,7 +44,7 @@ static char *process_title;
int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
- return 0;
+ return uv__kqueue_init(loop);
}
View
40 src/unix/pipe.c
@@ -29,7 +29,7 @@
#include <unistd.h>
#include <stdlib.h>
-static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events);
+static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, unsigned int events);
int uv_pipe_init(uv_loop_t* loop, uv_pipe_t* handle, int ipc) {
@@ -57,7 +57,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
bound = 0;
/* Already bound? */
- if (handle->fd >= 0) {
+ if (handle->io_watcher.fd >= 0) {
uv__set_artificial_error(handle->loop, UV_EINVAL);
goto out;
}
@@ -89,7 +89,7 @@ int uv_pipe_bind(uv_pipe_t* handle, const char* name) {
/* Success. */
handle->pipe_fname = pipe_fname; /* Is a strdup'ed copy. */
- handle->fd = sockfd;
+ handle->io_watcher.fd = sockfd;
status = 0;
out:
@@ -117,21 +117,18 @@ int uv_pipe_listen(uv_pipe_t* handle, int backlog, uv_connection_cb cb) {
saved_errno = errno;
status = -1;
- if (handle->fd == -1) {
+ if (handle->io_watcher.fd == -1) {
uv__set_artificial_error(handle->loop, UV_EINVAL);
goto out;
}
- assert(handle->fd >= 0);
+ assert(handle->io_watcher.fd >= 0);
- if ((status = listen(handle->fd, backlog)) == -1) {
+ if ((status = listen(handle->io_watcher.fd, backlog)) == -1) {
uv__set_sys_error(handle->loop, errno);
} else {
handle->connection_cb = cb;
- uv__io_init(&handle->read_watcher,
- uv__pipe_accept,
- handle->fd,
- UV__IO_READ);
- uv__io_start(handle->loop, &handle->read_watcher);
+ handle->io_watcher.cb = uv__pipe_accept;
+ uv__io_start(handle->loop, &handle->io_watcher, UV__IO_READ);
}
out:
@@ -175,11 +172,11 @@ void uv_pipe_connect(uv_connect_t* req,
int r;
saved_errno = errno;
- new_sock = (handle->fd == -1);
+ new_sock = (handle->io_watcher.fd == -1);
err = -1;
if (new_sock)
- if ((handle->fd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
+ if ((handle->io_watcher.fd = uv__socket(AF_UNIX, SOCK_STREAM, 0)) == -1)
goto out;
memset(&saddr, 0, sizeof saddr);
@@ -190,7 +187,7 @@ void uv_pipe_connect(uv_connect_t* req,
* is either there or not.
*/
do {
- r = connect(handle->fd, (struct sockaddr*)&saddr, sizeof saddr);
+ r = connect(handle->io_watcher.fd, (struct sockaddr*)&saddr, sizeof saddr);
}
while (r == -1 && errno == EINTR);
@@ -199,12 +196,11 @@ void uv_pipe_connect(uv_connect_t* req,
if (new_sock)
if (uv__stream_open((uv_stream_t*)handle,
- handle->fd,
+ handle->io_watcher.fd,
UV_STREAM_READABLE | UV_STREAM_WRITABLE))
goto out;
- uv__io_start(handle->loop, &handle->read_watcher);
- uv__io_start(handle->loop, &handle->write_watcher);
+ uv__io_start(handle->loop, &handle->io_watcher, UV__IO_READ|UV__IO_WRITE);
err = 0;
out:
@@ -217,7 +213,7 @@ void uv_pipe_connect(uv_connect_t* req,
ngx_queue_init(&req->queue);
/* Run callback on next tick. */
- uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE);
+ uv__io_feed(handle->loop, &handle->io_watcher);
/* Mimic the Windows pipe implementation, always
* return 0 and let the callback handle errors.
@@ -227,17 +223,17 @@ void uv_pipe_connect(uv_connect_t* req,
/* TODO merge with uv__server_io()? */
-static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events) {
+static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_pipe_t* pipe;
int saved_errno;
int sockfd;
saved_errno = errno;
- pipe = container_of(w, uv_pipe_t, read_watcher);
+ pipe = container_of(w, uv_pipe_t, io_watcher);
assert(pipe->type == UV_NAMED_PIPE);
- sockfd = uv__accept(pipe->fd);
+ sockfd = uv__accept(pipe->io_watcher.fd);
if (sockfd == -1) {
if (errno != EAGAIN && errno != EWOULDBLOCK) {
uv__set_sys_error(pipe->loop, errno);
@@ -248,7 +244,7 @@ static void uv__pipe_accept(uv_loop_t* loop, uv__io_t* w, int events) {
pipe->connection_cb((uv_stream_t*)pipe, 0);
if (pipe->accepted_fd == sockfd) {
/* The user hasn't called uv_accept() yet */
- uv__io_stop(pipe->loop, &pipe->read_watcher);
+ uv__io_stop(pipe->loop, &pipe->io_watcher, UV__IO_READ);
}
}
View
24 src/unix/poll.c
@@ -27,15 +27,14 @@
#include <errno.h>
-static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, int events) {
+static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_poll_t* handle;
int pevents;
handle = container_of(w, uv_poll_t, io_watcher);
if (events & UV__IO_ERROR) {
- /* An error happened. Libev has implicitly stopped the watcher, but we */
- /* need to fix the refcount. */
+ uv__io_stop(loop, w, UV__IO_READ | UV__IO_WRITE);
uv__handle_stop(handle);
uv__set_sys_error(handle->loop, EBADF);
handle->poll_cb(handle, -1, 0);
@@ -54,10 +53,8 @@ static void uv__poll_io(uv_loop_t* loop, uv__io_t* w, int events) {
int uv_poll_init(uv_loop_t* loop, uv_poll_t* handle, int fd) {
uv__handle_init(loop, (uv_handle_t*) handle, UV_POLL);
- handle->fd = fd;
+ uv__io_init(&handle->io_watcher, uv__poll_io, fd);
handle->poll_cb = NULL;
- uv__io_init(&handle->io_watcher, uv__poll_io, fd, 0);
-
return 0;
}
@@ -69,7 +66,7 @@ int uv_poll_init_socket(uv_loop_t* loop, uv_poll_t* handle,
static void uv__poll_stop(uv_poll_t* handle) {
- uv__io_stop(handle->loop, &handle->io_watcher);
+ uv__io_stop(handle->loop, &handle->io_watcher, UV__IO_READ | UV__IO_WRITE);
uv__handle_stop(handle);
}
@@ -87,23 +84,20 @@ int uv_poll_start(uv_poll_t* handle, int pevents, uv_poll_cb poll_cb) {
assert((pevents & ~(UV_READABLE | UV_WRITABLE)) == 0);
assert(!(handle->flags & (UV_CLOSING | UV_CLOSED)));
- if (pevents == 0) {
- uv__poll_stop(handle);
+ uv__poll_stop(handle);
+
+ if (pevents == 0)
return 0;
- }
events = 0;
if (pevents & UV_READABLE)
events |= UV__IO_READ;
if (pevents & UV_WRITABLE)
events |= UV__IO_WRITE;
- uv__io_stop(handle->loop, &handle->io_watcher);
- uv__io_set(&handle->io_watcher, uv__poll_io, handle->fd, events);
- uv__io_start(handle->loop, &handle->io_watcher);
-
- handle->poll_cb = poll_cb;
+ uv__io_start(handle->loop, &handle->io_watcher, events);
uv__handle_start(handle);
+ handle->poll_cb = poll_cb;
return 0;
}
View
2 src/unix/process.c
@@ -204,7 +204,7 @@ static int uv__process_init_stdio(uv_stdio_container_t* container, int fds[2]) {
if (container->flags & UV_INHERIT_FD) {
fd = container->data.fd;
} else {
- fd = container->data.stream->fd;
+ fd = container->data.stream->io_watcher.fd;
}
if (fd == -1) {
View
9 src/unix/signal.c
@@ -38,7 +38,7 @@ RB_HEAD(uv__signal_tree_s, uv_signal_s);
static int uv__signal_unlock();
-static void uv__signal_event(uv_loop_t* loop, uv__io_t* watcher, int events);
+static void uv__signal_event(uv_loop_t* loop, uv__io_t* w, unsigned int events);
static int uv__signal_compare(uv_signal_t* w1, uv_signal_t* w2);
static void uv__signal_stop(uv_signal_t* handle);
@@ -212,9 +212,8 @@ static int uv__signal_loop_once_init(uv_loop_t* loop) {
uv__io_init(&loop->signal_io_watcher,
uv__signal_event,
- loop->signal_pipefd[0],
- UV__IO_READ);
- uv__io_start(loop, &loop->signal_io_watcher);
+ loop->signal_pipefd[0]);
+ uv__io_start(loop, &loop->signal_io_watcher, UV__IO_READ);
return 0;
}
@@ -329,7 +328,7 @@ int uv_signal_start(uv_signal_t* handle, uv_signal_cb signal_cb, int signum) {
}
-static void uv__signal_event(uv_loop_t* loop, uv__io_t* watcher, int events) {
+static void uv__signal_event(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv__signal_msg_t* msg;
uv_signal_t* handle;
char buf[sizeof(uv__signal_msg_t) * 32];
View
143 src/unix/stream.c
@@ -38,7 +38,7 @@
static void uv__stream_connect(uv_stream_t*);
static void uv__write(uv_stream_t* stream);
static void uv__read(uv_stream_t* stream);
-static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events);
+static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events);
/* Used by the accept() EMFILE party trick. */
@@ -88,7 +88,6 @@ void uv__stream_init(uv_loop_t* loop,
stream->connect_req = NULL;
stream->shutdown_req = NULL;
stream->accepted_fd = -1;
- stream->fd = -1;
stream->delayed_error = 0;
ngx_queue_init(&stream->write_queue);
ngx_queue_init(&stream->write_completed_queue);
@@ -97,42 +96,32 @@ void uv__stream_init(uv_loop_t* loop,
if (loop->emfile_fd == -1)
loop->emfile_fd = uv__open_cloexec("/", O_RDONLY);
- uv__io_init(&stream->read_watcher, uv__stream_io, -1, 0);
- uv__io_init(&stream->write_watcher, uv__stream_io, -1, 0);
+ uv__io_init(&stream->io_watcher, uv__stream_io, -1);
}
int uv__stream_open(uv_stream_t* stream, int fd, int flags) {
socklen_t yes;
assert(fd >= 0);
- stream->fd = fd;
-
stream->flags |= flags;
if (stream->type == UV_TCP) {
/* Reuse the port address if applicable. */
yes = 1;
- if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1) {
- uv__set_sys_error(stream->loop, errno);
- return -1;
- }
- if ((stream->flags & UV_TCP_NODELAY) &&
- uv__tcp_nodelay((uv_tcp_t*)stream, 1)) {
- return -1;
- }
+ if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof yes) == -1)
+ return uv__set_sys_error(stream->loop, errno);
+
+ if ((stream->flags & UV_TCP_NODELAY) && uv__tcp_nodelay(fd, 1))
+ return uv__set_sys_error(stream->loop, errno);
/* TODO Use delay the user passed in. */
- if ((stream->flags & UV_TCP_KEEPALIVE) &&
- uv__tcp_keepalive((uv_tcp_t*)stream, 1, 60)) {
- return -1;
- }
+ if ((stream->flags & UV_TCP_KEEPALIVE) && uv__tcp_keepalive(fd, 1, 60))
+ return uv__set_sys_error(stream->loop, errno);
}
- /* Associate the fd with each watcher. */
- uv__io_set(&stream->read_watcher, uv__stream_io, fd, UV__IO_READ);
- uv__io_set(&stream->write_watcher, uv__stream_io, fd, UV__IO_WRITE);
+ stream->io_watcher.fd = fd;
return 0;
}
@@ -142,6 +131,7 @@ void uv__stream_destroy(uv_stream_t* stream) {
uv_write_t* req;
ngx_queue_t* q;
+ assert(!uv__io_active(&stream->io_watcher, UV__IO_READ | UV__IO_WRITE));
assert(stream->flags & UV_CLOSED);
if (stream->connect_req) {
@@ -232,27 +222,26 @@ static int uv__emfile_trick(uv_loop_t* loop, int accept_fd) {
}
-void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) {
+void uv__server_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
static int use_emfile_trick = -1;
uv_stream_t* stream;
int fd;
int r;
- stream = container_of(w, uv_stream_t, read_watcher);
+ stream = container_of(w, uv_stream_t, io_watcher);
assert(events == UV__IO_READ);
+ assert(stream->accepted_fd == -1);
assert(!(stream->flags & UV_CLOSING));
- if (stream->accepted_fd >= 0) {
- uv__io_stop(loop, &stream->read_watcher);
- return;
- }
+ if (stream->accepted_fd == -1)
+ uv__io_start(stream->loop, &stream->io_watcher, UV__IO_READ);
/* connection_cb can close the server socket while we're
* in the loop so check it on each iteration.
*/
- while (stream->fd != -1) {
- assert(stream->accepted_fd < 0);
- fd = uv__accept(stream->fd);
+ while (stream->io_watcher.fd != -1) {
+ assert(stream->accepted_fd == -1);
+ fd = uv__accept(stream->io_watcher.fd);
if (fd == -1) {
switch (errno) {
@@ -273,7 +262,7 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) {
}
if (use_emfile_trick) {
- SAVE_ERRNO(r = uv__emfile_trick(loop, stream->fd));
+ SAVE_ERRNO(r = uv__emfile_trick(loop, stream->io_watcher.fd));
if (r == 0)
continue;
}
@@ -292,7 +281,7 @@ void uv__server_io(uv_loop_t* loop, uv__io_t* w, int events) {
if (stream->accepted_fd != -1) {
/* The user hasn't yet accepted called uv_accept() */
- uv__io_stop(loop, &stream->read_watcher);
+ uv__io_stop(loop, &stream->io_watcher, UV__IO_READ);
return;
}
@@ -333,7 +322,7 @@ int uv_accept(uv_stream_t* server, uv_stream_t* client) {
goto out;
}
- uv__io_start(streamServer->loop, &streamServer->read_watcher);
+ uv__io_start(streamServer->loop, &streamServer->io_watcher, UV__IO_READ);
streamServer->accepted_fd = -1;
status = 0;
@@ -393,7 +382,7 @@ static void uv__drain(uv_stream_t* stream) {
assert(!uv_write_queue_head(stream));
assert(stream->write_queue_size == 0);
- uv__io_stop(stream->loop, &stream->write_watcher);
+ uv__io_stop(stream->loop, &stream->io_watcher, UV__IO_WRITE);
/* Shutdown? */
if ((stream->flags & UV_STREAM_SHUTTING) &&
@@ -405,7 +394,7 @@ static void uv__drain(uv_stream_t* stream) {
stream->shutdown_req = NULL;
uv__req_unregister(stream->loop, req);
- if (shutdown(stream->fd, SHUT_WR)) {
+ if (shutdown(stream->io_watcher.fd, SHUT_WR)) {
/* Error. Report it. User should call uv_close(). */
uv__set_sys_error(stream->loop, errno);
if (req->cb) {
@@ -447,7 +436,7 @@ static void uv__write_req_finish(uv_write_t* req) {
* callback called in the near future.
*/
ngx_queue_insert_tail(&stream->write_completed_queue, &req->queue);
- uv__io_feed(stream->loop, &stream->write_watcher, UV__IO_WRITE);
+ uv__io_feed(stream->loop, &stream->io_watcher);
}
@@ -469,7 +458,7 @@ static void uv__write(uv_stream_t* stream) {
start:
- assert(stream->fd >= 0);
+ assert(stream->io_watcher.fd >= 0);
/* Get the request at the head of the queue. */
req = uv_write_queue_head(stream);
@@ -497,7 +486,7 @@ static void uv__write(uv_stream_t* stream) {
struct msghdr msg;
char scratch[64];
struct cmsghdr *cmsg;
- int fd_to_send = req->send_handle->fd;
+ int fd_to_send = req->send_handle->io_watcher.fd;
assert(fd_to_send >= 0);
@@ -523,15 +512,15 @@ static void uv__write(uv_stream_t* stream) {
}
do {
- n = sendmsg(stream->fd, &msg, 0);
+ n = sendmsg(stream->io_watcher.fd, &msg, 0);
}
while (n == -1 && errno == EINTR);
} else {
do {
if (iovcnt == 1) {
- n = write(stream->fd, iov[0].iov_base, iov[0].iov_len);
+ n = write(stream->io_watcher.fd, iov[0].iov_base, iov[0].iov_len);
} else {
- n = writev(stream->fd, iov, iovcnt);
+ n = writev(stream->io_watcher.fd, iov, iovcnt);
}
}
while (n == -1 && errno == EINTR);
@@ -603,7 +592,7 @@ static void uv__write(uv_stream_t* stream) {
assert(!(stream->flags & UV_STREAM_BLOCKING));
/* We're not done. */
- uv__io_start(stream->loop, &stream->write_watcher);
+ uv__io_start(stream->loop, &stream->io_watcher, UV__IO_WRITE);
}
@@ -680,11 +669,11 @@ static void uv__read(uv_stream_t* stream) {
assert(buf.len > 0);
assert(buf.base);
- assert(stream->fd >= 0);
+ assert(stream->io_watcher.fd >= 0);
if (stream->read_cb) {
do {
- nread = read(stream->fd, buf.base, buf.len);
+ nread = read(stream->io_watcher.fd, buf.base, buf.len);
}
while (nread < 0 && errno == EINTR);
} else {
@@ -700,7 +689,7 @@ static void uv__read(uv_stream_t* stream) {
msg.msg_control = (void *) cmsg_space;
do {
- nread = recvmsg(stream->fd, &msg, 0);
+ nread = recvmsg(stream->io_watcher.fd, &msg, 0);
}
while (nread < 0 && errno == EINTR);
}
@@ -711,7 +700,7 @@ static void uv__read(uv_stream_t* stream) {
if (errno == EAGAIN || errno == EWOULDBLOCK) {
/* Wait for the next one. */
if (stream->flags & UV_STREAM_READING) {
- uv__io_start(stream->loop, &stream->read_watcher);
+ uv__io_start(stream->loop, &stream->io_watcher, UV__IO_READ);
}
uv__set_sys_error(stream->loop, EAGAIN);
@@ -732,15 +721,16 @@ static void uv__read(uv_stream_t* stream) {
stream->read2_cb((uv_pipe_t*)stream, -1, buf, UV_UNKNOWN_HANDLE);
}
- assert(!uv__io_active(&stream->read_watcher));
+ assert(!uv__io_active(&stream->io_watcher, UV__IO_READ));
return;
}
} else if (nread == 0) {
/* EOF */
uv__set_artificial_error(stream->loop, UV_EOF);
- uv__io_stop(stream->loop, &stream->read_watcher);
- if (!uv__io_active(&stream->write_watcher))
+ uv__io_stop(stream->loop, &stream->io_watcher, UV__IO_READ);
+
+ if (!uv__io_active(&stream->io_watcher, UV__IO_WRITE))
uv__handle_stop(stream);
if (stream->read_cb) {
@@ -808,7 +798,7 @@ static void uv__read(uv_stream_t* stream) {
int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
assert((stream->type == UV_TCP || stream->type == UV_NAMED_PIPE) &&
"uv_shutdown (unix) only supports uv_handle_t right now");
- assert(stream->fd >= 0);
+ assert(stream->io_watcher.fd >= 0);
if (!(stream->flags & UV_STREAM_WRITABLE) ||
stream->flags & UV_STREAM_SHUT ||
@@ -825,36 +815,38 @@ int uv_shutdown(uv_shutdown_t* req, uv_stream_t* stream, uv_shutdown_cb cb) {
stream->shutdown_req = req;
stream->flags |= UV_STREAM_SHUTTING;
- uv__io_start(stream->loop, &stream->write_watcher);
+ uv__io_start(stream->loop, &stream->io_watcher, UV__IO_WRITE);
return 0;
}
-static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, int events) {
+static void uv__stream_io(uv_loop_t* loop, uv__io_t* w, unsigned int events) {
uv_stream_t* stream;
- /* either UV__IO_READ or UV__IO_WRITE but not both */
- assert(!!(events & UV__IO_READ) ^ !!(events & UV__IO_WRITE));
-
- if (events & UV__IO_READ)
- stream = container_of(w, uv_stream_t, read_watcher);
- else
- stream = container_of(w, uv_stream_t, write_watcher);
+ stream = container_of(w, uv_stream_t, io_watcher);
assert(stream->type == UV_TCP ||
stream->type == UV_NAMED_PIPE ||
stream->type == UV_TTY);
assert(!(stream->flags & UV_CLOSING));
- if (stream->connect_req)
+ if (stream->connect_req) {
uv__stream_connect(stream);
- else if (events & UV__IO_READ) {
- assert(stream->fd >= 0);
+ return;
+ }
+
+ if (events & UV__IO_READ) {
+ assert(stream->io_watcher.fd >= 0);
+
uv__read(stream);
+
+ if (stream->io_watcher.fd == -1)
+ return; /* read_cb closed stream. */
}
- else {
- assert(stream->fd >= 0);
+
+ if (events & UV__IO_WRITE) {
+ assert(stream->io_watcher.fd >= 0);
uv__write(stream);
uv__write_callbacks(stream);
}
@@ -883,8 +875,8 @@ static void uv__stream_connect(uv_stream_t* stream) {
stream->delayed_error = 0;
} else {
/* Normal situation: we need to get the socket error from the kernel. */
- assert(stream->fd >= 0);
- getsockopt(stream->fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
+ assert(stream->io_watcher.fd >= 0);
+ getsockopt(stream->io_watcher.fd, SOL_SOCKET, SO_ERROR, &error, &errorsize);
}
if (error == EINPROGRESS)
@@ -914,7 +906,7 @@ int uv_write2(uv_write_t* req,
stream->type == UV_TTY) &&
"uv_write (unix) does not yet support other types of streams");
- if (stream->fd < 0) {
+ if (stream->io_watcher.fd < 0) {
uv__set_sys_error(stream->loop, EBADF);
return -1;
}
@@ -966,7 +958,7 @@ int uv_write2(uv_write_t* req,
* sufficiently flushed in uv__write.
*/
assert(!(stream->flags & UV_STREAM_BLOCKING));
- uv__io_start(stream->loop, &stream->write_watcher);
+ uv__io_start(stream->loop, &stream->io_watcher, UV__IO_WRITE);
}
return 0;
@@ -1001,14 +993,14 @@ int uv__read_start_common(uv_stream_t* stream, uv_alloc_cb alloc_cb,
/* TODO: keep track of tcp state. If we've gotten a EOF then we should
* not start the IO watcher.
*/
- assert(stream->fd >= 0);
+ assert(stream->io_watcher.fd >= 0);
assert(alloc_cb);
stream->read_cb = read_cb;
stream->read2_cb = read2_cb;
stream->alloc_cb = alloc_cb;
- uv__io_start(stream->loop, &stream->read_watcher);
+ uv__io_start(stream->loop, &stream->io_watcher, UV__IO_READ);
uv__handle_start(stream);
return 0;
@@ -1028,7 +1020,7 @@ int uv_read2_start(uv_stream_t* stream, uv_alloc_cb alloc_cb,
int uv_read_stop(uv_stream_t* stream) {
- uv__io_stop(stream->loop, &stream->read_watcher);
+ uv__io_stop(stream->loop, &stream->io_watcher, UV__IO_READ);
uv__handle_stop(stream);
stream->flags &= ~UV_STREAM_READING;
stream->read_cb = NULL;
@@ -1050,16 +1042,15 @@ int uv_is_writable(const uv_stream_t* stream) {
void uv__stream_close(uv_stream_t* handle) {
uv_read_stop(handle);
- uv__io_stop(handle->loop, &handle->write_watcher);
+ uv__io_stop(handle->loop, &handle->io_watcher, UV__IO_WRITE);
- close(handle->fd);
- handle->fd = -1;
+ close(handle->io_watcher.fd);
+ handle->io_watcher.fd = -1;
if (handle->accepted_fd >= 0) {
close(handle->accepted_fd);
handle->accepted_fd = -1;
}
- assert(!uv__io_active(&handle->read_watcher));
- assert(!uv__io_active(&handle->write_watcher));
+ assert(!uv__io_active(&handle->io_watcher, UV__IO_READ|UV__IO_WRITE));
}
View
163 src/unix/sunos.c
@@ -65,14 +65,163 @@
int uv__platform_loop_init(uv_loop_t* loop, int default_loop) {
loop->fs_fd = -1;
+ loop->backend_fd = port_create();
+
+ if (loop->backend_fd == -1)
+ return -1;
+
+ uv__cloexec(loop->backend_fd, 1);
+
return 0;
}
void uv__platform_loop_delete(uv_loop_t* loop) {
- if (loop->fs_fd == -1) return;
- close(loop->fs_fd);
- loop->fs_fd = -1;
+ if (loop->fs_fd != -1) {
+ close(loop->fs_fd);
+ loop->fs_fd = -1;
+ }
+
+ if (loop->backend_fd != -1) {
+ close(loop->backend_fd);
+ loop->backend_fd = -1;
+ }
+}
+
+
+void uv__io_poll(uv_loop_t* loop, int timeout) {
+ struct port_event events[1024];
+ struct port_event* pe;
+ struct timespec spec;
+ ngx_queue_t* q;
+ uv__io_t* w;
+ uint64_t base;
+ uint64_t diff;
+ unsigned int nfds;
+ unsigned int i;
+ int saved_errno;
+ int nevents;
+ int count;
+ int fd;
+
+ if (loop->nfds == 0) {
+ assert(ngx_queue_empty(&loop->watcher_queue));
+ return;
+ }
+
+ while (!ngx_queue_empty(&loop->watcher_queue)) {
+ q = ngx_queue_head(&loop->watcher_queue);
+ ngx_queue_remove(q);
+ ngx_queue_init(q);
+
+ w = ngx_queue_data(q, uv__io_t, watcher_queue);
+ assert(w->pevents != 0);
+
+ if (port_associate(loop->backend_fd, PORT_SOURCE_FD, w->fd, w->pevents, 0))
+ abort();
+
+ w->events = w->pevents;
+ }
+
+ assert(timeout >= -1);
+ base = loop->time;
+ count = 48; /* Benchmarks suggest this gives the best throughput. */
+
+ for (;;) {
+ if (timeout != -1) {
+ spec.tv_sec = timeout / 1000;
+ spec.tv_nsec = (timeout % 1000) * 1000000;
+ }
+
+ /* Work around a kernel bug where nfds is not updated. */
+ events[0].portev_source = 0;
+
+ nfds = 1;
+ saved_errno = 0;
+ if (port_getn(loop->backend_fd,
+ events,
+ ARRAY_SIZE(events),
+ &nfds,
+ timeout == -1 ? NULL : &spec)) {
+ /* Work around another kernel bug: port_getn() may return events even
+ * on error.
+ */
+ if (errno == EINTR || errno == ETIME)
+ saved_errno = errno;
+ else
+ abort();
+ }
+
+ if (events[0].portev_source == 0) {
+ if (timeout == 0)
+ return;
+
+ if (timeout == -1)
+ continue;
+
+ goto update_timeout;
+ }
+
+ if (nfds == 0) {
+ assert(timeout != -1);
+ return;
+ }
+
+ nevents = 0;
+
+ for (i = 0; i < nfds; i++) {
+ pe = events + i;
+ fd = pe->portev_object;
+
+ assert(fd >= 0);
+ assert((unsigned) fd < loop->nwatchers);
+
+ w = loop->watchers[fd];
+
+ /* File descriptor that we've stopped watching, ignore. */
+ if (w == NULL)
+ continue;
+
+ w->cb(loop, w, pe->portev_events);
+ nevents++;
+
+ /* Events Ports operates in oneshot mode, rearm timer on next run. */
+ if (w->pevents != 0 && ngx_queue_empty(&w->watcher_queue))
+ ngx_queue_insert_tail(&loop->watcher_queue, &w->watcher_queue);
+ }
+
+ if (nevents != 0) {
+ if (nfds == ARRAY_SIZE(events) && --count != 0) {
+ /* Poll for more events but don't block this time. */
+ timeout = 0;
+ continue;
+ }
+ return;
+ }
+
+ if (saved_errno == ETIME) {
+ assert(timeout != -1);
+ return;
+ }
+
+ if (timeout == 0)
+ return;
+
+ if (timeout == -1)
+ continue;
+
+update_timeout:
+ assert(timeout > 0);
+
+ diff = uv_hrtime() / 1000000;
+ assert(diff >= base);
+ diff -= base;
+
+ if (diff >= (uint64_t) timeout)
+ return;
+
+ timeout -= diff;
+ }
}
@@ -139,7 +288,9 @@ static void uv__fs_event_rearm(uv_fs_event_t *handle) {
}
-static void uv__fs_event_read(uv_loop_t* loop, uv__io_t* w, int revents) {
+static void uv__fs_event_read(uv_loop_t* loop,
+ uv__io_t* w,
+ unsigned int revents) {
uv_fs_event_t *handle = NULL;
timespec_t timeout;
port_event_t pe;
@@ -216,8 +367,8 @@ int uv_fs_event_init(uv_loop_t* loop,
uv__fs_event_rearm(handle);
if (first_run) {
- uv__io_init(&loop->fs_event_watcher, uv__fs_event_read, portfd, UV__IO_READ);
- uv__io_start(loop, &loop->fs_event_watcher);
+ uv__io_init(&loop->fs_event_watcher, uv__fs_event_read, portfd);
+ uv__io_start(loop, &loop->fs_event_watcher, UV__IO_READ);
}
return 0;
View
88 src/unix/tcp.c
@@ -37,7 +37,7 @@ int uv_tcp_init(uv_loop_t* loop, uv_tcp_t* tcp) {
static int maybe_new_socket(uv_tcp_t* handle, int domain, int flags) {
int sockfd;
- if (handle->fd != -1)
+ if (handle->io_watcher.fd != -1)
return 0;
sockfd = uv__socket(domain, SOCK_STREAM, 0);
@@ -68,7 +68,7 @@ static int uv__bind(uv_tcp_t* tcp,
return -1;
tcp->delayed_error = 0;
- if (bind(tcp->fd, addr, addrsize) == -1) {
+ if (bind(tcp->io_watcher.fd, addr, addrsize) == -1) {
if (errno == EADDRINUSE) {
tcp->delayed_error = errno;
} else {
@@ -105,7 +105,7 @@ static int uv__connect(uv_connect_t* req,
handle->delayed_error = 0;
do
- r = connect(handle->fd, addr, addrlen);
+ r = connect(handle->io_watcher.fd, addr, addrlen);
while (r == -1 && errno == EINTR);
if (r == -1) {
@@ -127,10 +127,10 @@ static int uv__connect(uv_connect_t* req,
ngx_queue_init(&req->queue);
handle->connect_req = req;
- uv__io_start(handle->loop, &handle->write_watcher);
+ uv__io_start(handle->loop, &handle->io_watcher, UV__IO_WRITE);
if (handle->delayed_error)
- uv__io_feed(handle->loop, &handle->write_watcher, UV__IO_WRITE);
+ uv__io_feed(handle->loop, &handle->io_watcher);
return 0;
}
@@ -174,7 +174,7 @@ int uv_tcp_getsockname(uv_tcp_t* handle, struct sockaddr* name,
goto out;
}
- if (handle->fd < 0) {
+ if (handle->io_watcher.fd < 0) {